將伺服器新增到Kafka叢集很容易,只需為其分配唯一的代理ID,然後在新伺服器上啟動Kafka。但是,不會為這些新伺服器自動分配任何資料分割區,因此,除非將分割區移至它們,否則在建立新主題之前它們將不會做任何工作。因此,通常在將計算機新增到群集時,您將需要將一些現有資料遷移到這些計算機。
資料遷移過程是手動啟動的,但完全自動化。在幕後,Kafka會將新伺服器新增為要遷移的分割區的跟隨者,並允許其完全複製該分割區中的現有資料。新伺服器完全複製該分割區的內容並加入同步副本後,現有副本之一將刪除其分割區的資料。
分割區重新分配工具可用於在代理之間移動分割區。理想的分割區分配將確保所有代理之間的資料負載和分割區大小均勻。分割區重新分配工具沒有能力自動研究Kafka群集中的資料分佈,並四處移動分割區以實現均勻的負載分佈。因此,管理員必須弄清楚應該移動哪些主題或分割區。
分割區重新分配工具可以在3種互斥模式下執行:
建立一個分割區為三,副本數為二的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic topic_test
檢視topic情況
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_test
編寫遷移檔案
vi topics-to-move.json
topics-to-move.json:
{"topics": [{"topic": "topic_test"}], "version":1}
一旦json檔案准備就緒,使用分割區重新分配工具生成候選分配
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic_test","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic_test","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]}]}
該工具生成一個候選分配,將所有分割區從主題topic_test移動到代理1,2。但請注意,此時分割區移動尚未開始,它只是告訴您當前的分配和建議的新分配。應儲存當前分配,以防您想要回滾它。新的賦值應保
存在json檔案中(例如expand-cluster-reassignment.json),以使用–execute選項輸入到工具
儲存當前分配
vi old-expand-cluster-reassignment.json
old-expand-cluster-reassignment.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic_test","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]}
vi expand-cluster-reassignment.json
expand-cluster-reassignment.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]}]}
執行遷移
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
檢視遷移結果
可以看到分割區已經遷移成功
也可以通過命令檢視結果
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
首先我們需要還原原本分割區
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file old-expand-cluster-reassignment.json --execute
然後將副本2遷移至分割區0中
編寫遷移檔案
vi increase-replication-factor.json
increase-replication-factor.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[0,1,2]}]}
執行遷移
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
可以看到副本2已經成功遷移到分割區1中。