紀錄檔分段檔案切分包含以下4個條件,滿足其一即可:
log.segment.bytes
設定的值。log.segment.bytes
引數的預設值為 1073741824
,即1GBlog.roll.ms
或log.roll.hours
引數設定的值。如果同時設定了log.roll.ms
和log.roll.hours
引數,那麼log.roll.ms
的優先順序高,預設情況下,只設定了log.roll.hours
引數,其值為168,即7天。log.index.size.max.bytes
設定的值。log.index.size .max.bytes
的預設值為10485760
,即10MBInteger.MAX_VALUE
, 即要追加的訊息的偏移量不能轉變為相對偏移量(offset - baseOffset > Integer.MAX_VALUE)。Controller作為Kafka叢集中的核心元件,它的主要作用是在Apache ZooKeeper的幫助下管理和協調整個Kafka叢集。
Controller與Zookeeper進行互動,獲取與更新叢集中的後設資料資訊。其他broker並不直接與zookeeper進行通訊,而是與Controller進行通訊並同步Controller中的後設資料資訊。
Kafka叢集中每個節點都可以充當Controller節點,但叢集中同時只能有一個Controller節點。
Controller簡單來說,就是kafka叢集的狀態管理者
controller競選機制:簡單說,先來先上!
Broker 在啟動時,會嘗試去 ZooKeeper 中建立 /controller 節點。Kafka 當前選舉控制器的規則是:第一個成功建立 /controller 節點的 Broker 會被指定為控制器。
在Kafka叢集中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責維護整個叢集中所有分割區和副本的狀態及分割區leader的選舉。
當某個分割區的leader副本出現故障時,由控制器負責為該分割區選舉新的leader副本。當檢測到某個分割區的ISR集合發生變化時,由控制器負責通知所有broker更新其後設資料資訊。當使用kafka-topics.sh指令碼為某個topic增加分割區數量時,同樣還是由控制器負責分割區的重新分配。
Kafka中的控制器選舉的工作依賴於Zookeeper,成功競選為控制器的broker會在Zookeeper中建立/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定為1,brokerid表示成為控制器的broker的id編號,timestamp表示競選成為控制器時的時間戳。
在任意時刻,叢集中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取zookeeper上的/controller節點的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經有其它broker節點成功競選為控制器,所以當前broker就會放棄競選;如果Zookeeper中不存在/controller這個節點,或者這個節點中的資料異常,那麼就會嘗試去建立/controller這個節點,當前broker去建立節點的時候,也有可能其他broker同時去嘗試建立這個節點,只有建立成功的那個broker才會成為控制器,而建立失敗的broker則表示競選失敗。每個broker都會在記憶體中儲存當前控制器的brokerid值,這個值可以標識為activeControllerId。
對Zookeeper中的/admin/reassign_partitions節點註冊PartitionReassignmentListener,用來處理分割區重分配的動作。
對Zookeeper中的/isr_change_notification節點註冊IsrChangeNotificetionListener,用來處理ISR集合變更的動作。
對Zookeeper中的/admin/preferred-replica-election節點新增PreferredReplicaElectionListener,用來處理優先副本選舉。
對Zookeeper中的/brokers/topics節點新增TopicChangeListener,用來處理topic增減的變化;
對Zookeeper中的/admin/delete_topics節點新增TopicDeletionListener,用來處理刪除topic的動作
對Zookeeper中的/brokers/ids/節點新增BrokerChangeListener,用來處理broker增減的變化
從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的資訊並進行相應的管理。
對各topic所對應的Zookeeper中的/brokers/topics/[topic]節點新增PartitionModificationsListener,用來監聽topic中的分割區分配變化。並將最新資訊同步給其他所有broker。
使用者端請求建立一個topic時,每一個分割區副本在broker上的分配,是由叢集controller來決定;
結論:裡面會建立出來兩個亂數
第一個亂數確定0號分割區leader的位置,往後1號分割區2號分割區的leader依次往後順延1
第二個亂數確定每個分割區的第一個副本的位置 在leader所在機器上往後順延(亂數+1)臺機器,該臺機器就是第一個副本的位置,剩餘副本依次往後順延1
// 舉例:
// broker_id = 0~19 一共20臺機器
// 分割區數20,副本數10
// 第一個亂數:19
// 第二個亂數:0
(0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8))
(1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
(2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
(3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
(4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
(5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
(6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
(7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16))
(9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17))
(10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18))
(11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19))
(12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0))
(13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1))
(14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2))
(15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3))
(16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4))
(17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5))
(18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6))
(19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))
// 其分佈策略原始碼如下:
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分割區的個數 10
replicationFactor: Int, //副本的個數 5
brokerList: Seq[Int],//broker的集合 8 0~7
fixedStartIndex: Int//預設值是-1 固定開始的索引位置
startPartitionId: Int): Map[Int, Seq[Int]] //預設值是-1 分割區開始的位置
= {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){
nextReplicaShift += 1
}
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1) {
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
副本因子不能大於 Broker 的個數(報錯:Replication factor: 4 larger than available brokers: 3.);
partition_0的第1個副本(leader副本)放置位置是隨機從 brokerList 選擇的;
其他分割區的第1個副本(leader)放置位置相對於paritition_0分割區依次往後移(也就是如果我們有5個 Broker,5個分割區,假設partition0分割區放在broker4上,那麼partition1將會放在broker5上;patition2將會放在broker1上;partition3在broker2,依次類);
各分割區剩餘的副本相對於分割區前一個副本偏移亂數nextReplicaShift+1,然後後面的副本依次加1
分割區 leader 副本的選舉由控制器controller負責具體實施。
當建立分割區(建立主題或增加分割區都有建立分割區的動作)或Leader下線(此時分割區需要選舉一個新的leader上線來對外提供服務)的時候都需要執行 leader 的選舉動作。
選舉策略:按照 ISR集合中副本的順序查詢第一個存活的副本,並且這個副本在 ISR 集合中
一個分割區的AR集合在partition分配的時候就被指定,並且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分割區的 ISR 集合中副本的順序可能會改變;
生產者工作流程圖:
一個生產者使用者端由兩個執行緒協調執行,這兩個執行緒分別為主執行緒和 Sender 執行緒 。
在主執行緒中由kafkaProducer建立訊息,然後通過可能的攔截器、序列化器和分割區器的作用之後快取到訊息累加器(RecordAccumulator, 也稱為訊息收集器)中。
Sender 執行緒負責從RecordAccumulator 獲取訊息並將其傳送到 Kafka 中;
RecordAccumulator主要用來快取訊息以便Sender 執行緒可以批次傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator快取的大小可以通過生產者使用者端引數buffer.memory
設定,預設值為33554432B
,即32M。如果生產者傳送訊息的速度超過傳送到伺服器的速度,則會導致生產者空間不足,這個時候 KafkaProducer.send()方法呼叫要麼被阻塞,要麼丟擲異常,這個取決於引數max.block.ms
的設定,此引數的預設值為60000
,即60秒。
主執行緒中傳送過來的訊息都會被迫加到 RecordAccumulator 的某個雙端佇列( Deque )中,
RecordAccumulator內部為每個分割區都維護了一個雙端佇列,即Deque<ProducerBatch>。
訊息寫入快取時,追加到雙端佇列的尾部;
Sender讀取訊息時,從雙端佇列的頭部讀取。注意:ProducerBatch 是指一個訊息批次;
與此同時,會將較小的 ProducerBatch 湊成一個較大 ProducerBatch ,也可以減少網路請求的次數以提升整體的吞吐量。
ProducerBatch 大小和batch.size
引數也有著密切的關係。當一條訊息(ProducerRecord ) 流入 RecordAccumulator 時,會先尋找與訊息分割區所對應的雙端佇列(如果沒有則新建),再從這個雙端佇列的尾部獲取一個ProducerBatch (如果沒有則新建),檢視 ProducerBatch中是否還可以寫入這個ProducerRecord,如果可以寫入就直接寫入,如果不可以則需要建立一個新的Producer Batch。在新建 ProducerBatch時評估這條訊息的大小是否超過 batch.size
引數大小,如果不超過,那麼就以 batch.size
引數的大小來建立 ProducerBatch。
如果生產者使用者端需要向很多分割區傳送訊息, 則可以將buffer.memory引數適當調大以增加整體的吞吐量。
Sender從 RecordAccumulator 獲取快取的訊息之後,會進一步將<分割區,Deque<Producer Batch>的形式轉變成<Node,List<ProducerBatch>>的形式,其中Node表示Kafka叢集broker節點。對於網路連線來說,生產者使用者端是與具體broker節點建立的連線,也就是向具體的broker節點傳送訊息,而並不關心訊息屬於哪一個分割區;而對於KafkaProducer的應用邏輯而言,我們只關注向哪個分割區中傳送哪些訊息,所以在這裡需要做一個應用邏輯層面到網路I/O層面的轉換。
在轉換成<Node, List<ProducerBatch>>的形式之後, Sender會進一步封裝成<Node,Request> 的形式,這樣就可以將 Request 請求發往各個Node了,這裡的Request是Kafka各種協定請求;
請求在從sender執行緒發往Kafka之前還會儲存到InFlightRequests中,InFlightRequests儲存物件的具體形式為 Map<Nodeld, Deque<request>>,它的主要作用是快取了已經發出去但還沒有收到伺服器端響應的請求(Nodeld 是一個 String 型別,表示節點的 id 編號)。與此同時,InFlightRequests 還提供了許多管理類的方法,並且通過設定引數還可以限制每個連線(也就是使用者端與 Node之間的連線)最多快取的請求數。這個設定引數為 max.in.flight.request.per.connection
,預設值為5,即每個連線最多隻能快取5個未響應的請求,超過該數值之後就不能再向這個連線傳送更多的請求了,除非有快取的請求收到了響應( Response )。通過比較 Deque<Request> 的size與這個引數的大小來判斷對應的 Node中是否己經堆積了很多未響應的訊息,如果真是如此,那麼說明這個 Node 節點負載較大或網路連線有問題,再繼續傳送請求會增大請求超時的可能。
kafka 在 producer 裡面提供了訊息確認機制。我們可以通過設定來決定訊息傳送到對應分割區的幾個副本才算訊息傳送成功。可以在構造producer 時通過acks引數指定(在 0.8.2.X 前是通過 request.required.acks 引數設定的)。這個引數支援以下三種值:
acks = 0:意味著如果生產者能夠通過網路把訊息傳送出去,那麼就認為訊息已成功寫入 kafka 。在這種情況下還是有可能發生錯誤,比如傳送的物件不能被序列化或者網路卡發生故障,但如果是分割區離線或整個叢集長時間不可用,那就不會收到任何錯誤。在 acks=0 模式下的執行速度是非常快的(這就是為什麼很多基準測試都是基於這個模式),你可以得到驚人的吞吐量和頻寬利用率,不過如果選擇了這種模式,大概率會丟失一些訊息。
acks = 1:意味著leader 在收到訊息並把它寫入到分割區資料檔案(不一定同步到磁碟上)時會返回確認或錯誤響應。在這個模式下,如果發生正常的 leader 選舉,生產者會在選舉時收到一個 LeaderNotAvailableException 異常,如果生產者能恰當地處理這個錯誤,它會重試傳送悄息,最終訊息會安全到達新的 leader 那裡。不過在這個模式下仍然有可能丟失資料,比如訊息已經成功寫入 leader,但在訊息被複制到 follower 副本之前 leader發生崩潰。
acks = all(這個和 request.required.acks = -1 含義一樣):意味著 leader 在返回確認或錯誤響應之前,會等待所有同步副本都收到悄息。如果和 min.insync.replicas 引數結合起來,就可以決定在返回確認前至少有多少個副本能夠收到悄息,生產者會一直重試直到訊息被成功提交。不過這也是最慢的做法,因為生產者在繼續傳送其他訊息之前需要等待所有副本都收到當前的訊息。
acks | 含義 |
---|---|
0 | Producer往叢集傳送資料不需要等到叢集的確認資訊,不確保訊息傳送成功。安全性最低但是效率最高。 |
1 | Producer往叢集傳送資料只要 leader成功寫入訊息就可以傳送下一條,只確保Leader 接收成功。 |
-1 或 all | Producer往叢集傳送資料需要所有的ISR Follower 都完成從 Leader 的同步才會傳送下一條,確保 Leader傳送成功和所有的副本都成功接收。安全性最高,但是效率最低。 |
生產者將acks設定為all,是否就一定不會丟資料呢?
否!如果在某個時刻ISR列表只剩leader自己了,那麼就算acks=all,收到這條資料還是隻有一個點;
可以配合另外一個引數緩解此情況: 最小同步副本數>=2
BROKER端引數: min.insync.replicas(預設1)
生產者的ack=all,也不能完全保證資料傳送的100%可靠性
為什麼?因為,如果伺服器端目標partition的同步副本只有leader自己了,此時,它收到資料就會給生產者反饋成功!
可以修改伺服器端的一個引數(分割區最小ISR數[min.insync.replicas]>=2),來避免此問題;
acks是控制kafka伺服器端向生產者應答訊息寫入成功的條件;生產者根據得到的確認資訊,來判斷訊息傳送是否成功;
這個引數用來限制生產者使用者端能傳送的訊息的最大值,預設值為 1048576B ,即 lMB
一般情況下,這個預設值就可以滿足大多數的應用場景了。
這個引數還涉及一些其它引數的聯動,比如 broker 端(topic級別引數)的 message.max.bytes引數(預設1000012),如果設定錯誤可能會引起一些不必要的異常;比如將 broker 端的 message.max.bytes 引數設定為10B ,而 max.request.size引數設定為20B,那麼當傳送一條大小為 15B 的訊息時,生產者使用者端就會報出異常;
retries引數用來設定生產者重試的次數,預設值為2147483647,即在發生異常的時候不進行任何重試動作。
訊息在從生產者發出到成功寫入伺服器之前可能發生一些臨時性的異常,比如網路抖動、 leader 副本的選舉等,這種異常往往是可以自行恢復的,生產者可以通過設定 retries大於0的值,以此通過內部重試來恢復而不是一味地將異常拋給生產者的應用程式。如果重試達到設定的次數,那麼生產者就會放棄重試並返回異常。重試還和另一個引數 retry.backoff.ms 有關,這個引數的預設值為100,它用來設定兩次重試之間的時間間隔,避免無效的頻繁重試 。如果將 retries引數設定為非零值,並且 max .in.flight.requests.per.connection 引數設定為大於1的值,那可能會出現錯序的現象:如果批次1訊息寫入失敗,而批次2訊息寫入成功,那麼生產者會重試傳送批次1的訊息,此時如果批次1的訊息寫入成功,那麼這兩個批次的訊息就出現了錯序。
對於某些應用來說,順序性非常重要 ,比如MySQL binlog的傳輸,如果出現錯誤就會造成非常嚴重的後果;一般而言,在需要保證訊息順序的場合建議把引數max.in.flight.requests.per.connection 設定為1 ,而不是把retries設定為0,不過這樣也會影響整體的吞吐。
這個引數用來指定訊息的壓縮方式,預設值為「none",即預設情況下,訊息不會被壓縮。該引數還可以設定為 "gzip","snappy" 和 "lz4"。對訊息進行壓縮可以極大地減少網路傳輸、降低網路I/O,從而提高整體的效能 。訊息壓縮是一種以時間換空間的優化方式,如果對時延有一定的要求,則不推薦對訊息進行壓縮;
每個Batch要存放batch.size大小的資料後,才可以傳送出去。比如說batch.size預設值是16KB,那麼裡面湊夠16KB的資料才會傳送。理論上來說,提升batch.size的大小,可以允許更多的資料緩衝在recordAccumulator裡面,那麼一次Request傳送出去的資料量就更多了,這樣吞吐量可能會有所提升。但是batch.size也不能過大,要是資料老是緩衝在Batch裡遲遲不傳送出去,那麼傳送訊息的延遲就會很高。一般可以嘗試把這個引數調節大些,利用生產環境發訊息負載測試一下。
這個引數用來指定生產者傳送 ProducerBatch 之前等待更多訊息( ProducerRecord )加入
ProducerBatch 時間,預設值為0。生產者使用者端會在ProducerBatch填滿或等待時間超過linger.ms 值時傳送出去。
增大這個引數的值會增加訊息的延遲,但是同時能提升一定的吞吐量。
冪等性,就是一個操作重複做,也不會影響最終的結果!
int a = 1;
a++; // 非冪等操作
val map = new HashMap()
map.put(「a」,1); // 冪等操作
在kafka中,同一條訊息,生產者如果多次重試傳送,在伺服器中的結果如果還是隻有一條,這就是具備冪等性;否則,就不具備冪等性!
用來指定分割區器,預設:org.apache.kafka.internals.DefaultPartitioner
預設分割區器的分割區規則:
自定義partitioner需要實現org.apache.kafka.clients.producer.Partitioner介面
會觸發rebalance(消費者)的事件可能是如下任意一種:
將分割區的消費權從一個消費者移到另一個消費者稱為再均衡(rebalance),如何rebalance也涉及到分割區分配策略。
kafka有兩種的分割區分配策略:range(預設) 和 roundrobin(新版本中又新增了另外2種)
我們可以通過partition.assignment.strategy
引數選擇 range 或 roundrobin。
partition.assignment.strategy
引數預設的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
例1:
假設有TOPIC_A有5個分割區,由3個consumer(C1,C2,C3)來消費;5/3得到商1,餘2,則每個消費者至少分1個分割區,前兩個消費者各多1個分割區C1: 2個分割區,C2:2個分割區,C3:1個分割區
接下來,就按照「區間」進行分配:
TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_A_3 TOPIC_A-4
C1: TOPIC_A-0, TOPIC_A-1
C2 : TOPIC_A-2, TOPIC_A_3
C3: TOPIC_A-4
例2:
假設TOPIC_A有5個分割區,TOPIC_B有3個分割區,由2個consumer(C1,C2)來消費
5/2得到商2,餘1,則C1有3個分割區,C2有2個分割區,得到結果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4
3/2得到商1,餘1,則C1有2個分割區,C2有1個分割區,得到結果
C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
以上述「例2」來舉例:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3
C3 TOPIC_A-4
對應的類叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特點:
再均衡的過程中,還是會讓各消費者先取消自身的分割區,然後再重新分配(只不過是分配過程中會盡量讓原來屬於誰的分割區依然分配給誰)
對應的類叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特點:
消費組在消費資料的時候,有兩個角色進行組內的各事務的協調;
角色1: Group Coordinator (組協調器) 位於伺服器端(就是某個broker)
組協調器的定位:
coordinator在我們組記偏移量的__consumer_offsets分割區的leader所在broker上
查詢Group Coordinator的方式:
先根據消費組groupid的hashcode值計算它應該所在__consumer_offsets 中的分割區編號; 分割區數
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount為__consumer_offsets的分割區總數,這個可以通過broker端引數offset.topic.num.partitions來設定,預設值是50;
找到對應的分割區號後,再尋找此分割區leader副本所在broker節點,則此節點即為自己的Grouping Coordinator;
角色2: Group Leader (組長) 位於消費端(就是消費組中的某個消費者)
組長的定位:隨機選的哦!!!
每個消費組在伺服器端對應一個GroupCoordinator其進行管理,GroupCoordinator是Kafka伺服器端中用於管理消費組的元件。
消費者使用者端中由ConsumerCoordinator元件負責與GroupCoordinator行互動;
ConsumerCoordinator和GroupCoordinator最重要的職責就是負責執行消費者rebalance操作
如果想控制消費者在發生再均衡時執行一些特定的工作,可以通過訂閱主題時註冊「再均衡監聽器」來實現;
場景舉例:在發生再均衡時,處理消費位移
如果A消費者消費掉的一批訊息還沒來得及提交offset,而它所負責的分割區在rebalance中轉移給了B消費者,則有可能發生資料的重複消費處理。此情形下,可以通過再均衡監聽器做一定程度的補救;
程式碼範例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
/**
* 消費組再均衡觀察
*/
public class ConsumerDemo2 {
public static void main(String[] args) {
//1.建立kafka的消費者物件,附帶著把組態檔搞定
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.訂閱主題(確定需要消費哪一個或者多個主題)
//我現在想看看如果我的消費者組裡面,多了一個消費者或者少了一個消費者,他有沒有給我做再均衡
consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
/**
* 這個方法是將原來的分配情況全部取消,或者說把所有的分割區全部回收了
* 這個全部取消很噁心,原來的消費者消費的好好的,他一下子就給他全部停掉了
* @param collection
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("我原來的均衡情況是:"+collection + "我已經被回收了!!");
}
/**
* 這個方法是當上面的分配情況全部取消以後,呼叫這個方法,來再次分配,這是在均衡分配後的情況
* @param collection
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
System.out.println("我是重新分配後的結果:"+collection);
}
});
while (true){
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
}
}
}
CAP理論作為分散式系統的基礎理論,它描述的是一個分散式系統在以下三個特性中:
最多滿足其中的兩個特性。也就是下圖所描述的。分散式系統要麼滿足CA,要麼CP,要麼AP。無法同時滿足CAP。
分割區容錯性:指的分散式系統中的某個節點或者網路分割區出現了故障的時候,整個系統仍然能對外提供滿足一致性和可用性的服務。也就是說部分故障不影響整體使用。事實上我們在設計分散式系統時都會考慮到bug,硬體,網路等各種原因造成的故障,所以即使部分節點或者網路出現故障,我們要求整個系統還是要繼續使用的(不繼續使用,相當於只有一個分割區,那麼也就沒有後續的一致性和可用性了)
可用性:一直可以正常的做讀寫操作。簡單而言就是使用者端一直可以正常存取並得到系統的正常響應。使用者角度來看就是不會出現系統操作失敗或者存取超時等問題。
一致性:在分散式系統完成某寫操作後任何讀操作,都應該獲取到該寫操作寫入的那個最新的值。相當於要求分散式系統中的各節點時時刻刻保持資料的一致性。
Kafka 作為一個商業級訊息中介軟體,資料可靠性和可用性是優先考慮的重點,兼顧資料一致性;
參考檔案:https://www.cnblogs.com/lilpig/p/16840963.html
Kafka 0.11.0.0 版本開始引入了冪等性與事務這兩個特性,以此來實現 EOS ( exactly once semantics ,精確一次處理語意)
生產者在進行傳送失敗後的重試時(retries),有可能會重複寫入訊息,而使用 Kafka冪等性功能之後就可以避免這種情況。
開啟冪等性功能,只需要顯式地將生產者引數 enable.idempotence
設定為 true (預設值為 false):
props.put("enable.idempotence",true);
在開啟冪等性功能時,如下幾個引數必須正確設定:
如有違反,則會丟擲ConfigException異常
;
producer.send(「aaa」)
訊息aaa就擁有了一個唯一的序列號, 如果這條訊息傳送失敗,producer內部自動重試(retry),此時序列號不變;
producer.send(「bbb」)
訊息bbb擁有一個新的序列號
注意:kafka只保證producer單個對談中的單個分割區冪等;
主要原理:
開始事務-->傳送一個ControlBatch訊息(事務開始)
提交事務-->傳送一個ControlBatch訊息(事務提交)
放棄事務-->傳送一個ControlBatch訊息(事務終止)
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// acks
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
// 生產者的重試次數
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
// 飛行中的請求快取最大數量
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
// 開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
// 設定事務id
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");
事務控制的程式碼模板
// 初始化事務
producer.initTransaction( )
// 開啟事務
producer.beginTransaction( )
// 幹活
// 提交事務
producer.commitTransaction( )
// 異常回滾(放棄事務) catch裡面
producer.abortTransaction( )
消費者api是會拉取到尚未提交事務的資料的;只不過可以選擇是否讓使用者看到!
是否讓使用者看到未提交事務的資料,可以通過消費者引數來設定:
isolation.level=read_uncommitted(預設值)
isolation.level=read_committed
使用者的程式,要從kafka讀取源資料,資料處理的結果又要寫入kafka
kafka能實現端到端的事務控制(比起上面的「基礎」事務,多了一個功能,通過producer可以將consumer的消費偏移量繫結到事務上提交)
producer.sendOffsetsToTransaction(offsets,consumer_id)
為了實現事務,應用程式必須提供唯一transactional.id,並且開啟生產者的冪等性
properties.put ("transactional.id","transactionid00001");
properties.put ("enable.idempotence",true);
「消費kafka-處理-生產結果到kafka」典型場景下的程式碼結構範例:
package com.doit.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Exercise_kafka2kafka {
public static void main(String[] args) {
Properties props = new Properties();
//消費者的
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shouwei");
//自動提交偏移量
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//寫生產者的一些屬性
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//設定ack 開啟冪等性必須設定的三個引數
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
//開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
//開啟事務
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"id_fro_39_19");
//消費資料
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//初始化事務
producer.initTransactions();
//訂閱主題
consumer.subscribe(Arrays.asList("eventlog"));
while (true){
//拉取資料
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
try {
//開啟事務
producer.beginTransaction();
for (ConsumerRecord<String, String> record : poll) {
String value = record.value();
//將value的值寫入到另外一個topic中
producer.send(new ProducerRecord<String,String>("k2k",value));
}
producer.flush();
//提交偏移量
consumer.commitAsync();
//提交事務
producer.commitTransaction();
} catch (ProducerFencedException e) {
//放棄事務
producer.abortTransaction();
}
}
}
}
使用Zero-Copy (零拷貝)技術來進一步提升效能;
零拷貝是指將資料直接從磁碟檔案複製到網路卡裝置中,而不需要經由應用程式之手;
零拷貝大大提高了應用程式的效能,減少了核心和使用者模式之間的上下文切換;對於Linux系統而言,零拷貝技術依賴於底層的 sendfile( )方法實現;對應於Java 語言,FileChannal.transferTo( )方法的底層實現就是 sendfile( )方法;