Doris寫入資料異常提示actual column number in csv file is less than schema column number

2023-07-14 21:00:29

版本資訊:

  • Flink 1.17.1
  • Doris 1.2.3
  • Flink Doris Connector 1.4.0

寫入方式

採用 String 資料流,依照社群網站的樣例程式碼,在sink之前將資料轉換為DataStream,分隔符采用"\t"。

執行異常

通過Stream Load返回結果json中的ErrorUrl可以看到如題的異常

Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...]

資料庫表明明只有10個欄位,提示schema column number卻是11個。是自己眼花數錯欄位了嗎?經過反覆確認及同事確認,沒有錯,目標表就是10個欄位,我寫入的也是10個欄位,是Flink Doris Connector 的bug嗎?

分析過程

既然懷疑是bug,那就去扒程式碼。
實際資料寫入邏輯封裝在org.apache.doris.flink.sink.writer.DorisWriter,該類實現了org.apache.flink.api.connector.sink.SinkWriter介面。檢視該類發現,寫入Doris的過程實際是使用微批寫入的。

    @Override
    public void write(IN in, Context context) throws IOException {
        checkLoadException();
        byte[] serialize = serializer.serialize(in);
        if(Objects.isNull(serialize)){
            //ddl record
            return;
        }
        if(!loading) {
            //Start streamload only when there has data
            dorisStreamLoad.startLoad(currentLabel);
            loading = true;
        }
        dorisStreamLoad.writeRecord(serialize);
    }
    @Override
    public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
        if(!loading){
            //There is no data during the entire checkpoint period
            return Collections.emptyList();
        }
        // disable exception checker before stop load.
        loading = false;
        Preconditions.checkState(dorisStreamLoad != null);
        RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
            throw new DorisRuntimeException(errMsg);
        }
        if (!executionOptions.enabled2PC()) {
            return Collections.emptyList();
        }
        long txnId = respContent.getTxnId();
        return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
    }

每一條記錄都會觸發write操作,從上述程式碼可以看到根據boolean變數loading的值,程式將會觸發dorisStreamLoad.startLoad(currentLabel);,而loading的狀態在preCommit方法中進行修改,而preCommit是在checkpoint時觸發,所以資料提交動作是通過checkpoint觸發的。檢視startLoad原始碼


    /**
     * start write data for new checkpoint.
     * @param label
     * @throws IOException
     */
    public void startLoad(String label) throws IOException{
        loadBatchFirstRecord = true;
        HttpPutBuilder putBuilder = new HttpPutBuilder();
        recordStream.startInput();
        LOG.info("stream load started for {} on host {}", label, hostPort);
        try {
            InputStreamEntity entity = new InputStreamEntity(recordStream);
            putBuilder.setUrl(loadUrlStr)
                    .baseAuth(user, passwd)
                    .addCommonHeader()
                    .addHiddenColumns(enableDelete)
                    .setLabel(label)
                    .setEntity(entity)
                    .addProperties(streamLoadProp);
            if (enable2PC) {
               putBuilder.enable2PC();
            }
            pendingLoadFuture = executorService.submit(() -> {
                LOG.info("start execute load");
                return httpClient.execute(putBuilder.build());
            });
        } catch (Exception e) {
            String err = "failed to stream load data with label: " + label;
            LOG.warn(err, e);
            throw e;
        }
    }

DorisStreamLoad類負責將資料實際寫入Doris,在上面的程式碼中我看到了一個陌生的詞彙HiddenColumns,「隱藏列」,什麼是隱藏列?.addHiddenColumns(enableDelete)的引數enableDelete 是一個boolean值,繼續扒程式碼發現,預設值enableDelete = true;,addHiddenColumn(true)?是否意味著我的put運算元據中必須包含隱藏列?繼續扒

    public HttpPutBuilder addHiddenColumns(boolean add) {
        if(add){
            header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
        }
        return this;
    }

在http請求header中新增了一個設定,似乎是指明瞭"hidden_columns"="DORIS_DELETE_SIGN",看著好像是一個列名稱,使用IDEA的跟蹤呼叫功能,檢視下哪裡用到了這個變數。

跟蹤這些程式碼更確信,這是一個列名稱。我的10列加上這一列就是11列啊,設定enableDelete = false,是否意味著我的put操作不再包含這一隱含列?

解決方案

修改構造DorisSink的程式碼新增.setDeletable(false);

        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix
                .setDeletable(false);

重新執行程式碼,寫入成功,問題解決。

總結

出現該異常是因為,Flink Doris Connector 在構造Sink時預設使用者寫入資料中包含了隱藏列__DORIS_DELETE_SIGN__
儘管問題解決了,但是還是有很多疑問,什麼是隱藏列,__DORIS_DELETE_SIGN__這個隱藏列是什麼意思,從前面的程式碼中可以看出其取值為0或1,匯入資料時為什麼預設需要傳遞該列,該列在最前面還是在最後面?不傳遞該列是否會有問題?