採用 String 資料流,依照社群網站的樣例程式碼,在sink之前將資料轉換為DataStream
通過Stream Load返回結果json中的ErrorUrl可以看到如題的異常
Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...]
資料庫表明明只有10個欄位,提示schema column number卻是11個。是自己眼花數錯欄位了嗎?經過反覆確認及同事確認,沒有錯,目標表就是10個欄位,我寫入的也是10個欄位,是Flink Doris Connector 的bug嗎?
既然懷疑是bug,那就去扒程式碼。
實際資料寫入邏輯封裝在org.apache.doris.flink.sink.writer.DorisWriter
,該類實現了org.apache.flink.api.connector.sink.SinkWriter
介面。檢視該類發現,寫入Doris的過程實際是使用微批寫入的。
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
byte[] serialize = serializer.serialize(in);
if(Objects.isNull(serialize)){
//ddl record
return;
}
if(!loading) {
//Start streamload only when there has data
dorisStreamLoad.startLoad(currentLabel);
loading = true;
}
dorisStreamLoad.writeRecord(serialize);
}
@Override
public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
if(!loading){
//There is no data during the entire checkpoint period
return Collections.emptyList();
}
// disable exception checker before stop load.
loading = false;
Preconditions.checkState(dorisStreamLoad != null);
RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
throw new DorisRuntimeException(errMsg);
}
if (!executionOptions.enabled2PC()) {
return Collections.emptyList();
}
long txnId = respContent.getTxnId();
return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
}
每一條記錄都會觸發write操作,從上述程式碼可以看到根據boolean變數loading的值,程式將會觸發dorisStreamLoad.startLoad(currentLabel);
,而loading的狀態在preCommit方法中進行修改,而preCommit是在checkpoint時觸發,所以資料提交動作是通過checkpoint觸發的。檢視startLoad原始碼
/**
* start write data for new checkpoint.
* @param label
* @throws IOException
*/
public void startLoad(String label) throws IOException{
loadBatchFirstRecord = true;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput();
LOG.info("stream load started for {} on host {}", label, hostPort);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addHiddenColumns(enableDelete)
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
if (enable2PC) {
putBuilder.enable2PC();
}
pendingLoadFuture = executorService.submit(() -> {
LOG.info("start execute load");
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
LOG.warn(err, e);
throw e;
}
}
DorisStreamLoad類負責將資料實際寫入Doris,在上面的程式碼中我看到了一個陌生的詞彙HiddenColumns
,「隱藏列」,什麼是隱藏列?.addHiddenColumns(enableDelete)
的引數enableDelete
是一個boolean值,繼續扒程式碼發現,預設值enableDelete = true;
,addHiddenColumn(true)?是否意味著我的put運算元據中必須包含隱藏列?繼續扒
public HttpPutBuilder addHiddenColumns(boolean add) {
if(add){
header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
}
return this;
}
在http請求header中新增了一個設定,似乎是指明瞭"hidden_columns"="DORIS_DELETE_SIGN",看著好像是一個列名稱,使用IDEA的跟蹤呼叫功能,檢視下哪裡用到了這個變數。
跟蹤這些程式碼更確信,這是一個列名稱。我的10列加上這一列就是11列啊,設定enableDelete = false
,是否意味著我的put操作不再包含這一隱含列?
修改構造DorisSink的程式碼新增.setDeletable(false);
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix
.setDeletable(false);
重新執行程式碼,寫入成功,問題解決。
出現該異常是因為,Flink Doris Connector 在構造Sink時預設使用者寫入資料中包含了隱藏列__DORIS_DELETE_SIGN__
。
儘管問題解決了,但是還是有很多疑問,什麼是隱藏列,__DORIS_DELETE_SIGN__
這個隱藏列是什麼意思,從前面的程式碼中可以看出其取值為0或1,匯入資料時為什麼預設需要傳遞該列,該列在最前面還是在最後面?不傳遞該列是否會有問題?