Canal實時解析mysql binlog資料實戰

2022-07-29 06:00:58

一、說明

通過canal實時監聽mysql binlog紀錄檔檔案的變化,並將資料解析出來

二、環境準備

1、建立maven專案並修改pom.xml組態檔

 <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
    </dependencies>

 

2、嗦程式碼

 特別說明:在解析資料時,相當於程式是使用者端,使用者端在連線canal伺服器端時是不需要使用者名稱和密碼 

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

    // 獲取連線
    CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111),
            "example","","");

    while(true)
    {
       // 連線
        canalConnector.connect();
        // 訂閱資料庫
        canalConnector.subscribe("CanalDb.*");
        // 獲取資料
        Message message = canalConnector.get(100);
        // 獲取Entry集合
        List<CanalEntry.Entry> entries=message.getEntries();
        // 判斷集合是否為空,如果為空,則執行緒等待2秒再拉取資料
        if (entries.size()<=0)
        {
            System.out.println("當次抓取沒有資料,休息一會兒。。。");
            Thread.sleep(2000);
        }
        else
        {
            // 遍歷entries,單條解析
            for (CanalEntry.Entry entry:entries)
            {
               // 1,獲取表名
                String tableName=entry.getHeader().getTableName();
                // 2,獲取型別
                CanalEntry.EntryType entryType=entry.getEntryType();
                // 3,獲取序列化後的資料
                ByteString storeValue=entry.getStoreValue();
                // 4.判斷當前entryType型別是否為ROWDATA
                if (CanalEntry.EntryType.ROWDATA.equals(entryType))
                {
                    //5.反序列化資料
                    CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue);
                    //6.獲取當前事件的操作型別
                    CanalEntry.EventType eventType=rowChange.getEventType();
                    //7.獲取資料集
                    List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList();
                    //8.遍歷rowDataList並列印資料集
                    for(CanalEntry.RowData rowData:rowDataList)
                    {
                        JSONObject beforData=new JSONObject();
                        List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList();
                        for (CanalEntry.Column column:beforClountList)
                        {
                            beforData.put(column.getName(),column.getValue());
                        }
                        JSONObject afterData=new JSONObject();
                        List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList();
                        for (CanalEntry.Column column:afterClountList)
                        {
                            afterData.put(column.getName(),column.getValue());
                        }
                        // 列印資料
                        System.out.println(""+tableName+
                                ",EventType:"+eventType+
                                ",Before:"+beforData+
                                ",After:"+afterData);
                    }

                }
                else
                {
                    System.out.println("當前操作型別為"+entryType);
                }
            }
        }
    }
  }
}

三、專案效果