1、jkd 8+
2、mysql 5.7+
3、Elasticsearch 7+
4、kibana 7+
5、canal.adapter 1.1.5
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for UserInfo -- ---------------------------- DROP TABLE IF EXISTS `UserInfo`; CREATE TABLE `UserInfo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
PUT canal_product { "mappings": { "properties": { "user_name": { "type": "text" }, "phone": { "type": "text" }, "age": { "type": "integer" } } } }
github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
額外需要下載v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)
分別解壓縮後,將v1.1.5-alpha-2解壓縮資料夾下plugin資料夾中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替換掉release版本的plugin檔案的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,並重新命名,再將該jar賦予許可權 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar
只需要修改特定的幾處即可,關於各節點說明可參考官方說明:https://help.aliyun.com/document_detail/135297.html
srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false username: canal password: canal
- name: es7 hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode properties: mode: rest #transport or rest security.auth: es:22222 # only used for rest mode cluster.name: elasticsearch # es叢集節點名稱
# 啟動服務
./bin/startup.sh
cat logs/adapter/adapter.log
如圖所示
向資料庫中插入一條資料
INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('張三', '10086', 99);
檢視紀錄檔
kibana檢視索引資料
GET canal_product/_search
dataSourceKey: defaultDS # 源資料來源的key, 對應上面設定的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的資料
esMapping:
_index: canal_product # es 的索引名稱
_id: _id # es 的_id, 如果不設定該項必須設定下面的pk項_id則會由es自動分配
sql: "SELECT
p.id as _id,
p.user_name,
p.phone,
p.age
FROM
UserInfo p " # sql對映
etlCondition: "where p.id>={}" #etl的條件引數
commitBatch: 3000 # 提交批大小
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml
學習連結:https://help.aliyun.com/document_detail/135297.html