FlinkSQL MySQL CDC

it2026-02-19  5

flink-cdc-connectors

 

package com.sm.job import com.sm.function.udf.{ParseDeviceCode, ParsePopularize} import com.sm.utils.FlinkUtils import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.log4j.Level import org.slf4j.LoggerFactory /** * flink mysql CDC * 实时同步维表 * * create by LiuJinHe 2020/10/22 */ object FlinkMySqlCdc { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO) org.apache.log4j.Logger.getLogger("io.debezium").setLevel(Level.INFO) // 初始化 stream 环境 // 本地测试,需要 flink-runtime-web 依赖 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() // val env = StreamExecutionEnvironment.getExecutionEnvironment // 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3)) // 本地测试线程 1 // env.setParallelism(1) // 事件处理的时间,由系统时间决定 // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 创建 streamTable 环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, FlinkUtils.getSettings) // tableConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) // checkpoint 设置 // val tableConfig = tableEnv.getConfig.getConfiguration // 开启checkpoint tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) // checkpoint的超时时间周期,1 分钟做一次checkpoint, 每次checkpoint 完成后 sink 才会执行 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60)) // checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃 // tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60)) // checkpoint 最小间隔,两个检查点之间至少间隔 30 秒 // tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30)) // 同一时间只允许进行一个检查点 // tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1)) // 手动cancel时是否保留checkpoint // tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 设置状态的最小空闲时间和最大的空闲时间 // tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) // Catalog // val memoryCatalog = new GenericInMemoryCatalog("kafkaSourceTable", "memory") // 删除null数据 // tableEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop") val packageSourceSql = """ |create table packageSourceTable ( | ID int, | PACKAGE_ID int, | GAME_ID int, | CHANNEL_ID int, | PLATFORM int, | POPULARIZE int, | COMPANY_ID int, | ADD_TIME as localtimestamp, | proctime as proctime() |) with ( | 'connector' = 'mysql-cdc', | 'hostname' = 'xxx', | 'port' = '3306', | 'username' = 'xxx', | 'password' = 'xxx', | 'database-name' = 'game_platform', | 'table-name' = 'packages', | 'server-time-zone' = 'Asia/Shanghai' |) """.stripMargin val printSinkSql = """ |create table printSinkTable ( | ID int, | PACKAGE_ID int, | GAME_ID int, | CHANNEL_ID int, | PLATFORM int, | POPULARIZE int, | COMPANY_ID int, | ADD_TIME timestamp |) with ( | 'connector' = 'print' |) """.stripMargin val insertSql = """ |insert into printSinkTable |select | ID, | PACKAGE_ID, | GAME_ID, | CHANNEL_ID, | PLATFORM, | POPULARIZE, | COMPANY_ID, | ADD_TIME |from packageSourceTable """.stripMargin tableEnv.executeSql(packageSourceSql) tableEnv.executeSql(printSinkSql) tableEnv.executeSql(insertSql) } }

 

table API

val mysqlSource = MySQLSource.builder() .hostname("cdh-slave04") .port(3306) .username("root") .password("123456") .databaseList("game_platfrom") .tableList("game_platfrom.games") .deserializer(new StringDebeziumDeserializationSchema) .build() env.addSource(mysqlSource).print() env.execute("flink mysql cdc")

 

最新回复(0)