FlinkSQL 读写 MySQL

it2026-01-30  3

FlinkSQL读取MySQL大多用作维表关联, 聚合结果写入MySQL,简单记录一下用法。

JDBC SQL Connector

 

package com.sm.job import com.sm.utils.FlinkUtils import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.log4j.Level import org.slf4j.LoggerFactory /** * flinkSQL 读写 mysql * * create by LiuJinHe 2020/10/22 */ object FlinkJdbcConnector { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO) org.apache.log4j.Logger.getLogger("io.debezium").setLevel(Level.INFO) // 初始化 stream 环境 // 本地测试,需要 flink-runtime-web 依赖 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() // val env = StreamExecutionEnvironment.getExecutionEnvironment // 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3)) // 本地测试线程 1 env.setParallelism(1) // 事件处理的时间,由系统时间决定 // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 创建 streamTable 环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, FlinkUtils.getSettings) // checkpoint 设置 val tableConfig = tableEnv.getConfig.getConfiguration // 开启checkpoint tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) // checkpoint的超时时间周期,1 分钟做一次checkpoint, 每次checkpoint 完成后 sink 才会执行 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60)) // checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60)) // checkpoint 最小间隔,两个检查点之间至少间隔 30 秒 tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30)) // 同一时间只允许进行一个检查点 tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1)) // 手动cancel时是否保留checkpoint tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 手动cancel时是否保留checkpoint checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // Catalog // val memoryCatalog = new GenericInMemoryCatalog("kafkaSourceTable", "memory") /** * mysql 源表 */ val mysqlSourceSql = """ |create table mysqlSourceTable ( | ID bigint, | NAME string, | DEVELOPER_ID bigint, | DEVELOPER_SHARE decimal(11,2), | STATE tinyint, | WEB_OPEN tinyint, | IS_MULTIPLE tinyint, | CP_GAME_NAME string, | SM_GAME_NAME string, | REMARK string, | CP_GAME_ID int, | UPDATE_TIME TIMESTAMP |) with ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://192.168.100.39:3306/game_platform?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC', | 'username' = 'root', | 'password' = '123456', | 'table-name' = 'games', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'scan.fetch-size' = '200' |) """.stripMargin /** * mysql sink */ val printSinkSql = """ |create table printSinkTable ( | ID bigint, | NAME string, | DEVELOPER_ID bigint, | DEVELOPER_SHARE decimal(11,2), | STATE tinyint, | WEB_OPEN tinyint, | IS_MULTIPLE tinyint, | CP_GAME_NAME string, | SM_GAME_NAME string, | REMARK string, | CP_GAME_ID int, | UPDATE_TIME TIMESTAMP |) with ( | 'connector' = 'print' |) """.stripMargin // val printSinkSql = // """ // |create table printSinkTable ( // | ID bigint, // | NAME string, // | DEVELOPER_ID bigint, // | DEVELOPER_SHARE decimal(11,2), // | STATE tinyint, // | WEB_OPEN tinyint, // | IS_MULTIPLE tinyint, // | CP_GAME_NAME string, // | SM_GAME_NAME string, // | REMARK string, // | CP_GAME_ID int, // | UPDATE_TIME TIMESTAMP // |) with ( // | 'connector' = 'jdbc', // | 'url' = 'jdbc:mysql://localhost:3306/test_flink?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC', // | 'username' = 'root', // | 'password' = '863863', // | 'table-name' = 'test-flink', // | 'driver' = 'com.mysql.cj.jdbc.Driver', // | 'sink.buffer-flush.interval' = '3s', // | 'sink.buffer-flush.max-rows' = '1', // | 'sink.max-retries' = '5' // |) val insertSql = """ |insert into printSinkTable |select * from mysqlSourceTable """.stripMargin tableEnv.executeSql(mysqlSourceSql) tableEnv.executeSql(printSinkSql) tableEnv.executeSql(insertSql) } }

依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> <!-- <scope>provided</scope>--> </dependency>

 

最新回复(0)