pentaho(keetle)使用手冊

2023-09-08 15:00:56

pentaho使用

先展示一下用途和效果

1. 環境準備

1.1 pentaho是什麼?

pentaho可讀作「彭塔湖」,原名keetlekeetlepentaho公司收購後改名而來。

pentaho是一款開源ETL工具,純java編寫的C/S模式的工具,可綠色免安裝,開箱即用。支援Windows、macOS、Linux平臺。

pentaho有2個核心設計,即轉換作業

轉換是一個包含輸入、邏輯處理、輸出的完整過程,即ETL。

作業是一個提供定時執行轉換的機制,即定時服務排程。

pentaho官網下載連結:Pentaho Community Edition Download | Hitachi Vantara

pentaho由主要四部分組成

  • Spoon.bat/Spoon.sh :勺子,是一個圖形化介面,可圖形化操作轉換和作業
  • Pan.bat/Pan.sh : 煎鍋,可用命令列方式呼叫轉換
  • Kitchen.bat/Kitchen.sh : 廚房,可用命令列方式呼叫作業
  • Carte.bat/Carte.sh : 選單,是一個輕量級web容器,可建立專用、遠端的ETL Server

1.2 pentaho安裝

Windows

由於是純java編寫,依賴jdk環境。所以需要先設定jdk環境,這裡省略。

從官網下載pentaho安裝包後,直接解壓。

MacOS

tar -zxvf 安裝包路徑 -C 目標路徑

Linux

tar -zxvf 安裝包路徑 -C 目標路徑

目錄結構

重點目錄以及執行檔案說明

  • lib目錄 : 這是依賴庫目錄,例如各個資料庫的jdbc驅動,都放在此目錄下
  • logs目錄 :這是轉換和作業執行的預設紀錄檔輸出目錄
  • simple-jndi目錄 :這是各個資料庫的JNDI連線資訊的全域性設定
  • Spoon.bat/Spoon.sh :勺子,是一個圖形化介面,可圖形化操作轉換和作業
  • Pan.bat/Pan.sh : 煎鍋,可用命令列方式呼叫轉換
  • Kitchen.bat/Kitchen.sh : 廚房,可用命令列方式呼叫作業
  • Carte.bat/Carte.sh : 選單,是一個輕量級web容器,可建立專用、遠端的ETL Server

在window上執行就用.bat格式指令碼,MacOS 或者 Linux 平臺上使用.sh格式指令碼


2. 開始使用

pentaho內建了豐富的資料處理元件,本章節主要對pentaho介面上各個功能元件作用進行說明。

2.1 啟動圖形化介面

Windows

執行 Spoon.bat 

MacOS

執行 Spoon.sh

Linux

執行 Spoon.sh

執行後會短暫沒有任何反應,等待會議,就會出現介面

主物件樹中有轉換作業

  • 轉換:所有的資料處理工作都在轉換中完成
  • 作業:這是一個任務

開始資料處理工作前,必需新建一個轉換,因為只有新建了之後,才能使用資料處理元件,此時的核心物件樹是空的。

2.2 轉換

在 「核心物件樹 –> 轉換 –> 右鍵 –> 新建」 或 在 「檔案 –> 新建 –> 轉換」 ,新建一個轉換,核心物件樹就會出現各類元件。依靠這些元件組合使用,完成資料處理工作。

2.2.1 主物件樹

一個轉換就是一個資料處理工作流程。這裡主要是轉換的設定,例如資料來源連線,執行設定等。

2.2.2 核心物件樹

包含各類資料處理元件。

2.3 資料處理元件

這裡對一些常用的元件進行說明

輸入

輸入元件,即各類資料來源,例如資料庫,json,xml等

輸出

輸出元件,將處理後的資料進行輸出儲存

轉換

這是資料轉換的核心,在這裡完成資料處理

應用

包含一些資料處理外的操作,例如傳送郵件,寫紀錄檔等

流程

用於控制資料處理流程,例如開始,結束,終止等

指令碼

當內建轉換元件完成不了資料處理的邏輯時,即可使用指令碼元件,用自定義程式碼的方式來完成處理邏輯

查詢

用於一些查詢請求,例如http請求,資料庫查詢某個表是否存在等

連線

可用於多表,單表處理完後,進行記錄合併


2.4 作業元件

在「檔案–>新建–>作業」建立一個作業。

主物件樹包含作業執行設定,DB連線設定等

核心物件樹包含作業的各類元件


通用

作業流程元件,有開始、轉換、成功、空處理等

郵件

傳送郵件

檔案管理

檔案操作,建立、刪除等

條件

條件處理,例如判斷某個檔案是否存在

指令碼

使用shell,js、sql等指令碼處理複雜作業邏輯

應用

作業處理,例如終止作業、寫紀錄檔等

檔案傳輸

定時作業來上傳、下載檔案


2.5 使用

上面介紹了各個元件用途,現在來完成一個完整的資料處理工作流程。

啟動應用


新建轉換

在 「核心物件樹 –> 轉換 –> 右鍵 –> 新建」 或 在 「檔案 –> 新建 –> 轉換」 ,新建一個轉換


設定DB連線

主物件樹中選擇DB連線,右鍵新建

注意:連線資料庫之前需要下載對應的jdbc驅動,例如連線pgsql則需要下載 postgresql-version.jar,r然後將驅動包放到安裝目錄下的\lib目錄


這裡以kingbase V8為例,因為這個踩了坑。經歷如下:

內建的資料來源裡有KingbaseES,本以為可以直接用,結果發現連不上,報驅動錯誤。可能是因為內建的驅動版本跟資料庫版本不一致,因為Kingbase V8的驅動不向前相容。更新驅動後,依然不行。

然後發現,內建還有Generic database選項,這個是用來自定義連線內建資料來源之外的資料庫的。使用jdbc方式連線,需要一個連線串,驅動包名(前提是下載了對應的驅動包),使用者名稱,密碼。然而,這種方式依然不行……

後來一想,乾脆用pgsql的方式來連線kingbase,沒想到連線成功!



選擇輸入

因為資料來源是資料庫,所以這裡從輸入元件中選擇表輸入,將其拖入到右側面板中


設定輸入

雙擊「表輸入」元件 或 右鍵選擇 「編輯步驟」

點選獲取SQL查詢語句,會彈出介面選擇資料表

選擇一個資料表後,提示

選擇「是」

這裡會自動填充獲取資料的sql,也可以在這裡加上各種where條件,獲取需要的資料

點選「確定」


設定輸出

如果是表結構一致,則可使用

因為目標資料來源也是資料庫,所以這裡選擇表輸出。從輸出元件選擇表輸出,拖入轉換檢視中

然後進行步驟連線

方式一:按住shift鍵,滑鼠左鍵點選「輸入步驟」,會出現箭頭,然後連線到「輸出步驟」

方式二:滑鼠左鍵框選輸入和輸出,然後右鍵,選擇」新建節點連線「,選擇」起始步驟「,」目標步驟「

點選「確定」

連線後如下:

雙擊「表輸出」或右鍵選擇「編輯步驟」

選擇目標資料庫中的資料表,然後點選」確定「

選擇表輸出,無法設定欄位對映,所以前提是表結構一致才可使用。如果是異構表,需要欄位對映的,則需要使用 插入/更新 元件

如果輸入表和輸出表結構不一致,即異構表,則需要使用插入/更新元件。從輸出中選擇插入/更新拖入轉換檢視中,然後進行步驟連線,進入輸出設定

注意:一定要正確連線步驟,否則這步無法獲取輸入欄位,輸出欄位

欄位對映設定好後如下

點選「確定」

然後點選轉換檢視中的按鈕,這個是執行

這個執行是執行一次,完成後就結束了。如果要定時執行,則需要作業

點選「啟動」 會彈出介面 儲存 當前轉換

輸入儲存的檔名稱,然後點選「Save」即可

每個步驟都顯示綠色的箭頭,說明沒有錯誤,正確的執行完了轉換。也可以在紀錄檔輸出檢視.

紀錄檔:完成處理 (I=1, O=1, R=1, W=1, U=0, E=0)中的 I=1 表示 輸入 1 行,O=1表示 輸出 1 行,R=1 表示 讀取 1 行,W=1 表示 寫入 1 行

然後看一下資料輸出結果

源表

目標表

定時作業

如果需要定時執行同步過程,那麼就需要引入作業。在「檔案–>新建–>作業」 建立一個作業。

在「通用」中選擇Start拖入作業檢視中

然後選擇轉換拖入檢視,並進行步驟連線。

雙擊「轉換」或右鍵選擇「編輯作業入口」

點選「確定」

然後選成功元件拖入檢視,並連線步驟

雙擊檢視中的Start元件或右鍵」編輯作業入口「,進行作業排程設定

點選執行檢視中的按鈕。一個定時作業即完成

定時作業排程期間,程式不能退出!程式退出,作業即停止

至此一個完整的資料處理作業完成了。

3. 案例

3.1 簡單同步

本部分對簡單同步進行說明。簡單同步是指不涉及複雜計算、轉換等同步工作。

3.1.1 單表

即一對一同步,A表資料同步到B表,A與B的欄位數量、型別、名稱可能都不一樣,因此需要一些欄位型別轉換,這都很容易。

處理過程詳見2.5章節


3.1.2 多表

即2個及以上的表往一個表同步,同樣也需要欄位對映、型別轉換等操作。

外來鍵關聯

這種通過某個欄位(外來鍵)關聯的表,處理思路是在獲取資料時,通過sql聯表查詢,獲取到全部需要的資料。然後用單表同步方式進行處理。

多表合併

如果是異構表的話,獲取到每個資料來源後,使用Multiway merge join多路合併元件處理

合併後的記錄可作為一個單表,然後進行單表同步的處理

合併是笛卡爾積,即A表n條記錄,B表n條記錄,結果就是n x n條記錄,欄位是A、B表全部欄位,這種方式不建議採用,會消耗更多記憶體資源。建議拆分成單表同步

如果是同構表的話,可拆分為多個單表同步處理。


3.2 複雜同步

本部分對涉及到資料計算、轉換的同步工作進行說明。有些複雜操作,無法直接使用元件進行處理,需要用到Script元件。這裡主要對如何使用指令碼元件完成資料處理進行說明。

這裡先展示一個實際案例

這個過程是多表同步到一個表、涉及到欄位型別轉換、補充欄位和值、資料計算、增補資料。由於計算和增補資料使用內建元件無法完成,因此這裡使用了java指令碼元件,自定義程式碼進行資料處理。

這裡對欄位型別轉換增加列給某列設定值java指令碼進行說明。

欄位型別轉換

例如 數位型別 轉為 字串,字串 轉為 日期時間……

轉換中選擇欄位選擇元件

雙擊「欄位選擇」或右鍵選擇「編輯步驟」

選擇「後設資料」,在「欄位名稱」列選擇欄位,然後在「型別」列選擇目標型別


增加列並設定亂數

輸入中找到生成亂數元件,拖入檢視並連線步驟

雙擊生成亂數或右鍵選擇「編輯步驟」

在「名稱」列輸入需要增加的欄位名,型別選擇生成亂數規則

點選「確定」後,執行轉換,在preview data處可預覽資料,可以看到增加了一列 uid 也有值


將列的值設定為常數

例如將上面亂數元件生成的值設定為常數1。在轉換中選擇將欄位設定為常數元件,並連線步驟

雙擊「將欄位設定為常數」或右鍵選擇「編輯步驟」

在「欄位」列選擇需要設定的欄位,這裡選擇上一步驟生成的「uid」欄位,在「值替換」列輸入值。

點選「確定」,執行轉換,然後預覽資料,可以看到uid的值被替換為1


java指令碼

指令碼有Java指令碼、JavaScript指令碼,SQL指令碼等。這裡使用Java指令碼,指令碼的目的是處理內建元件處理不了的邏輯。例如有10個地層,但是資料來源中只記錄了前9個地層,最後一個需要根據計算得到。

拖入Java指令碼元件到轉換檢視中並連線步驟

雙擊「Java指令碼」或右鍵選擇「編輯步驟」

然後展開Code Snippits\Common use

選擇Main拖入右側編輯區,Main是整個指令碼處理入口

其預設指令碼結構如下

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;
	// 程式碼邏輯區域
      
    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */
  }

  Object[] r = getRow();

  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.
  r = createOutputRow(r, data.outputRowMeta.size());

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";
    
  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */
  // Send the row on to the next step.
  putRow(data.outputRowMeta, r);

  return true;
}

TODO區域就是程式碼編輯區域,其它是預設指令碼函數

點選」確定「,然後再次開啟Java指令碼,就能看到輸入輸出欄位資訊了

完整實現地層計算並補充最後一層的Java指令碼程式碼邏輯如下

// 這裡是需要用的 java API 所匯入的包
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.*;
import java.math.BigDecimal;
import java.util.*;

// 核心處理過程入口
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

 if (first) {
    first = false;
	    logBasic("----------------------欄位-------------------------");
        String projectCount = "project_count";
        String knumber = "knumber";
        String depth = "depth";
        String dep = "dep";
        String layerorder = "layerorder";
        String id = "id";
 
		logBasic("----------------------獲取輸入流-------------------------");
        // 輸入資料流 input 是訊息步驟中設定的標籤名
        RowSet infoStream = findInfoRowSet("input"); 


 		Object[] infoRow = null;
    	int infoRowCount = 0;

		logBasic("----------------------遍歷資料流,將其載入到map-------------------------");
        // 遍歷資料流,將其載入到map,便於操作
        // 根據 專案索引+鑽孔索引 分組
        Map<String, ArrayList<Object[]>> groups = new HashMap<String, ArrayList<Object[]>>();
        while((infoRow = getRowFrom(infoStream)) != null){
            // 獲取欄位值
        	String prjCode = get(TransformClassBase.Fields.In, projectCount).getString(infoRow);
            String drillCode = get(TransformClassBase.Fields.In, knumber).getString(infoRow);
            String groupKey = prjCode + drillCode;

            if (!groups.containsKey(groupKey)) {
                logBasic("----------------------建立分組-------------------------");
                groups.put(groupKey,new ArrayList<Object[]>());
            }
			logBasic("----------------------新增資料到分組-------------------------");
            ArrayList<Object[]> objects = (ArrayList<Object[]>)groups.get(groupKey);
            objects.add(infoRow);

            logBasic("----------------------新增資料到輸出流-------------------------");
			// 將當前行拷貝一份
            Object[] row=infoRow;
            // 建立一個輸出行
             row = createOutputRow(infoRow, data.outputRowMeta.size());
            //putRow(infoStream.getRowMeta(), row);
            // 將輸出行新增到輸出資料集
            putRow(data.outputRowMeta, row);

      		infoRowCount++;
    	}
           
        logBasic("----------------------分組完成,處理最後一條資料-------------------------");
        // 將最後一條資料拷貝一份,場地分層索引+1,層底深度dep 賦值為 鑽孔深度 depth,然後將此行數資料新增
		Object[] keys = groups.keySet().toArray();
		for (int i = 0; i < keys.length; i++)  {
			 String s = keys[i].toString();
			 logBasic("----------------------當前分組-----------------------:"+ s);	
        	  ArrayList<Object[]> list = (ArrayList<Object[]>) groups.get(s);
		
			
              Object[] last = (Object[])list.get(list.size() - 1);         
              Object[] newLast=last;
            // 設定 layerorder 的值
           	  String layerorderVal = get(TransformClassBase.Fields.In, layerorder).getString(last);
			  BigDecimal v = new BigDecimal(layerorderVal);
              v = v.add(new BigDecimal(1));
              get(TransformClassBase.Fields.Out, layerorder).setValue(newLast, v);

            // 設定 dep 的值
              String layerDepVal = get(TransformClassBase.Fields.In, depth).getString(last);
         	  BigDecimal v2 = new BigDecimal(layerDepVal);
              get(TransformClassBase.Fields.Out, dep).setValue(newLast, v2);
            
			 // 設定id
              String idVal = UUID.randomUUID().toString();
              get(TransformClassBase.Fields.Out, id).setValue(newLast, idVal);
              
			  
			  logBasic("----------------------新增資料到輸出流-------------------------");
			  newLast=createOutputRow(newLast, data.outputRowMeta.size());
              // 將新的一行資料新增到輸出資料集
              putRow(data.outputRowMeta, newLast);

        }        
        



    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */
  }


  Object[] r = getRow();
	 logBasic("----------------------getRow-----------------------:"+ r);	


  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.
  r = createOutputRow(r, data.outputRowMeta.size());
 logBasic("----------------------createOutputRow-----------------------:"+ r);	

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";
    
  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */
  // Send the row on to the next step.
  putRow(data.outputRowMeta, r);

  return true;
}

至此 Java指令碼處理完成。

痛(坑)點總結:

1.指令碼編輯區是個文字編輯框,不能像IDEA一樣幫助寫程式碼,只能通過紀錄檔進行輸出驗證邏輯

2.建議通用的不涉及pentaho的java程式碼操作,可以在IDEA中完成,然後拷貝到指令碼編輯區。例如需要匯入的包就是在IDEA中通過智慧匯入,然後拷貝的

驗證一下資料,圖中標記的行,就是根據前2行資料計算而來,然後進行補充的。在資料來源中只記錄了前2行資料。