canal server实例只负责解析MySQL binlog日志并推送至kafka,可通过java程序对接进行数据同步,这里使用canal自带的adapter进行实时数据同步。
下载canal adapter https://github.com/alibaba/canal/releases解压下载的压缩文件 tar xvf canal.adapter-1.1.4.tar.gz修改canal adapter基础配置文件 vim conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: kafka # kafka rocketMQ #需根据实际指定 # canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 mqServers: 1.1.1.1:9092 #or rocketmq #需根据实际指定 flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: #需根据实际指定 url: jdbc:mysql://1.1.1.1:3306/dbName?useUnicode=true&characterEncoding=UTF-8 username: userName #需根据实际指定 password: password #需根据实际指定 canalAdapters: - instance: mysqlEs #需根据实际指定 前面配置的topic和实例名称 groups: - groupId: g1 outerAdapters: - name: logger - name: es hosts: 1.1.1.1:9200 # 127.0.0.1:9200 for rest mode #需根据实际指定 properties: mode: rest security.auth: username:password # username 和password 需根据实际指定 cluster.name: dev-es6-cluster # ES集群名称 #需根据实际指定 修改同步表配置文件: es/tsp_search.yml tsp_search.yam为根据自己需求新建的文件 dataSourceKey: defaultDS destination: mysqlEs groupId: g1 esMapping: _index: index #根据实际指定 _type: doc #根据实际指定 _id: _id upsert: true # pk: id sql: "SELECT.........." # objFields: # _labels: array:; etlCondition: "where a.c_time>={}" commitBatch: 3000需要同步的表根据以上示例填写,并放置在相同的目录下,参数说明,参考:https://github.com/alibaba/canal/wiki/Sync-ES
全量同步: curl http://1.1.1.1:8081/etl/es/tsp_search.yml -X POST
