kafka生產者作為訊息傳送中很重要的一環,這裡面可是大有文章,你知道生產者訊息傳送的流程嗎?知道訊息是如何發往哪個分割區的嗎?如何保證生產者訊息的可靠性嗎?如何保證訊息傳送的順序嗎?如果對於這些問題還比較模糊的話,那麼很有必要看看這篇文章了,本文主要是基於kafka3.x版本講解。
kafka生產者最重要的就是訊息傳送的整個流程,我們來看下究竟是怎麼一回事把。
在訊息傳送的過程中,涉及到了兩個執行緒——main
執行緒和 Sender
執行緒。在 main
執行緒中建立了一個雙端佇列 RecordAccumulator
。main
執行緒將訊息傳送給 RecordAccumulator
,Sender
執行緒不斷從 RecordAccumulator
中拉取訊息傳送到 Kafka Broker
。
kafkaProducer
建立訊息,然後通過可能的攔截器、序列化器和分割區器的作用之後快取到訊息累加器(RecordAccumulator
, 也稱為訊息收集器)中。Sender
執行緒負責從 RecordAccumulator
獲取訊息並將其傳送到 Kafka
中。RecordAccumulator
主要用來快取訊息以便 Sender
執行緒可以批次傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator
快取的大小可以通過生產者使用者端引數 buffer.memory
設定,預設值為 33554432B
,即 32M
。RecordAccumulator
的某個雙端佇列( Deque
)中,RecordAccumulator
內部為每個分割區都維護了一個雙端佇列,即 Deque<ProducerBatch>
, 訊息寫入快取時,追加到雙端佇列的尾部。Sender
讀取訊息時,從雙端佇列的頭部讀取。ProducerBatch
是指一個訊息批次;與此同時,會將較小的 ProducerBatch
湊成一個較大 ProducerBatch
,也可以減少網路請求的次數以提升整體的吞吐量。ProducerBatch
大小可以通過batch.size
控制,預設16kb
。Sender
執行緒會在有資料積累到batch.size
,預設16kb,或者如果資料遲遲未達到batch.size
,Sender
執行緒等待linger.ms
設定的時間到了之後就會獲取資料。linger.ms
單位ms
,預設值是0ms
,表示沒有延遲。Sender
從 RecordAccumulator
獲取快取的訊息之後,會將資料封裝成網路請求<Node,Request>
的形式,這樣就可以將 Request
請求發往各個 Node
了。sender
執行緒發往 Kafka
之前還會儲存到 InFlightRequests
中,它的主要作用是快取了已經發出去但還沒有收到伺服器端響應的請求。InFlightRequests
預設每個分割區下最多快取5個請求,可以通過設定引數為max.in.flight.request.per. connection
修改。Request
通過通道Selector
傳送到kafka
節點。acks
.Leader
收到資料後應答。Request
請求接受到kafka的響應結果,如果成功的話,從InFlightRequests
清除請求,否則的話需要進行重發操作,可以通過設定項retries
決定,當訊息傳送出現錯誤的時候,系統會重發訊息。retries
表示重試次數。預設是 int 最大值,2147483647
。RecordAccumulator
中的資料。現在我們來看看kafka生產者中常用且關鍵的設定引數。
bootstrap.servers
生產者連線叢集所需的 broker 地 址 清 單 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092
,可以設定 1 個或者多個,中間用逗號隔開。注意這裡並非需要所有的 broker 地址,因為生產者從給定的 broker
裡查詢到其他 broker
資訊。
key.serializer
和 value.serializer
指定傳送訊息的 key 和 value 的序列化型別。一定要寫全類名。
buffer.memory
RecordAccumulator
緩衝區總大小,預設 32m。
batch.size
緩衝區一批資料最大值,預設 16k。適當增加該值,可以提高吞吐量,但是如果該值設定太大,會導致資料傳輸延遲增加。
linger.ms
如果資料遲遲未達到 batch.size
,kafka等待這個時間之後就會傳送資料。單位 ms,預設值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms
之間。
max.request.size
這個引數用來限制生產者使用者端能傳送的訊息的最大值,預設值為 1048576B ,即 lMB 一般情況下,這個預設值就可以滿足大多數的應用場景了。
compression.type
這個引數用來指定訊息的壓縮方式,預設值為「none
",即預設情況下,訊息不會被壓縮。該引數還可以設定為 "gzip
","snappy
" 和 "lz4
"。對訊息進行壓縮可以極大地減少網路傳輸、降低網路 I/O,從而提高整體的效能 。訊息壓縮是一種以時間換空間的優化方式,如果對時延有一定的要求,則不推薦對訊息進行壓縮;
acks
acks
的值為0,1和-1或者all。
Producer
往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。Producer
往叢集傳送資料只要 Leader
成功寫入訊息就可以傳送下一條,只確保 Leader
接收成功。Producer
往叢集傳送資料需要所有的 ISR Follower
都完成從 Leader
的同步才會傳送下一條,確保Leader 傳送
成功和所有的副本都成功接收。安全性最高,但是效率最低。max.in.flight.requests.per.connection
允許最多沒有返回 ack 的次數,預設為 5,開啟冪等性要保證該值是 1-5 的數位。
retries
和retry.backoff.ms
當訊息傳送出現錯誤的時候,系統會重發訊息。retries
表示重試次數。在kafka3.4.0預設是 int 最大值,2147483647
。如果設定了重試,還想保證訊息的有序性,需要設定max.in.flight.requests.per.connection
=1否則在重試此失敗訊息的時候,其他的訊息可能傳送成功了。另外retry.backoff.ms
控制兩次重試之間的時間間隔,預設是 100ms。
更多kafka生產者的設定可以查閱官網https://kafka.apache.org/documentation/#producerconfigs
。
通常情況下,生產者傳送訊息分為以下4個步驟:
(1)設定生產者使用者端引數及建立相應的生產者範例
(2)構建待傳送的訊息
(3)傳送訊息
(4)關閉生產者範例
我們直接上程式碼。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.0</version>
</dependency>
public static void main(String[] args) {
// 1. 建立 kafka 生產者的設定物件
Properties properties = new Properties();
// 2. 給 kafka 設定物件新增設定資訊:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 建立 kafka 生產者物件
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 呼叫 send 方法,傳送訊息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i));
}
// 5. 關閉資源
kafkaProducer.close();
}
ProducerRecord
kafka傳送時主要構造出ProducerRecord
物件,包含傳送的主題,partition,key,value等。
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
kafka提供了3種傳送訊息的模式,發後即忘,同步傳送和非同步傳送,我們直接上程式碼。
fire-and-forget
)發後即忘,它只管往 Kafka 傳送,並不關心訊息是否正確到達。 在大多數情況下,這種傳送方式沒有問題。 不過在某些時候(比如發生不可重試異常時)會造成訊息的丟失。 這種傳送方式的效能最高,可靠性最差。
Future<RecordMetadata> send = producer.send(rcd);
sync
****)只需在上面種傳送方式的基礎上,再呼叫一下 get()方法即可,該方法時阻塞的。
// 同步傳送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
async
****)回撥函數會在 producer
收到 ack
時呼叫,為非同步呼叫,該方法有兩個引數,分別是 RecordMetadata
和Exception
,如果 Exception
為 null
,說明訊息傳送成功,如果 Exception
不為 null
,說明訊息傳送失敗。
注意:訊息傳送失敗會自動重試,不需要我們在回撥函數中手動重試。
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 沒有異常,輸出資訊到控制檯
System.out.println(" 主題: " +
metadata.topic() + "->" + "分割區:" + metadata.partition());
} else {
// 出現異常列印
exception.printStackTrace();
}
}
});
}
kafka設計上存在分割區的,它有下面兩個好處:
Partition
在一個Broker
上儲存,可以把海量的資料按照分割區切割成一塊一塊資料儲存在多臺Broker
上。合理控制分割區的任務,可以實現負載均衡的效果。那究竟生產者是按照什麼樣的策略發往到不同的分割區呢?
根據生產者的傳送流程,其中會經過分割區器,預設情況下是使用DefaultPartitioner
,具體邏輯如下:
kafka
傳送訊息的時候構造訊息物件ProducerRecord
,可以傳入指定的partition
, 那麼訊息就會傳送這個指定的分割區。例如partition=0,所有資料寫入分割區0。
// 傳送訊息到0號分割區
kafkaProducer.send(new
ProducerRecord<>("first", 0, Integer.toString(i), "hello " + i));
partition
值但有key
的情況下,將key
的hash
值與topic
的partition
數進行取餘得到partition
值;例如:key1
的hash
值=5, key2
的hash
值=6 ,topic
的partition
數=2,那麼key1
對應的value1
寫入1號分割區,key2
對應的value2
寫入0號分割區。
partition
值又沒有key
值的情況下,Kafka採用Sticky Partition
(黏性分割區器),會隨機選擇一個分割區,並儘可能一直使用該分割區,待該分割區的batch
已滿或者已完成,Kafka
再隨機一個分割區進行使用(和上一次的分割區不同)。例如:第一次隨機選擇0號分割區,等0號分割區當前批次滿了(預設16k)或者linger.ms
設定的時間到, Kafka
再隨機一個分割區進行使用(如果還是0會繼續隨機)。
如果預設的分割區規則不滿足需求,我們也可以自定義一個分割區器。比如我們實現一個分割區器實現,傳送過來的資料中如果包含 alvin
,就發往 0 號分割區,不包含 alvin
,就發往 1 號分割區。
Partitioner
/**
* 1. 實現介面 Partitioner
* 2. 實現 3 個方法:partition,close,configure
* 3. 編寫 partition 方法,返回分割區號
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取訊息
String msgValue = value.toString();
// 建立 partition
int partition;
// 判斷訊息是否包含 alvin
if (msgValue.contains("alvin")){
partition = 0;
}else {
partition = 1;
}
// 返回分割區號
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
// 新增自定義分割區器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.alvin.kafka.producer.MyPartitioner");
// 傳送訊息 略~~
對比著前面kafka生產者的傳送流程,kafka生產者提供的一些設定引數可以有助於提高生產者的吞吐量。
引數名稱 | 描述 |
---|---|
buffer.memory |
RecordAccumulator 緩衝區總大小,預設 32m。適當增加該值,可以提高吞吐量。 |
batch.size |
緩衝區一批資料最大值,預設 16k。適當增加該值,可以提高吞吐量,但是如果該值設定太大,會導致資料傳輸延遲增加。 |
linger.ms |
如果資料遲遲未達到 batch.size ,sender 執行緒等待 linger.time 之後就會傳送資料。單位 ms,預設值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms 之間。 |
compression.type |
指定訊息的壓縮方式,預設值為「none ",即預設情況下,訊息不會被壓縮。該引數還可以設定為 "gzip ","snappy " 和 "lz4 "。對訊息進行壓縮可以極大地減少網路傳輸、降低網路 I/O,從而提高整體的效能 。 |
為了保證訊息傳送的可靠性,kafka
在 producer
裡面提供了訊息確認機制。我們可以通過設定來決 定訊息傳送到對應分割區的幾個副本才算訊息傳送成功。可以在定義 producer
時通過 acks
引數指定。
生產者傳送過來的資料,不需要等資料落盤應答。
生產者傳送過來的資料,Leader
收到資料後應答。
生產者傳送過來的資料,Leader
和ISR
佇列裡面的所有節點收齊資料後應答。
ISR
概念:(同步副本)。每個分割區的leader
會維護一個ISR
列表,ISR
列表裡面就是follower
副本 的Borker
編 號 , 只 有 跟 得 上Leader
的follower
副 本 才 能 加 入 到ISR
裡 面 , 這 個 是 通 過replica.lag.time.max.ms
=30000(預設值)引數設定的,只有ISR
裡的成員才有被選為leader
的可能。
如果Leader
收到資料,所有Follower
都開始同步資料,但有一個Follower
,因為某種故障,遲遲不能與Leader
進行同步,那這個問題怎麼解決呢?
Leader
維護了一個動態的in-sync replica set
(ISR
),意為和Leader
保持同步的Follower+Leader
集合(leader:0,isr:0,1,2)
。如果Follower
長時間未向Leader
傳送通訊請求或同步資料,則該Follower
將被踢出ISR
。該時間閾值由replica.lag.time.max.ms
引數設定,預設30s
。
小結:資料完全可靠條件 = ACK
級別設定為-1 + 分割區副本大於等於2 + ISR裡應答的最小副本數量大於等於2。
acks=0
,生產者傳送過來資料就不管了,可靠性差,效率高;acks=1
,生產者傳送過來資料Leader
應答,可靠性中等,效率中等;acks=-1或者all
,生產者傳送過來資料Leader
和ISR
佇列裡面所有Follwer
應答,可靠性高,效率低;在生產環境中,acks=0
很少使用;acks=1
,一般用於傳輸普通紀錄檔,允許丟個別資料;acks=-1
,一般用於傳輸和錢相關的資料,對可靠性要求比較高的場景。
kafka作為分散式訊息系統,難免會出現重複訊息或者丟訊息的情況,會存在3種資料傳遞語意。
ack級別設定為0, 可以保證資料不重複,但是不能保證資料不丟失, 所以叫做最多一次。
ack級別設定為-1 + 分割區副本大於等於2 + ISR
裡應答的最小副本數量大於等於2可能會出現至少一次的訊息。比如下圖中在傳送過程Leader節點宕機,訊息就會重試,就有可能出現訊息的重複。
At Least Once
可以保證資料不丟失,但是不能保證資料不重複。
對於一些非常重要的資訊,比如和錢相關的資料,要求資料既不能重複也不丟失。這在kafka中可以通過冪等性和事務的特性實現。
精確一次(Exactly Once) = 冪等性 + 至少一次( ack=-1 + 分割區副本數>=2 + ISR最小副本數量>=2) 。
冪等性,簡單來說,就是一個操作重複做,每次的結果都一樣。開啟冪等性功能,引數enable.idempotence
設定為 true即可,在3.x版本中預設情況下也是true。具體實現原理如下:
producer
在初始化時會生成一個 producer_id
,併為每個目標 partition
維護一個「序列號」。producer
每傳送一條訊息,會將<producer_id
,分割區>對應的「序列號」加 1。broker
伺服器端端會為每一對<producer_id,分割區>
維護一個序列號,對於每收到的一條訊息,會判斷伺服器端 的 SN_old
和接收到的訊息中的 SN_new
進行對比:SN_OLD+1
= SN_NEW
,正常情況 SN_old+1
>SN_new
,說明是重複寫入的資料,直接丟棄SN_old+1
<SN_new
,說明中間有資料尚未寫入,或者是發生了亂序,或者是資料丟失,將丟擲嚴重異常:OutOfOrderSequenceException
。根據前面的生產者傳送流程可以知道,要想保證訊息投遞的順序性:
max.in.flight.requests.per.connection=1
max.in.flight.requests.per.connection
需要設定為1。max.in.flight.requests.per.connection
需要設定小於等於5。因為在kafka1.x以後,啟用冪等後,kafka伺服器端會快取producer
發來的最近5個request
的後設資料,故無論如何,都可以保證最近5個request
的資料都是有序的。
本文總結了kafka生產者整個訊息傳送的流程,只有明白了這個流程以後,那麼我們對於一些生產者訊息傳送的一些問題才有更加深刻的理解。
歡迎關注個人公眾號【JAVA旭陽】交流學習
本文來自部落格園,作者:JAVA旭陽,轉載請註明原文連結:https://www.cnblogs.com/alvinscript/p/17422784.html