Canal+kafka同步mysql数据到ElastcSearch(二) 部署Canal Adapter端

it2026-03-05  4

1.canal adapter部署

    canal server实例只负责解析MySQL binlog日志并推送至kafka,可通过java程序对接进行数据同步,这里使用canal自带的adapter进行实时数据同步。

下载canal adapter  https://github.com/alibaba/canal/releases解压下载的压缩文件 tar xvf canal.adapter-1.1.4.tar.gz

修改canal adapter基础配置文件 vim conf/application.yml

server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: kafka # kafka rocketMQ #需根据实际指定 # canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 mqServers: 1.1.1.1:9092 #or rocketmq #需根据实际指定 flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: #需根据实际指定 url: jdbc:mysql://1.1.1.1:3306/dbName?useUnicode=true&characterEncoding=UTF-8 username: userName #需根据实际指定 password: password #需根据实际指定 canalAdapters: - instance: mysqlEs #需根据实际指定 前面配置的topic和实例名称 groups: - groupId: g1 outerAdapters: - name: logger - name: es hosts: 1.1.1.1:9200 # 127.0.0.1:9200 for rest mode #需根据实际指定 properties: mode: rest security.auth: username:password # username 和password 需根据实际指定 cluster.name: dev-es6-cluster # ES集群名称 #需根据实际指定 修改同步表配置文件: es/tsp_search.yml tsp_search.yam为根据自己需求新建的文件 dataSourceKey: defaultDS destination: mysqlEs groupId: g1 esMapping: _index: index #根据实际指定 _type: doc #根据实际指定 _id: _id upsert: true # pk: id sql: "SELECT.........." # objFields: # _labels: array:; etlCondition: "where a.c_time>={}" commitBatch: 3000

需要同步的表根据以上示例填写,并放置在相同的目录下,参数说明,参考:https://github.com/alibaba/canal/wiki/Sync-ES

全量同步: curl http://1.1.1.1:8081/etl/es/tsp_search.yml -X POST

2.整体注意事项

canal配置前请保证ES中对应索引已建立,mapping映射和sql中查询语句一致JDK11环境时,需要修改server和adapter的启动脚本startup.sh  去掉过时的JVM参数 :-XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods
最新回复(0)