Kafka詳解

2022-10-14 06:01:15

Kafka介紹

  Kafka是最初由Linkedin公司開發,是一個分散式、支援分割區的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批次處理系統、低延遲的實時系統、Storm/Spark流式處理引擎,web/nginx紀錄檔、存取紀錄檔,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。

  實際上算作是分散式的流處理平臺,具備訊息中間間的功能,在巨量資料領域作為流計算的平臺,也會做訊息分發。

 

Kafka常見的使用場景

  【1】紀錄檔收集:一個公司可以用Kafka收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。

  【2】訊息系統:解耦和生產者和消費者、快取訊息等。

  【3】使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉儲中做離線分析和挖掘。

  【4】運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告。

 

Kafka基本概念

  【1】kafka是一個分散式的,分割區的訊息(官方稱之為commit log)服務。它提供一個訊息系統應該具備的功能,但是確有著獨特的設計。可以這樣來說,Kafka借鑑了JMS規範的思想,但是確並沒有完全遵循JMS規範。

  【2】基礎的訊息(Message)相關術語:

名稱
解釋
Broker
訊息中介軟體處理節點,一個Kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka叢集
Topic
Kafka根據topic對訊息進行歸類,釋出到Kafka叢集的每條訊息都需要指定一個topic
Producer
訊息生產者,向Broker傳送訊息的使用者端
Consumer
訊息消費者,從Broker讀取訊息的使用者端
ConsumerGroup
每個Consumer屬於一個特定的Consumer Group,一條訊息可以被多個不同的Consumer Group消費,但是一個Consumer Group中只能有一個Consumer能夠消費該訊息
Partition
物理上的概念,一個topic可以分為多個partition,每個partition內部訊息是有序的

  【3】從宏觀層面上看,producer通過網路傳送訊息到Kafka叢集,然後consumer來進行消費,如下圖:

       

  【4】伺服器端(brokers)和使用者端(producer、consumer)之間通訊通過TCP協定來完成。

 

kafka基本使用(原生API)

建立主題 

  【1】建立一個名字為「test」的Topic,這個topic只有一個partition,並且備份因子也設定為1:

bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 1 --topic test

  【2】通過以下命令來檢視kafka中目前存在的topic

bin/kafka-topics.sh --list --zookeeper 192.168.65.60:2181

  【3】除了通過手工的方式建立Topic,當producer釋出一個訊息到某個指定的Topic,這個Topic如果不存在,就自動建立。

  【4】刪除主題

bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.65.60:2181

 

傳送訊息

  【1】kafka自帶了一個producer命令使用者端,可以從本地檔案中讀取內容,或者我們也可以以命令列中直接輸入內容,並將這些內容以訊息的形式傳送到kafka叢集中。在預設情況下,每一個行會被當做成一個獨立的訊息。

  【2】執行釋出訊息的指令碼,然後在命令中輸入要傳送的訊息的內容:

//指定往哪個broker(也就是伺服器)上發訊息
bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092 --topic test >this is a msg >this is a another msg

 

消費訊息

  【1】對於consumer,kafka同樣也攜帶了一個命令列使用者端,會將獲取到內容在命令中進行輸出,預設是消費最新的訊息:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --topic test   

  【2】想要消費之前的訊息可以通過--from-beginning引數指定,如下命令:

//這裡便凸顯了與傳統訊息中介軟體的不同,消費完,訊息依舊保留(預設保留在磁碟一週)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test

  【3】通過不同的終端視窗來執行以上的命令,你將會看到在producer終端輸入的內容,很快就會在consumer的終端視窗上顯示出來。

  【4】所有的命令都有一些附加的選項;當我們不攜帶任何引數執行命令的時候,將會顯示出這個命令的詳細用法

執行bin/kafka-console-consumer.sh 命令顯示所有的可選引數

 

消費訊息型別分析

消費多主題

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --whitelist "test|test-2"

單播消費

  【1】一條訊息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組裡即可

  【2】分別在兩個使用者端執行如下消費命令,然後往主題裡傳送訊息,結果只有一個使用者端能收到訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092  --consumer-property group.id=testGroup --topic test

多播消費

  【1】一條訊息能被多個消費者消費的模式,類似publish-subscribe模式費,針對Kafka同一條訊息只能被同一個消費組下的某一個消費者消費的特性,要實現多播只要保證這些消費者屬於不同的消費組即可。我們再增加一個消費者,該消費者屬於testGroup-2消費組,結果兩個使用者端都能收到訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup-2 --topic test 

檢視消費組名

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --list 

檢視消費組的消費偏移量

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --describe --group testGroup

//current-offset:當前消費組的已消費偏移量
//log-end-offset:主題對應分割區訊息的結束偏移量(HW)
//lag:當前消費組未消費的訊息數

 

主題Topic和訊息紀錄檔Log詳解

  【1】可以理解Topic是一個類別的名稱,同類訊息傳送到同一個Topic下面。對於每一個Topic,下面可以有多個分割區(Partition)紀錄檔檔案:【分散式儲存的思想】

                    

  【2】Partition是一個有序的message序列,這些message按順序新增到一個叫做commit log的檔案中。每個partition中的訊息都有一個唯一的編號,稱之為offset,用來唯一標示某個分割區中的message。 

  【3】每個partition,都對應一個commit log檔案。一個partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。

  【4】kafka一般不會刪除訊息,不管這些訊息有沒有被消費。只會根據設定的紀錄檔保留時間(log.retention.hours)確認訊息多久被刪除,預設保留最近一週的紀錄檔訊息。kafka的效能與保留的訊息資料量大小沒有關係,因此儲存大量的資料訊息紀錄檔資訊不會有什麼影響。

  【5】每個consumer是基於自己在commit log中的消費進度(offset)來進行工作的。在kafka中,消費offset由consumer自己來維護;一般情況下我們按照順序逐條消費commit log中的訊息,當然我可以通過指定offset來重複消費某些訊息,或者跳過某些訊息。

  【6】這意味kafka中的consumer對叢集的影響是非常小的,新增一個或者減少一個consumer,對於叢集或者其他consumer來說,都是沒有影響的,因為每個consumer維護各自的消費offset。

 

建立多個分割區的主題:

bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 2 --topic test1

檢視下topic的情況

bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test1

 

 以下是輸出內容的解釋,第一行是所有分割區的概要資訊,之後的每一行表示每一個partition的資訊。

leader節點負責給定partition的所有讀寫請求。
replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是」leader「,甚至這個節點掛了,也會列出。
isr 是replicas的一個子集,它只列出當前還存活著的,並且已同步備份了該partition的節點。

 

我們可以執行相同的命令檢視之前建立的名稱為」test「的topic
bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test
之前設定了topic的partition數量為1,備份因子為1,因此顯示就如上所示了。
可以進入kafka的資料檔案儲存目錄檢視test和test1主題的訊息紀錄檔檔案:
訊息紀錄檔檔案主要存放在分割區資料夾裡的以log結尾的紀錄檔檔案裡,如下是test1主題對應的分割區0的訊息紀錄檔:

 

當然我們也可以通過如下命令增加topic的分割區數量(目前kafka不支援減少分割區):

bin/kafka-topics.sh -alter --partitions 3 --zookeeper 192.168.65.60:2181 --topic test

 

理解Topic,Partition和Broker

  【1】一個topic,代表邏輯上的一個業務資料集,比如按資料庫裡不同表的資料操作訊息區分放入不同topic,訂單相關操作訊息放入訂單topic,使用者相關操作訊息放入使用者topic,對於大型網站來說,後端資料都是海量的,訂單訊息很可能是非常巨量的,比如有幾百個G甚至達到TB級別,如果把這麼多資料都放在一臺機器上可定會有容量限制問題,那麼就可以在topic內部劃分多個partition來分片儲存資料,不同的partition可以位於不同的機器上,每臺機器上都執行一個Kafka的程序Broker。

為什麼要對Topic下資料進行分割區儲存?

  【1】commit log檔案會受到所在機器的檔案系統大小的限制,分割區之後可以將不同的分割區放在不同的機器上,相當於對資料做了分散式儲存理論上一個topic可以處理任意數量的資料

  【2】為了提高並行度

 

叢集消費

  【1】log的partitions分佈在kafka叢集中不同的broker上,每個broker可以請求備份其他broker上partition上的資料。kafka叢集支援設定一個partition備份的數量。

  【2】針對每個partition,都有一個broker起到「leader」的作用,0個或多個其他的broker作為「follwers」的作用。leader處理所有的針對這個partition的讀寫請求,而followers被動複製leader的結果,不提供讀寫(主要是為了保證多副本資料與消費的一致性)。如果這個leader失效了,其中的一個follower將會自動的變成新的leader。

Producers

  【1】生產者將訊息傳送到topic中去,同時負責選擇將message傳送到topic的哪一個partition中。通過round-robin做簡單的負載均衡。也可以根據訊息中的某一個關鍵字來進行區分。通常第二種方式使用的更多。

Consumers

  【1】傳統的訊息傳遞模式有2種:佇列( queue) 和(publish-subscribe)

queue模式:多個consumer從伺服器中讀取資料,訊息只會到達一個consumer。
publish-subscribe模式:訊息會被廣播給所有的consumer。

  【2】Kafka基於這2種模式提供了一種consumer的抽象概念:consumer group。

queue模式:所有的consumer都位於同一個consumer group 下。
publish-subscribe模式:所有的consumer都有著自己唯一的consumer group。

  

  上圖說明:由2個broker組成的kafka叢集,某個主題總共有4個partition(P0-P3),分別位於不同的broker上。這個叢集由2個Consumer Group消費, A有2個consumer instances ,B有4個。

  通常一個topic會有幾個consumer group,每個consumer group都是一個邏輯上的訂閱者( logical subscriber )。每個consumer group由多個consumer instance組成,從而達到可延伸和容災的功能。

 

消費順序

  【1】一個partition同一個時刻在一個consumer group中只能有一個consumer instance在消費,從而保證消費順序。

  【2】consumer group中的consumer instance的數量不能比一個Topic中的partition的數量多,否則,多出來的consumer消費不到訊息

  【3】Kafka只在partition的範圍內保證訊息消費的區域性順序性,不能在同一個topic中的多個partition中保證總的消費順序性。

  【4】如果有在總體上保證消費順序的需求,那麼我們可以通過將topic的partition數量設定為1,將consumer group中的consumer instance數量也設定為1,但是這樣會影響效能,所以kafka的順序消費很少用。

 

Java使用者端存取Kafka

引入maven依賴

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.4.1</version>
</dependency>

訊息傳送端程式碼

public class MsgProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        /* *********************************設定部分******************************************************/
        Properties props = new Properties();
        //叢集架構存取叢集,防止單節點故障發不出去
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
         /*
         發出訊息持久化機制引數
        (1)acks=0: 表示producer不需要等待任何broker確認收到訊息的回覆,就可以繼續傳送下一條訊息。效能最高,但是最容易丟訊息。(海量資料紀錄檔的話推薦這個,丟些訊息其實並不影響)
        (2)acks=1: 至少要等待leader已經成功將資料寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續傳送下一條訊息。這種情況下,如果follower沒有成功備份資料,而此時leader又掛掉,則訊息會丟失。
        (一般場景都可以用在這個,適中,也是預設值) (3)acks=-1或all: 需要等待 min.insync.replicas(預設為1,推薦設定大於等於2,即多少個節點寫入成功即可) 這個引數設定的副本個數都成功寫入紀錄檔,這種策略 會保證只要有一個備份存活就不會丟失資料。這是最強的資料保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種設定。(金融場景,對訊息十分敏感,不允許丟訊息)
*/ props.put(ProducerConfig.ACKS_CONFIG, "1"); /* 傳送失敗會重試,重試次數設定,預設重試間隔100ms,重試能保證訊息傳送的可靠性,但是也可能造成訊息重複傳送,比如網路抖動,所以需要在接收者那邊做好訊息接收的冪等性處理。 */ props.put(ProducerConfig.RETRIES_CONFIG, 3); //重試間隔設定 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
     //因為kafka並不是採取,你寫一條,我就發一條的操作,而是在本機申請了,一塊緩衝記憶體。你寫入一條,優先是寫到了本地緩衝記憶體中
     //然後使用者端還會有一個執行緒不斷的從這個本地緩衝記憶體中拿資料放入,傳送緩衝區(batch緩衝區【16KB】),拉滿就會傳送。(這也是效能為什麼會高的原因之一,還會壓縮資料)
//設定傳送訊息的本地緩衝區,如果設定了該緩衝區,訊息會先傳送到本地緩衝區,可以提高訊息傳送效能,預設值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); /* kafka本地執行緒會從緩衝區取資料,批次傳送到broker, 設定批次傳送訊息的大小,預設值是16384,即16kb,就是說一個batch滿了16kb就傳送出去 */ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); /* 預設值是0,意思就是訊息必須立即被傳送,但這樣會影響效能 一般設定10毫秒左右,就是說這個訊息傳送完後會進入原生的一個batch,如果10毫秒內,這個batch滿了16kb就會隨batch一起被傳送出去 如果10毫秒內,batch沒滿,那麼也必須把訊息傳送出去,不能讓訊息的傳送延遲時間太長 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 10); //把傳送的key從字串序列化為位元組陣列 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把傳送訊息value從字串序列化為位元組陣列 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); /* *********************************使用部分******************************************************/ int msgNum = 5; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); //用於非同步的展示 for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); /* *********************************指定分割區與不指定分割區*****************************************/ //指定傳送分割區 /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/ //未指定傳送分割區,具體傳送的分割區計算公式:hash(key)%partitionNum ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order)); /* *********************************同步傳送與非同步傳送(傳送完後面還有邏輯優先非同步)*****************************************/ //等待訊息傳送成功的同步阻塞方法 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式傳送訊息結果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); //非同步回撥方式傳送訊息 /*producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("傳送訊息失敗:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("非同步方式傳送訊息結果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } countDownLatch.countDown(); } });*/ //業務... } countDownLatch.await(5, TimeUnit.SECONDS); producer.close(); } } @Data public class Order { private Integer orderId; private Integer productId; private Integer productNum; private Double orderAmount; public Order(Integer orderId, Integer productId, Integer productNum, Double orderAmount) { super(); this.orderId = orderId; this.productId = productId; this.productNum = productNum; this.orderAmount = orderAmount; } }

  部分說明:

    【1】如果不指定分割區:

    【2】如果設定了重試引數,kafka2.4.1版本如何保證冪等機制:

 

訊息接收端程式碼

public class MsgConsumer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
        // 消費分組名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        // 是否自動提交offset,預設就是true
        /*props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自動提交offset的間隔時間
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");*/
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        /*
        當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那麼應該如何消費
        latest(預設) :只消費自己啟動之後傳送到主題的訊息
        earliest:第一次從頭開始消費,以後按照消費offset記錄繼續消費,這個需要區別於consumer.seekToBeginning(每次都從頭開始消費)
        */
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        /*
        consumer給broker傳送心跳的間隔時間,broker接收到心跳如果此時有rebalance發生會通過心跳響應將
        rebalance方案下發給consumer,這個時間可以稍微短一點,預設3s
        */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        /*
        伺服器端broker多久感知不到一個consumer心跳就認為他故障了,會將其踢出消費組,對應的Partition也會被重新分配給其他consumer,預設是10秒
        */
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);


        //一次poll最大拉取訊息的條數,如果消費者處理速度很快,可以設定大點,如果處理速度一般,可以設定小點,預設500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        /*
        如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,會將其踢出消費組,將分割區分配給別的consumer消費
     有種適者生存的感覺,把處理能力弱的(如2核4G比4核8G弱,而且沒在限定時間內來拿任務)驅逐。【要麼檢查處理過程是否能夠優化縮短時間,要麼調整一次性獲取的條數,要麼增大間隔時間(間隔時間預設的一般不建議調整)】
*/ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 消費指定分割區 //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //訊息回溯消費 /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ //指定offset消費 /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
     //指定從TOPIC_NAME的第10條開始消費 consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
*/ //從指定時間點開始消費 /*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); //從1小時前開始消費 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>();
     //將topic的所有partition拿出來 for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); }
     //尋找每個partition的符合時間節點的offset Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
     //針對每個partition進行消費 for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); //根據消費裡的timestamp確定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
*/ while (true) { /* * poll() API 是拉取訊息的長輪詢 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到訊息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 手動同步提交offset,當前執行緒會阻塞直到offset提交成功 // 一般使用同步提交,因為提交之後一般也沒有什麼邏輯程式碼了 //consumer.commitSync(); // 手動非同步提交offset,當前執行緒提交offset不會阻塞,可以繼續處理後面的程式邏輯 /*consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } } });*/ } } } }

  部分說明:

    【1】如果設定為 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); 自動提交:

      一般需要設定提交時間 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); 【如1秒後提交】,但是由於不可預測到業務完成需要多久時間(假設5s),那麼在處理過程中會出現伺服器宕機的情況,導致訊息丟失。如果出現業務執行很快,在0.5s的時候就已經執行業務完成,但是在0.8s的時候伺服器宕機,會造成訊息已經消費了,但是中介軟體不知道(又會發給第二個消費者消費),導致訊息重複消費。【所以自動提交不太可取

    【2】如果設定為 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 手動提交:

      那麼會有兩個選擇:同步提交與非同步提交。

    【3】訊息回溯消費的機制是怎麼實現的:

      因為kafka的訊息儲存在log檔案裡面,而且對應的還會有index與timeindex(可以加快對於訊息的檢索),根據設定給予的offset可以快速定位到是哪個log檔案,因為檔名就是offset偏移值。快速拿出資料就可以進行消費了。此外根據時間回溯也是一樣不過量會更大一點。

    【4】針對已經存在的tipoc,如果有新的消費組加入,預設是將當前tipoc的最後offset傳給消費組,作為其已消費的記錄。故,如果是要它來幫忙處理訊息的,要設定為props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ,這個消費組如果是已經存在的,那麼這個引數其實不會變動已有的offset。預設處理巨量資料量的應該採用latest。業務場景則用earliest。

 

Spring Boot整合Kafka

引入spring boot kafka依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml設定如下:

spring:
  kafka:
    bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
    producer: # 生產者
      retries: 3 # 設定大於0的值,則使用者端會將傳送失敗的記錄重新傳送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定訊息key和訊息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
      # RECORD
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後提交
      # BATCH
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
      # TIME
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
      # COUNT
      # TIME | COUNT 有一個條件滿足時提交
      # COUNT_TIME
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後, 手動呼叫Acknowledgment.acknowledge()後提交
      # MANUAL
      # 手動呼叫Acknowledgment.acknowledge()後立即提交
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE

 

傳送者程式碼:

@RestController
public class KafkaController {

    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
    }

}

 

消費者程式碼:

@Component
public class MyConsumer {

    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同組下的消費者個數,就是並行消費數,必須小於等於分割區總數
     * @param record
     */
    @KafkaListener(topics = "my-replicated-topic",groupId = "zGroup")
    public void listenZGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手動提交offset
        //ack.acknowledge();
    }

    //設定多個消費組
    /*@KafkaListener(topics = "my-replicated-topic",groupId = "tuGroup")
    public void listenTuGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        ack.acknowledge();
    }*/
}

 

Kafka設計原理詳解

Kafka核心總控制器Controller

  在Kafka叢集中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責管理整個叢集中所有分割區和副本的狀態。

當某個分割區的leader副本出現故障時,由控制器負責為該分割區選舉新的leader副本。
當檢測到某個分割區的ISR集合發生變化時,由控制器負責通知所有broker更新其後設資料資訊。
當使用kafka-topics.sh指令碼為某個topic增加分割區數量時,同樣還是由控制器負責讓新分割區被其他節點感知到

 

Controller選舉機制

  【1】在kafka叢集啟動的時候,會自動選舉一臺broker作為controller來管理整個叢集,選舉的過程是叢集中每個broker都會嘗試在zookeeper上建立一個 /controller 臨時節點,zookeeper會保證有且僅有一個broker能建立成功,這個broker就會成為叢集的總控器controller。

  【2】當這個controller角色的broker宕機了,此時zookeeper臨時節點會消失,叢集裡其他broker會一直監聽這個臨時節點,發現臨時節點消失了,就競爭再次建立臨時節點,就是我們上面說的選舉機制,zookeeper又會保證有一個broker成為新的controller。

  【3】具備控制器身份的broker需要比其他普通的broker多一份職責,具體細節如下:

1.監聽broker相關的變化。為Zookeeper中的/brokers/ids/節點新增BrokerChangeListener,用來處理broker增減的變化。
2.監聽topic相關的變化。為Zookeeper中的/brokers/topics節點新增TopicChangeListener,用來處理topic增減的變化;為Zookeeper中的/admin/delete_topics節點新增TopicDeletionListener,用來處理刪除topic的動作。
3.從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的資訊並進行相應的管理。對於所有topic所對應的Zookeeper中的/brokers/topics/[topic]節點新增PartitionModificationsListener,用來監聽topic中的分割區分配變化。
4.更新叢集的後設資料資訊,同步到其他普通的broker節點中。

 

Partition副本選舉Leader機制

  【1】controller感知到分割區leader所在的broker掛了(controller監聽了很多zk節點可以感知到broker存活),controller會從ISR列表(引數unclean.leader.election.enable=false的前提下)裡挑第一個broker作為leader(第一個broker最先放進ISR列表,可能是同步資料最多的副本)【這種會阻塞直到ISR列表有資料】,如果引數unclean.leader.election.enable為true,代表在ISR列表裡所有副本都掛了的時候可以在ISR列表以外的副本中選leader,這種設定,可以提高可用性,但是選出的新leader有可能資料少很多。【其實就是知道/broker/ids/下面的資料沒了】

  【2】副本進入ISR列表有兩個條件:

1.副本節點不能產生分割區,必須能與zookeeper保持對談以及跟leader副本網路連通
2.副本能複製leader上的所有寫操作,並且不能落後太多。(與leader副本同步滯後的副本,是由 replica.lag.time.max.ms 設定決定的,超過這個時間都沒有跟leader同步過的一次的副本會被移出ISR列表)

 

消費者消費訊息的offset記錄機制

  【1】每個consumer會定期將自己消費分割區的offset提交給kafka內部topic:__consumer_offsets,提交過去的時候,key是consumerGroupId+topic+分割區號,value就是當前offset的值,kafka會定期清理topic裡的訊息,最後就保留最新的那條資料。【相當於記錄了這個消費組在這個topic的某分割區上消費到了哪

  【2】因為__consumer_offsets可能會接收高並行的請求,kafka預設給其分配50個分割區(可以通過offsets.topic.num.partitions設定),這樣可以通過加機器的方式抗大並行。

  【3】通過如下公式可以選出consumer消費的offset要提交到__consumer_offsets的哪個分割區

公式:hash(consumerGroupId)  %  __consumer_offsets主題的分割區數

  【4】早期這個就是記錄在zookeeper中,但是並行度不高,所以才轉到了broker中。

 

消費者Rebalance機制(再平衡機制)

  【1】rebalance就是說如果消費組裡的消費者數量有變化或消費的分割區數有變化,kafka會重新分配消費者消費分割區的關係。比如consumer group中某個消費者掛了,此時會自動把分配給他的分割區交給其他的消費者,如果他又重啟了,那麼又會把一些分割區重新交還給他。

  【2】注意:rebalance只針對subscribe這種不指定分割區消費的情況,如果通過assign這種消費方式指定了分割區,kafka不會進行rebanlance。

  【3】如下情況可能會觸發消費者rebalance

1.消費組裡的consumer增加或減少了
2.動態給topic增加了分割區
3.消費組訂閱了更多的topic

  【4】rebalance過程中,消費者無法從kafka消費訊息,這對kafka的TPS會有影響,如果kafka叢集內節點較多,比如數百個,那重平衡可能會耗時極多,所以應儘量避免在系統高峰期的重平衡發生。

 

消費者Rebalance分割區分配策略:

  【1】主要有三種rebalance的策略:range、round-robin、sticky。

  【2】Kafka 提供了消費者使用者端引數partition.assignment.strategy 來設定消費者與訂閱主題之間的分割區分配策略。預設情況為range分配策略

  【3】假設一個主題有10個分割區(0-9),現在有三個consumer消費:

    1)range策略就是按照分割區序號排序,假設 n=分割區數/消費者數量 = 3, m=分割區數%消費者數量 = 1,那麼前 m 個消費者每個分配 n+1 個分割區,後面的(消費者數量-m )個消費者每個分配 n 個分割區。比如分割區0~3給一個consumer,分割區4~6給一個consumer,分割區7~9給一個consumer。

    2)round-robin策略就是輪詢分配,比如分割區0、3、6、9給一個consumer,分割區1、4、7給一個consumer,分割區2、5、8給一個consumer。

    3)sticky策略初始時分配策略與round-robin類似,但是在rebalance的時候,需要保證如下兩個原則。

1)分割區的分配要儘可能均勻 。
2)分割區的分配儘可能與上次分配的保持相同。

      當兩者發生衝突時,第一個目標優先於第二個目標 。這樣可以最大程度維持原來的分割區分配的策略。比如對於第一種range情況的分配,如果第三個consumer掛了,那麼重新用sticky策略分配的結果如下:

consumer1除了原有的0~3,會再分配一個7
consumer2除了原有的4~6,會再分配8和9

 

Rebalance過程

  【1】當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷以下幾個階段。圖示過程:

  

 

  【2】第一階段:選擇組協調器

    組協調器GroupCoordinator:每個consumer group都會選擇一個broker作為自己的組協調器coordinator,負責監控這個消費組裡的所有消費者的心跳,以及判斷是否宕機,然後開啟消費者rebalance。

    consumer group中的每個consumer啟動時會向kafka叢集中的某個節點傳送 FindCoordinatorRequest 請求來查詢對應的組協調器GroupCoordinator,並跟其建立網路連線。

    組協調器選擇方式

      consumer消費的offset要提交到__consumer_offsets的哪個分割區,這個分割區leader對應的broker就是這個consumer group的coordinator

  【3】第二階段:加入消費組JOIN GROUP

    在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 傳送 JoinGroupRequest 請求,並處理響應。然後GroupCoordinator 從一個consumer group中選擇第一個加入group的consumer作為leader(消費組協調器),把consumer group情況傳送給這個leader,接著這個leader會負責制定分割區方案。

  【4】第三階段( SYNC GROUP)

    consumer leader通過給GroupCoordinator傳送SyncGroupRequest,接著GroupCoordinator就把分割區方案下發給各個consumer【心跳的時候】,他們會根據指定分割區的leader broker進行網路連線以及訊息消費。

 

producer釋出訊息機制剖析

  【1】寫入方式

    producer 採用 push 模式將訊息釋出到 broker,每條訊息都被 append 到 patition 中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。

  【2】訊息路由

    producer 傳送訊息到 broker 時,會根據分割區演演算法選擇將其儲存到哪一個 partition。其路由機制為:

1. 指定了 patition,則直接使用;
2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
3. patition 和 key 都未指定,使用輪詢選出一個 patition。

  【3】寫入流程

    如圖:

    

    說明:

1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
2. producer 將訊息傳送給該 leader
3. leader 將訊息寫入本地 log
4. followers 從 leader pull 訊息,寫入本地 log 後 向leader 傳送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK

 

HW與LEO詳解

  【1】HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO(log-end-offset)作為HW,consumer最多隻能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的訊息,consumer不能立刻消費,leader會等待該訊息被所有ISR中的replicas同步後更新HW,此時訊息才能被consumer消費。這樣就保證瞭如果leader所在的broker失效,該訊息仍然可以從新選舉的leader中獲取。對於來自內部broker的讀取請求,沒有HW的限制。

  【2】如圖詳細的說明了當producer生產訊息至broker後,ISR以及HW和LEO的流轉過程:

  

 

  【3】故,Kafka的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上,同步複製要求所有能工作的follower都複製完,這條訊息才會被commit,這種複製方式極大的影響了吞吐率。而非同步複製方式下,follower非同步的從leader複製資料,資料只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複製完,落後於leader時,突然leader宕機,則會丟失資料。而Kafka的這種使用ISR的方式則很好的均衡了確保資料不丟失以及吞吐率。再回顧下訊息傳送端對發出訊息持久化機制引數acks的設定,我們結合HW和LEO來看下acks=1的情況。

  【4】結合HW和LEO看下 acks=1的情況:

  

 

ISR機制

  【1】概念:

AR(Assigned Repllicas)一個partition的所有副本(就是replica,不區分leader或follower)
ISR(In-Sync Replicas)能夠和 leader 保持同步的 follower + leader本身 組成的集合。
OSR(Out-Sync Relipcas)不能和 leader 保持同步的 follower 集合
公式:AR = ISR + OSR

 

  【2】副本資料同步機制

   

 

  【3】Kafka 選擇了第二種方案,原因如下:

同樣為了容忍 n 臺節點的故障,第一種方案需要 2n+1 個副本,而第二種方案只需要 n+1 個副本,而 Kafka 的每個分割區都有大量的資料,第一種方案會造成大量資料的冗餘。
雖然第二種方案的網路延遲會比較高,但網路延遲對 Kafka 的影響較小。

 

  【4】ISR踢出replica

# 預設10000 即 10秒
replica.lag.time.max.ms
# 允許 follower 副本落後 leader 副本的訊息數量,超過這個數量後,follower 會被踢出 ISR
replica.lag.max.messages

  【5】總結:leader會維持一個與其保持同步的replica集合,該集合就是ISR,每一個leader partition都有一個ISR,leader動態維護, 要保證kafka不丟失message,就要保證ISR這組集合存活(至少有一個存活),並且訊息commit成功,Partition leader 保持同步的 Partition Follower 集合, 當 ISR 中的Partition Follower ,完成資料的同步之後,就會給 leader 傳送 ack 如果Partition follower長時間(replica.lag.time.max.ms) 未向leader同步資料,則該Partition Follower將被踢出ISR,Partition Leader 發生故障之後,就會從 ISR 中選舉新的 Partition Leader。當replica重新追上了leader,OSR中的replica就會重新加入ISR中

  【6】問題點:Kafka對外依然可以聲稱是完全同步,但是承諾是對AR中的所有replica完全同步了嗎?並沒有。Kafka只保證對ISR集合中的所有副本保證完全同步。至於,ISR到底有多少個follower,那不知道!Kafka是一定會保證leader接收到的訊息完全同步給ISR中的所有副本。而最壞的情況下,ISR中只剩leader自己。故,才會有副本選舉Leader機制中的兩種不同的方式

 

紀錄檔分段儲存

  【1】Kafka 一個分割區的訊息資料對應儲存在一個資料夾下,以topic名稱+分割區號命名,訊息在分割區內是分段(segment)儲存,每個段的訊息都儲存在不一樣的log檔案裡,這種特性方便old segment file快速被刪除,kafka規定了一個段位的 log 檔案最大為 1G,做這個限制目的是為了方便把 log 檔案載入到記憶體去操作:

# 部分訊息的offset索引檔案,kafka每次往分割區發4K(可設定)訊息就會記錄一條當前訊息的offset到index檔案,(有點類似於MYSQL的索引,B+樹的快速定位)
# 如果要定位訊息的offset會先在這個檔案裡快速定位,再去log檔案裡找具體訊息
00000000000000000000.index
# 訊息儲存檔案,主要存offset和訊息體
00000000000000000000.log
# 訊息的傳送時間索引檔案,kafka每次往分割區發4K(可設定)訊息就會記錄一條當前訊息的傳送時間戳與對應的offset到timeindex檔案,
# 如果需要按照時間來定位訊息的offset,會先在這個檔案裡查詢
00000000000000000000.timeindex

//另一份紀錄檔段檔案
00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex
//又是一份紀錄檔段檔案
00000000000009936472.index
00000000000009936472.log
00000000000009936472.timeindex

  【2】這個 9936472 之類的數位,就是代表了這個紀錄檔段檔案裡包含的起始 Offset,也就說明這個分割區裡至少都寫入了接近 1000 萬條資料了。

  【3】Kafka Broker 有一個引數,log.segment.bytes,限定了每個紀錄檔段檔案的大小,最大就是 1GB

  【4】一個紀錄檔段檔案滿了,就自動開一個新的紀錄檔段檔案來寫入,避免單個檔案過大,影響檔案的讀寫效能,這個過程叫做 log rolling,正在被寫入的那個紀錄檔段檔案,叫做 active log segment。

 

zookeeper節點資料圖:

       

kafka紀錄檔平臺架構(圖示)