需求:日活明细分析,需要保存日活明细数据。通过对数据去重得到日活,因为乱序数据,客户端时间可能会存在变化。
HBase SQL Connector
1. Flink 窗口排序去重,写入HBase
import java.time.Duration import com.sm.common.conf.PropManager import com.sm.constants.Constants import com.sm.utils.FlinkUtils import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.environment.{CheckpointConfig, ExecutionCheckpointingOptions} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.log4j.Level import org.slf4j.LoggerFactory /** * flink sql User Active Daily Analysis * * create by LiuJinHe 2020/10/12 */ object UserActiveDailyAnalysis { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO) // 初始化 stream 环境 // 本地测试,需要 flink-runtime-web 依赖 // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() val env = StreamExecutionEnvironment.getExecutionEnvironment // 本地测试线程 1 // env.setParallelism(1) // val params = ParameterTool.fromArgs(args) // env.getConfig.setGlobalJobParameters(params) // 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 3)) // 事件处理的时间,由系统时间决定 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 创建 streamTable 环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, FlinkUtils.getSettings) // checkpoint 设置 val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) // checkpoint的超时时间周期,1 分钟做一次checkpoint, 每次checkpoint 完成后 sink 才会执行 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60)) // checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃 tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60)) // checkpoint 最小间隔,两个检查点之间至少间隔 30 秒 tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30)) // 同一时间只允许进行一个检查点 tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1)) // 手动cancel时是否保留checkpoint tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 设置状态的最小空闲时间和最大的空闲时间, 也就是空闲数据的保留时间 tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) // tableConfig.setString("table.exec.mini-batch.enabled", "true") // tableConfig.setString("table.exec.mini-batch.allow-latency", "2s") // tableConfig.setString("table.exec.mini-batch.size", "5000") // tableConfig.setString("table.optimizer.distinct-agg.split.enabled", "true") /** * 加载配置 */ val dwdUserLoginTopic = PropManager.getProp(Constants.DWD_USER_LOGIN_TOPIC) val dwsUserDailyActive = PropManager.getProp(Constants.DWS_DAILY_USER_ACTIVE_TOPIC) val bootstrapServers = PropManager.getProp(Constants.BOOTSTRAP_SERVERS) val ConsumerGroupId = PropManager.getProp(Constants.CONSUMER_GROUP_ID) val ProducerGroupId = PropManager.getProp(Constants.PRODUCER_GROUP_ID) val startMode = PropManager.getProp(Constants.STARTUP_MODE) /** * kafka source table */ tableEnv.executeSql("drop table if exists login_kafka_Source") val kafkaSourceDDL = s""" |create table $dwdUserLoginTopic ( | cp_game_id int, | game_id int, | package_id int, | core_account string, | `time` string, # yyyy-MM-dd HH:mm:ss | time_server string, | device_id string, | md5_device_id string, | device_code string, | ts as to_timestamp(`time`), # 客户端时间戳 | local_time as localtimestamp, | `date` as date_format(localtimestamp, 'yyyy-MM-dd') |) with ( | 'connector' = 'kafka', | 'topic' = '$dwdUserLoginTopic', | 'properties.bootstrap.servers' = '$bootstrapServers', | 'properties.group.id' = '$ConsumerGroupId', | 'scan.startup.mode' = '$startMode', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin // long 时间戳转换 ts as to_timestamp(from_unixtime(`time` / 1000, 'yyyy-MM-dd HH:mm:ss')) tableEnv.executeSql(kafkaSourceDDL) /** * 加载HBase table配置 */ val zookeeperQuorum = PropManager.getProp(Constants.HBASE_ZOOKEEPER_QUORUM) val zookeeperZnodeParent = PropManager.getProp(Constants.HBASE_ZOOKEEPER_ZNODE_PARENT) val sinkBufferFlushMaxSize = PropManager.getProp(Constants.HBASE_SINK_BUFFER_FLUSH_MAX_SIZE) val sinkBufferFlushMaxRows = PropManager.getProp(Constants.HBASE_SINK_BUFFER_FLUSH_MAX_ROWS) val sinkBufferFlushInterval = PropManager.getProp(Constants.HBASE_SINK_BUFFER_FLUSH_INTERVAL) /** * hbase sink table */ tableEnv.executeSql(s"drop table if exists $dwsUserDailyActive") val kafkaSinkDDL = s""" |create table $dwsUserDailyActive ( | rowkey string, | cf Row(core_account string, cp_game_id string, game_id string, package_id string, `time` string, | time_server string, device_id string, md5_device_id string, device_code string, `date` string), | primary key (rowkey) not enforced |) with ( | 'connector' = 'hbase-1.4', | 'table-name' = '$dwsUserDailyActive', | 'zookeeper.quorum' = '$zookeeperQuorum', | 'zookeeper.znode.parent' = '$zookeeperZnodeParent', | 'sink.buffer-flush.max-size' = '$sinkBufferFlushMaxSize', | 'sink.buffer-flush.max-rows' = '$sinkBufferFlushMaxRows', | 'sink.buffer-flush.interval' = '$sinkBufferFlushInterval' |) """.stripMargin tableEnv.executeSql(kafkaSinkDDL) /** * 1. row_number() over 窗口排序方式去重,数据写入HBase中 * 这里取客户端时间戳 ts 排序,取第一条,因为乱序数据,数据会存在变化,不能直接写入Kafka */ val queryDDL = s""" |insert into $dwsUserDailyActive |select | concat(`date`, package_id, core_account) as rowkey, | Row(core_account, | cp_game_id, | game_id, | package_id, | `time`, | time_server, | device_id, | md5_device_id, | device_code, | `date`) as cf |from ( |select | core_account, | cast(cp_game_id as string) as cp_game_id, | cast(game_id as string) as game_id, | cast(package_id as string) as package_id, | `time`, | time_server, | device_id, | md5_device_id, | device_code, | `date`, | row_number() over(partition by core_account, package_id order by ts) as row_num | from $dwdUserLoginTopic | where date_format(`time`, 'yyyy-MM-dd') = `date` |) where row_num = 1 """.stripMargin tableEnv.executeSql(queryDDL) } }2. 关联HBase维表去重,写入HBase
/** * 2. 维表关联方式去重 * 数据存储在HBase中,通过row_key join * 可以像上面一样直接写入HBase, 这里演示, 多操作一步 temporary view */ val queryDDL = s""" |create temporary view resultTmpTable as |select | core_account, | cp_game_id, | game_id, | package_id, | login.`time`, | time_server, | device_id, | md5_device_id, | device_code, | `date` |from $dwdUserLoginTopic as login | left join dimHbaseNewUser for system_time as of login.ts as dua | on concat(date_format(localtimestamp, 'yyyy-MM-dd'), cast(package_id as string), core_account) = dua.rowkey | where dua.rowkey is null """.stripMargin tableEnv.executeSql(queryDDL) // 写入hbase val insertSQL = s""" |insert into $hbaseDwsUserDailyActive |select | concat(date_format(localtimestamp, 'yyyy-MM-dd'), cast(package_id as string), core_account) as rowkey, | Row( | core_account, | cp_game_id, | game_id, | package_id, | `time`, | time_server, | device_id, | md5_device_id, | device_code, | `date`) as cf |from ( |select | core_account, | cast(cp_game_id as string) as cp_game_id, | cast(game_id as string) as game_id, | cast(package_id as string) as package_id, | `time`, | time_server, | device_id, | md5_device_id, | device_code, | `date` | from resultTmpTable | ) """.stripMargin tableEnv.executeSql(insertSQL)
3. 关联HBase去重,写入Kafka
Kafka只能 append-only,已经发送的数据不会变化,所以,这里取 process_time,第一条接收到的数据。
// kafka sink table val kafkaSinkDDL = s""" |create table $kafkaDwsUserDailyActive ( | core_account string, | cp_game_id int, | game_id int, | package_id int, | `time` string, | time_server string, | device_id string, | md5_device_id string, | device_code string, | `date` string |) with ( | 'connector' = 'kafka', | 'topic' = '$kafkaDwsUserDailyActive', | 'properties.bootstrap.servers' = '$bootstrapServers', | 'properties.group.id' = '$ProducerGroupId', | 'sink.partitioner' = 'round-robin', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin tableEnv.executeSql(kafkaSinkDDL) // temporary view val queryDDL = s""" |insert into $kafkaDwsUserDailyActive |select | core_account, | cp_game_id, | game_id, | package_id, | login.`time`, | time_server, | device_id, | md5_device_id, | device_code, | `date` |from $dwdUserLoginTopic as login | left join dimHbaseNewUser for system_time as of login.proc_time as dua | on concat(date_format(localtimestamp, 'yyyy-MM-dd'), cast(package_id as string), core_account) = dua.rowkey | where dua.rowkey is null """.stripMargin val resultTable = tableEnv.sqlQuery(queryDDL) }4. 变化数据写入Kafka
取客户端时间 ts,数据会变化,无法直接写入Kafka。可以将 table 转成 dataStream后写入,这样再转回 table 用 table 和 SQL 方式写入。
已经发送的数据不会变化,可能会造成数据重复。
// kafka sink table val kafkaSinkDDL = s""" |create table $kafkaDwsUserDailyActive ( | core_account string, | cp_game_id int, | game_id int, | package_id int, | `time` string, | time_server string, | device_id string, | md5_device_id string, | device_code string, | `date` string |) with ( | 'connector' = 'kafka', | 'topic' = '$kafkaDwsUserDailyActive', | 'properties.bootstrap.servers' = '$bootstrapServers', | 'properties.group.id' = '$ProducerGroupId', | 'sink.partitioner' = 'round-robin', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) """.stripMargin tableEnv.executeSql(kafkaSinkDDL) // temporary view val queryDDL = s""" |select | core_account, | cp_game_id, | game_id, | package_id, | login.`time`, | time_server, | device_id, | md5_device_id, | device_code, | `date` |from $dwdUserLoginTopic as login | left join dimHbaseNewUser for system_time as of login.ts as dua | on concat(date_format(localtimestamp, 'yyyy-MM-dd'), cast(package_id as string), core_account) = dua.rowkey | where dua.rowkey is null """.stripMargin val resultTable = tableEnv.sqlQuery(queryDDL) mapper = new ObjectMapper() val userActiveDS = tableEnv.toRetractStream[SdkUserActive](resultTable) val resultDS = userActiveDS.map(new MapFunction[(Boolean, SdkUserActive), String] { override def map(value: (Boolean, SdkUserActive)): String = { objectParse(value._2) } }) // 1. table api val table = tableEnv.fromDataStream(resultDS) table.executeInsert(s"$kafkaDwsUserDailyActive") // 2. sql tableEnv.createTemporaryView("resultTable", resultDS) val kafkaInsertSql = s""" |insert into $kafkaDwsUserDailyActive |select * from resultTable """.stripMargin tableEnv.executeSql(kafkaInsertSql) // resultDS.print() // env.execute("dua") } def objectParse(obj: Object): String = { mapper.writeValueAsString(obj) }HBase配置
hbase.zookeeper.quorum = cdh-master:2181,cdh-slave01:2181,cdh-slave02:2181,cdh-slave03:2181,cdh-slave04:2181 hbase.zookeeper.znode.parent = /hbase hbase.sink.buffer-flush.max-size = 30000 hbase.sink.buffer-flush.max-rows = 1000 hbase.sink.buffer-flush.interval = 2spom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency>
