5種kafka消費端效能優化方法

2022-09-21 18:00:26
摘要:帶你瞭解基於FusionInsight HD&MRS的5種kafka消費端效能優化方法。

本文分享自華為雲社群《FusionInsight HD&MRSkafka消費端效能優化方法》,作者: 穿夾克的壞猴子。

kafka消費端效能優化主要從下面幾個方面優化:

1.介面使用方面優化:

舊版本highlevel-consumer:偏移量資訊儲存在zookeeper,最大消費執行緒數與分割區數量相同,不推薦

舊版本simpleconsumer:自行選擇儲存偏移量的方式,可以實現多執行緒消費單分割區,若無特殊的效能要求,不推薦

新版本highlevel-consumer:偏移量資訊儲存在kafka指定的topic中,預設情況下最大消費執行緒數與分割區數量相同,可以實現多執行緒消費單分割區,推薦

2.引數調優(以下引數需根據現網環境評估調至合適的值):

2.1 舊版本消費者(kafka old API)引數調優

fetch.message.max.bytes:該引數為一次性從kafka叢集中獲取的資料塊大小。在升級到651版本後這個引數需要調大,否則容易出現獲取資料限制的報錯。建議調整大小不小於kafka的伺服器端引數message.max.bytes。

注意如何確認為舊版本:如果生產者的設定方式包含如下這些設定,則為舊版本:group.id/zookeeper.connect

2.2 新版本引數(kafka new API)引數調優

max.poll.records:意味消費者一次poll()操作,能夠獲取的最巨量資料量,調整這個值能提升吞吐量,於此同時也需要同步提升max.poll.interval.ms的引數大小。

fetch.max.bytes:意味server端可返回給consumer的最巨量資料大小,增加可以提升吞吐量,但是在使用者端和伺服器端網路延遲比較大的環境下,建議可以減小該值,防止業務處理資料超時。

heartbeat.interval.ms:消費超時時間,consumer與kafka之間的超時時間,該引數不能超過session.timeout.ms,通常設定為session.timeout.ms的三分之一,預設值:3000。

max.partition.fetch.bytes:限制每個consumer發起fetch請求時候,讀到資料(record)的限制,設定過大,consumer本地快取的資料就會越多,可能影響記憶體的使用,預設值:1048576。

fetch.max.bytes:server端可返回給consumer的最巨量資料大小,數值可大於max.partition.fetch.bytes,一般設定為預設值即可,預設值:52428800

session.timeout.ms:使用consumer組管理offset時,consumer與broker之間的心跳超時時間,如果consumer消費資料的頻率非常低,建議增大這個引數值,預設值:10000。

auto.offset.reset:消費過程中無法找到資料消費到的offset位置,所選擇的消費策略,earliest:從頭開始消費,可能會消費到重複資料,latest:從資料末尾開始消費,可能會丟失資料。預設值:earlist。

max.poll.interval.ms:消費者在每一輪poll() (拉取資料之間的最大時間延遲),如果此超時時間期滿之前poll()沒有被再次呼叫,則消費者被視為失敗,並且分組將觸發rebalance,以便將分割區重新分配給別的成員。

如果,再兩次poll之間需要新增過多複雜的,耗時的邏輯,需要延長這個時間,預設值:300s。

max.poll.records:消費者一次poll()操作,能夠獲取的最巨量資料量,增加這個引數值,會增加一次性拉取資料的資料量,確保拉取資料的時間,至少在max.poll.interval.ms規定的範圍之內,預設值:500。

2.3 Simpleconsumer引數調優

simpleconsumer在初始化階段需要傳一個fetchsize的引數,比如:consumer=new SimpleConsumer(leaderBroker,a_port,100000,64*1024,clientName)中64*1024,該參數列示simpleconsumer一次性獲取的資料大小,如果該值過大則可能會導致request時間過長,使用過程中應該降低這個值,保證消費頻率。

使用SimpleConsumer的核心需求是:多執行緒消費單個分割區,以達到提升效能的要求,如果沒有這樣需求,不建議使用這個這種消費方式

3.消費端頻繁rebalance導致效能下降調優:

3.1因業務處理能力不足導致的:

session.timout.ms控制心跳超時時間。

heartbeat.interval.ms控制心跳傳送頻率,建議該值不超過session.timout.ms的三分之一。

max.poll.interval.ms控制每次poll的間隔,時間=獲取資料的時間+處理資料的時間,如果max.poll.records設定的值在max.poll.interval.ms指定的時間內沒有處理完成會觸發rebalance,這裡給出一個相對較為合理的設定,建議在預計的處理時間的基礎上再加1分鐘。

max.poll.records 每個批次處理的資料條數,預設為500條。如果處理能力較低,建議可以減小這個值。

3.2 非正常消費者頻繁的存取kafka叢集導致頻繁rebalance:

收集kafka-request.log,檢視異常的topic有哪些使用者端節點在消費,cat kafka-request.* | grep 「topic=topicName」 | grep 「apikey=FETCH」 | awk –F’from connection’ ‘{print $2}’ | awk –F’;’ ‘{print $1}’ | awk –F’-’ ‘{print $2}’ | awk –F’:’ ‘{print $1}’ | sort | uniq –c | sort -nr ,找出不應該產生消費行為的節點,停止異常節點上消費者

4.版本引發效能下降優化

FI 8.0.2版本之前kafka SimpleAclAuthorizer鑑權異常導致效能下降,8.0.2版本在使用非安全埠(21005或者9092埠)時會出現叢集效能下降的問題,表現:kafka-root.log中出現大量ExitcodeException:id:Default#Principal:no such user報錯。

解決辦法:升級到FI 8023以上版本。

臨時規避辦法:業務側使用21007埠存取kafka,去掉鑑權外掛即allow.everyone.if.no.acl.found=true,將以下kafka伺服器端設定置為空:authorizer.class.name=。

5.FI 6513~6516版本的核心問題引發的效能異常

6513版本在kafka引入社群的的lazy index功能後,在新的segment建立的過程中可能會導致並行建立失敗的問題,常見的報錯(server.log中)如以下兩種型別:

(1)java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code;

(2)java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index;

當出現以上兩種型別的報錯的時候可以斷定是版本問題導致,問題預警如:https://support.huawei.com/enterprise/zh/bulletins-product/ENEWS2000007844;
解決方案:升級到6517版本以上版本或者打入緊急修補程式:https://support.huawei.com/enterprise/zh/cloud-computing/fusioninsight-hd-pid-21110924/software/251482609?idAbsPath=fixnode01%7C7919749%7C7941815%7C19942925%7C250430185%7C21110924;

臨時規避方案:重啟異常的broker範例。

 

點選關注,第一時間瞭解華為雲新鮮技術~