kafka詳解(一)--kafka是什麼及怎麼用

2022-08-31 12:02:30

kafka是什麼

在回答這個問題之前,我們需要先了解另一個東西--event streaming。

什麼是event streaming

我覺得,event streaming 是一個動態的概念,它描述了一個個 event ( "something happened" in the world ) 在不同主體間連續地、正確地流動的狀態。(這裡我想搞個動圖的,不過 plantuml 不支援,所以只能靠想象了。。)

event source 產生 event,event source 可以是資料庫、感測器、移動裝置、應用程式,等等。

event broker 持久化 event,以備 event sink 可以隨時獲取它們。

event sink 實時或回顧性地從 broker 中獲取 event 進行處理。

有的人可能會問,為什麼需要 broker,event 從 source 直接流到 sink 不行嗎?當然可以,但是不夠解耦,要麼 event source 需要事先知道誰需要這些 event,要麼 event sink 需要知道 event 從哪裡來。

現在,我們可以在腦子裡想象出 event streaming 的樣子:event 由 source 產生,然後流向 broker,在 broker 被持久化,再流到 sink。並不複雜對吧?

event streaming用來幹嘛

我們可以在很多的應用場景中找到 event streaming 的身影,例如:

  • 實時處理支付、金融交易、客戶訂單等等;

  • 實時跟蹤和監控物流進度;

  • 持續捕獲和分析來自物聯網裝置或其他裝置的感測器資料;

  • 不同資料來源的資料連線;

  • 作為資料平臺、事件驅動架構和微服務等的技術基礎;

等等。

kafka是什麼

現在我們回過頭來回答問題:kafka 是什麼?

我認為,如果說 event streaming 是一種規範的話,那麼 kafka 就是 event streaming 的一種具體實現

kafka的架構

概念檢視

從最上層的抽象看,kafka 由三個部分組成:

其中,producer 釋出 event,broker 持久化 even,consumer 訂閱 event。其中,producer 和 consumer 完全解耦,互不知曉。

不過,這是概念檢視,不是物理檢視。具體實現會因為 source 或 sink 的不同而有所不同。

物理檢視

Producer/Consumer API

當 event source 為普通應用程式時,可以在程式中引入 Producer API 和 Consumer API 來完成與 broker 的互動。這些 API 涵蓋了大部分主流語言,例如 Java、Scala、Go、Python、C/C++,除此之外,我們也可以直接使用 REST API 呼叫。

Connector

但是,並不是所有 source 或 sink 都能使用 API 的方式,例如,實時捕獲資料庫的更改、檔案的更改,從 RabbitMQ 匯入匯出訊息,等等。

這個時候就需要使用 connector 來完成整合。通常情況下,connector 並不需要我們自己開發,kafka 社群為我們提供了大量的 connector 來滿足我們的使用需求。

topic&partition

接下來我們再來補充下 broker 的一些細節。//zzs001

通常情況下,我們的 broker 會接收到很多不同型別的 event ,broker 需要區分它們,以便正確地路由。topic 就發揮了作用,它有點類似檔案系統的目錄,而 event 就類似於目錄裡的檔案,sink 想要什麼 event,只要找到對應的 topic 就行了。

同一 topic 可以有零個或多個 producer 和 consumer,不同於傳統 MQ,kafka 的 event 消費後並不刪除,為什麼這麼做呢?這個我們後續的部落格會說的。

除此之外,一個 topic 會劃分成一個或多個 partition,這些 partition 一般分佈在不同的 broker 範例。producer 釋出的 event 會根據某種策略分配到不同的 partition,這樣做的好處是,consumer 可以同時從多臺 broker 讀取 event,從而大大提高吞吐量。另外,為了高可用,同一個 partition 還會有多個副本,它們分佈在不同的 broker 範例。

需要注意一下,當同一 topic 的 event 被分發到多個 partition 時,寫入和讀取的順序就不能保證了,對於需要嚴格控制順序的 topic,partition 需要設定為 1。

Streams

kafka 那麼受歡迎,還有一個很重要的原因,就是它提供了流式處理類庫,支援對儲存於Kafka內的資料進行流式處理和分析。這部分內容,我也是剛入門而已,後續部落格再好好研究。

如何使用kafka

環境說明

kafka:3.2.1

os:CentOS Linux release 8.3.2011

JDK:1.8.0_291

注意,kafka 3.2.1 要求本地環境安裝 Java 8 及以上版本

下載安裝

從 下載頁面下載安裝包。

解壓安裝包。

tar -xzf kafka_2.13-3.2.1.tgz

啟動broker

進入到解壓目錄,我們看看 kafka 的目錄結構。

cd kafka_2.13-3.2.1
ls -al

接下來,我們啟動 broker 的部分,需要按照順序依次啟動 zookeeper 和 kafka server。

先啟動 zookeeper(後續版本可能不再需要 zookeeper)。

bin/zookeeper-server-start.sh config/zookeeper.properties

開啟另一個對談,再啟動 kafka server。

bin/kafka-server-start.sh config/server.properties

現在,單機版 broker 已經就緒,我們可以開始使用了。

建立topic

producer 釋出的 event 會持久化在對應的 topic 中,才能路由給正確的 consumer。所以,在讀寫 event 之前,我們需要先建立 topic。

開啟另一個對談,執行以下命令。

# 建立topic  zzs001
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 查詢topic  
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

簡單的讀寫event

接下來我們用 kafka 自帶的 console-consumer 和 console-producer 讀寫 event。

使用 console-producer 寫 event 時,我們每輸入一行並回車,就會向 topic 寫入一個 event。

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

寫完之後我們可以按 Ctrl + C 退出。

接著,我們使用 console-consumer 讀 event。可以看到,剛寫的 event 被讀到了。

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

讀完我們按 Ctrl + C 退出。

我們可以在兩個對談中保持 producer 和 consumer 不退出,當我們在 producer 寫入 event 時, consumer 將實時讀取到。

前面提到過,topic 的 event 會被持久化下來,而且被消費過的 event 並不會刪除。這一點很容易驗證,我們可以再開一個 consumer 來讀取,它還是能讀到被別人讀過的 event。

使用connect匯入匯出

前面提到過,有的 source 或 sink 需要依賴 connector 來讀寫 event,接下來我們以檔案為例,演示如何從已有檔案中將 event 匯入 topic,並從 topic 中匯出到另一個檔案中。

首先我們需要一個可以匯入匯出檔案的 connector,預設情況下,在 kafka 的 libs 目錄就有這樣一個 jar 包--connect-file-3.2.1.jar。我們需要在 connect 的設定中引入這個包。

vi config/connect-standalone.properties

按 i 進入編輯,新增或修改plugin.path=libs/connect-file-3.2.1.jar

按 ESC 後輸入 :wq 儲存並退出。除此之外,這個檔案還可以用來設定需要連線哪個 broker,以及 event 的序列化方式等。

然後,我們建立一個 test.txt 作為 event source,並寫入 event。

echo -e "foo\nbar" > test.txt

現在我們先啟動 event source 的 connector,將 test.txt 的 event 寫入名為 connect-test 的 topic。config/connect-file-source.properties 已經設定好了connector 名稱、event source 的檔案、topic,等等。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

執行片刻後我們可以按 Ctrl + C 退出。

這時,我們可以先通過 consumer-console 檢視 topic 上是否有這些 event。可以看到,event 已經成功匯入,至於格式為什麼是這樣的,這個以後再說明。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

現在我們啟動 event sink 的 connector,將 topic 的 event 匯入到 test.sink.txt。connect-file-sink.properties 已經設定好了connector 名稱、event source 的檔案、topic,等等 。

bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink.properties

執行片刻後我們可以按 Ctrl + C 退出。

這時檢視 test.sink.txt,可以看到 event 成功匯出。

和前面一樣,這裡我們也可以保持 event source 和 event sink 的 connector 不退出,測試實時生產和消費 event。

使用streams處理

這部分內容後續再補充。

停止

走到這一步,我們已經完成了 kafka 的入門學習。

接下來,我們可以通過以下步驟關閉 kafka。

  1. 如果 producer 或 consumer 還在執行,Ctrl + C 退出;

  2. Ctrl + C 退出 kafka server;

  3. Ctrl + C 退出 zookeeper;

如果想清除 kafka 的資料,包括我們建立的 topic 和 event、紀錄檔等,執行以下命令:

rm -rf /tmp/kafka-logs /tmp/zookeeper  /tmp/connect.offsets

結語

以上內容是最近學習 kafka 的一些思考和總結(主要參考官方檔案),如有錯誤,歡迎指正。

任何的事物,都可以被更簡單、更連貫、更系統地瞭解。希望我的文章能夠幫到你。

最後,感謝閱讀。

參考資料

Apache Kafka 官方檔案

相關原始碼請移步:https://github.com/ZhangZiSheng001/kafka-demo

本文為原創文章,轉載請附上原文出處連結:https://www.cnblogs.com/ZhangZiSheng001/p/16641755.html