nifi從入門到實戰(保姆級教學)——flow

2022-07-03 18:02:07

本文章首發於部落格園,轉載請標明出處

經過前兩篇文章(環境篇身份驗證),我們已經有了nifi可以執行的基礎,今天就來實現一個案例吧。
假設我們要從ftp上獲取一個zip包,裡面有兩個csv檔案,一個是manufacture.csv,一個是brand.csv.然後要把這兩個檔案匯入到sqlserver資料庫中。其中brand是manufacture的下一級,但是brand裡沒有manufacture的主鍵,必須要通過一些關鍵欄位的匹配來找出它們。
在實現這個場景之前,我們需要認識一下nifi中的幾個重要元件。
Processor : 主要用來處理flowfile,也就是我們的資料。nifi提供了上百個不同功能的processor,一般的需求都能滿足。當然它也支援自定義processor,需要用java自行開發。
Processor Group :簡單地理解就是把processor的流程組合成一個整體。只有Processor Group有version,所以它對於後續流程的遷移很重要。
Input Port,Output Port : 這兩個主要是用於聯接group.
有這些瞭解後就開始吧!
先看看流程的整體吧

  1. 首先拖拽一個group在畫布中,併為這個group命名為Import,如下圖

    雙擊group進入。再建一個group,命名為getfiles.這個group主要負責從ftp上獲取檔案,並解壓。

    GetFTP:主要填以下幾個屬性。


    Delete Origianl預設為true,會刪除ftp上的檔案,所以最好設定為false.類似的Processor還有getfile,使用時一定要注意。
    因為我們獲取的是一個zip包,所以需要解壓。這個比較簡單,預設就行了。如果壓縮檔案有密碼,設定一下password屬性就好了。

    接下來就有點複雜了。因為我們的manufacture和brand是要進不同的表,所以就要路由了。這裡就要用到route的processor,我用的是RouteText,也可以用RouteOnAttribute,只是一些設定不同。後面我也會用到。

    新增了兩個路由屬性:fabricantes,modelos.這個名字你可以隨便取。如果filename包含manufacture就走fabricantes分支,包含brand就走modelos分支。
    後面我做了一個延時,大家可以根據實際情況自由選擇。這裡我也介紹一下。

    先用UpdateAttribute新增一個屬性delay,值為當前時間加20s.

    再用RouteOnAttribute來在規定時間內死迴圈,直到當前時間大於規定時間。
    最後用兩個output port結束當前group.
  2. 將brand的資料儲存到SQL SERVER的一張臨時表裡。

    建立一個group,名為tmp_barnd.這個group一開始必須是input port,用於接收上一個group傳出的資料。
    SplitRecord:

    這裡用到兩個controller service: CSVReader,JsonRecordSetWriter.

    根據實際情況修改一下相應屬性。我覺得比較重要的是Value Separator(預設是","但是很自定義的csv可能是";""),Character Set(預設是UTF-8,比如我的檔案裡有特殊符號,用的是ISO-8859-1)。

    因為是進資料庫,所以為了防止SQL隱碼攻擊,需要先做一些準備工作。
    經過上一步,資料已經被拆分成一條條的json,現在就用EvaluateJsonPath提取相應的欄位

    再用UpateAttribute組裝成Sql語句需要的引數。關於sql.args.[*].type的值,請參考java.sql.Types

    最後就是執行SQL語句了。這裡有很多選擇,可以用PutSQL,ExcuteSQL等。

    SQL Statement是這樣的:
點選檢視程式碼
if not EXISTS (SELECT 1 FROM tmp_modelos
WHERE MODELO=? AND FABRICANTE=? AND DESCRIPCION=? AND TIPO_VEHICULO=?)
INSERT INTO tmp_modelos (MODELO, FABRICANTE, DESCRIPCION, DESCRIPCION_ADDICIONAL, TIPO_VEHICULO) VALUES (?, ?, ?, ?, ?);

?代表引數,有多少?,sql.args屬性就相對有幾個,否則執行時會報引數不匹配。
DBCPConnectionPool的設定如下:

整個流程上要用的processor和controller service差不多就是上面這些,剩下的就是大家按需求組合了。
我剩下兩個group裡的流程是這樣的。

還有一個很重要的,就是nifi所用的表示式,大家可以參考一下官方檔案

好了,至此,我們的流程就已經畫完了。接下來就是執行偵錯了。下篇再見!