教學 | Datavines 自定義資料質量檢查規則(Metric)

2023-06-27 09:00:53

Metric 是 Datavines 中一個核心概念,一個 Metric 表示一個資料質量檢查規則,比如空值檢查和錶行數檢查都是一個規則。Metric 採用外掛化設計,使用者可以根據自己的需求來實現一個 Metric。下面我們來詳細講解一下如何自定義Metric

第一步

我們先了解下幾個介面和抽象類,它們是實現自定義 Metric 的關鍵。

SqlMetric 介面

SqlMetric介面中定義了規則的各種屬性和操作的介面。

@SPI
public interface SqlMetric {
    // 中文名
    String getName();
    // 英文名
    String getZhName();
    // 根據系統的語言進行名字返回
    default String getNameByLanguage(boolean isEn) {
        return isEn ? getName() : getZhName();
    }
    // 規則屬於哪個維度,比如準確性、唯一性等等
    MetricDimension getDimension();
    // 規則的型別,包括單表檢查、單表自定義檢查
    MetricType getType();
    // 規則的級別,比如表級別、列級別
    default MetricLevel getLevel() {
        return MetricLevel.NONE;
    }
    // 是否支援錯誤資料輸出
    boolean isInvalidateItemsCanOutput();

    /**
     * 獲取不符合規則的資料的SQL語句
     * @return ExecuteSql
     */
    ExecuteSql getInvalidateItems(String uniqueKey);

    /**
     * 計算實際值的SQL語句
     * @return ExecuteSql
     */
    ExecuteSql getActualValue(String uniqueKey);

    /**
     * 實際值的欄位名
     */
    default String getActualName() {
        return "actual_value";
    }
    // 實際值的型別,比如數位,百分比或者列表
    default String getActualValueType() {
        return MetricActualValueType.COUNT.getDescription();
    }
    // 對引數進行檢查並輸出檢查結果
    CheckResult validateConfig(Map<String,Object> config);
    //規則所需要的引數
    Map<String, ConfigItem> getConfigMap();
    //構造規則前需要做的檢查
    void prepare(Map<String,String> config);

    default String getIssue() {
        return "";
    }
    // 適合哪些欄位型別
    List<DataVinesDataType> suitableType();
    // 是否支援多選,比如錶行數檢查支援多張表
    default boolean supportMultiple() {
        return false;
    }
    // 對規則引數的重新構造,配合錶行數多張表檢查
    default List<Map<String,Object>> getMetricParameter(Map<String,Object> metricParameter) {
        return Collections.singletonList(metricParameter);
    }
}

BaseSingleTable 抽象類

BaseSingleTable是實現了 SqlMetric 介面的抽象類,實現了表級別檢查規則中所需要引數的新增、錯誤資料SQL語句構造和實際值計算SQL語句構造和對過濾條件的處理等。

  • 這裡定義了獲取不符合規則的資料的基礎SQL語句,判斷型別的規則比如正規表示式檢查和列舉值檢查,只需要在基礎SQL語句後面新增過濾條件即可。
    protected StringBuilder invalidateItemsSql = new StringBuilder("select * from ${table}");
  • 實際值計算SQL語句預設是計算不符合規則資料的行數
String actualValueSql = "select count(1) as actual_value_"+ uniqueKey +" from ${invalidate_items_table}"; 
  • 計算平均值、彙總值等統計型別的規則需要重新實現getActualValue()中的ExecuteSql
public abstract class BaseSingleTable implements SqlMetric {
    // 這裡定義了獲取不符合規則的資料的基礎 SQL 語句,判斷類的規則比如正規表示式和列舉值檢查,只需要在基礎SQL後面新增過濾條件即可。
    protected StringBuilder invalidateItemsSql = new StringBuilder("select * from ${table}");

    protected List<String> filters = new ArrayList<>();

    protected HashMap<String,ConfigItem> configMap = new HashMap<>();

    protected Set<String> requiredOptions = new HashSet<>();

    public BaseSingleTable() {
        configMap.put("table",new ConfigItem("table", "表名", "table"));
        configMap.put("filter",new ConfigItem("filter", "過濾條件", "filter"));

        requiredOptions.add("table");
    }

    @Override
    public ExecuteSql getInvalidateItems(String uniqueKey) {
        ExecuteSql executeSql = new ExecuteSql();
        executeSql.setResultTable("invalidate_items_" + uniqueKey);
        executeSql.setSql(invalidateItemsSql.toString());
        executeSql.setErrorOutput(isInvalidateItemsCanOutput());
        return executeSql;
    }

    @Override
    public ExecuteSql getActualValue(String uniqueKey) {
        ExecuteSql executeSql = new ExecuteSql();
        executeSql.setResultTable("invalidate_count_" + uniqueKey);
        String actualValueSql = "select count(1) as actual_value_"+ uniqueKey +" from ${invalidate_items_table}";
        executeSql.setSql(actualValueSql);
        executeSql.setErrorOutput(false);
        return executeSql;
    }

    @Override
    public CheckResult validateConfig(Map<String, Object> config) {
        return ConfigChecker.checkConfig(config, requiredOptions);
    }

    @Override
    public void prepare(Map<String, String> config) {
        if (config.containsKey("filter")) {
            filters.add(config.get("filter"));
        }

        addFiltersIntoInvalidateItemsSql();
    }

    private void addFiltersIntoInvalidateItemsSql() {
        if (filters.size() > 0) {
            invalidateItemsSql.append(" where ").append(String.join(" and ", filters));
        }
    }

    @Override
    public MetricLevel getLevel() {
        return MetricLevel.TABLE;
    }
}

BaseSingleTableColumn 抽象類

BaseSingleTableColumn是列級別的抽象實現類,主要是新增列級別規則的通用引數。

public abstract class BaseSingleTableColumn extends BaseSingleTable {

    public BaseSingleTableColumn() {
        super();
        configMap.put("column",new ConfigItem("column", "列名", "column"));
        requiredOptions.add("column");
    }

    @Override
    public Map<String, ConfigItem> getConfigMap() {
        return configMap;
    }

    @Override
    public MetricLevel getLevel() {
        return MetricLevel.COLUMN;
    }

    @Override
    public boolean isInvalidateItemsCanOutput() {
        return false;
    }
}

第二步

瞭解完上面的三個基礎類以後,自定義一個Metric就變得格外簡單了。

基礎工作

在 datavines-metric-plugins 下建立一個新規則的 module

在 pom.xml 中新增

 <dependency>
     <groupId>io.datavines</groupId>
     <artifactId>datavines-metric-base</artifactId>
     <version>${project.version}</version>
 </dependency>

以 列舉值檢查 規則為例來講解

  • 判斷要實現的規則的級別,因為列舉值檢查是列級別,所以繼承 BaseSingleTableColumn 即可。
  • 在建構函式中的configMap新增enum_list引數用於返回給前端進行展示,在requiredOptions新增enum_list用於引數的檢查。
  • 實現英文名、中文名、規則維度、規則型別這些基礎的屬性。
  • 因為列舉值檢查規則是為了找出在列舉值列表中的資料,所以只需要在fileters這個陣列裡面加入(${column} in ( ${enum_list} ))prepare()方法會自動進行不符合規則的SQL語句構造。
  • 實現suitableType()方法新增規則適用的欄位型別。
public class ColumnInEnums extends BaseSingleTableColumn {

    public ColumnInEnums(){
        super();
        configMap.put("enum_list",new ConfigItem("enum_list", "列舉值列表", "enum_list"));
        requiredOptions.add("enum_list");
    }

    @Override
    public String getName() {
        return "column_in_enums";
    }

    @Override
    public String getZhName() {
        return "列舉值檢查";
    }

    @Override
    public MetricDimension getDimension() {
        return MetricDimension.EFFECTIVENESS;
    }

    @Override
    public MetricType getType() {
        return MetricType.SINGLE_TABLE;
    }

    @Override
    public boolean isInvalidateItemsCanOutput() {
        return true;
    }

    @Override
    public void prepare(Map<String, String> config) {
        if (config.containsKey("enum_list") && config.containsKey("column")) {
            filters.add(" (${column} in ( ${enum_list} )) ");
        }
        super.prepare(config);
    }

    @Override
    public List<DataVinesDataType> suitableType() {
        return Arrays.asList(DataVinesDataType.NUMERIC_TYPE, DataVinesDataType.STRING_TYPE, DataVinesDataType.DATE_TIME_TYPE);
    }
}

第三步

非常重要的一步

  • 在 resources 目錄下建立META-INF/plugins目錄。
  • 在 plugins 目錄下建立檔案並且命名為io.datavines.metric.api.SqlMetric
  • 在檔案中新增column_in_enums=io.datavines.metric.plugin.ColumnInEnums

第四步

打包成jar放到 datavines 目錄下的libs目錄下即可。

收工!自定義 Metric 就這樣輕鬆搞定了。

加入我們

Datavines 的目標是成為更好的資料可觀測性領域的開源專案,為更多的使用者去解決後設資料管理和資料質量管理中遇到的問題。在此我們真誠歡迎更多的貢獻者參與到社群建設中來,和我們一起成長,攜手共建更好的社群。

關於Datavane

Datavane 是一個專注於巨量資料領域的開源組織(社群),由一群巨量資料領域優秀的開源專案作者共同建立,旨在幫助開源專案作者更好的建設專案、為大眾提供高質量的開源軟體,宗旨是:只為做一個好軟體。目前已經聚集了一批優質的開源專案,涉及到資料整合、巨量資料元件管理、資料質量等。

Datavane 社群中,所有的專案都是開源開放的,程式碼質量和架構設計優質的潛力專案。社群保持開放中立、共同作業創造、堅持精品,鼓勵所有的開發者、使用者和貢獻者積极參與我們的社群、共同合作,創新創造,建設一個更加強大的開源社群。

官 網: http://www.datavane.org/
Github : https://github.com/datavane