在下面的教學中,我們將演示如何使用Spring Boot組態Spring Kafka。 Spring Boot使用合理的預設組態Spring Kafka。並使用application.yml
屬性檔案覆蓋這些預設值。
專案設定
2.1.4.RELEASE
2.0.0.RELEASE
kafka_2.11-1.0.0
3.5
此前已經學習了如何建立一個Kafka消費者和生產者,它可以手動組態生產者和消費者。 在這個例子中,我們將使用Spring Boot使用合理的預設值來組態它們。
下載並安裝Apache Kafka
要下載並安裝Apache Kafka,請閱讀官方文件( https://kafka.apache.org/quickstart )。 本教學假設伺服器使用預設組態啟動,並且沒有更改伺服器埠。
Maven的依賴
這個專案中,使用Apache Maven來管理專案依賴關係。 確保以下依賴關係在類路徑中。pom.xml 檔案的內容如下所示 -
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yiibai.spring.kafka</groupId>
<artifactId>springboot-config</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>https://www.tw511.com</url>
<description>Spring Kafka Spring Boot</description>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<defaultGoal>compile</defaultGoal>
</build>
</project>
整個專案的目錄結構如下所示 -
使用Spring Boot傳送Spring Kafka訊息
Spring Boot根據application.yml
屬性檔案中組態的屬性自動組態並初始化KafkaTemplate。 通過使用@Service
註解,使Sender
類符合Spring容器的要求來執行自動發現。
Sender.java 的程式碼如下所示 -
package com.yiibai.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
用Spring Boot接收Kafka訊息
ConcurrentKafkaListenerContainerFactory
和KafkaMessageListenerContainer bean
也由Spring Boot自動組態。 可以選擇使用application.yml
屬性檔案來組態這些bean。
通過使用@KafkaListener
來註解一個方法Spring Kafka會自動建立一個訊息監聽器容器。
Receiver.java 實現的程式碼如下所示 -
package com.yiibai.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@KafkaListener(topics = "${app.topic.foo}")
public void receive(@Payload String message,
@Headers MessageHeaders headers) {
LOG.info("received message='{}'", message);
headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
}
}
使用application.yml組態應用程式
Spring Boot會嘗試根據pom.xml
檔案中指定的依賴關係自動組態應用程式,並設定合理的預設值。這裡還沒有組態任何Consumer,Producer或KafkaTemplate bean,Spring引導將使用spring引導預設值自動組態它們。 這些值可以使用application.yml
屬性檔案重寫。可以找到更多關於Spring Boot Kafka Properties的資訊。
還建立了一個在src/main/resources
檔案夾中的application.yml
屬性檔案。 這些屬性通過spring引導注入到組態類中。
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
app:
topic:
foo: foo.t
logging:
level:
root: WARN
org.springframework.web: INFO
com.yiibai: DEBUG
執行應用程式
在執行這個專案程式之前,需要執行 zookeeper
和 kafka
,如下所示 -
啟動zookeeper
服務 -
D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties
啟動kafka
服務 -
D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties
最後,編寫了一個簡單的Spring Boot應用程式來演示應用程式。使這個演示工作,需要在埠9092
上執行的本地主機上的Kafka伺服器,這是Kafka的預設組態。
package com.yiibai.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
sender.send("Spring Kafka and Spring Boot Configuration Example");
}
範例
當執行應用程式時,應該得到以下輸出。
. ____ _ __ _ _
/\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
\\\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
2018-03-14 11:22:25.177 INFO 2892 --- [ main] com.yiibai.kafka.SpringKafkaApplication : Starting SpringKafkaApplication on MY-PC with PID 2892 (F:\\worksp\\spring-kafka\\springboot-config\\target\\classes started by Administrator in F:\\worksp\\spring-kafka\\springboot-config)
2018-03-14 11:22:25.181 DEBUG 2892 --- [ main] com.yiibai.kafka.SpringKafkaApplication : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
2018-03-14 11:22:25.182 INFO 2892 --- [ main] com.yiibai.kafka.SpringKafkaApplication : No active profile set, falling back to default profiles: default
2018-03-14 11:22:26.869 INFO 2892 --- [ main] com.yiibai.kafka.SpringKafkaApplication : Started SpringKafkaApplication in 2.208 seconds (JVM running for 2.751)
2018-03-14 11:22:26.871 INFO 2892 --- [ main] com.yiibai.kafka.Sender : sending message='Spring Kafka and Spring Boot Configuration Example' to topic='foo.t'
... ...
2018-03-14 11:22:36.035 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.156 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.163 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 12 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.433 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.436 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 14 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:38.559 WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 16 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:38.559 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:40.028 WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:56.203 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : received message='Spring Kafka and Spring Boot Configuration Example'
2018-03-14 11:22:56.205 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_offset: 0
2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false)
2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@68cba188
2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_timestampType: CREATE_TIME
2018-03-14 11:22:56.206 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_receivedMessageKey: null
2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_receivedPartitionId: 0
2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_receivedTopic: foo.t
2018-03-14 11:22:56.207 INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver : kafka_receivedTimestamp: 1520997760772