FlinkSQL MySQL维表关联

it2025-12-16  10

需求:消费Kafka数据,进行数据清洗及维表关联补齐字段,结果数据写入Kafka。 

import java.time.Duration import com.sm.function.udf._ import com.sm.common.conf.PropManager import com.sm.constants.Constants import com.sm.utils.FlinkUtils import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.environment.{CheckpointConfig, ExecutionCheckpointingOptions} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.log4j.Level import org.slf4j.LoggerFactory /** * flink sql 维表关联 * 消费kafka, MySQL维表关联, 结果写入kafka * * create by LiuJinHe 2020/10/22 */ object LoginEtlKafka2Kafka { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO) // 初始化 stream 环境 // 本地测试,需要 flink-runtime-web 依赖 // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() val env = StreamExecutionEnvironment.getExecutionEnvironment // 本地测试线程 1 // env.setParallelism(1) // val params = ParameterTool.fromArgs(args) // env.getConfig.setGlobalJobParameters(params) // 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3)) // 事件处理的时间,由系统时间决定 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 创建 streamTable 环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, FlinkUtils.getSettings) // tableConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) // checkpoint 设置 val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) // checkpoint的超时时间周期,1 分钟做一次checkpoint, 每次checkpoint 完成后 sink 才会执行 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60)) // checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60)) // checkpoint 最小间隔,两个检查点之间至少间隔 30 秒 tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30)) // 同一时间只允许进行一个检查点 tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1)) // 手动cancel时是否保留checkpoint tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 设置状态的最小空闲时间和最大的空闲时间 tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) tableConfig.setString("table.exec.mini-batch.enabled", "true") tableConfig.setString("table.exec.mini-batch.allow-latency", "2s") tableConfig.setString("table.exec.mini-batch.size", "5000") tableConfig.setString("table.optimizer.distinct-agg.split.enabled", "true") // 测试 // val odsLoginTopic = "ods_login" // val bootstrapServers = "192.168.100.39:9092" // val groupId = "flink-test-group" // val startMode = "group-offsets" // val dwdSdkUserLogin = "dwd_sdk_user_login" /** * 加载配置 */ val odsUserLoginTopic = PropManager.getProp(Constants.ODS_USER_LOGIN_TOPIC) val dwdUserLoginTopic = PropManager.getProp(Constants.DWD_USER_LOGIN_TOPIC) val bootstrapServers = PropManager.getProp(Constants.BOOTSTRAP_SERVERS) val ConsumerGroupId = PropManager.getProp(Constants.CONSUMER_GROUP_ID) val ProducerGroupId = PropManager.getProp(Constants.PRODUCER_GROUP_ID) val startMode = PropManager.getProp(Constants.STARTUP_MODE) /** * kafka source table */ tableEnv.executeSql("drop table if exists login_kafka_Source") val kafkaSourceDDL = s""" |create table $odsUserLoginTopic ( | _game_id int, | _login_name string, | _time bigint, | _time_server bigint, | _device_key string, | _os string, | _imei string, | ts as to_timestamp(from_unixtime(_time / 1000, 'yyyy-MM-dd HH:mm:ss')), | proc_time as proctime() |) with ( | 'connector' = 'kafka', | 'topic' = '$odsUserLoginTopic', | 'properties.bootstrap.servers' = '$bootstrapServers', | 'properties.group.id' = '$ConsumerGroupId', | 'scan.startup.mode' = '$startMode', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin tableEnv.executeSql(kafkaSourceDDL) /** * dwd */ tableEnv.executeSql(s"drop table if exists $dwdUserLoginTopic") val kafkaSinkDDL = s""" |create table $dwdUserLoginTopic ( | cp_game_id int, | game_id int, | login_account string, | `time` string, | time_server string, | device_key string, | os string, | primary key (`time`, package_id, core_account) not enforced |) with ( | 'connector' = 'kafka', | 'topic' = '$dwdUserLoginTopic', | 'properties.bootstrap.servers' = '$bootstrapServers', | 'properties.group.id' = '$ProducerGroupId', | 'sink.partitioner' = 'round-robin', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin tableEnv.executeSql(kafkaSinkDDL) /** * DIM */ val dimSourceDDL = """ |create table dim_mysql_games ( | ID bigint, | CP_GAME_ID int |) with ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://cdh-slave04:3306/game_platform?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC', | 'username' = 'root', | 'password' = '123456', | 'table-name' = 'games', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'scan.fetch-size' = '200', | 'lookup.cache.max-rows' = '2000', | 'lookup.cache.ttl' = '300s' |) """.stripMargin tableEnv.executeSql(dimSourceDDL) /** * 注册自定义函数 */ tableEnv.createTemporarySystemFunction("ParseDeviceCode", classOf[ParseDeviceCode]) tableEnv.createTemporarySystemFunction("ParseDeviceKey", classOf[ParseDeviceKey]) tableEnv.createTemporarySystemFunction("ParseOsVersion", classOf[ParseOsVersion]) tableEnv.createTemporarySystemFunction("ParseDeviceId", classOf[ParseDeviceId]) tableEnv.createTemporarySystemFunction("ParseMd5DeviceId", classOf[ParseMd5DeviceId]) /** * temporal join */ val queryDDL = s""" |insert into $dwdUserLoginTopic |select | dim.CP_GAME_ID as cp_game_id, | _game_id as game_id, | coalesce(_login_name, '') as login_account, | from_unixtime(_time / 1000, 'yyyy-MM-dd HH:mm:ss') as `time`, | from_unixtime(_time_server / 1000, 'yyyy-MM-dd HH:mm:ss') as time_server, | ParseDeviceKey(_device_key, _os, _imei) as device_key, | _os as os |from $odsUserLoginTopic as login | left join dim_mysql_games for system_time as of login.proc_time as dim | on dim.ID = login._game_id """.stripMargin tableEnv.executeSql(queryDDL) } }

 

配置加载 

默认加载resources下 application.properties

import com.typesafe.config.{Config, ConfigFactory} /** * create by LiuJinHe 2020/3/11 */ object PropManager { final val config: Config = ConfigFactory.load def getProp(key: String): String = config.getString(key).trim }

 

自定义udf

import com.mysql.cj.util.StringUtils import com.sm.common.utils.MD5Utils import org.apache.flink.table.functions.ScalarFunction /** * 自定义UDF, 解析 Device key * * create by LiuJinHe 2020/9/22 */ class ParseDeviceKey extends ScalarFunction { def eval(deviceKey: String, os: String, imei: String): String = { if (!StringUtils.isNullOrEmpty(deviceKey)) { deviceKey.toLowerCase } else if (os == "1") { "" } else { MD5Utils.MD5Encode(imei).toLowerCase } } }

pom

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.11.2</flink.version> <hive.version>2.1.1-cdh6.2.0</hive.version> <hadoop.version>3.0.0-cdh6.2.0</hadoop.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>com.sm.common</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <!-- other Dependency--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> <!-- <scope>provided</scope>--> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.2.0</version> <executions> <execution> <goals> <!--声明绑定到maven的compile阶段--> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

 

提交到集群,$FLINK_HOME/lib 下放入以下jar,重启集群,打包时全部 <scope>provided</scope>

flink-connector-jdbc_2.11-1.11.2.jar flink-sql-connector-kafka_2.11-1.11.2.jar flink-sql-connector-mysql-cdc-1.1.0.jar # mysql cdc,包含mysql-connector-java

 

最新回复(0)