當kafka分割區不能再增加的情況下,使用多執行緒提升kafka消費能力(附原始碼)

2020-09-25 11:01:12

前兩天csdn提醒我又多了一個粉絲,又激發了寫作的動力,不要點開看我的粉絲數,哈哈!      

正常情況下,kafka的消費執行緒資料是分割區(patition)一對一,單個patition是kafka並行操作的最小單元,kafka只允許單個partition的資料被一個consumer執行緒消費,例如我們做20個分割區,實際上就對應著20個消費執行緒,當我們做一些活動的時候,就會有發生訊息量猛增,而我們的消費執行緒有限,處理訊息的能力有可能跟不上,導致大量的訊息堆積處理不完。

這時我們可能就需求要優化,加大處理能力,多數人可能會想到增加分割區,分割區是可以增加,但是不可能一直無限向上增加,我們這裡參用多執行緒的方案。

package com.imcbb;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;

/**
 * @author kevin
 * Date 2020-09-24
 * Time 09:43
 */
@Service
public class KafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);


    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new ThreadFactoryBuilder().setNameFormat("KThread-%d").build(),
            (r, executor) -> {
                logger.warn("Ops,Rejected!");
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
//                        new ThreadPoolExecutor.CallerRunsPolicy()

    );

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> cr) {

        executor.execute(() -> {
            logger.info("---------" + cr.toString());
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

    }
}

在上面的程式碼中,我們建立了一個執行緒池來消費一個分割區的訊息,這裡有兩個需要特別注意的點:

1.執行緒池裡存放task的佇列這裡我們使用了SynchronousQueue阻塞佇列,這個佇列很有趣,是一個無容量佇列,具體有興趣的大家再上網查這裡不做過多的解釋,使用SynchronousQueue是因為,如果系統發生異常宕機或者應用發版重新啟動時防止佇列堆積的訊息丟失(當然這也無法完全避免線上程工作中,系統發生異常導致執行緒掛掉,如果要求可用性更高,可考慮先存入資料庫,redis中,再做補償處理機制)。

2.當訊息過多,執行緒池也忙不過時,我們這裡有兩種處理辦法

2.1 讓監聽消費執行緒阻塞,使用new ThreadPoolExecutor.CallerRunsPolicy(),此時訊息將不會再消費

2.1 自定義拒絕策略,把任務再放回去

(r, executor) -> {
    logger.warn("Ops,Rejected!");
    try {
        executor.getQueue().put(r);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

以上兩種方案都可以解決問題,選一種使用即可,兩種方案都無法保證處理訊息的順序。

使用第二種有一個好處,就是可以通過紀錄檔檢視消費端的一個消費能力,看看有沒有進到拒絕裡,來適當的調整執行緒數。

以上就是多執行緒消費的辦法,這個在我們生產環境經過考驗的,哈哈!

文章簡單帖了些程式碼,完整寫了一個demo給大家,可以自行下載參考:https://github.com/kevinmails/kafka-consumer-demo

前提要在本機上安裝一下kafka,官方有安裝手冊(最小可用版本,本地測試夠用了),如果大家看官方留言搞不定,留言給我,我再寫一篇安裝文章。官方有安裝手冊:https://kafka.apache.org/quickstart

希望對大家有幫助!

參考:https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/