kafka使用scala開發,支援多語言使用者端(c++、java、python、go等)
Kafka最先由LinkedIn公司開發,之後成為Apache的頂級專案。
Kafka是一個分散式的、分割區化、可複製提交的紀錄檔服務
LinkedIn使用Kafka實現了公司不同應用程式之間的鬆耦和,那麼作為一個可延伸、高可靠的訊息系統
支援高Throughput的應用
scale out:無需停機即可延伸機器
持久化:通過將資料持久化到硬碟以及replication防止資料丟失
支援online和offline的場景
Kafka是分散式的,其所有的構件borker(伺服器端叢集)、producer(訊息生產)、consumer(訊息消費者)都可以是分散式的。
在訊息的生產時可以使用一個標識topic來區分,且可以進行分割區;每一個分割區都是一個順序的、不可變的訊息佇列, 並且可以持續的新增。
同時為釋出和訂閱提供高吞吐量。據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。
訊息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡
監控:主機通過Kafka傳送與系統和應用程式健康相關的指標,然後這些資訊會被收集和處理從而建立監控儀表盤並行送警告。
訊息佇列: 應用程度使用Kafka作為傳統的訊息系統實現標準的佇列和訊息的釋出—訂閱,例如搜尋和內容提要(Content Feed)。比起大多數的訊息系統來說,Kafka有更好的吞吐量,內建的分割區,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。訊息系統 一般吞吐量相對較低,但是需要更小的端到端延時,並嚐嚐依賴於Kafka提供的強大的永續性保障。在這個領域,Kafka足以媲美傳統訊息系統,如ActiveMR或RabbitMQ
站點的使用者活動追蹤: 為了更好地理解使用者行為,改善使用者體驗,將使用者檢視了哪個頁面、點選了哪些內容等資訊傳送到每個資料中心的Kafka叢集上,並通過Hadoop進行分析、生成日常報告。
流處理:儲存收集流資料,以提供之後對接的Storm或其他流式計算框架進行處理。很多使用者會將那些從原始topic來的資料進行 階段性處理,彙總,擴充或者以其他的方式轉換到新的topic下再繼續後面的處理。例如一個文章推薦的處理流程,可能是先從RSS資料來源中抓取文章的內 容,然後將其丟入一個叫做「文章」的topic中;後續操作可能是需要對這個內容進行清理,比如回覆正常資料或者刪除重複資料,最後再將內容匹配的結果返 還給使用者。這就在一個獨立的topic之外,產生了一系列的實時資料處理的流程。
紀錄檔聚合:使用Kafka代替紀錄檔聚合(log aggregation)。紀錄檔聚合一般來說是從伺服器上收集紀錄檔檔案,然後放到一個集中的位置(檔案伺服器或HDFS)進行處理。然而Kafka忽略掉 檔案的細節,將其更清晰地抽象成一個個紀錄檔或事件的訊息流。這就讓Kafka處理過程延遲更低,更容易支援多資料來源和分散式資料處理。比起以紀錄檔為中心的 系統比如Scribe或者Flume來說,Kafka提供同樣高效的效能和因為複製導致的更高的耐用性保證,以及更低的端到端延遲
永續性紀錄檔:Kafka可以為一種外部的永續性紀錄檔的分散式系統提供服務。這種紀錄檔可以在節點間備份資料,併為故障節點資料回覆提供一種重新同步的機制。Kafka中紀錄檔壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似於Apache BookKeeper專案。
1.Topic(話題):Kafka中用於區分不同類別資訊的類別名稱。由producer指定
2.Producer(生產者):將訊息釋出到Kafka特定的Topic的物件(過程)
3.Consumers(消費者):訂閱並處理特定的Topic中的訊息的物件(過程)
4.Broker(Kafka服務叢集):已釋出的訊息儲存在一組伺服器中,稱之為Kafka叢集。叢集中的每一個伺服器都是一個代理(Broker). 消費者可以訂閱一個或多個話題,並從Broker拉資料,從而消費這些已釋出的訊息。
5.Partition(分割區):Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)
Message:訊息,是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。
訊息由一個固定大小的報頭和可變長度但不透明的位元組陣列負載。報頭包含格式版本和CRC32效驗和以檢測損壞或截斷
1. 4 byte CRC32 of the message
2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
bit 0 ~ 2 : Compression codec
0 : no compression
1 : gzip
2 : snappy
3 : lz4
bit 3 : Timestamp type
0 : create time
1 : log append time
bit 4 ~ 7 : reserved
4. (可選) 8 byte timestamp only if "magic" identifier is greater than 0
5. 4 byte key length, containing length K
6. K byte key
7. 4 byte payload length, containing length V
8. V byte payload
Producer:Producer即生產者,訊息的產生者,是訊息的⼊口。
kafka cluster:kafka叢集,一臺或多臺服務器組成
Consumer:消費者,即訊息的消費方,是訊息的出口。
我們看上⾯的架構圖中,producer就是生產者,是資料的入口。Producer在寫入資料的時候會把資料 寫入到leader中,不會直接將資料寫入follower!那leader怎麼找呢?寫入的流程又是什麼樣的呢?我 們看下圖:
1.⽣產者從Kafka叢集獲取分割區leader資訊
2.⽣產者將訊息傳送給leader
3.leader將訊息寫入本地磁碟
4.follower從leader拉取訊息資料
5.follower將訊息寫入本地磁碟後向leader傳送ACK
6.leader收到所有的follower的ACK之後向生產者傳送ACK
那在kafka中,如果某個topic有多個partition,producer⼜怎麼知道該將資料發往哪個partition呢? kafka中有幾個原則:
1.partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
2.如果沒有指定partition,但是設定了資料的key,則會根據key的值hash出一個partition。
3.如果既沒指定partition,又沒有設定key,則會採用輪詢⽅式,即每次取一小段時間的資料寫入某
個partition,下一小段的時間寫入下一個partition
producer在向kafka寫入訊息的時候,可以設定引數來確定是否確認kafka接收到資料,這個引數可設定 的值為 0,1,all
最後要注意的是,如果往不存在的topic寫資料,kafka會⾃動建立topic,partition和replication的數量 預設設定都是1。
topic 是同⼀類別的訊息記錄(record)的集合。在Kafka中,⼀個主題通常有多個訂閱者。對於每個 主題,Kafka叢集維護了⼀個分割區資料⽇志⽂件結構如下:
每個partition都是⼀個有序並且不可變的訊息記錄集合。當新的資料寫⼊時,就被追加到partition的末 尾。在每個partition中,每條訊息都會被分配⼀個順序的唯⼀標識,這個標識被稱為offset,即偏移 量。注意,Kafka只保證在同⼀個partition內部訊息是有序的,在不同partition之間,並不能保證訊息 有序。
Kafka可以設定⼀個保留期限,⽤來標識⽇志會在Kafka叢集內保留多⻓時間。Kafka叢集會保留在保留 期限內所有被髮布的訊息,不管這些訊息是否被消費過。⽐如保留期限設定為兩天,那麼資料被髮布到 Kafka叢集的兩天以內,所有的這些資料都可以被消費。當超過兩天,這些資料將會被清空,以便為後 續的資料騰出空間。由於Kafka會將資料進⾏持久化儲存(即寫⼊到硬碟上),所以保留的資料⼤⼩可 以設定為⼀個⽐較⼤的值。
Partition在伺服器上的表現形式就是⼀個⼀個的⽂件夾,每個partition的⽂件夾下⾯會有多組segment ⽂件,每組segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三個⽂件,其中 .log ⽂ 件就是實際儲存message的地⽅,⽽ .index 和 .timeindex ⽂件為索引⽂件,⽤於檢索訊息。
多個消費者範例可以組成⼀個消費者組,並⽤⼀個標籤來標識這個消費者組。⼀個消費者組中的不同消 費者範例可以運⾏在不同的程序甚⾄不同的伺服器上。
如果所有的消費者範例都在同⼀個消費者組中,那麼訊息記錄會被很好的均衡的傳送到每個消費者實 例。
如果所有的消費者範例都在不同的消費者組,那麼每⼀條訊息記錄會被⼴播到每⼀個消費者範例。
舉個例⼦,如上圖所示⼀個兩個節點的Kafka叢集上擁有⼀個四個partition(P0-P3)的topic。有兩個 消費者組都在消費這個topic中的資料,消費者組A有兩個消費者範例,消費者組B有四個消費者範例。 從圖中我們可以看到,在同⼀個消費者組中,每個消費者範例可以消費多個分割區,但是每個分割區最多隻 能被消費者組中的⼀個範例消費。也就是說,如果有⼀個4個分割區的主題,那麼消費者組中最多隻能有4 個消費者範例去消費,多出來的都不會被分配到分割區。其實這也很好理解,如果允許兩個消費者範例同 時消費同⼀個分割區,那麼就⽆法記錄這個分割區被這個消費者組消費的offset了。如果在消費者組中動態 的上線或下線消費者,那麼Kafka叢集會⾃動調整分割區與消費者範例間的對應關係。
Go語言中連線kafka使用第三方庫: github.com/Shopify/sarama。
go get github.com/Shopify/sarama
注意事項: sarama v1.20之後的版本加入了zstd壓縮演演算法,需要用到cgo,在Windows平臺編譯時會提示類似如下錯誤: github.com/DataDog/zstd exec: "gcc":executable file not found in %PATH% 所以在Windows平臺請使用v1.19版本的sarama。(如果不會版本控制請檢視部落格里面的go module章節)
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// 基於sarama第三方庫開發的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 傳送完資料需要leader和follow都確認
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
config.Producer.Return.Successes = true // 成功交付的訊息將在success channel返回
// 構造一個訊息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 連線kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 傳送訊息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根據topic取到所有的分割區
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍歷所有的分割區
// 針對每個分割區建立一個對應的分割區消費者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 非同步從每個分割區消費資訊
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}