企業級logstash簡單使用(ELK)

2023-07-05 15:00:45

企業級logstash簡單使用(ELK)

要使用logstash收集到Elasticsearch的方式,需確保logstash版本與es版本一致。

由於我也是剛剛研究使用,所以本文暫不會出現原理性的東西。

Logstash

介紹

Logstash是具有實時流水線能力的開源的資料收集引擎。Logstash可以動態統一不同來源的資料,並將資料標準化到您選擇的目標輸出。它提供了大量外掛,可幫助我們解析,豐富,轉換和緩衝任何型別的資料。

inputs(輸入階段)

會生成事件。包括:file、kafka、beats等

filters(過濾器階段)

可以將過濾器和條件語句結合使用對事件進行處理。包括:grok、mutate等

outputs(輸出階段)

將事件資料傳送到特定的目的地,完成了所以輸出處理,改事件就完成了執行。如:elasticsearch、file等

使用方式

下載地址 :https://www.elastic.co/fr/downloads/logstash

下載之後隨便解壓到某個目錄,會得到以下這些目錄和檔案,我們需要注意的就三個目錄,bin、config、logs,下面一個一個說。

先來看config資料夾,進入後會有這幾個檔案:

檢視logstash-sample.conf組態檔

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  #輸入外掛beats,輕量化
  beats { 
    #監聽埠
    port => 5044
  }
}

output {
  #es連線地址及索引設定
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}

下來稍微修改一下,我們啟動試試。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
  #新增file外掛
  file {
    #測試環境中我們一般是nohup後臺啟動jar包,預設紀錄檔追加到nohup檔案中,這裡我們用外掛讀取這個紀錄檔傳送到es上試試
    path => "/home/mm/mmm/nohup.out"
    mode => "read"
  }
}

output {
  elasticsearch {
    #設定自己的es連線,這裡是使用es預設模板
    hosts => ["http://localhost:9200"]
    index => "ceshi"
    #user => "elastic"
    #password => "changeme"
  }
}

退回到bin目錄下,啟動

./logstash -f /自己路徑下的組態檔/logstash/config/logstash-sample.conf

這樣子就是啟動成功了。

下面就是我們收集到的紀錄檔,大家可以看看預設都有什麼欄位。

     {        
        "_index" : "console-analysis",
        "_type" : "_doc",
        "_id" : "_DDNH4kBVvgVIOGHRiop",
        "_score" : 1.0,
        "_source" : {
          "port" : 57910,
          "thread_name" : "main",
          "host" : "172.17.0.5",
          "logger_name" : "com.alibaba.nacos.client.naming",
          "@version" : "1",
          "level_value" : 20000,
          "message" : "[BEAT] adding beat: BeatInfo{port=17007, ip='192.168.1.59', weight=1.0, serviceName='DEFAULT_GROUP@@amcp-analysis', cluster='DEFAULT', metadata={preserved.register.source=SPRING_CLOUD}, scheduled=false, period=5000, stopped=false} to beat map.",
          "level" : "INFO",
          "logHost" : "192.168.1.59:5044",
          "appname" : "analysis"
        }

Springboot整合logstash+elasticsearch

加入依賴

        <!-- https://mvnrepository.com/artifact/net.logstash.logback/logstash-logback-encoder -->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>7.1.1</version>
        </dependency>

設定

在resources目錄下建立一個logback-spring.xml的xml檔案。

如果是使用nacos來獲取設定的話,檔案名字不能是logback-spring.xml,因為會導致logback-spring.xml檔案被載入兩次,這樣在logback-spring.xml檔案中如果想讀取nacos上的設定的話是拿不到的。

在yml或者properties檔案中新增設定

logging:
 config: classpath:logback-nacos.xml

logstash:
 host: localhost:5044
logback-spring.xml組態檔
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
	<!-- 獲取設定中的值 -->
    <springProperty scope="context" name="logHost" source="logstash.host"/>

    <!-- 新增logstash連線設定 -->
    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${logHost}</destination> <!-- Logstash的主機和埠,可以設定多個 -->
        <!-- 其他屬性 -->
        <connectionTimeout>5000</connectionTimeout>
        <reconnectionDelay>5000</reconnectionDelay>
        <encoder charset="UTF-8"
                 class="net.logstash.logback.encoder.LogstashEncoder">
            <!-- 自定義欄位,這裡我是用來區分收集的是哪個程式的紀錄檔,後面logstash設定我們可以看下它的作用 -->
            <customFields>{"appname":"analysis"}</customFields>
        </encoder>

    </appender>

    <!-- 控制檯輸出設定,不新增的話紀錄檔不會在控制檯輸出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}  %-5level --- [%thread] %logger{36} : %msg%n</pattern>
        </encoder>
    </appender>
	
    <!-- level設定收集紀錄檔級別,可以設定多個appender,這個組態檔中也可以直接新增filter過濾標籤,這個大家自己嘗試一下 -->
    <root level="info">
        <appender-ref ref="console" /> <!-- 新增控制檯appender -->
        <appender-ref ref="logstash" />
    </root>

</configuration>

logstash設定

logstash-sample.conf組態檔
input {
  #設定監聽埠,也就是destination
  tcp {
    mode => "server"
    host => "localhost"
    port => 5044
    codec => json_lines
  }
}

filter {
  #判斷appname,在設定中宣告一個變數,不同的appname賦予不同的值,這裡其實就是根據我自定義的欄位來給不同的es索引名稱
  if [appname] == "analysis"{
    mutate {
      add_field => {
        "[@metadata][index]" => "console-analysis"
      }
    }
  }

  #判斷紀錄檔級別,收集到的紀錄檔預設欄位level會記錄紀錄檔級別,這個時候我們可以根據需要對紀錄檔進行操作。下面這個操作是將紀錄檔級別,紀錄檔記錄到自定義欄位,以及將記錄的時間進行轉換記錄到time欄位,預設記錄的時間是帶時區的
  if [level] =~ /DEBUG/ {
    mutate {
      add_field => {
        "type" => "DEBUG"
        "details" => "%{message}"
      }
    }
    ruby {
      code => "
        event.set('time', event.timestamp.time.localtime.strftime('%Y-%m-%d %H:%M:%S'))
      "
    }
    mutate {
     remove_field => ["[@timestamp]"]
    }
  }
}

output {
 #這裡判斷message欄位中如果不包含HiddenHorzOCR就記錄,只是演示一下這裡面也可以進行邏輯判斷
 if !([message] =~ /HiddenHorzOCR/) {
   if [@metadata][index] {
     elasticsearch {
       hosts => ["http://192.168.1.59:9200"]
       #這個索引就是我們在filter中判斷appname時賦的值
       index => "%{[@metadata][index]}"
       #指定es要使用的模板,也可以使用預設的
       template => "/home/collect.json"
       #模板名稱
       template_name => "collect"
       #載入模板是否覆蓋之前的模板
       template_overwrite => true
     }
   }
 }
}
es模板
{
  "index_patterns": ["console*"],
  "settings": {
    "number_of_shards": 5,
    "max_result_window": "500000000"
  },
  "mappings": {
    //自定義幾個欄位
    "properties": {
      "type": { "type": "keyword" },
      "details": { "type": "text" },
      "time": { 
                "type": "keyword" 
      }
    }
  }
}

所有設定新增完成之後,啟動logstash和自己的應用程式,這個時候就可以上es或者kibana上檢視建立出的索引以及收集到的紀錄檔。

這是指定模板後收集到的紀錄檔。

      {
        "_index" : "console-analysis",
        "_type" : "_doc",
        "_id" : "_jDNH4kBVvgVIOGHRiop",
        "_score" : 1.0,
        "_source" : {
          "port" : 57910,
          "details" : "Scanning for api listing references",
          "type" : "INFO",
          "time" : "2023-07-04 15:28:13",
          "thread_name" : "main",
          "host" : "172.17.0.5",
          "logger_name" : "springfox.documentation.spring.web.scanners.ApiListingReferenceScanner",
          "@version" : "1",
          "level_value" : 20000,
          "message" : "Scanning for api listing references",
          "level" : "INFO",
          "logHost" : "192.168.1.59:5044",
          "appname" : "analysis"
        }
      }