本教學演示了如何從Spring Kafka傳送和接收訊息。 首先建立一個Spring Kafka Producer,它能夠將訊息傳送到Kafka主題。 接下來建立一個Spring Kafka Consumer,它能夠收聽傳送給Kafka的訊息。使用適當的鍵/值序列化器和反序列化器來組態它們。 最後,使用簡單的Spring Boot應用程式演示應用程式。
要下載並安裝Apache Kafka,請閱讀此處的官方文件。本教學假定使用預設組態啟動伺服器,並且不更改任何伺服器埠。
注意:在使用 Kafka 之前,需要安裝好
Spring Kafka:2.1.4.RELEASE
Spring Boot:2.0.0.RELEASE
Apache Kafka:kafka_2.11-1.0.0
Maven:3.5
請參考以下專案結構來構建專案。
在這個專案中,使用Apache Maven來管理專案依賴項。確保以下依賴項存在於類路徑上。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yiibai.spring.kafka</groupId>
<artifactId>producer-consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>/20/238/8840.html-boot</url>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
這個專案是從傳送訊息開始,使用KafkaTemplate
類來包裝Producer並提供高階操作以將資料傳送到Kafka主題。 提供非同步和同步方法,非同步方法返回Future
。
package com.yiibai.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
使用ProducerFactory
的實現來組態KafkaTemplate
,更具體地說是DefaultKafkaProducerFactory
。可以使用Map <String,Object>
初始化這個生產者工廠。使用從ProducerConfig
類中獲取鍵。
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
指定用於建立與Kafka群集的初始連線的主機/埠對列表。用戶端將使用所有伺服器,而不管此處指定哪些伺服器進行引導/此列表僅影響用於發現整套伺服器的初始主機。此列表應採用host1:port1,host2:port2,....
的形式。由於這些伺服器僅用於初始連線以發現完整的叢集成員資格(可能會動態更改),因此此列表不需要包含完整集 伺服器(但是,如果伺服器關閉,可能需要多個伺服器)。ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
指定用於實現org.apache.kafka.common.serialization.Serializer
介面的鍵的序列化程式類。ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
指定用於實現org.apache.kafka.common.serialization.Serializer
介面的值的序列化程式類。有關組態選項的完整列表,請檢視ProducerConfig類。
package com.yiibai.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
接下來,將演示如何從Kafka主題中收聽訊息。 Receiver
類將使用Kafka主題訊息。建立一個Listen()
方法並使用@KafkaListener
注釋對其進行了注釋,該注釋將該方法標記為指定主題上的Kafka訊息偵聽器的目標。
package com.yiibai.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@KafkaListener(topics = "${app.topic.foo}")
public void listen(@Payload String message) {
LOG.info("received message='{}'", message);
}
}
此機制需要在其中一個@Configuration
類和偵聽器容器工廠上使用@EnableKafka
注釋,該工廠用於組態基礎ConcurrentMessageListenerContainer
。使用SenderConfig
類中相同型別的鍵/值反序列化器。
ConsumerConfig.GROUP_ID_CONFIG
指定一個唯一字串,用於標識此使用者所屬的組。ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
指定當Kafka中沒有初始偏移量或伺服器上當前偏移量不再存在時要執行的操作(例如,因為該資料已被刪除):earliest
: 自動將偏移重置為最早的偏移量latest
: 自動將偏移重置為最新的偏移量none
: 如果沒有找到消費者組的先前偏移量,則向消費者丟擲異常anything else
: 向消費者丟擲異常。消費者使用消費者組名稱標記自己,並且發布到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者範例。 消費者範例可以在單獨的進程中,也可以在不同的機器。
如果所有使用者範例具有相同的使用者組,則記錄將有效地在使用者範例上進行負載平衡。 如果所有消費者範例具有不同的消費者組,則每個記錄將被廣播到所有消費者進程。
有關組態選項的完整列表,請檢視ConsumerConfig類。
package com.yiibai.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
需要建立了一個application.yml 屬性檔案,該檔案位於src/main/resources 檔案夾中。 這些屬性通過spring boot在組態類中注入。
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
現在,編寫一個簡單的Spring Boot應用程式來演示應用程式。 為了使這個演示工作,需要前先在埠9092
上執行localhost的Kafka伺服器(Kafka的預設組態)。
package com.yiibai.kafka;
import com.yiibai.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ProducerConsumerApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
sender.send("Spring Kafka Producer and Consumer Example");
}
}
使用 Maven 命令構建專案:
mvn clean install
看到構建成功後,執行以下Java命令,執行Jar程式:
java -jar target\producer-consumer-1.0.0-SNAPSHOT.jar
當執行應用程式時,應該會得到類似以下的結果: