先展示一下用途和效果
pentaho
可讀作「彭塔湖」,原名keetle
在keetle
被pentaho公司收購後改名而來。
pentaho
是一款開源ETL
工具,純java編寫的C/S模式的工具,可綠色免安裝,開箱即用。支援Windows、macOS、Linux平臺。
pentaho
有2個核心設計,即轉換
和作業
。
轉換
是一個包含輸入、邏輯處理、輸出的完整過程,即ETL。
作業
是一個提供定時執行轉換的機制,即定時服務排程。
pentaho
官網下載連結:Pentaho Community Edition Download | Hitachi Vantara
pentaho
由主要四部分組成
由於是純java編寫,依賴jdk環境。所以需要先設定jdk環境,這裡省略。
從官網下載pentaho
安裝包後,直接解壓。
tar -zxvf 安裝包路徑 -C 目標路徑
tar -zxvf 安裝包路徑 -C 目標路徑
重點目錄以及執行檔案說明
在window上執行就用
.bat
格式指令碼,MacOS 或者 Linux 平臺上使用.sh
格式指令碼
pentaho
內建了豐富的資料處理元件,本章節主要對pentaho
介面上各個功能元件作用進行說明。
Windows
執行 Spoon.bat
MacOS
執行 Spoon.sh
Linux
執行 Spoon.sh
執行後會短暫沒有任何反應,等待會議,就會出現介面
主物件樹
中有轉換
和作業
開始資料處理工作前,必需新建一個轉換
,因為只有新建了之後,才能使用資料處理元件,此時的核心物件
樹是空的。
在 「核心物件樹 –> 轉換 –> 右鍵 –> 新建」 或 在 「檔案 –> 新建 –> 轉換」 ,新建一個轉換,核心物件
樹就會出現各類元件。依靠這些元件組合使用,完成資料處理工作。
一個轉換就是一個資料處理工作流程。這裡主要是轉換的設定,例如資料來源連線,執行設定等。
包含各類資料處理元件。
這裡對一些常用的元件進行說明
輸入元件,即各類資料來源,例如資料庫,json,xml等
輸出元件,將處理後的資料進行輸出儲存
這是資料轉換的核心,在這裡完成資料處理
包含一些資料處理外的操作,例如傳送郵件,寫紀錄檔等
用於控制資料處理流程,例如開始,結束,終止等
當內建轉換元件完成不了資料處理的邏輯時,即可使用指令碼元件,用自定義程式碼的方式來完成處理邏輯
用於一些查詢請求,例如http請求,資料庫查詢某個表是否存在等
可用於多表,單表處理完後,進行記錄合併
在「檔案–>新建–>作業」建立一個作業。
主物件樹包含作業執行設定,DB連線設定等
核心物件樹包含作業的各類元件
作業流程元件,有開始、轉換、成功、空處理等
傳送郵件
檔案操作,建立、刪除等
條件處理,例如判斷某個檔案是否存在
使用shell,js、sql等指令碼處理複雜作業邏輯
作業處理,例如終止作業、寫紀錄檔等
定時作業來上傳、下載檔案
上面介紹了各個元件用途,現在來完成一個完整的資料處理工作流程。
略
在 「核心物件樹 –> 轉換 –> 右鍵 –> 新建」 或 在 「檔案 –> 新建 –> 轉換」 ,新建一個轉換
在主物件樹
中選擇DB連線
,右鍵新建
注意:連線資料庫之前需要下載對應的
jdbc驅動
,例如連線pgsql
則需要下載postgresql-version.jar
,r然後將驅動包放到安裝目錄下的\lib
目錄
這裡以kingbase V8
為例,因為這個踩了坑。經歷如下:
內建的資料來源裡有KingbaseES
,本以為可以直接用,結果發現連不上,報驅動錯誤。可能是因為內建的驅動版本跟資料庫版本不一致,因為Kingbase V8
的驅動不向前相容。更新驅動後,依然不行。
然後發現,內建還有Generic database
選項,這個是用來自定義連線內建資料來源之外的資料庫的。使用jdbc
方式連線,需要一個連線串,驅動包名(前提是下載了對應的驅動包),使用者名稱,密碼。然而,這種方式依然不行……
後來一想,乾脆用pgsql
的方式來連線kingbase
,沒想到連線成功!
因為資料來源是資料庫,所以這裡從輸入元件中選擇表輸入
,將其拖入到右側面板中
雙擊「表輸入」元件 或 右鍵選擇 「編輯步驟」
點選獲取SQL查詢語句
,會彈出介面選擇資料表
選擇一個資料表後,提示
選擇「是」
這裡會自動填充獲取資料的sql,也可以在這裡加上各種where條件,獲取需要的資料
點選「確定」
如果是表結構一致,則可使用
因為目標資料來源也是資料庫,所以這裡選擇表輸出
。從輸出元件
選擇表輸出
,拖入轉換檢視中
然後進行步驟連線。
方式一:按住shift
鍵,滑鼠左鍵點選「輸入步驟」,會出現箭頭,然後連線到「輸出步驟」
方式二:滑鼠左鍵框選輸入和輸出,然後右鍵,選擇」新建節點連線「,選擇」起始步驟「,」目標步驟「
點選「確定」
連線後如下:
雙擊「表輸出」或右鍵選擇「編輯步驟」
選擇目標資料庫中的資料表,然後點選」確定「
選擇表輸出,無法設定欄位對映,所以前提是表結構一致才可使用。如果是異構表,需要欄位對映的,則需要使用 插入/更新 元件
如果輸入表和輸出表結構不一致,即異構表,則需要使用插入/更新
元件。從輸出
中選擇插入/更新
拖入轉換檢視中,然後進行步驟連線,進入輸出設定
注意:一定要正確連線步驟,否則這步無法獲取輸入欄位,輸出欄位
欄位對映設定好後如下
點選「確定」
或
然後點選轉換檢視中的按鈕,這個是執行
這個執行是執行一次,完成後就結束了。如果要定時執行,則需要
作業
。
點選「啟動」 會彈出介面 儲存 當前轉換
輸入儲存的檔名稱,然後點選「Save」即可
每個步驟都顯示綠色的箭頭,說明沒有錯誤,正確的執行完了轉換。也可以在紀錄檔輸出檢視.
紀錄檔:完成處理 (I=1, O=1, R=1, W=1, U=0, E=0)
中的 I=1 表示 輸入 1 行,O=1表示 輸出 1 行,R=1 表示 讀取 1 行,W=1 表示 寫入 1 行
然後看一下資料輸出結果
源表
目標表
如果需要定時執行同步過程,那麼就需要引入作業
。在「檔案–>新建–>作業」 建立一個作業。
在「通用」中選擇Start
拖入作業檢視中
然後選擇轉換
拖入檢視,並進行步驟連線。
雙擊「轉換」或右鍵選擇「編輯作業入口」
點選「確定」
然後選成功
元件拖入檢視,並連線步驟
雙擊檢視中的Start
元件或右鍵」編輯作業入口「,進行作業排程設定
點選執行檢視中的按鈕。一個定時作業即完成
定時作業排程期間,程式不能退出!程式退出,作業即停止
至此一個完整的資料處理作業完成了。
本部分對簡單同步
進行說明。簡單同步
是指不涉及複雜計算、轉換等同步工作。
即一對一同步,A表資料同步到B表,A與B的欄位數量、型別、名稱可能都不一樣,因此需要一些欄位型別轉換,這都很容易。
處理過程詳見2.5章節
即2個及以上的表往一個表同步,同樣也需要欄位對映、型別轉換等操作。
這種通過某個欄位(外來鍵)關聯的表,處理思路是在獲取資料時,通過sql聯表查詢,獲取到全部需要的資料。然後用單表同步方式進行處理。
如果是異構表的話,獲取到每個資料來源後,使用Multiway merge join
多路合併元件處理
合併後的記錄可作為一個單表,然後進行單表同步的處理
合併是笛卡爾積,即A表n條記錄,B表n條記錄,結果就是n x n條記錄,欄位是A、B表全部欄位,這種方式不建議採用,會消耗更多記憶體資源。建議拆分成單表同步
如果是同構表的話,可拆分為多個單表同步處理。
本部分對涉及到資料計算、轉換的同步工作進行說明。有些複雜操作,無法直接使用元件進行處理,需要用到Script
元件。這裡主要對如何使用指令碼元件完成資料處理進行說明。
這裡先展示一個實際案例
這個過程是多表同步到一個表、涉及到欄位型別轉換、補充欄位和值、資料計算、增補資料。由於計算和增補資料使用內建元件無法完成,因此這裡使用了java指令碼
元件,自定義程式碼進行資料處理。
這裡對欄位型別轉換
、增加列
、給某列設定值
、java指令碼
進行說明。
例如 數位型別 轉為 字串,字串 轉為 日期時間……
從轉換
中選擇欄位選擇
元件
雙擊「欄位選擇」或右鍵選擇「編輯步驟」
選擇「後設資料」,在「欄位名稱」列選擇欄位,然後在「型別」列選擇目標型別
在輸入
中找到生成亂數
元件,拖入檢視並連線步驟
雙擊生成亂數
或右鍵選擇「編輯步驟」
在「名稱」列輸入需要增加的欄位名,型別選擇生成亂數規則
點選「確定」後,執行轉換,在preview data
處可預覽資料,可以看到增加了一列 uid 也有值
例如將上面亂數元件生成的值設定為常數1。在轉換
中選擇將欄位設定為常數
元件,並連線步驟
雙擊「將欄位設定為常數」或右鍵選擇「編輯步驟」
在「欄位」列選擇需要設定的欄位,這裡選擇上一步驟生成的「uid」欄位,在「值替換」列輸入值。
點選「確定」,執行轉換,然後預覽資料,可以看到uid的值被替換為1
指令碼
有Java指令碼、JavaScript指令碼,SQL指令碼等。這裡使用Java指令碼,指令碼的目的是處理內建元件處理不了的邏輯。例如有10個地層,但是資料來源中只記錄了前9個地層,最後一個需要根據計算得到。
拖入Java指令碼
元件到轉換檢視中並連線步驟
雙擊「Java指令碼」或右鍵選擇「編輯步驟」
然後展開Code Snippits\Common use
選擇Main
拖入右側編輯區,Main是整個指令碼處理入口
其預設指令碼結構如下
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
// 程式碼邏輯區域
/* TODO: Your code here. (Using info fields)
FieldHelper infoField = get(Fields.Info, "info_field_name");
RowSet infoStream = findInfoRowSet("info_stream_tag");
Object[] infoRow = null;
int infoRowCount = 0;
// Read all rows from info step before calling getRow() method, which returns first row from any
// input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
while((infoRow = getRowFrom(infoStream)) != null){
// do something with info data
infoRowCount++;
}
*/
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
/* TODO: Your code here. (See Sample)
// Get the value from an input field
String foobar = get(Fields.In, "a_fieldname").getString(r);
foobar += "bar";
// Set a value in a new output field
get(Fields.Out, "output_fieldname").setValue(r, foobar);
*/
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}
TODO
區域就是程式碼編輯區域,其它是預設指令碼函數
點選」確定「,然後再次開啟Java指令碼
,就能看到輸入輸出欄位資訊了
完整實現地層計算並補充最後一層的Java指令碼
程式碼邏輯如下
// 這裡是需要用的 java API 所匯入的包
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.*;
import java.math.BigDecimal;
import java.util.*;
// 核心處理過程入口
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
logBasic("----------------------欄位-------------------------");
String projectCount = "project_count";
String knumber = "knumber";
String depth = "depth";
String dep = "dep";
String layerorder = "layerorder";
String id = "id";
logBasic("----------------------獲取輸入流-------------------------");
// 輸入資料流 input 是訊息步驟中設定的標籤名
RowSet infoStream = findInfoRowSet("input");
Object[] infoRow = null;
int infoRowCount = 0;
logBasic("----------------------遍歷資料流,將其載入到map-------------------------");
// 遍歷資料流,將其載入到map,便於操作
// 根據 專案索引+鑽孔索引 分組
Map<String, ArrayList<Object[]>> groups = new HashMap<String, ArrayList<Object[]>>();
while((infoRow = getRowFrom(infoStream)) != null){
// 獲取欄位值
String prjCode = get(TransformClassBase.Fields.In, projectCount).getString(infoRow);
String drillCode = get(TransformClassBase.Fields.In, knumber).getString(infoRow);
String groupKey = prjCode + drillCode;
if (!groups.containsKey(groupKey)) {
logBasic("----------------------建立分組-------------------------");
groups.put(groupKey,new ArrayList<Object[]>());
}
logBasic("----------------------新增資料到分組-------------------------");
ArrayList<Object[]> objects = (ArrayList<Object[]>)groups.get(groupKey);
objects.add(infoRow);
logBasic("----------------------新增資料到輸出流-------------------------");
// 將當前行拷貝一份
Object[] row=infoRow;
// 建立一個輸出行
row = createOutputRow(infoRow, data.outputRowMeta.size());
//putRow(infoStream.getRowMeta(), row);
// 將輸出行新增到輸出資料集
putRow(data.outputRowMeta, row);
infoRowCount++;
}
logBasic("----------------------分組完成,處理最後一條資料-------------------------");
// 將最後一條資料拷貝一份,場地分層索引+1,層底深度dep 賦值為 鑽孔深度 depth,然後將此行數資料新增
Object[] keys = groups.keySet().toArray();
for (int i = 0; i < keys.length; i++) {
String s = keys[i].toString();
logBasic("----------------------當前分組-----------------------:"+ s);
ArrayList<Object[]> list = (ArrayList<Object[]>) groups.get(s);
Object[] last = (Object[])list.get(list.size() - 1);
Object[] newLast=last;
// 設定 layerorder 的值
String layerorderVal = get(TransformClassBase.Fields.In, layerorder).getString(last);
BigDecimal v = new BigDecimal(layerorderVal);
v = v.add(new BigDecimal(1));
get(TransformClassBase.Fields.Out, layerorder).setValue(newLast, v);
// 設定 dep 的值
String layerDepVal = get(TransformClassBase.Fields.In, depth).getString(last);
BigDecimal v2 = new BigDecimal(layerDepVal);
get(TransformClassBase.Fields.Out, dep).setValue(newLast, v2);
// 設定id
String idVal = UUID.randomUUID().toString();
get(TransformClassBase.Fields.Out, id).setValue(newLast, idVal);
logBasic("----------------------新增資料到輸出流-------------------------");
newLast=createOutputRow(newLast, data.outputRowMeta.size());
// 將新的一行資料新增到輸出資料集
putRow(data.outputRowMeta, newLast);
}
/* TODO: Your code here. (Using info fields)
FieldHelper infoField = get(Fields.Info, "info_field_name");
RowSet infoStream = findInfoRowSet("info_stream_tag");
Object[] infoRow = null;
int infoRowCount = 0;
// Read all rows from info step before calling getRow() method, which returns first row from any
// input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
while((infoRow = getRowFrom(infoStream)) != null){
// do something with info data
infoRowCount++;
}
*/
}
Object[] r = getRow();
logBasic("----------------------getRow-----------------------:"+ r);
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
logBasic("----------------------createOutputRow-----------------------:"+ r);
/* TODO: Your code here. (See Sample)
// Get the value from an input field
String foobar = get(Fields.In, "a_fieldname").getString(r);
foobar += "bar";
// Set a value in a new output field
get(Fields.Out, "output_fieldname").setValue(r, foobar);
*/
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}
至此 Java指令碼
處理完成。
痛(坑)點總結:
1.指令碼編輯區是個文字編輯框,不能像IDEA一樣幫助寫程式碼,只能通過紀錄檔進行輸出驗證邏輯
2.建議通用的不涉及pentaho的java程式碼操作,可以在IDEA中完成,然後拷貝到指令碼編輯區。例如需要匯入的包就是在IDEA中通過智慧匯入,然後拷貝的
驗證一下資料,圖中標記的行,就是根據前2行資料計算而來,然後進行補充的。在資料來源中只記錄了前2行資料。
本文來自部落格園,作者:宣君{https://www.nhit.icu/},轉載請註明原文連結:https://www.cnblogs.com/ycit/p/17687558.html