便捷的統計訂單收益(一)

2020-10-21 18:00:31
欄目帶大家瞭解如何便捷的統計訂單收益。

引言

統計訂單收益是做電商型別的APP老生常談的問題.常規需求大致有使用者收益日報/月報/年報.這些報表型的資料對錶設計和程式設計有著不小的挑戰.常規的聚合查詢語句的查詢時間會隨著收益表資料日漸龐大而逐漸變長.這時候就需要思考如何設計收益表可以更高效的查詢?怎樣的設計才可以讓統計收益變得簡單?

需求

效果圖

具體需求

  • 收益型別分為:自購訂單收益,分享訂單收益,分銷收益,活動收益
  • 統計當日收益,當月收益
  • 根據篩選的時間統計出時間段的收益.

思考

設計思路

訂單表是肯定需要的.在寫入或者修改訂單表的時候同步寫入修改收益表.只有自購和分享訂單會記錄到訂單表中,分銷以及活動贈送收益只在特殊業務中寫入收益表.再以日為維度,建立一張使用者收益日報表.單行記錄寫入使用者當天收益情況.降低查詢使用者日/月/年收益統計時的資料量.以單使用者為例,通過拆分使用者一個月只會產生最多31條資料.屬於可控增長速度.如果沿用收益表,因為收益表的資料量跟使用者下單的數量一一對應,如果使用者下單量多那麼表會非常龐大.在前期使用者量初見增長時,可用此方法規避大的資料量統計,後期如果使用者量增大導致日報表資料變多可以再考慮分表.

可見問題

  • 同步收益日報表的時機問題,因為原本訂單的操作就很複雜需要同步寫入收益和計算寫入收益日報資料,程式碼耦合度太高.有沒有什麼方法通過收益表異構出收益日報表呢?
  • 雖然收益被寫入到了日報表中,但是要滿足效果圖要求的效果,可能需要多次查詢SQL語句,有沒有辦法在不影響程式效率的情況下儘量少些一些聚合SQL呢?

實現

總結出上面這些問題.我開始了資料收集.最終採用canal+RocketMQ做為異構方案.

技術棧

簡單介紹下這兩款技術框架:

  • canal:主要用途是基於 MySQL 資料庫增量紀錄檔解析,提供增量資料訂閱和消費
  • RocketMQ:一款開源的分散式訊息系統,基於高可用分散式叢集技術,提供低延時的、高可靠的訊息釋出與訂閱服務。

注:我用的aliyun的全家桶,MQ和mysql都是阿里雲的,如果是自建伺服器的可能有區別,我在後面儘量標出

方案流程

  1. 在寫入或修改收益表的同時通過canal監控mysql收益表的binlog紀錄檔.
  2. canal檢測到變更,組裝變更的JSON報文,傳送RocketMQ中事先定義好的TOPIC.
  3. 程式消費該TOPIC,異構收益日報表.

canal設定部分

canal的安裝請參考官方檔案 解壓後可得到一個canal資料夾,包含三個目錄

  • bin:存放啟動重新啟動指令碼
  • conf:存放核心組態檔
  • lib:存放核心jar包

我們需要重點關注conf資料夾裡的conf/canal.properties核心組態檔以及conf/example/instance.properties單個監控節點組態檔

conf/canal.properties

# tcp, kafka, RocketMQ,這裡預設是tcp讀取模式,採用RocketMQ需要將其改變為RocketMQ模式
canal.serverMode = RocketMQ
# 如果是aliyun的RocketMQ需要設定以下兩個KEY,ak/sk
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
# 監控的節點名稱.這個預設就是example如果有多節點可以逗號隔開,如下方的例子
canal.destinations = example,sign
# 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel為cloud預設為local
canal.mq.accessChannel = cloud
#MQ的地址,需要注意這裡是不帶http://,但是需要帶埠號
canal.mq.servers = 
#rocketmq範例id
canal.mq.namespace =

conf/example/instance.properties

#mysql地址
canal.instance.master.address=
#以下兩個引數需要在開啟資料庫binlog紀錄檔後得到,在資料庫查詢介面輸入查詢語句`show master status`,canal.instance.master.journal.name對應File引數,canal.instance.master.position對應Position引數
canal.instance.master.journal.name=
canal.instance.master.position=
#資料庫的賬號密碼
canal.instance.dbUsername=
canal.instance.dbPassword=
#需要監控變動的表
canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out
#定義傳送的mq生產組
canal.mq.producerGroup = 
#定義傳送到mq的指定主題
canal.mq.topic=

注:監控表的書寫規則格式參照監控表書寫規則

啟動

cd /canal/bin
./start.sh

這時候會發現canal目錄中多了一個log檔案,進入可以看到canal主紀錄檔檔案和example節點啟動紀錄檔.

canal紀錄檔中出現
 the canal server is running now ......
example紀錄檔中出現
 init table filter : ^tablename
 xxxxxxxxx , the next step is binlog dump

表示你已經成功了一大步,canal監控已正常執行.

RocketMQ部分

如果用的aliyun的RocketMQ,設定程式碼部分直接可參考檔案 自建的RocketMQ也可參照簡單的消費例子監控對應的TOPIC即可 消費Canal發來的資料,格式如下:

{
    "data":[
        {
            //單個修改後表資料,如果同一時間有多個表變動會有多個該JSON物件        }
    ],
    "database":"監控的表所在資料庫",
    "es":表變動時間,
    "id":canal生成的id,
    "isDdl":Boolean型別,表示是否DDL語句,
    "mysqlType":{
        表結構
    },
    "old":如果是修改型別會填充修改前的值,
    "pkNames":[
        該表的主鍵,如"id"
    ],
    "sql":"執行的SQL",
    "sqlType":{
        欄位對應的sqlType,一般使用mysqlType即可
    },
    "table":"監控的表名",
    "ts":canal記錄傳送時間,
    "type":"表的修改型別,入INSERT,UPDATE,DELETE"
}

MQ消費程式碼主要用了反射,對映到對應的表

//這裡的body就是Canal發來的資料
public Action process(String body) {
        boolean result = Boolean.FALSE;
        JSONObject data = JSONObject.parseObject(body);
        log.info("資料庫操作紀錄檔記錄:data:{}",data.toString());
        Class c = null;
        try {
            //這裡監控了訂單和收益表分別做訂單統計和收益日報統計
            c = Class.forName(getClassName(data.getString("table")));
        } catch (ClassNotFoundException e) {
            log.error("error {}",e);
        }
        if (null != c) {
            JSONArray dataArray = data.getJSONArray("data");
            if (dataArray != null) {
                //把獲取到的data部分轉換為反射後的實體集合
                List list = dataArray.toJavaList(c);
                if (CollUtil.isNotEmpty(list)) {
                    //對修改和寫入操作分別進行邏輯操作
                    String type = data.getString("type");
                    if ("UPDATE".equals(type)) {
                        result = uppHistory(list);
                    } else if ("INSERT".equals(type)) {
                        result = saveHistory(list);
                    }
                }
            }
        }
        return result ? Action.CommitMessage : Action.ReconsumeLater;
    }
    
    /**
     * @description: 獲取反射ClassName
     * @author: chenyunxuan
     */
    private String getClassName(String tableName) {
        StringBuilder sb = new StringBuilder();
        //判斷是哪張表的資料
        if (tableName.equals("t_user_income_detail")) {
            sb.append("cn.mc.core.model.order");
        } else if (tableName.equals("t_user_cash_out")) {
            sb.append("cn.mc.sync.model");
        }
        String className = StrUtil.toCamelCase(tableName).substring(1);
        return sb.append(".").append(className).toString();
    }
    
    /**
     * @description: 寫入對應型別的統計表
     * @author: chenyunxuan
     */
    private <T> Boolean saveHistory(List<T> orderList) {
        boolean result = Boolean.FALSE;
        Object dataType = orderList.get(0);
        //用instanceof判斷型別進入不同的邏輯處理程式碼
        if (dataType instanceof TUserIncomeDetail) {
            result = userOrderHistoryService.saveIncomeDaily(orderList);
        } else if (dataType instanceof UserCashOut) {
            result = userCashOutHistoryService.delSaveHistoryList(orderList);
        }
        return result;
    }

saveIncomeDaily虛擬碼

  public synchronized Boolean saveIncomeDaily(List orderList) {
    //迴圈收益明細記錄
    .......
    //通過建立時間和使用者id查詢收益日報表中是否有當日資料
    if(不存在當日資料){
        //建立當日的收益日報表記錄
        .....
    }
    //因為不存在當日記錄也會立即寫入當日的空資料,所以下面的流程都是走更新流程
    //更新當日資料
    .......
    return Boolean.TRUE;
    }

注:程式碼中應該多打一些紀錄檔,方便產生異常收益資料後的校對

後記

至此一個基於canal+RocketMQ的收益日報統計異構方案就完成了,下一篇會圍繞本文提到的第二個問題減少聚合SQL的產生展開.敬請關注.

相關免費學習推薦:

以上就是便捷的統計訂單收益(一)的詳細內容,更多請關注TW511.COM其它相關文章!