在本章中,我們將學習如何將Kafka與Apache Storm整合。
Storm最初是由Nathan Marz和BackType團隊建立的。 在很短的時間內,Apache Storm成為分散式實時處理系統的標準,用於處理巨量資料。 Storm速度非常快,每個節點每秒處理超過一百萬個元組的基準時鐘。 Apache Storm持續執行,從組態的源(Spouts)中消耗資料並將資料傳遞到處理管道(Bolts)。 組合 Spouts 和 Bolts 構成一個拓撲。
Kafka和Storm自然而然地相互補充,它們強大的合作能夠實現快速移動巨量資料的實時流式分析。 Kafka和Storm的整合使得開發者更容易從Storm拓撲中獲取和發布資料流。
概念流程
噴口(spout)是流的來源。 例如,spout可能會讀取卡夫卡主題中的元組並將其作為流傳送。 Bolts消耗輸入流,處理並可能發射新的流。 Bolts可以做任何事情,從執行功能,過濾元組,流聚合,流式連線,與資料庫互動等等。 Storm拓撲中的每個節點並行執行。 一個拓撲無限期地執行,直到終止它。 Storm會自動重新分配任何失敗的任務。 此外,即使機器停機並且資訊丟失,Storm也可以保證不會丟失資料。
下面來看看Kafka-Storm整合API。 有三個主要類將Kafka和Storm結合在一起。 他們如下 -
BrokerHosts - ZkHosts&StaticHosts
BrokerHosts
是一個介面,ZkHosts
和StaticHosts
是它的兩個主要實現。 ZkHosts用於通過在ZooKeeper中維護詳細資訊來動態跟蹤Kafka經紀人,而StaticHosts
用於手動/靜態設定Kafka經紀人及其詳細資訊。 ZkHosts是存取Kafka經紀人的簡單而快捷的方式。
ZkHosts的簽名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中brokerZkStr
是ZooKeeper主機,brokerZkPath
是維護Kafka代理細節的ZooKeeper路徑。
public KafkaConfig(BrokerHosts hosts, string topic)
引數
SpoutConfig API
Spoutconfig是KafkaConfig的擴充套件,支援額外的ZooKeeper資訊。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
引數
hosts
- BrokerHosts可以是BrokerHosts介面的任何實現topic
- 主題名稱。zkRoot
- ZooKeeper根路徑。id
- spout
儲存在Zookeeper中消耗的偏移量的狀態。該ID應該唯一標識的spout。SchemeAsMultiScheme
SchemeAsMultiScheme是一個介面,它規定了從Kafka消耗的ByteBuffer如何轉換為 storm 元組。它來自MultiScheme並接受Scheme類的實現。Scheme類有很多實現,一個這樣的實現是StringScheme,它將位元組解析為一個簡單的字串。 它還控制輸出欄位的命名。 簽名定義如下。
public SchemeAsMultiScheme(Scheme scheme)
引數
scheme
- 從kafka消耗的位元組緩衝區。KafkaSpout API
KafkaSpout
是spout實現,它將與Storm整合。 它從kafka主題獲取訊息並將其作為元組傳送到Storm生態系統中。 KafkaSpout從SpoutConfig獲取組態細節。
以下是建立一個簡單的kafka spout的範例程式碼。
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt是一個將元組作為輸入,處理元組並生成新的元組作為輸出的元件。 Bolts將實現IRichBolt
介面。 在這個程式中,使用兩個類 - WordSplitter-Bolt
和WordCounterBolt
來執行操作。
IRichBolt
介面有以下方法 -
prepare
- 為 bolt 提供執行的環境。 執行者將執行此方法來初始化spout
。prepare
- 處理輸入的單個元組。prepare
- 當bolt即將關閉時呼叫。declareOutputFields
- 宣告元組的輸出模式。下面建立一個Java檔案:SplitBolt.java
,它實現了將句子分成單詞;CountBolt.java
它實現了邏輯來分離唯一的單詞並計算它的出現次數。
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
檔案:CountBolt.java -
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Storm拓撲基本上是一個Thrift結構。 TopologyBuilder類提供了簡單而簡單的方法來建立複雜的拓撲。 TopologyBuilder類具有設定spout (setSpout)和設定bolt(setBolt)的方法。 最後,TopologyBuilder使用createTopology()
來建立拓樸學。 shuffleGrouping
和fieldsGrouping
方法有助於設定spout
和bolt
的流分組。
本地群集 - 出於開發目的,我們可以使用LocalCluster物件建立本地群集,然後使用LocalCluster類的submitTopology方法提交拓撲。
檔案:KafkaStormSample.java -
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
在移動編譯之前,Kakfa-Storm整合需要館長ZooKeeper用戶端java庫。 ZooKeeper 版本2.9.1支援Apache Storm 0.9.5版本(在本教學中使用)。 下載下面指定的jar檔案並將其放在java類路徑中。
curator-client-2.9.1.jar
curator-framework-2.9.1.jar
在包含依賴檔案後,使用以下命令編譯程式,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
執行
啟動Kafka Producer CLI(在上一章中介紹),建立一個名為my-first-topic
的新主題,並提供一些範例訊息,如下所示 -
hello
kafka
storm
spark
test message
another test message
現在使用以下命令執行應用程式 -
java -cp 「/path/to/Kafka/apache-storm-0.9.5/lib/*」:. KafkaStormSample
此應用程式的輸出範例如下所示 -
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2