ELK技術-Logstash

2022-08-30 15:03:27

 1.背景

1.1 簡介

Logstash 是一個功能強大的工具,可與各種部署整合。 它提供了大量外掛,可幫助業務做解析,豐富,轉換和緩衝來自各種來源的資料。

Logstash 是一個資料流引擎

  • 它是用於資料物流的開源流式 ETL(Extract-Transform-Load)引擎
  • 在幾分鐘內建立資料流管道
  • 具有水平可延伸及韌性且具有自適應緩衝
  • 不可知的資料來源
  • 具有 200 多個整合和處理器的外掛生態系統
  • 使用 Elastic Stack 監視和管理部署

Logstash 幾乎可以攝入各種類別的資料

它可以攝入紀錄檔,檔案,指標或者網路真實資料。經過 Logstash 的處理,變為可以使用的 Web Apps 可以消耗的資料,也可以儲存於資料中心,或變為其它的流式資料。

Logstash 相關概念

  • Logstash 範例是一個正在執行的 Logstash 程序。建議在 Elasticsearch 的單獨主機上執行 Logstash,以確保兩個元件有足夠的計算資源可用。
  • 管道(pipeline)是設定為處理給定工作負載的外掛集合。一個 Logstash 範例可以執行多個管道。(彼此獨立)
  • 輸入外掛(input plugins)用於從給定的源系統中提取或接收資料。 Logstash 參考指南中提供了支援的輸入外掛列表:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
  • 過濾器外掛(filter plugin)用於對傳入事件應用轉換和豐富。 Logstash 參考指南中提供了支援的過濾器外掛列表:Filter plugins | Logstash Reference [8.3] | Elastic
  • 輸出外掛(output plugin)用於將資料載入或傳送到給定的目標系統。 Logstash 參考指南中提供了支援的輸出外掛列表:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
Logstash 包含3個主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)。 你必須定義這些過程的設定才能使用 Logstash,儘管不是每一個都必須的。在有些情況下,可以甚至沒有過濾器。在過濾器的部分,它可以對資料來源的資料進行分析,豐富,處理等。

1.2 學習參考

1.3 本例測試版本

[root@dev1613 study]# sudo -u logstash ../bin/logstash  --version
Using bundled JDK: /opt/logstash/jdk
logstash 7.12.1

2.功能應用

2.1 基礎測試

輸入測試命令,../bin為當前執行命令所在資料夾,與logstash安裝後bin的相對目錄位置。
sudo -u logstash ../bin/logstash -e 'input { stdin { } } output { stdout {} }'
執行命令後,輸出結果如圖:

2.2 Logstash解析紀錄檔檔案

最原始的 Log 資料,經過 Logstash 的處理,可以把非結構化的資料變成結構化的資料。甚至可以使用 Logstash 強大的 Filter 來對資料繼續加工。最終將加工後的資料儲存下來,用於分析和搜尋。

紀錄檔原始內容

2022-07-06 18:48:37.453 ERROR 14677 --- [ dispatcher 108] c.a.c.s.dashboard.metric.MetricFetcher   : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104506000&endTime=1657104512000&refetch=false>: socket timeout
2022-07-06 18:48:44.439 ERROR 14677 --- [ dispatcher 109] c.a.c.s.dashboard.metric.MetricFetcher   : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104513000&endTime=1657104519000&refetch=false>: socket timeout
2022-07-06 18:48:51.514 ERROR 14677 --- [ dispatcher 110] c.a.c.s.dashboard.metric.MetricFetcher   : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104520000&endTime=1657104526000&refetch=false>: socket timeout

Logstash組態檔

編寫紀錄檔解析組態檔,並解析時間,錯誤級別,錯誤行,錯誤資訊。提取出來變為結構化資料。編寫組態檔如下:
設定相關節點參考官方檔案:《plugins-inputs-file》
input {
    file {
        path => "/opt/logstash/study/outlog.log"
        start_position => "beginning"
        stat_interval => "3"
        type => "sentinel-log"        
    }
}

filter {
      grok {
        match => ["message","%{TIMESTAMP_ISO8601:datetime} %{LOGLEVEL:loglevel} %{NUMBER:textid} %{GREEDYDATA:errormsg}"]
    }
    json {
        source => "request"
    }
}
output {
    stdout { codec => rubydebug }
}

Grok紀錄檔解析線上測試

基於elastic線上網頁,可編寫解析紀錄檔測試demo。

紀錄檔解析結構化輸出

執行命令:sudo -u logstash ../bin/logstash -f study-file-es.conf
執行logstash載入組態檔命令,啟動測試輸出結構化內容如下:

2.3 Logstash-資料庫同步

本例將MySql資料表中的資料,基於修改時間同步到es資料儲存中心。

基礎資料內容

資料來源-mysql資料表建表語句:
CREATE TABLE `study_logstash_es` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
  `study_code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '編碼',
  `study_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名稱',
  `study_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '標籤',
  `study_level` smallint NOT NULL DEFAULT '0' COMMENT '等級,如1,2,3',
  `is_delete` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0 未刪除  1 刪除',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
  `operate_user` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作人',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_study_code` (`study_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='學習-logstash同步msql資料到es';
目標源-es索引建立指令碼:
PUT /study_logstash_es
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1 
    }
  },

  "mappings": {
      "properties": {
        "id": {
          "type": "integer"  
        },
         "study_code": {
          "type": "text"  
        },
        "study_name": {
          "type": "text"  
        },
         "operate_user": {
          "type": "text"  
        },
         "study_tag": {
          "type": "keyword"  
        },
        "is_delete": {
          "type": "integer"  
        },
         "study_level": {
          "type": "integer"  
        },
         "mark_time": {
          "type": "date",
          "format": "epoch_millis"
        },
        "update_time": {
          "type": "date"
        }
      }
    }
}

Logstash組態檔

本例測試的資料庫地址,es地址,已經基於xxx脫敏。更多jdbc的設定,請參考官方檔案:《plugins-inputs-jdbc》
jdbc_driver_library:為mysql連線包,可在Maven上下載,下載地址參考:《mysql-connector-java.jar 包下載》
input {
  jdbc {
    jdbc_driver_library => "/opt/logstash/study/mysql-connector-java-8.0.30.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://xxx.xxx.xx.x:3306/study_database?serverTimezone=Asia/Shanghai&allowMultiQueries=true&characterEncoding=utf-8"
    jdbc_user => "root"
    jdbc_password => "xxxxx"
    jdbc_paging_enabled => true
    jdbc_page_size => "2"
    use_column_value => true
    tracking_column => "mark_time"
    tracking_column_type => "numeric"
    schedule => "* * * * *"
    statement => "SELECT id,study_code,study_name,study_tag,study_level,operate_user,update_time,UNIX_TIMESTAMP(update_time) as mark_time from study_logstash_es where UNIX_TIMESTAMP(update_time)>:sql_last_value AND update_time < NOW()"
  }
}
output{
     elasticsearch{
         hosts => ["xxx.xxx.16.4:9200","xxx.xxx.16.xx:9200","192.xxx.xx.xx:9200"]
         index => "study_logstash_es"
         timeout => 300
         user => "xxx"
         password => "xxxxx"
     }
}

資料同步es

執行命令:sudo -u logstash ../bin/logstash -f study-mysql-es.conf
執行logstash載入組態檔命令,啟動執行紀錄檔,es同步的資料如下:
es資料查詢如下:

2.4 Logstash-kafka訊息同步

Logstash的輸入項可以監聽kafka訊息,消費訊息記錄。
input {
    kafka {
        bootstrap_servers => "xxx.xxx.xx.4:9092,xxx.xxx.16.4:9093,xxx.xxx.16.4:9094" #kafka伺服器地址
        topics => "xxxlog"
        # batch_size => 5
        codec => "json"
        group_id => "logstash"
        consumer_threads => 3
    }
}

filter {
    # 丟棄所有的header請求
   if [request][method] == "HEAD" {
           drop { }
    }
    # 因為[request][querystring]這個玩意中的欄位型別可能不一樣,所以全部幹成字串
   ruby {
    code => "event.set('[request][querystring]', event.get('[request][querystring]').to_s) if event.get('[request][querystring]')"
   } 
   if [request][uri] =~ "^/ucenter-admin-view/v3(.*)" {
        mutate {
            add_field => { "log_source" => "使用者中心管理後臺" }
            add_field => { "log_source_id" => "1" }
        }
    }
    else if [request][uri] =~ "^/ucenter-org-view/v3/(.*)" {
        mutate {
            add_field => { "log_source" => "使用者中心工作臺" }
            add_field => { "log_source_id" => "2" }
        }
    }
    else if [request][uri] =~ "^/safety-admin-api(.*)" {
        mutate {
                add_field => { "log_source" => "安全管理平臺" }
                add_field => { "log_source_id" => "3" }
            }
    }
    else{
        mutate {
            add_field => { "log_source" => "其他" }
            add_field => { "log_source_id" => "0" }
        }
    }
 
    grok {
        match => { "[request][uri]" => "%{URIPATH:[request][path]}" }
        named_captures_only => false
    }
 
}
output{
#    stdout {
 #   codec => json
  # }
     elasticsearch{
         hosts => ["xxx.xxx.xx.4:9200","xxx.xxx.16.13:9200","xxx.xxx.16.14:9200"]
         index => "apisixlog"
         timeout => 300
         user => "elastic"
         password => "HApn2xCJMuRlg0UOIV0P"
     }

3.總結

  1. Logstash基於 輸入(inputs),過濾器(filters)和輸出(outputs)可以方便快捷的處理資料,將一些非結構化資料,處理為結構化資料。Logstash支援資料中轉,資料同步等場景的應用。本例只是簡要測試,在實際業務使用時,可基於某一個輸入外掛/輸出外掛參考官方檔案,結合專案使用。
  1. 在做一些資料同步工作時,出於效能等各方面考慮,如同步資料表到es中,除了Logstash這種方案,也可以參考其他的方案,如alibaba/DataX
  1. Logstash收集大量紀錄檔時,存在耗記憶體的情況,建議參考官方推薦的FileBeat模式。詳情參考檔案:《開源紀錄檔管理方案 ELK 和 EFK 的區別》,《通過Filebeat把紀錄檔傳入到Elasticsearch》
  1. Logstash在組態檔調整後,啟動命令,可能出現如下報錯:
刪除掉Logstash/data檔案下的快取檔案,即可重新啟動成功。
  1. Logstash啟動命名常用如下:
sudo -u logstash ../bin/logstash -f study-file-es.conf  
表示當前視窗啟動,關閉或退出命令列時,logstash範例關閉。

sudo -u logstash ../bin/logstash -f study-file-es.conf --config.reload.automatic
表示當前視窗啟動,組態檔變化時,不用重新啟動範例,可自動載入。關閉或退出命令列時,logstash範例關閉。

sudo -u logstash ../bin/logstash -f study-mysql-es.conf & test.out --config.reload.automatic
表示後臺啟動,關閉退出命令,範例在後臺一直執行。

ps -ef|grep logstash 
 kill-9 程序號, 關閉對應的範例
  1. Logstash執行紀錄檔檢視
檢視cat logstash-plain.log 檔案,可檢視Logstash執行紀錄檔記錄。