Spring消費者和生產者


本教學演示了如何傳送和接收來自Spring Kafka的訊息。 首先建立一個能夠傳送訊息給Kafka主題的Spring Kafka Producer。 接下來,我們建立一個Spring Kafka Consumer,它可以收聽傳送給Kafka主題的訊息。使用適當的鍵/值序列化器和解串器來組態它們。 最後用一個簡單的Spring Boot應用程式演示應用程式。

下載並安裝Apache Kafka

要下載並安裝Apache Kafka,請閱讀官方文件( https://kafka.apache.org/quickstart )。 本教學假設伺服器使用預設組態啟動,並且沒有更改伺服器埠。

Maven依賴

這個專案中,使用Apache Maven來管理專案依賴關係。 確保以下依賴關係在類路徑中。pom.xml 檔案的內容如下所示 -

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yiibai.spring.kafka</groupId>
    <artifactId>producer-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>https://www.tw511.com/kafka</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>

</project>

Spring Kafka傳送訊息到主題

使用ProducerKafkaTemplate類傳送訊息,並提供將資料傳送到Kafka主題的高階操作。 提供非同步和同步方法,非同步方法返回Future

package com.yiibai.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

為了能成功地傳送訊息給Kafka主題,我們需要組態KafkaTemplate。 此組態由SenderConfig類處理。

使用ProducerFactory的實現來組態KafkaTemplate,更具體地說是使用DefaultKafkaProducerFactory。可以使用Map <String,Object>來初始化這個生產者工廠,從ProducerConfig類獲取的鍵。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG指定用於建立到Kafka叢集的初始連線的主機/埠對列表。 用戶端將使用所有伺服器,而不管這裡指定哪些伺服器用於引導/此列表僅影響用於發現全套伺服器的初始主機。 此列表應採用host1:port1host2:port2...的形式。由於這些伺服器僅用於初始連線以發現完整叢集成員資格(可能會動態更改),因此此列表不需包含完整集合 的伺服器(不過,如果伺服器停機,可能需要多個伺服器)。

  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG指定實現org.apache.kafka.common.serialization.Serializer介面的鍵的序列化程式類。

  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG指定實現org.apache.kafka.common.serialization.Serializer介面的值的序列化程式類。

有關組態選項的完整列表,請檢視ProducerConfig類(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html )。

package com.yiibai.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Spring Kafka監聽來自主題的訊息

接下來,我們將演示如何監聽來自Kafka主題的訊息。 Receiver類將使用來自Kafka主題的訊息。 建立Listen()方法並使用@KafkaListener註解對其進行了注釋,該注釋將方法標記為指定主題上的Kafka訊息偵聽器的目標。

package com.yiibai.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        LOG.info("received message='{}'", message);
    }

}

該機制需要在@Configuration類和偵聽器容器工廠之一上使用@EnableKafka註解,該工廠用於組態底層ConcurrentMessageListenerContainer

使用SenderConfig類中使用的相同型別的鍵/值反序列化器是非常重要的。

  • ConsumerConfig.GROUP_ID_CONFIG指定一個唯一的字串,標識此使用者所屬的使用者組。
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定在Kafka中沒有初始偏移量或當前偏移量不再存在於伺服器上(例如,因為該資料已被刪除)時要執行的操作:
    • earliest: 自動將偏移重置為最早的偏移量
    • latest: 自動將偏移量重置為最新的偏移量
    • none: 如果未找到消費者組的前一個偏移量,則向消費者丟擲異常
    • anything else: 向消費者丟擲異常。

消費者用消費者組名稱標記自己,並且發布到主題的每個記錄都被傳送到每個訂閱消費者組中的一個消費者範例。 消費者範例可以在單獨的進程中或在單獨的機器上。

如果所有消費者範例具有相同的消費者組,則記錄將有效地在消費者範例上進行負載均衡。 如果所有消費者範例具有不同的消費者組,則每個記錄將被廣播到所有消費者進程。

有關組態選項的完整列表,請檢視ConsumerConfig類(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html )。

package com.yiibai.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

使用application.yml組態應用程式

建立一個在src/main/resources檔案夾中的application.yml屬性檔案。 這些屬性通過spring引導注入到組態類中。

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.yiibai: DEBUG

執行應用程式

最後,編寫一個簡單的Spring Boot應用程式來演示應用程式。 為了使這個演示工作,需要在埠9092上執行的本地主機上的Kafka伺服器,這是Kafka的預設組態。

在執行這個專案程式之前,需要執行 zookeeper 和 kafka ,如下所示 -

啟動zookeeper服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties

啟動kafka服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties

應用程式的實現 -

package com.yiibai.kafka;

import com.yiibai.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerConsumerApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka Producer and Consumer Example");
    }
}

當我們執行應用程式時,應該會得到類似下面的輸出。


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

2018-03-14 14:40:41.454  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Starting ProducerConsumerApplication on MY-PC with PID 9740 (F:\worksp\spring-kafka\producer-consumer\target\classes started by Administrator in F:\worksp\spring-kafka\producer-consumer)
2018-03-14 14:40:41.458 DEBUG 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
2018-03-14 14:40:41.458  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : No active profile set, falling back to default profiles: default
2018-03-14 14:40:47.512  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Started ProducerConsumerApplication in 6.567 seconds (JVM running for 7.084)
2018-03-14 14:40:47.514  INFO 9740 --- [           main] com.yiibai.kafka.producer.Sender         : sending message='Spring Kafka Producer and Consumer Example' to topic='foo.t'
2018-03-14 14:40:49.413  INFO 9740 --- [ntainer#0-0-C-1] com.yiibai.kafka.consumer.Receiver       : received message='Spring Kafka Producer and Consumer Example'