canal-1.1.5實時同步MySQL資料到Elasticsearch

2022-07-31 06:00:59

一、環境準備

1、jkd 8+

2、mysql 5.7+

3、Elasticsearch 7+

4、kibana 7+

5、canal.adapter 1.1.5 

二、部署

一、建立資料庫CanalDb和表UserInfo

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for UserInfo
-- ----------------------------
DROP TABLE IF EXISTS `UserInfo`;
CREATE TABLE `UserInfo`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

 

 

 

二、kibana建立索引

PUT canal_product 
{
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text"
      },
      "phone": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      }
    }
  }
}

 

 

 

 

 

 

 

三、下載安裝canal.adapter

github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5

額外需要下載v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)

分別解壓縮後,將v1.1.5-alpha-2解壓縮資料夾下plugin資料夾中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替換掉release版本的plugin檔案的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,並重新命名,再將該jar賦予許可權 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar

 

 

 

1、解壓並修改組態檔 conf/application.yml

只需要修改特定的幾處即可,關於各節點說明可參考官方說明:https://help.aliyun.com/document_detail/135297.html

 

srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false
      username: canal
      password: canal

 

- name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode 
        properties:
          mode: rest #transport  or rest 
          security.auth: es:22222 #  only used for rest mode
          cluster.name: elasticsearch  # es叢集節點名稱

 

 

 

 

 2、啟動服務

# 啟動服務
./bin/startup.sh

3、檢視紀錄檔是否啟動成功

cat logs/adapter/adapter.log

如圖所示

 

 4、實時同步

向資料庫中插入一條資料

INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('張三', '10086', 99);

檢視紀錄檔

 

kibana檢視索引資料

GET canal_product/_search

 

 

5、全量同步,修改conf/es7/mytest_user.yml組態檔,或者新建一個yml檔案也可

dataSourceKey: defaultDS # 源資料來源的key, 對應上面設定的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的資料
esMapping:
  _index: canal_product # es 的索引名稱
  _id: _id  # es 的_id, 如果不設定該項必須設定下面的pk項_id則會由es自動分配
  sql: "SELECT
         p.id as _id,
         p.user_name,
         p.phone,
         p.age
        FROM
         UserInfo p "        # sql對映
  etlCondition: "where p.id>={}"   #etl的條件引數
  commitBatch: 3000   # 提交批大小

 

 

 

curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml

 

 

 

 

 

 

 學習連結:https://help.aliyun.com/document_detail/135297.html

                   https://blog.csdn.net/zh1998wx/article/details/123101442?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&utm_relevant_index=1