要使用logstash收集到Elasticsearch的方式,需確保logstash版本與es版本一致。
由於我也是剛剛研究使用,所以本文暫不會出現原理性的東西。
Logstash是具有實時流水線能力的開源的資料收集引擎。Logstash可以動態統一不同來源的資料,並將資料標準化到您選擇的目標輸出。它提供了大量外掛,可幫助我們解析,豐富,轉換和緩衝任何型別的資料。
會生成事件。包括:file、kafka、beats等
可以將過濾器和條件語句結合使用對事件進行處理。包括:grok、mutate等
將事件資料傳送到特定的目的地,完成了所以輸出處理,改事件就完成了執行。如:elasticsearch、file等
下載地址 :https://www.elastic.co/fr/downloads/logstash
下載之後隨便解壓到某個目錄,會得到以下這些目錄和檔案,我們需要注意的就三個目錄,bin、config、logs,下面一個一個說。
先來看config資料夾,進入後會有這幾個檔案:
檢視logstash-sample.conf組態檔
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
#輸入外掛beats,輕量化
beats {
#監聽埠
port => 5044
}
}
output {
#es連線地址及索引設定
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
下來稍微修改一下,我們啟動試試。
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
#新增file外掛
file {
#測試環境中我們一般是nohup後臺啟動jar包,預設紀錄檔追加到nohup檔案中,這裡我們用外掛讀取這個紀錄檔傳送到es上試試
path => "/home/mm/mmm/nohup.out"
mode => "read"
}
}
output {
elasticsearch {
#設定自己的es連線,這裡是使用es預設模板
hosts => ["http://localhost:9200"]
index => "ceshi"
#user => "elastic"
#password => "changeme"
}
}
退回到bin目錄下,啟動
./logstash -f /自己路徑下的組態檔/logstash/config/logstash-sample.conf
這樣子就是啟動成功了。
下面就是我們收集到的紀錄檔,大家可以看看預設都有什麼欄位。
{
"_index" : "console-analysis",
"_type" : "_doc",
"_id" : "_DDNH4kBVvgVIOGHRiop",
"_score" : 1.0,
"_source" : {
"port" : 57910,
"thread_name" : "main",
"host" : "172.17.0.5",
"logger_name" : "com.alibaba.nacos.client.naming",
"@version" : "1",
"level_value" : 20000,
"message" : "[BEAT] adding beat: BeatInfo{port=17007, ip='192.168.1.59', weight=1.0, serviceName='DEFAULT_GROUP@@amcp-analysis', cluster='DEFAULT', metadata={preserved.register.source=SPRING_CLOUD}, scheduled=false, period=5000, stopped=false} to beat map.",
"level" : "INFO",
"logHost" : "192.168.1.59:5044",
"appname" : "analysis"
}
<!-- https://mvnrepository.com/artifact/net.logstash.logback/logstash-logback-encoder -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.1.1</version>
</dependency>
在resources目錄下建立一個logback-spring.xml的xml檔案。
如果是使用nacos來獲取設定的話,檔案名字不能是logback-spring.xml,因為會導致logback-spring.xml檔案被載入兩次,這樣在logback-spring.xml檔案中如果想讀取nacos上的設定的話是拿不到的。
在yml或者properties檔案中新增設定
logging:
config: classpath:logback-nacos.xml
logstash:
host: localhost:5044
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!-- 獲取設定中的值 -->
<springProperty scope="context" name="logHost" source="logstash.host"/>
<!-- 新增logstash連線設定 -->
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${logHost}</destination> <!-- Logstash的主機和埠,可以設定多個 -->
<!-- 其他屬性 -->
<connectionTimeout>5000</connectionTimeout>
<reconnectionDelay>5000</reconnectionDelay>
<encoder charset="UTF-8"
class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 自定義欄位,這裡我是用來區分收集的是哪個程式的紀錄檔,後面logstash設定我們可以看下它的作用 -->
<customFields>{"appname":"analysis"}</customFields>
</encoder>
</appender>
<!-- 控制檯輸出設定,不新增的話紀錄檔不會在控制檯輸出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger{36} : %msg%n</pattern>
</encoder>
</appender>
<!-- level設定收集紀錄檔級別,可以設定多個appender,這個組態檔中也可以直接新增filter過濾標籤,這個大家自己嘗試一下 -->
<root level="info">
<appender-ref ref="console" /> <!-- 新增控制檯appender -->
<appender-ref ref="logstash" />
</root>
</configuration>
input {
#設定監聽埠,也就是destination
tcp {
mode => "server"
host => "localhost"
port => 5044
codec => json_lines
}
}
filter {
#判斷appname,在設定中宣告一個變數,不同的appname賦予不同的值,這裡其實就是根據我自定義的欄位來給不同的es索引名稱
if [appname] == "analysis"{
mutate {
add_field => {
"[@metadata][index]" => "console-analysis"
}
}
}
#判斷紀錄檔級別,收集到的紀錄檔預設欄位level會記錄紀錄檔級別,這個時候我們可以根據需要對紀錄檔進行操作。下面這個操作是將紀錄檔級別,紀錄檔記錄到自定義欄位,以及將記錄的時間進行轉換記錄到time欄位,預設記錄的時間是帶時區的
if [level] =~ /DEBUG/ {
mutate {
add_field => {
"type" => "DEBUG"
"details" => "%{message}"
}
}
ruby {
code => "
event.set('time', event.timestamp.time.localtime.strftime('%Y-%m-%d %H:%M:%S'))
"
}
mutate {
remove_field => ["[@timestamp]"]
}
}
}
output {
#這裡判斷message欄位中如果不包含HiddenHorzOCR就記錄,只是演示一下這裡面也可以進行邏輯判斷
if !([message] =~ /HiddenHorzOCR/) {
if [@metadata][index] {
elasticsearch {
hosts => ["http://192.168.1.59:9200"]
#這個索引就是我們在filter中判斷appname時賦的值
index => "%{[@metadata][index]}"
#指定es要使用的模板,也可以使用預設的
template => "/home/collect.json"
#模板名稱
template_name => "collect"
#載入模板是否覆蓋之前的模板
template_overwrite => true
}
}
}
}
{
"index_patterns": ["console*"],
"settings": {
"number_of_shards": 5,
"max_result_window": "500000000"
},
"mappings": {
//自定義幾個欄位
"properties": {
"type": { "type": "keyword" },
"details": { "type": "text" },
"time": {
"type": "keyword"
}
}
}
}
所有設定新增完成之後,啟動logstash和自己的應用程式,這個時候就可以上es或者kibana上檢視建立出的索引以及收集到的紀錄檔。
這是指定模板後收集到的紀錄檔。
{
"_index" : "console-analysis",
"_type" : "_doc",
"_id" : "_jDNH4kBVvgVIOGHRiop",
"_score" : 1.0,
"_source" : {
"port" : 57910,
"details" : "Scanning for api listing references",
"type" : "INFO",
"time" : "2023-07-04 15:28:13",
"thread_name" : "main",
"host" : "172.17.0.5",
"logger_name" : "springfox.documentation.spring.web.scanners.ApiListingReferenceScanner",
"@version" : "1",
"level_value" : 20000,
"message" : "Scanning for api listing references",
"level" : "INFO",
"logHost" : "192.168.1.59:5044",
"appname" : "analysis"
}
}