如何編寫 Pipeline 指令碼

2022-10-14 18:01:42

前言

Pipeline 編寫較為麻煩,為此,DataKit 中內建了簡單的偵錯工具,用以輔助大家來編寫 Pipeline 指令碼。

偵錯 grok 和 pipeline

指定 pipeline 指令碼名稱,輸入一段文字即可判斷提取是否成功

Pipeline 指令碼必須放在 /pipeline 目錄下。

$ datakit pipeline your_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs): # 表示切割成功
{    
"code"   : "io/io.go: 458",       # 對應程式碼位置    
"level"  : "DEBUG",               # 對應紀錄檔等級    
"module" : "io",                  # 對應程式碼模組    
"msg"    : "post cost 6.87021ms", # 純紀錄檔內容    
"time"   : 1610358231887000000    # 紀錄檔時間(Unix 納秒時間戳)    "message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
}

提取失敗範例(只有 message 留下了,說明其它欄位並未提取出來):

$ datakit pipeline other_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms'
{    
"message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
} 

如果偵錯文字比較複雜,可以將它們寫入一個檔案(sample.log),用如下方式偵錯:

$ datakit pipeline your_pipeline.p -F sample.log

更多 Pipeline 偵錯命令,參見 datakit help pipeline。

Grok 通配搜尋

由於 Grok pattern 數量繁多,人工匹配較為麻煩。DataKit 提供了互動式的命令列工具 grokq(grok query):

datakit tool --grokq
grokq > Mon Jan 25 19:41:17 CST 2021   # 此處輸入你希望匹配的文字        
2 %{DATESTAMP_OTHER: ?}        # 工具會給出對應對的建議,越靠前匹配月精確(權重也越大)。前面的數位表明權重。        
0 %{GREEDYDATA: ?}

grokq > 2021-01-25T18:37:22.016+0800        
4 %{TIMESTAMP_ISO8601: ?}      # 此處的 ? 表示你需要用一個欄位來命名匹配到的文字        
0 %{NOTSPACE: ?}       
0 %{PROG: ?}        
0 %{SYSLOGPROG: ?}        
0 %{GREEDYDATA: ?}             # 像 GREEDYDATA 這種範圍很廣的 pattern,權重都較低                                       # 權重越高,匹配的精確度越大
grokq > Q                              # Q 或 exit 退出
Bye!

Windows 下,請在 Powershell 中執行偵錯。

多行如何處理

在處理一些呼叫棧相關的紀錄檔時,由於其紀錄檔行數不固定,直接用 GREEDYDATA 這個 pattern 無法處理如下情況的紀錄檔:

 

 1 2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task
 2 
 3     java.lang.NullPointerException: null    
 4 
 5 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)    
 6 
 7 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)    
 8 
 9 at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)    
10 
11 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)    
12 
13 at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)    
14 
15 at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)    
16 
17 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)    
18 
19 at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)    
20 
21 at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)
22  
此處可以使用 GREEDYLINES 規則來通配,如(/usr/local/datakit/pipeline/test.p):
add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')
grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'

# 此處移除 message 欄位便於偵錯
drop_origin_data()

將上述多行紀錄檔存為 multi-line.log,偵錯一下:

$ datakit --pl test.p --txt "$(<multi-line.log)" 

得到如下切割結果:

{  
"Level": "ERROR",  "Level_value": "1629881",  
"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task",  
"log_time": "2022-02-10 16:27:36.116",  
"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)",  

"thread_name": "scheduling-1"
}

Pipeline 欄位命名注意事項

在所有 Pipeline 切割出來的欄位中,它們都是指標(field)而不是標籤(tag)。由於行協定約束,我們不應該切割出任何跟 tag 同名的欄位。這些 Tag 包含如下幾類:

  • DataKit 中的全域性 Tag

  • 紀錄檔採集器中自定義的 Tag

另外,所有采集上來的紀錄檔,均存在如下多個保留欄位。我們不應該去覆蓋這些欄位,否則可能導致資料在檢視器頁面顯示不正常。

欄位名型別說明
source string(tag) 紀錄檔來源
service string(tag) 紀錄檔對應的服務,預設跟 service 一樣
status string(tag) 紀錄檔對應的等級
message string(field) 原始紀錄檔
time int 紀錄檔對應的時間戳

 

當然我們可以通過特定的 Pipeline 函數覆蓋上面這些 tag 的值。

一旦 Pipeline 切割出來的欄位跟已有 Tag 重名(大小寫敏感),都會導致如下資料包錯。故建議在 Pipeline 切割中,繞開這些欄位命名。

# 該錯誤在 DataKit monitor 中能看到
same key xxx in tag and field

完整 Pipeline 範例

這裡以 DataKit 自身的紀錄檔切割為例。DataKit 自身的紀錄檔形式如下:

2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms 

編寫對應 pipeline:

# pipeline for datakit log
# Mon Jan 11 10:42:41 CST 2021
# auth: tanb

grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')
rename("time", log_time) # 將 log_time 重名命名為 time
default_time(time)       # 將 time 欄位作為輸出資料的時間戳
drop_origin_data()       # 丟棄原始紀錄檔文字(不建議這麼做)

這裡參照了幾個使用者自定義的 pattern,如 _dklog_date、_dklog_level。我們將這些規則存放 <datakit安裝目錄>/pipeline/pattern 下。

 

注意,使用者自定義 pattern 如果需要==全域性生效==(即在其它 Pipeline 指令碼中應用),必須放置在 <DataKit安裝目錄/pipeline/pattern/> 目錄下):

$ cat pipeline/pattern/datakit
# 注意:自定義的這些 pattern,命名最好加上特定的字首,以免跟內建的命名衝突(內建 pattern 名稱不允許覆蓋)
# 自定義 pattern 格式為:
#    <pattern-name><空格><具體 pattern 組合>
_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}
_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)
_dklog_mod %{WORD}
_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?
_dklog_msg %{GREEDYDATA}

現在 pipeline 以及其參照的 pattern 都有了,就能通過 DataKit 內建的 pipeline 偵錯工具,對這一行紀錄檔進行切割:

# 提取成功範例
$ ./datakit --pl dklog_pl.p --txt '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs):
{    
"code": "io/io.go:458",    
"level": "DEBUG",    
"module": "io",    
"msg": "post cost 6.87021ms",    
"time": 1610358231887000000
}

FAQ

Pipeline 偵錯時,為什麼變數無法參照?

Pipeline 為:

json(_, message, "message")
json(_, thread_name, "thread")
json(_, level, "status")
json(_, @timestamp, "time")

其報錯如下:

[E] new piepline failed: 4:8 parse error: unexpected character: '@' 

A: 對於有特殊字元的變數,需將其用兩個 ` 修飾一下:

json(_, `@timestamp`, "time") 

參見【 Pipeline 的基本語法規則 】https://docs.guance.com/developers/pipeline/

 

Pipeline 偵錯時,為什麼找不到對應的 Pipeline 指令碼?

命令如下:

$ datakit pipeline test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory

A: 偵錯用的 Pipeline 指令碼,需將其放置到 /pipeline 目錄下。

 

如何在一個 Pipeline 中切割多種不同格式的紀錄檔?

在日常的紀錄檔中,因為業務的不同,紀錄檔會呈現出多種形態,此時,需寫多個 Grok 切割,為提高 Grok 的執行效率,可根據紀錄檔出現的頻率高低,優先匹配出現頻率更高的那個 Grok,這樣,大概率紀錄檔在前面幾個 Grok 中就匹配上了,避免了無效的匹配。

 

在紀錄檔切割中,Grok 匹配是效能開銷最大的部分,故避免重複的 Grok 匹配,能極大的提高 Grok 的切割效能。

grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")
if client_ip != nil {    
# 證明此時上面的 grok 已經匹配上了,那麼就按照該紀錄檔來繼續後續處理    
...
} else {    
# 這裡說明是不同的紀錄檔來了,上面的 grok 沒有匹配上當前的紀錄檔    
grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")

    if status != nil {       
 # 此處可再檢查上面的 grok 是否匹配上...    
} else {        
# 未識別的紀錄檔,或者,在此可再加一個 grok 來處理,如此層層遞進    
}
}

如何丟棄欄位切割

在某些情況下,我們需要的只是紀錄檔==中間的幾個欄位==,但不好跳過前面的部分,比如

200 356 1 0 44 30032 other messages

其中,我們只需要 44 這個值,它可能程式碼響應延遲,那麼可以這樣切割(即 Grok 中不附帶 :some_field 這個部分):

grok(_, "%{INT} %{INT} %{INT} %{INT:response_time} %{GREEDYDATA}") 

add_pattern() 跳脫問題

大家在使用 add_pattern() 新增區域性模式時,容易陷入跳脫問題,比如如下這個 pattern(用來通配檔案路徑以及檔名):

(/?[\w_%!$@:.,-]?/?)(\S+)? 

如果我們將其放到全域性 pattern 目錄下(即 pipeline/pattern 目錄),可這麼寫:

# my-testsource_file (/?[\w_%!$@:.,-]?/?)(\S+)?

  

 

如果使用 add_pattern(),就需寫成這樣:

 

# my-test.padd_pattern('source_file', '(/?[\\w_%!$@:.,-]?/?)(\\S+)?')

 

即這裡面反斜槓需要跳脫。