通過canal實時監聽mysql binlog紀錄檔檔案的變化,並將資料解析出來
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
特別說明:在解析資料時,相當於程式是使用者端,使用者端在連線canal伺服器端時是不需要使用者名稱和密碼
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { // 獲取連線 CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111), "example","",""); while(true) { // 連線 canalConnector.connect(); // 訂閱資料庫 canalConnector.subscribe("CanalDb.*"); // 獲取資料 Message message = canalConnector.get(100); // 獲取Entry集合 List<CanalEntry.Entry> entries=message.getEntries(); // 判斷集合是否為空,如果為空,則執行緒等待2秒再拉取資料 if (entries.size()<=0) { System.out.println("當次抓取沒有資料,休息一會兒。。。"); Thread.sleep(2000); } else { // 遍歷entries,單條解析 for (CanalEntry.Entry entry:entries) { // 1,獲取表名 String tableName=entry.getHeader().getTableName(); // 2,獲取型別 CanalEntry.EntryType entryType=entry.getEntryType(); // 3,獲取序列化後的資料 ByteString storeValue=entry.getStoreValue(); // 4.判斷當前entryType型別是否為ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5.反序列化資料 CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue); //6.獲取當前事件的操作型別 CanalEntry.EventType eventType=rowChange.getEventType(); //7.獲取資料集 List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList(); //8.遍歷rowDataList並列印資料集 for(CanalEntry.RowData rowData:rowDataList) { JSONObject beforData=new JSONObject(); List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList(); for (CanalEntry.Column column:beforClountList) { beforData.put(column.getName(),column.getValue()); } JSONObject afterData=new JSONObject(); List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList(); for (CanalEntry.Column column:afterClountList) { afterData.put(column.getName(),column.getValue()); } // 列印資料 System.out.println(""+tableName+ ",EventType:"+eventType+ ",Before:"+beforData+ ",After:"+afterData); } } else { System.out.println("當前操作型別為"+entryType); } } } } } }