前面已經給大家講過flume和Kafka的簡介以及安裝,今天就給大家講講二者如何關聯使用。
本文主要就是講解如何使用flume採集日誌資訊後把數據寫入kafka中,由於時間關係,這裏就暫時用僞數據,把存放僞數據的檔案放到專門用於flume監聽檔案的目錄中就是前面提到過的/opt/soft/datas下。
先新建組態檔用於關聯kafka
還是在/opt/flumeconf下建立properties檔案,並新增以下設定
cd /opt/flumeconf
vi conf_0812_kafka.properties
a5.channels=c5
a5.sources=s5
a5.sinks=k5
a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/soft/datas
a5.sources.s5.interceptors=head_filter
#正則攔截器
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true
#用來關聯kafka
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=192.168.5.150:9092
#topic的主題名要一致
a5.sinks.k5.kafka.topic=msgEvent
a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000
a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
zkServer.sh start
cd /opt/soft/kafka211/bin
./kafka-server-start.sh /opt/soft/kafka211/config/server.properties
使用jps驗證服務是否啓動!
kafka-topics.sh --create --zookeeper 192.168.5.150:2181 --replication-factor 1 --partitions 1 --topic msgEvent
注意:
建立的topic 和 組態檔裡的sinks元件中的topic必須要一致
kafka-console-consumer.sh --bootstrap-server 192.168.5.150:9092 --topic msgEvent --from-beginning
kafka-console-producer.sh --broker-list 192.168.5.150:9092 --topic msgEvent
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0812_kafka.properties -Dflume.root.logger=INFO,console
這樣就開啓了日誌採集 可能要等一段時間, 日誌採集完畢之後 flume會提示檔案的後綴名會有COMPLETED,說明檔案按已採集完畢!如下圖所示:
檔案會寫入到kafka中 具體路徑是kafka組態檔中server.properties裏面Log Basics的設定 如下圖所示:
檢視檔案資訊,是否已經寫入到kafka的日誌目錄中。
如下圖所示:
megEvent已出現子datas日誌目錄中,接下來檢視檔案中是否有日誌資訊,如下圖所示:
這樣數據就寫入到了kafka中!