在回答這個問題之前,我們需要先了解另一個東西--event streaming。
我覺得,event streaming 是一個動態的概念,它描述了一個個 event ( "something happened" in the world ) 在不同主體間連續地、正確地流動的狀態。(這裡我想搞個動圖的,不過 plantuml 不支援,所以只能靠想象了。。)
event source 產生 event,event source 可以是資料庫、感測器、移動裝置、應用程式,等等。
event broker 持久化 event,以備 event sink 可以隨時獲取它們。
event sink 實時或回顧性地從 broker 中獲取 event 進行處理。
有的人可能會問,為什麼需要 broker,event 從 source 直接流到 sink 不行嗎?當然可以,但是不夠解耦,要麼 event source 需要事先知道誰需要這些 event,要麼 event sink 需要知道 event 從哪裡來。
現在,我們可以在腦子裡想象出 event streaming 的樣子:event 由 source 產生,然後流向 broker,在 broker 被持久化,再流到 sink。並不複雜對吧?
我們可以在很多的應用場景中找到 event streaming 的身影,例如:
實時處理支付、金融交易、客戶訂單等等;
實時跟蹤和監控物流進度;
持續捕獲和分析來自物聯網裝置或其他裝置的感測器資料;
不同資料來源的資料連線;
作為資料平臺、事件驅動架構和微服務等的技術基礎;
等等。
現在我們回過頭來回答問題:kafka 是什麼?
我認為,如果說 event streaming 是一種規範的話,那麼 kafka 就是 event streaming 的一種具體實現。
從最上層的抽象看,kafka 由三個部分組成:
其中,producer 釋出 event,broker 持久化 even,consumer 訂閱 event。其中,producer 和 consumer 完全解耦,互不知曉。
不過,這是概念檢視,不是物理檢視。具體實現會因為 source 或 sink 的不同而有所不同。
當 event source 為普通應用程式時,可以在程式中引入 Producer API 和 Consumer API 來完成與 broker 的互動。這些 API 涵蓋了大部分主流語言,例如 Java、Scala、Go、Python、C/C++,除此之外,我們也可以直接使用 REST API 呼叫。
但是,並不是所有 source 或 sink 都能使用 API 的方式,例如,實時捕獲資料庫的更改、檔案的更改,從 RabbitMQ 匯入匯出訊息,等等。
這個時候就需要使用 connector 來完成整合。通常情況下,connector 並不需要我們自己開發,kafka 社群為我們提供了大量的 connector 來滿足我們的使用需求。
接下來我們再來補充下 broker 的一些細節。//zzs001
通常情況下,我們的 broker 會接收到很多不同型別的 event ,broker 需要區分它們,以便正確地路由。topic 就發揮了作用,它有點類似檔案系統的目錄,而 event 就類似於目錄裡的檔案,sink 想要什麼 event,只要找到對應的 topic 就行了。
同一 topic 可以有零個或多個 producer 和 consumer,不同於傳統 MQ,kafka 的 event 消費後並不刪除,為什麼這麼做呢?這個我們後續的部落格會說的。
除此之外,一個 topic 會劃分成一個或多個 partition,這些 partition 一般分佈在不同的 broker 範例。producer 釋出的 event 會根據某種策略分配到不同的 partition,這樣做的好處是,consumer 可以同時從多臺 broker 讀取 event,從而大大提高吞吐量。另外,為了高可用,同一個 partition 還會有多個副本,它們分佈在不同的 broker 範例。
需要注意一下,當同一 topic 的 event 被分發到多個 partition 時,寫入和讀取的順序就不能保證了,對於需要嚴格控制順序的 topic,partition 需要設定為 1。
kafka 那麼受歡迎,還有一個很重要的原因,就是它提供了流式處理類庫,支援對儲存於Kafka內的資料進行流式處理和分析。這部分內容,我也是剛入門而已,後續部落格再好好研究。
kafka:3.2.1
os:CentOS Linux release 8.3.2011
JDK:1.8.0_291
注意,kafka 3.2.1 要求本地環境安裝 Java 8 及以上版本
從 下載頁面下載安裝包。
解壓安裝包。
tar -xzf kafka_2.13-3.2.1.tgz
進入到解壓目錄,我們看看 kafka 的目錄結構。
cd kafka_2.13-3.2.1
ls -al
接下來,我們啟動 broker 的部分,需要按照順序依次啟動 zookeeper 和 kafka server。
先啟動 zookeeper(後續版本可能不再需要 zookeeper)。
bin/zookeeper-server-start.sh config/zookeeper.properties
開啟另一個對談,再啟動 kafka server。
bin/kafka-server-start.sh config/server.properties
現在,單機版 broker 已經就緒,我們可以開始使用了。
producer 釋出的 event 會持久化在對應的 topic 中,才能路由給正確的 consumer。所以,在讀寫 event 之前,我們需要先建立 topic。
開啟另一個對談,執行以下命令。
# 建立topic zzs001
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 查詢topic
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
接下來我們用 kafka 自帶的 console-consumer 和 console-producer 讀寫 event。
使用 console-producer 寫 event 時,我們每輸入一行並回車,就會向 topic 寫入一個 event。
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
寫完之後我們可以按 Ctrl + C 退出。
接著,我們使用 console-consumer 讀 event。可以看到,剛寫的 event 被讀到了。
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
讀完我們按 Ctrl + C 退出。
我們可以在兩個對談中保持 producer 和 consumer 不退出,當我們在 producer 寫入 event 時, consumer 將實時讀取到。
前面提到過,topic 的 event 會被持久化下來,而且被消費過的 event 並不會刪除。這一點很容易驗證,我們可以再開一個 consumer 來讀取,它還是能讀到被別人讀過的 event。
前面提到過,有的 source 或 sink 需要依賴 connector 來讀寫 event,接下來我們以檔案為例,演示如何從已有檔案中將 event 匯入 topic,並從 topic 中匯出到另一個檔案中。
首先我們需要一個可以匯入匯出檔案的 connector,預設情況下,在 kafka 的 libs 目錄就有這樣一個 jar 包--connect-file-3.2.1.jar。我們需要在 connect 的設定中引入這個包。
vi config/connect-standalone.properties
按 i 進入編輯,新增或修改plugin.path=libs/connect-file-3.2.1.jar
。
按 ESC 後輸入 :wq 儲存並退出。除此之外,這個檔案還可以用來設定需要連線哪個 broker,以及 event 的序列化方式等。
然後,我們建立一個 test.txt 作為 event source,並寫入 event。
echo -e "foo\nbar" > test.txt
現在我們先啟動 event source 的 connector,將 test.txt 的 event 寫入名為 connect-test 的 topic。config/connect-file-source.properties 已經設定好了connector 名稱、event source 的檔案、topic,等等。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
執行片刻後我們可以按 Ctrl + C 退出。
這時,我們可以先通過 consumer-console 檢視 topic 上是否有這些 event。可以看到,event 已經成功匯入,至於格式為什麼是這樣的,這個以後再說明。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
現在我們啟動 event sink 的 connector,將 topic 的 event 匯入到 test.sink.txt。connect-file-sink.properties 已經設定好了connector 名稱、event source 的檔案、topic,等等 。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
執行片刻後我們可以按 Ctrl + C 退出。
這時檢視 test.sink.txt,可以看到 event 成功匯出。
和前面一樣,這裡我們也可以保持 event source 和 event sink 的 connector 不退出,測試實時生產和消費 event。
這部分內容後續再補充。
走到這一步,我們已經完成了 kafka 的入門學習。
接下來,我們可以通過以下步驟關閉 kafka。
如果 producer 或 consumer 還在執行,Ctrl + C 退出;
Ctrl + C 退出 kafka server;
Ctrl + C 退出 zookeeper;
如果想清除 kafka 的資料,包括我們建立的 topic 和 event、紀錄檔等,執行以下命令:
rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/connect.offsets
以上內容是最近學習 kafka 的一些思考和總結(主要參考官方檔案),如有錯誤,歡迎指正。
任何的事物,都可以被更簡單、更連貫、更系統地瞭解。希望我的文章能夠幫到你。
最後,感謝閱讀。
本文為原創文章,轉載請附上原文出處連結:https://www.cnblogs.com/ZhangZiSheng001/p/16641755.html