一、整体模块构成
canal有两种使用方式:1、独立部署 2、内嵌到应用中。 deployer模块主要用于独立部署canal server。关于这两种方式的区别,请参见server模块源码分析。
deployer模块源码目录结构如下所示,包括启动和停止脚本,服务的相关配置文件等。
deployer模块主要完成以下功能:
1、读取canal,properties配置文件
2、启动canal server,监听canal client的请求
3、启动canal instance,连接mysql数据库,伪装成slave,解析binlog
4、在canal的运行过程中,监听配置文件的变化
二、核心类
1、CanalLauncher类(canal独立版本启动的入口类)
这个类是模块中最关键的入口类,
public static void main(String[] args) { try { logger.info("## set default uncaught exception handler"); setGlobalUncaughtExceptionHandler(); logger.info("## load canal configurations"); //1、读取canal.properties文件中配置,默认读取classpath下的canal.properties String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properties = new Properties(); if (conf.startsWith(CLASSPATH_URL_PREFIX)) { conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX); properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf)); } else { properties.load(new FileInputStream(conf)); } //2、启动canal,首先将properties对象传递给CanalStarter ,然后调用其start方法启动 final CanalStarter canalStater = new CanalStarter(properties); String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER); if (StringUtils.isNotEmpty(managerAddress)) { String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110"); boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_REGISTER)); String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER); String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP); if (StringUtils.isEmpty(registerIp)) { registerIp = AddressUtils.getHostIp(); } final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress, user, passwd, registerIp, Integer.parseInt(adminPort), autoRegister, autoCluster); PlainCanal canalConfig = configClient.findServer(null); if (canalConfig == null) { throw new IllegalArgumentException("managerAddress:" + managerAddress + " can't not found config for [" + registerIp + ":" + adminPort + "]"); } Properties managerProperties = canalConfig.getProperties(); // merge local managerProperties.putAll(properties); int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5")); executor.scheduleWithFixedDelay(new Runnable() { private PlainCanal lastCanalConfig; public void run() { try { if (lastCanalConfig == null) { lastCanalConfig = configClient.findServer(null); } else { PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5()); if (newCanalConfig != null) { // 远程配置canal.properties修改重新加载整个应用 canalStater.stop(); Properties managerProperties = newCanalConfig.getProperties(); // merge local managerProperties.putAll(properties); canalStater.setProperties(managerProperties); //启动canal server,但这还不是最终具体实现的类 canalStater.start(); lastCanalConfig = newCanalConfig; } } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); canalStater.setProperties(managerProperties); } else { canalStater.setProperties(properties); } canalStater.start(); runningLatch.await(); executor.shutdownNow(); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal Server:", e); } }可以看到,CanalLauncher实际上只是负责读取canal.properties配置文件,然后构造CanalStarter 对象,并通过其start和stop方法来开启和停止canal。
2、CanalStarter类(Canal server 启动类)
这个类是模块中最关键的启动类
/** * 启动方法 * * @throws Throwable */ //同步操作,防止重复启动同一个实例 public synchronized void start() throws Throwable { String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (!“tcp”.equalsIgnoreCase(serverMode)) { ExtensionLoader loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class); canalMQProducer = loader .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR); //初始化Kafka if (canalMQProducer != null) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader()); canalMQProducer.init(properties); Thread.currentThread().setContextClassLoader(cl); } }
if (canalMQProducer != null) { MQProperties mqProperties = canalMQProducer.getMqProperties(); // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); if (mqProperties.isFlatMessage()) { // 设置为raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } } logger.info("## start the canal server."); //将属性传给CanalController类,这是canal调度控制器,真正的启动是这个类在做 controller = new CanalController(properties); //启动 controller.start(); logger.info("## the canal server is running now ......"); shutdownThread = new Thread(() -> { try { logger.info("## stop the canal server"); controller.stop(); CanalLauncher.runningLatch.countDown(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal Server:", e); } finally { logger.info("## canal server is down."); } }); //关闭canal,通过添加JVM的钩子( 这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。),JVM停止前会回调run方法,其内部调用controller.stop()方法进行停止 Runtime.getRuntime().addShutdownHook(shutdownThread); //启动kafka if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(destinations); controller.setCanalMQStarter(canalMQStarter); } // start canalAdmin String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT); //CanalAdminWithNetty,基于netty网络服务的server实现 if (canalAdmin == null && StringUtils.isNotEmpty(port)) { String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); CanalAdminController canalAdmin = new CanalAdminController(this); canalAdmin.setUser(user); canalAdmin.setPasswd(passwd); String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP); logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}", port, user, passwd, ip); CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance(); canalAdminWithNetty.setCanalAdmin(canalAdmin); canalAdminWithNetty.setPort(Integer.parseInt(port)); canalAdminWithNetty.setIp(ip); canalAdminWithNetty.start(); this.canalAdmin = canalAdminWithNetty; } running = true; }可以看到,CanalStarter是负责初始化Kafka,启动kafka,启动基于Netty的server服务,调用CanalController的启动实例方法。
3、CanalController类(canal调度控制器类)
这个类是模块中canal调度控制器
public CanalController(final Properties properties){ managerClients = MigrateMap.makeComputingMap(this::getManagerClient);
// 初始化全局参数设置 globalInstanceConfig = initGlobalConfig(properties); //这里利用Google Guava框架的MapMaker创建Map实例并赋值给instanceConfigs instanceConfigs = new MapMaker().makeMap(); // 初始化instance config initInstanceConfig(properties); // init socketChannel String socketChannel = getProperty(properties, CanalConstants.CANAL_SOCKETCHANNEL); if (StringUtils.isNotEmpty(socketChannel)) { System.setProperty(CanalConstants.CANAL_SOCKETCHANNEL, socketChannel); } // 兼容1.1.0版本的ak/sk参数名 String accesskey = getProperty(properties, "canal.instance.rds.accesskey"); String secretkey = getProperty(properties, "canal.instance.rds.secretkey"); if (StringUtils.isNotEmpty(accesskey)) { System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey); } if (StringUtils.isNotEmpty(secretkey)) { System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey); } // 准备canal server ip = getProperty(properties, CanalConstants.CANAL_IP); registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, "11111")); adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110")); //CanalServerWithEmbedded是嵌入式版本实现的canal server embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY); if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) { //设置CanalServerWithNetty canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); } // 处理下ip为空,默认使用hostIp暴露到zk中 if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) { ip = registerIp = AddressUtils.getHostIp(); } if (StringUtils.isEmpty(ip)) { ip = AddressUtils.getHostIp(); } if (StringUtils.isEmpty(registerIp)) { registerIp = ip; // 兼容以前配置 } //zookeeper的部分配置 final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // CanalInstance运行状态监控 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> { ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener() { //触发现在轮到自己做为active,需要载入上一个active的上下文数据 public void processActiveEnter() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.start(destination); if (canalMQStarter != null) { canalMQStarter.startDestination(destination); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } //触发一下当前active模式失败 public void processActiveExit() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); if (canalMQStarter != null) { canalMQStarter.stopDestination(destination); } embededCanalServer.stop(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } //启动时回调做点事情 public void processStart() { try { if (zkclientx != null) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, registerIp + ":" + port); initCid(path); zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { logger.error("failed to connect to zookeeper", error); } }); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } //关闭时回调做点事情 public void processStop() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); if (zkclientx != null) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, registerIp + ":" + port); releaseCid(path); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } }); if (zkclientx != null) { runningMonitor.setZkClient(zkclientx); } // 触发创建一下cid节点 runningMonitor.init(); return runningMonitor; })); // 初始化monitor机制 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { public void start(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { // 重新读取一下instance config config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); } if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } logger.info("auto notify start {} successful.", destination); } public void stop(String destination) { // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息 InstanceConfig config = instanceConfigs.remove(destination); if (config != null) { embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } logger.info("auto notify stop {} successful.", destination); } public void reload(String destination) { // 目前任何配置变化,直接重启,简单处理 stop(destination); start(destination); logger.info("auto notify reload {} successful.", destination); } @Override public void release(String destination) { // 此处的release,代表强制释放,主要针对HA机制释放运行,让给其他机器抢占 InstanceConfig config = instanceConfigs.get(destination); if (config != null) { //ServerRunningMonitor 是针对server的running节点控制 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { boolean release = runningMonitor.release(); if (!release) { // 如果是单机模式,则直接清除配置 instanceConfigs.remove(destination); // 停掉服务 runningMonitor.stop(); if (instanceConfigMonitors.containsKey(InstanceConfig.InstanceMode.MANAGER)) { ManagerInstanceConfigMonitor monitor = (ManagerInstanceConfigMonitor) instanceConfigMonitors.get(InstanceConfig.InstanceMode.MANAGER); Map<String, InstanceAction> instanceActions = monitor.getActions(); if (instanceActions.containsKey(destination)) { // 清除内存中的autoScan cache monitor.release(destination); } } } } } logger.info("auto notify release {} successful.", destination); } }; //实例化配置 instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5")); //spring 模式 if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); // 设置conf目录,默认是user.dir + conf目录组成 String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf"; } if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { monitor.setRootConf(rootDir); } else { // eclipse debug模式 monitor.setRootConf("src/main/resources/"); } return monitor; } else if (mode.isManager()) { //管理模式 ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER); monitor.setConfigClient(getManagerClient(managerAddress)); return monitor; } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); } }); } }3.1.1配置解析相关代码
启动的相关配置类InstanceConfig
public class InstanceConfig {
private InstanceConfig globalConfig; private InstanceMode mode; private Boolean lazy; private String managerAddress; private String springXml;这个类对应canal.properties文件类的
具体实例化方法:
private InstanceConfig initGlobalConfig(Properties properties) { String adminManagerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER); InstanceConfig globalConfig = new InstanceConfig(); //读取canal.instance.global.mode String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(adminManagerAddress)) { // 如果指定了manager地址,则强制适用manager globalConfig.setMode(InstanceMode.MANAGER); } else if (StringUtils.isNotEmpty(modeStr)) { globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr))); } //读取canal.instance.global.lazy String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(lazyStr)) { globalConfig.setLazy(Boolean.valueOf(lazyStr)); } //读取canal.instance.global.manager.address String managerAddress = getProperty(properties, CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(managerAddress)) { if (StringUtils.equals(managerAddress, “${canal.admin.manager}”)) { managerAddress = adminManagerAddress; }
globalConfig.setManagerAddress(managerAddress); } //读取canal.instance.global.spring.xml String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(springXml)) { globalConfig.setSpringXml(springXml); } //初始化instanceGenerator instanceGenerator = destination -> { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { throw new CanalServerException("can't find destination:" + destination); } //基于manager生成对应的canal实例 if (config.getMode().isManager()) { PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties); instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress())); instanceGenerator.setSpringXml(config.getSpringXml()); return instanceGenerator.generate(destination); } else if (config.getMode().isSpring()) { //基于Spring生成对应的canal实例 SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator(); instanceGenerator.setSpringXml(config.getSpringXml()); return instanceGenerator.generate(destination); } else { throw new UnsupportedOperationException("unknow mode :" + config.getMode()); } }; return globalConfig; }其中canal.instance.global.mode用于确定canal instance的全局配置加载方式,其取值范围有2个:spring、manager。我们知道一个canal server中可以启动多个canal instance,每个instance都有各自的配置。instance的配置也可以放在本地,也可以放在远程配置中心里。我们可以自定义每个canal instance配置文件存储的位置,如果所有canal instance的配置都在本地或者远程,此时我们就可以通过canal.instance.global.mode这个配置项,来统一的指定配置文件的位置,避免为每个canal instance单独指定。
其中:
spring方式:
表示所有的canal instance的配置文件位于本地。此时,我们必须提供配置项canal.instance.global.spring.xml指定spring配置文件的路径。canal提供了多个spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。这么多配置文件主要是为了支持canal instance不同的工作方式。
manager方式:
表示所有的canal instance的配置文件位于远程配置中心,此时我们必须提供配置项 canal.instance.global.manager.address来指定远程配置中心的地址。目前alibaba内部配置使用这种方式。开发者可以自己实现CanalConfigClient,连接各自的管理系统,完成接入。
3.1.2instanceConfigs字段
类型为Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,当其generate方法被调用时,会尝试从instanceConfigs根据一个destination获取对应的InstanceConfig,现在分析instanceConfigs的相关初始化代码。
我们知道globalInstanceConfig定义全局的配置加载方式。如果需要把部分CanalInstance配置放于本地,另外一部分CanalIntance配置放于远程配置中心,则只通过全局方式配置,无法达到这个要求。虽然这种情况很少见,但是为了提供最大的灵活性,canal支持每个CanalIntance自己来定义自己的加载方式,来覆盖默认的全局配置加载方式。而每个destination对应的InstanceConfig配置就存放于instanceConfigs字段中。
举例来说:
//当前server上部署的instance列表 canal.destinations=instance1,instance2
//instance配置全局加载方式 canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/file-instance.xml
//instance1覆盖全局加载方式 canal.instance.instance1.mode = manager canal.instance.instance1.manager.address = 127.0.0.1:1099 canal.instance.instance1.lazy = tue 这段配置中,设置了instance的全局加载方式为spring,instance1覆盖了全局配置,使用manager方式加载配置。而instance2没有覆盖配置,因此默认使用spring加载方式。
具体的实例化配置方法
private void initInstanceConfig(Properties properties) { //读取配置项canal.destinations String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); //以","分割canal.destinations,得到一个数组形式的destination String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT); //为每一个destination生成一个InstanceConfig实例 for (String destination : destinations) { //将destination对应的InstanceConfig放入instanceConfigs中 InstanceConfig config = parseInstanceConfig(properties, destination); InstanceConfig oldConfig = instanceConfigs.put(destination, config);
if (oldConfig != null) { logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config); } } }上面代码片段中,首先解析canal.destinations配置项,可以理解一个destination就对应要初始化一个canal instance。针对每个destination会创建各自的InstanceConfig,最终都会放到instanceConfigs这个Map中。
3.1.3 准备canal server
// 准备canal server ip = getProperty(properties, CanalConstants.CANAL_IP); registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, “11111”)); adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, “11110”)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, “11112”)); embededCanalServer.setMetricsPort(metricsPort);
this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY); if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) { canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); }ip:String,对应canal.properties文件中的canal.ip,canal server监听的ip。
port:int,对应canal.properties文件中的canal.port,canal server监听的端口
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
CanalServerWithEmbedded 和 CanalServerWithNetty都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。
关于这两种类型的实现,canal官方文档有以下描述:
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库。如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。
在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。
因此,在上述代码中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalServerWithNetty中。
3.1.4启动方法
public void start() throws Throwable { logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port); // 创建整个canal的工作节点 /构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换。 在集群模式下,可能会有多个canal server共同处理同一个destination, 在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。/ final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + “:” + port); initCid(path); if (zkclientx != null) { /对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常, 此时需要尝试自己进入running状态。/ this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { logger.error("failed to connect to zookeeper", error); } }); } // 优先启动embeded服务 embededCanalServer.start(); // 尝试启动一下非lazy状态的通道 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) { instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } } // 启动网络接口 if (canalServer != null) { canalServer.start(); } }HA机制启动机制
runningMonitor.start();
public synchronized void start() { super.start(); try { processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener);
initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }private void initRunning() { if (!isStart()) { return; } //构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 //构建临时节点的数据,标记当前destination由哪一个canal server处理 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。 //此时会抛出ZkNodeExistsException,进入catch代码块。 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果创建成功,触发一下事件,内部调用ServerRunningListener的processActiveEnter方法 processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { //创建节点失败,则根据path从zk中获取当前是哪一个canal server创建了当前canal instance的相关信息。 //第二个参数true,表示的是,如果这个path不存在,则返回null。 bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { //如果的确存在,则将创建该canal instance实例信息存入activeData中。 activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
可以看到,initRunning方法内部只有在尝试在zk中创建节点成功后,才会去调用listener的processActiveEnter方法来真正启动destination对应的canal instance,这是canal HA方式启动的核心。canal官方文档中介绍了CanalServer HA机制启动的流程,如下:
事实上,这个说明的前两步,都是在initRunning方法中实现的。从上面的代码中,我们可以看出,在HA机启动的情况下,initRunning方法不一定能走到processActiveEnter()方法,因为创建临时节点可能会出错。
此外,根据官方文档说明,如果出错,那么当前canal instance则进入standBy状态。也就是另外一个canal instance出现异常时,当前canal instance顶上去。
3.1.5autoScan机制相关代码
关于autoscan,官方文档有以下介绍:
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() {//…};
instanceConfigMonitors = //.... }可以看到,autoScan是否需要自动扫描的开关,只有当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其中:
其中:
defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。其实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动;当移除一个destination配置时,需要调用stop方法来停止;当某个destination配置发生变更时,需要调用reload方法来进行重启。 instanceConfigMonitors:类型为Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段只是定义了配置发生变化默认应该采取的操作,那么总该有一个类来监听配置是否发生了变化,这就是InstanceConfigMonitor的作用。官方文档中,只提到了对canal.conf.dir配置项指定的目录的监听,这指的是通过spring方式加载配置。显然的,通过manager方式加载配置,配置中心的内容也是可能发生变化的,也需要进行监听。此时可以理解为什么instanceConfigMonitors的类型是一个Map,key为InstanceMode,就是为了对这两种方式的配置加载方式都进行监听。defaultAction字段初始化源码如下所示:
defaultAction = new InstanceAction() {
public void start(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { // 重新读取一下instance config config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); } if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } } public void stop(String destination) { // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息 InstanceConfig config = instanceConfigs.remove(destination); if (config != null) { embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } } public void reload(String destination) { // 目前任何配置变化,直接重启,简单处理 stop(destination); start(destination); }}; instanceConfigMonitors字段初始化源码如下所示:
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() { public InstanceConfigMonitor apply(InstanceMode mode) { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) {//如果加载方式是spring,返回SpringInstanceConfigMonitor SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); // 设置conf目录,默认是user.dir + conf目录组成 String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = “…/conf”; } if (StringUtils.equals(“otter-canal”, System.getProperty(“appName”))) { monitor.setRootConf(rootDir); } else { // eclipse debug模式 monitor.setRootConf(“src/main/resources/”); } return monitor; } else if (mode.isManager()) {//如果加载方式是manager,返回ManagerInstanceConfigMonitor return new ManagerInstanceConfigMonitor(); } else { throw new UnsupportedOperationException(“unknow mode :” + mode + " for monitor"); } } });
3.1.6 启动实例方法
public void start() throws Throwable { logger.info("## start the canal server[{}:{}]", ip, port); // 创建整个canal的工作节点 :/otter/canal/cluster/{0} final String path = ZookeeperPathUtils.getCanalClusterNode(ip + “:” + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } }); } // 优先启动embeded服务 embededCanalServer.start(); //启动不是lazy模式的CanalInstance,通过迭代instanceConfigs,根据destination获取对应的ServerRunningMonitor,然后逐一启动 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 如果destination对应的CanalInstance没有启动,则进行启动 if (!embededCanalServer.isStart(destination)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); //如果不是lazy,lazy模式需要等到第一次有客户端请求才会启动 if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) {//启动配置文件自动检测机制 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start();//启动monitor } } } // 启动网络接口,监听客户端请求 canalServer.start(); }
4、总结
deployer模块的主要作用:
1、读取canal.properties,确定canal instance的配置加载方式
2、确定canal instance的启动方式:独立启动或者集群方式启动
3、监听canal instance的配置的变化,动态停止、启动或新增
4、启动canal server,监听客户端请求