Kafka 最佳實踐,涉及
Kafka 能夠對接到 Spark、Flink、Flume 等多個主流的流資料處理技術。利用 Kafka 高吞吐量的特點,客戶可以通過 Kafka 建立傳輸通道,把應用側的海量資料傳輸到流資料處理引擎中,資料經過處理分析後,可支援後端巨量資料分析,AI 模型訓練等多種業務。
Kafka 最常用也是我最熟悉的場景是紀錄檔分析系統。典型的實現方式是在使用者端部署 紀錄檔收集器(如 Fluentd、Filebeat 或者 Logstash 等)進行紀錄檔採集,並將資料傳送到 Kafka,之後通過後端的 ES 等進行資料運算,再搭建一個展示層如 Kibana 進行統計分析資料的展示。
隨著有價值的用例的出現,物聯網(IoT)正得到越來越多的關注。然而,一個關鍵的挑戰是整合裝置和機器來實時和大規模地處理資料。Apache Kafka®及其周邊的生態系統,包括Kafka Connect、Kafka Streams,已經成為整合和處理這類資料集的首選技術。
Kafka 已經被用於許多物聯網部署,包括消費者物聯網和工業物聯網(IIoT)。大多數場景都需要可靠、可伸縮和安全的端到端整合,從而支援實時的雙向通訊和資料處理。一些具體的用例是:
具體的實現架構如下圖所示:
生產者需要設定 request.required.acks = ALL
,伺服器端主節點寫成功且備節點同步成功才 返回 Response。
消費者接收訊息後,應先進行對應業務操作,隨後再進行 commit 標識訊息已被處理,通過這種處理方式可以確保一條訊息在業務處理失敗時,能夠重新被消費。注意消費者的 enable.auto.commit
引數需要設定為 False
,確保 commit 動作手工控制。
保障一條訊息最多投放一次,需要設定 request.required.acks = 0
,同時設定 retries = 0
。這裡的原理是生產者遇到任何異常都不重試,並且不考慮 broker 是否響應寫入成功。
保障一條訊息最多被消費一次,需要消費者在收到訊息後先進行 commit 標識訊息已被處理,隨後再進行對應業務操作。這裡的原理是消費者不需要管實際業務的處理結果,拿到訊息以後立刻 commit 告訴 broker 訊息處理成功。 注意消費者的 enable.auto.commit
引數需要設定為 False
,確保 commit 動作手工控制。
Kafka 0.11 版本起新增了冪等訊息的語意,通過設定 enable.idempotence=true
引數,可以實現單個分割區的訊息冪等。
如果 Topic 涉及多個分割區或者需要多條訊息封裝成一個事務保障冪等,則需要增加 Transaction 控制,樣例如下:
// 開啟冪等控制引數
producerProps.put("enbale.idempotence", "true");
// 初始化事務
producer.initTransactions();
// 設定事務 ID
producerProps.put("transactional.id", "id-001");
try{
// 開始事務,並在事務中傳送 2 條訊息
producer.beginTranscation();
producer.send(record0);
producer.send(record1);
// 提交事務
producer.commitTranscation();
} catch( Exception e ) {
producer.abortTransaction();
producer.close();
}
需要設定 isolation.level=read_committed
,並設定 enable.auto.commit = false
,確保消費者只消費生產者已經提交事務的訊息,消費者業務需要確保事務性避免重複處理訊息,比如說把訊息持久化到資料庫,然後向伺服器端提交 commit。
At Least Once 是最常用的語意,可確保訊息只多不少的傳送和消費,效能和可靠性上有較好的平衡,可以作為預設選用的模式。業務側也可以通過在訊息體加入唯一的業務主鍵自行保障冪等性,在消費側確保同一個業務主鍵的訊息只被處理一次。
Exactly Once 語意一般用絕對不容許重複的關鍵業務,典型案例是訂單和支付相關場景。
At Most Once 語意一般用在非關鍵業務,業務對於訊息丟失並不敏感,只需要儘量確保訊息成功生產消費即可。典型使用 At Most Once 語意的場景是訊息通知,出現少量遺漏訊息影響不大,相比之下重複傳送通知會造成較壞的使用者體驗。
以下彙總了通過 partition 調優效能建議考慮的維度,建議您根據理論分析配合壓力測試對系統整體效能進行調優。
考慮維度 | 說明 |
---|---|
吞吐量 | 增加 partition 的數量可以訊息消費的並行度,當系統瓶頸在於消費端,而消費端又可以水平擴充套件的時候,增加 partition 可以增加系統吞吐量。 在 Kafka 內部每個 Topic 下的每個 partition 都是一個獨立的訊息處理通道 , 一個 partition 內的訊息只能被同時被一個 consumer group 消費,當 consumer group 數量多於partition的數量時,多餘的 consumer group 會出現空閒。 |
訊息順序 | Kafka 可以保障一個 partition 內的訊息順序性,partition 之間的訊息順序無法保證,增加 partition 的時候需要考慮訊息順序對業務的影響。 |
範例 Partition 上限 | Partition 增加會消耗底層更多的記憶體,IO 和檔案控制程式碼等資源。在規劃 Topic 的 partition 數量時需要考慮 Kafka 叢集能支援的 partition 上限。 |
生產者,消費者與 partition 的關係說明。
如果 Topic 設定了多個分割區,生產者傳送訊息時需要先確認往哪個分割區傳送。在給同一個分割區傳送多條訊息時,Producer 使用者端會將相關訊息打包成一個 Batch,批次傳送到伺服器端。一般情況下,小 Batch 會導致 Producer 使用者端產生大量請求,造成請求佇列在使用者端和伺服器端的排隊,從而整體推高了訊息傳送和消費延遲。
一個合適的 batch 大小,可以減少傳送訊息時使用者端向伺服器端發起的請求次數,在整體上提高訊息傳送的吞吐和延遲。
Batch 引數說明如下:
引數 | 說明 |
---|---|
batch.size |
發往每個分割區(Partition)的訊息快取量(訊息內容的位元組數之和,不是條數)。達到設定的數值時,就會觸發一次網路請求,然後 Producer 使用者端把訊息批次發往伺服器。 |
linger.ms |
每條訊息在快取中的最長時間。若超過這個時間,Producer 使用者端就會忽略 batch.size 的限制,立即把訊息發往伺服器。 |
buffer.memory |
所有快取訊息的總體大小超過這個數值後,就會觸發把訊息發往伺服器,此時會忽略 batch.size 和 linger.ms 的限制。buffer.memory 的預設數值是 32MB,對於單個 Producer 而言,可以保證足夠的效能。 |
Batch 相關引數值的選擇並沒有通用的方法,建議針對效能敏感的業務場景進行壓測調優。
Kafka 生產者與伺服器端傳送訊息時有批次傳送的機制,只有傳送到相同 Partition 的訊息才會被放到同一個 Batch 中。在大批次傳送場景,如果訊息散落到多個 Partition 當中就可能會形成多個小 Batch,導致批次傳送機制失效而降低效能。
Kafka 預設選擇分割區的策略如下
場景 | 策略 |
---|---|
訊息指定 Key | 對訊息的 Key 進行雜湊,然後根據雜湊結果選擇分割區,保證相同 Key 的訊息會傳送到同一個分割區。 |
訊息沒有指定 Key | 預設策略是迴圈使用主題的所有分割區,將訊息以輪詢的方式傳送到每一個分割區上。 |
從預設機制可見 partition 的選擇隨機性很強,因此在大批次傳輸的場景下,推薦設定 partitioner.class
引數,指定自定義的分割區選擇演演算法實現 粘性分割區。
其中一種實現方法是在固定的時間段內使用同一個 partition,過一段時間切換到下一個分割區,避免資料散落到多個不同 partition。
Kafka 會在同一個 partition 內保障訊息順序,如果 Topic 存在多個 partition 則無法確保全域性順序。如果需要保障全域性順序,則需要控制 partition 數量為 1 個。
訊息佇列 Kafka 的訊息有 Key(訊息標識)和 Value(訊息內容)兩個欄位。為了便於追蹤,建議為訊息設定一個唯一的 Key。之後可以通過 Key 追蹤某訊息,列印傳送紀錄檔和消費紀錄檔,瞭解該訊息的生產和消費情況。
分散式環境下,由於網路等原因,訊息偶爾會出現傳送失敗的情況,其原因可能是訊息已經傳送成功但是 ACK 機制失敗或者訊息確實沒有傳送成功。預設的引數能滿足大部分場景,但可以根據業務需求,按需設定以下重試引數:
引數 | 說明 |
---|---|
retries |
重試次數,預設值為 3,但對於資料丟失零容忍的應用而言,請考慮設定為 Integer.MAX_VALUE (有效且最大)。 |
retry.backoff.ms |
重試間隔,建議設定為 1000。 |
❗ 注意:
如果希望實現 At Most Once 語意,重試需要關閉。
Spark Streaming 是 Spark Core 的一個擴充套件,用於高吞吐且容錯地處理持續性的資料,目前支援的外部輸入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。
Spark Streaming 將連續資料抽象成 DStream(Discretized Stream),而 DStream 由一系列連續的 RDD(彈性分散式資料集)組成,每個 RDD 是一定時間間隔內產生的資料。使用函數對 DStream 進行處理其實即為對這些 RDD 進行處理。
使用 Spark Streaming 作為 Kafka 的資料輸入時,可支援 Kafka 穩定版本與實驗版本:
Kafka Version | spark-streaming-kafka-0.8 | spark-streaming-kafka-0.10 |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
Api Maturity | Deprecated | Stable |
Language Support | Scala、Java、Python | Scala、Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit Api | No | Yes |
Dynamic Topic Subscription | No | Yes |
本次實踐使用 0.10.2.1 版本的 Kafka 依賴。
建立 Kafka 叢集的步驟略,再建立一個名為 test
的 Topic。
Centos6.8 系統
package | version |
---|---|
sbt | 0.13.16 |
hadoop | 2.7.3 |
spark | 2.1.0 |
protobuf | 2.5.0 |
ssh | CentOS 預設安裝 |
Java | 1.8 |
具體安裝步驟略,包括以下步驟:
這裡使用 0.10.2.1 版本的 Kafka 依賴。
build.sbt
新增依賴:name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
設定 producer_example.scala
:
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
val props = new Properties()
props.put("bootstrap.servers", "172.0.0.1:9092") //範例資訊中的內網 IP 與埠
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC="test" //指定要生產的 Topic
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i") //生產 key 是"key",value 是 hello i 的訊息
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close() //最後要斷開
}
更多有關 ProducerRecord 的用法請參考 ProducerRecord 檔案。
####### DirectStream
build.sbt
新增依賴:name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
DirectStream_example.scala
:import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream_test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)
val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("Kafka")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topics = Array("spark_test")
val offsets : Map[TopicPartition, Long] = Map()
for (i <- 0 until 3){
val tp = new TopicPartition("spark_test", i)
offsets.updated(tp , 0L)
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("directStream")
stream.foreachRDD{ rdd=>
//輸出獲得的訊息
rdd.foreach{iter =>
val i = iter.value
println(s"${i}")
}
//獲得offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
####### RDD
build.sbt
(設定同上,單擊檢視)。RDD_example
:import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sc = new SparkContext("local", "Kafka", new SparkConf())
val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
//按順序向 parition 拉取相應 offset 範圍的訊息,如果拉取不到則阻塞直到超過等待時間或者新生產訊息達到拉取的數量
val offsetRanges = Array[OffsetRange](
OffsetRange("spark_test", 0, 0, 5),
OffsetRange("spark_test", 1, 0, 5),
OffsetRange("spark_test", 2, 0, 5)
)
val range = KafkaUtils.createRDD[String, String](
sc,
java_kafkaParams,
offsetRanges,
PreferConsistent
)
range.foreach(rdd=>println(rdd.value))
sc.stop()
}
}
更多 kafkaParams
用法參考 kafkaParams 檔案。
Apache Flume 是一個分散式、可靠、高可用的紀錄檔收集系統,支援各種各樣的資料來源(如 HTTP、Log 檔案、JMS、監聽埠資料等),能將這些資料來源的海量紀錄檔資料進行高效收集、聚合、移動,最後儲存到指定儲存系統中(如 Kafka、分散式檔案系統、Solr 搜尋伺服器等)。
Flume 基本結構如下:
Flume 以 agent 為最小的獨立執行單位。一個 agent 就是一個 JVM,單個 agent 由 Source、Sink 和 Channel 三大元件構成。
Flume 與 Kafka
把資料儲存到 HDFS 或者 HBase 等下游儲存模組或者計算模組時需要考慮各種複雜的場景,例如並行寫入的量以及系統承載壓力、網路延遲等問題。Flume 作為靈活的分散式系統具有多種介面,同時提供可客製化化的管道。
在生產處理環節中,當生產與處理速度不一致時,Kafka 可以充當快取角色。Kafka 擁有 partition 結構以及採用 append 追加資料,使 Kafka 具有優秀的吞吐能力;同時其擁有 replication 結構,使 Kafka 具有很高的容錯性。
所以將 Flume 和 Kafka 結合起來,可以滿足生產環境中絕大多數要求。
Kafka 可作為 Source 或者 Sink 端對訊息進行匯入或者匯出。
設定 kafka 作為訊息來源,即將自己作為消費者,從 Kafka 中拉取資料傳入到指定 Sink 中。主要設定選項如下:
設定項 | 說明 |
---|---|
channels |
自己設定的 Channel |
type |
必須為:org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers |
Kafka Broker 的伺服器地址 |
kafka.consumer.group.id |
作為 Kafka 消費端的 Group ID |
kafka.topics |
Kafka 中資料來源 Topic |
batchSize |
每次寫入 Channel 的大小 |
batchDurationMillis |
每次寫入最大間隔時間 |
範例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
更多內容請參考 Apache Flume 官網。
設定 Kafka 作為內容接收方,即將自己作為生產者,推到 Kafka Server 中等待後續操作。主要設定選項如下:
設定項 | 說明 |
---|---|
channel |
自己設定的 Channel |
type |
必須為:org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers |
Kafka Broker 的伺服器 |
kafka.topics |
Kafka 中資料來源 Topic |
kafka.flumeBatchSize |
每次寫入的 Bacth 大小 |
kafka.producer.acks |
Kafka 生產者的生產策略 |
範例:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
更多內容請參考 Apache Flume 官網。
Storm 是一個分散式實時計算框架,能夠對資料進行流式處理和提供通用性分散式 RPC 呼叫,可以實現處理事件亞秒級的延遲,適用於對延遲要求比較高的實時資料處理場景。
在 Storm 的叢集中有兩種節點,控制節點Master Node
和工作節點Worker Node
。Master Node
上執行Nimbus
程序,用於資源分配與狀態監控。Worker Node
上執行Supervisor
程序,監聽工作任務,啟動executor
執行。整個 Storm 叢集依賴zookeeper
負責公共資料存放、叢集狀態監聽、任務分配等功能。
使用者提交給 Storm 的資料處理程式稱為topology
,它處理的最小訊息單位是tuple
,一個任意物件的陣列。topology
由spout
和bolt
構成,spout
是產生tuple
的源頭,bolt
可以訂閱任意spout
或bolt
發出的tuple
進行處理。
Storm 可以把 Kafka 作為spout
,消費資料進行處理;也可以作為bolt
,存放經過處理後的資料提供給其它元件消費。
Centos6.8系統
package | version |
---|---|
maven | 3.5.0 |
storm | 2.1.0 |
ssh | 5.3 |
Java | 1.8 |
pom.xml 設定如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
topology 程式碼:
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerSpout {
//申請的kafka範例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要將訊息寫入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//設定producer屬性
//函數參考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//屬性參考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//建立寫入kafka的bolt,預設使用fields("key" "message")作為生產訊息的key和message,也可以在FieldNameBasedTupleToKafkaMapper()中指定
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder = new TopologyBuilder();
//一個順序生成訊息的spout類,輸出field是sentence
SerialSentenceSpout spout = new SerialSentenceSpout();
AddMessageKeyBolt bolt = new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
//為tuple加上生產到kafka所需要的fields
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
//寫入kafka
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
Config config = new Config();
if (args != null && args.length > 0) {
//叢集模式,用於打包jar,並放到storm執行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
建立一個順序生成訊息的 spout 類:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class SerialSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
//生產一個UUID字串傳送給下一個元件
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
為 tuple
加上 key、message 兩個欄位,當 key 為 null 時,生產的訊息均勻分配到各個 partition,指定了 key 後將按照 key 值 hash 到特定 partition 上:
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AddMessageKeyBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//取出第一個filed值
String messae = tuple.getString(0);
//System.out.println(messae);
//傳送給下一個元件
basicOutputCollector.emit(new Values(null, messae));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//建立傳送給下一個元件的schema
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}
使用 trident 類生成 topology:
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerTrident {
//申請的kafka範例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要將訊息寫入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//設定producer屬性
//函數參考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//屬性參考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//設定Trident
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
//設定使用fields("key", "value")作為訊息寫入 不像FieldNameBasedTupleToKafkaMapper有預設值
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder = new TridentTopology();
//一個批次產生句子的spout,輸出field為sentence
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());
Config config = new Config();
if (args != null && args.length > 0) {
//叢集模式,用於打包jar,並放到storm執行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
private static class AddMessageKey extends BaseFunction {
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
//取出第一個filed值
String messae = tridentTuple.getString(0);
//System.out.println(messae);
//傳送給下一個元件
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}
建立一個批次生成訊息的 spout 類:
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class TridentSerialSentenceSpout implements IBatchSpout {
private final int batchCount;
public TridentSerialSentenceSpout(int batchCount) {
this.batchCount = batchCount;
}
@Override
public void open(Map map, TopologyContext topologyContext) {
}
@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i = 0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}
@Override
public void ack(long l) {
}
@Override
public void close() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerSpout {
//申請的kafka範例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要將訊息寫入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//設定重試策略
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//設定consumer引數
//函數參考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//引數參考http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //設定group
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //設定session超時
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //設定請求超時
}})
.setOffsetCommitPeriodMs(10_000) //設定自動確認時間
.setFirstPollOffsetStrategy(LATEST) //設定拉取最新訊息
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");
Config config = new Config();
config.setMaxSpoutPending(20);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerTrident {
//申請的kafka範例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要將訊息寫入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//設定consumer引數
//函數參考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//引數參考http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //設定group
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //設定自動確認
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //設定session超時
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //設定請求超時
}})
.setFirstPollOffsetStrategy(LATEST) //設定拉取最新訊息
.setRecordTranslator(trans)
.build();
TridentTopology builder = new TridentTopology();
// Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); //事務型
Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));
Config conf = new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}
使用 mvn package
編譯後,可以提交到本地叢集進行 debug 測試,也可以提交到正式叢集進行執行。
storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name
Logstash 是一個開源的紀錄檔處理工具,可以從多個源頭收集資料、過濾收集的資料並對資料進行儲存作為其他用途。
Logstash 靈活性強,擁有強大的語法分析功能,外掛豐富,支援多種輸入和輸出源。Logstash 作為水平可伸縮的資料管道,與 Elasticsearch 和 Kibana 配合,在紀錄檔收集檢索方面功能強大。
Logstash 資料處理可以分為三個階段:inputs → filters → outputs。
同時 Logstash 支援編碼解碼,可以在 inputs 和 outputs 端指定格式。
❗ 注意:
Logstash 過濾消耗資源,如果部署在生產 server 上會影響其效能。
建立一個名為 logstash_test
的 Topic。
執行 bin/logstash-plugin list
,檢視已經支援的外掛是否含有 logstash-input-kafka
。
在 .bin/
目錄下編寫組態檔 input.conf
。
此處將標準輸出作為資料終點,將 Kafka 作為資料來源。
input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:xxxx" // kafka 範例接入地址
group_id => "logstash_group" // kafka groupid 名稱
topics => ["logstash_test"] // kafka topic 名稱
consumer_threads => 3 // 消費執行緒數,一般與 kafka 分割區數一致
auto_offset_reset => "earliest"
}
}
output {
stdout{codec=>rubydebug}
}
執行以下命令啟動 Logstash,進行訊息消費。
./logstash -f input.conf
會看到剛才 Topic 中的資料被消費出來。
執行 bin/logstash-plugin list
,檢視已經支援的外掛是否含有 logstash-output-kafka
。
在.bin/
目錄下編寫組態檔 output.conf
。
此處將標準輸入作為資料來源,將 Kafka 作為資料目的地。
input {
input {
stdin{}
}
}
output {
kafka {
bootstrap_servers => "xx.xx.xx.xx:xxxx" // ckafka 範例接入地址
topic_id => "logstash_test" // ckafka topic 名稱
}
}
執行如下命令啟動 Logstash,向建立的 Topic 傳送訊息。
./logstash -f output.conf
啟動 Kafka 消費者,檢驗上一步的生產資料。
./kafka-console-consumer.sh --bootstrap-server 172.0.0.1:9092 --topic logstash_test --from-begging --new-consumer
Beats 平臺 集合了多種單一用途資料採集器。這些採集器安裝後可用作輕量型代理,從成百上千或成千上萬臺機器向目標傳送採集資料。
Beats 有多種採集器,您可以根據自身的需求下載對應的採集器。本文以 Filebeat(輕量型紀錄檔採集器)為例,向您介紹 Filebeat 接入 Kafka 的操作指方法,及接入後常見問題的解決方法。
建立一個名為 test
的 Topic。
進入 Filebeat 的安裝目錄,建立設定監控檔案 filebeat.yml。
##======= Filebeat prospectors ==========
filebeat.prospectors:
- input_type: log
## 此處為監聽檔案路徑
paths:
- /var/log/messages
##======= Outputs =========
##------------------ kafka -------------------------------------
output.kafka:
version:0.10.2 // 根據不同 Kafka 叢集版本設定
# 設定為Kafka範例的接入地址
hosts: ["xx.xx.xx.xx:xxxx"]
# 設定目標topic的名稱
topic: 'test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: none
max_message_bytes: 1000000
# SASL 需要設定下列資訊,如果不需要則下面兩個選項可不設定
username: "yourinstance#yourusername" //username 需要拼接範例ID和使用者名稱
password: "yourpassword"
執行如下命令啟動使用者端。
sudo ./filebeat -e -c filebeat.yml
為監控檔案增加資料(範例為寫入監聽的 testlog 檔案)。
echo ckafka1 >> testlog
echo ckafka2 >> testlog
echo ckafka3 >> testlog
開啟 Consumer 消費對應的 Topic,獲得以下資料。
{"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
如果您需要進行 SALS/PLAINTEXT 設定,則需要設定使用者名稱與密碼。 在 Kafka 設定區域新增加 username 和 password 設定即可。
訊息佇列 CKafka - 檔案中心 - 騰訊雲 (tencent.com)