基于CDH平台,Hive2.1.1-CDH6.2.0 ,写入Hive简单操作。
Hive Streaming
import java.time.Duration import com.sm.common.conf.PropManager import com.sm.constants.Constants import com.sm.utils.FlinkUtils import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.{CheckpointConfig, ExecutionCheckpointingOptions} import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.SqlDialect import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.log4j.Level import org.slf4j.LoggerFactory /** * flink sql kafka to hive * * create by LiuJinHe 2020/10/22 */ object FlinkSqlKafka2Hive { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO) // 初始化 stream 环境 // 本地测试,需要 flink-runtime-web 依赖 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() // val env = StreamExecutionEnvironment.getExecutionEnvironment // 参数读取 // val params = ParameterTool.fromArgs(args) // env.getConfig.setGlobalJobParameters(params) // 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3)) // 本地测试线程 1 env.setParallelism(1) // 事件处理的时间,由系统时间决定 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 创建 streamTable 环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, FlinkUtils.getSettings) // checkpoint 设置 val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) // checkpoint的超时时间周期,1 分钟做一次checkpoint tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30)) // checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃 // tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60)) // checkpoint 最小间隔,两个检查点之间至少间隔 30 秒 // tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30)) // 同一时间只允许进行一个检查点 // tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1)) // 手动cancel时是否保留checkpoint // tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, // CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 设置状态的最小空闲时间和最大的空闲时间 // tableEnv.getConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24)) // 加载配置 val catalog_name = PropManager.getProp(Constants.CATALOG_NAME) val database = PropManager.getProp(Constants.DEFAULT_DATABASE) val schemaDataBase = PropManager.getProp(Constants.SCHEMA_DATABASE) // 构造 hive catalog val hiveCatalog = new HiveCatalog( catalog_name, database, PropManager.getProp(Constants.HIVE_CONF_DIR) ) tableEnv.registerCatalog(catalog_name, hiveCatalog) tableEnv.useCatalog(catalog_name) // 构造 kafka source, 用 DEFAULT tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) tableEnv.useDatabase(schemaDataBase) // 如果catalog中 kafka source 已建表,先删除 tableEnv.executeSql("drop table if exists kafkaSourceTable") // kafka source table val kafkaSourceDDL = """ |create table kafka_source_Table ( | user_name string, | user_id bigint, | `time` bigint, | ts as to_timestamp(from_unixtime(`time` / 1000, 'yyyy-MM-dd HH:mm:ss')), | rt as proctime() |) with ( | 'connector' = 'kafka', | 'topic' = 'flink_test', | 'properties.bootstrap.servers' = '192.168.100.39:9092', | 'properties.group.id' = 'flink-test-group', | 'format' = 'json', | 'scan.startup.mode' = 'group-offsets' |) """.stripMargin tableEnv.executeSql(kafkaSourceDDL) // hive catalog tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.useDatabase(database) // 删除表,已有 hive 表的情况下不用删除和建表 // tableEnv.executeSql("drop table if exists test_hive_table") // hive sink table // 每次 checkpoint 完成后才会真正写入 Hive, 生成一个文件 val hiveSinkDDL = """ |create table hive_sink_table ( | user_name string, | user_id bigint, | `time` bigint, | `date` string |) |partitioned by (`date` string) |row format delimited fields terminated by '\t' |stored as orc |location 'hdfs://BigdataCluster/user/hive/warehouse/test_data.db/test/test_hive_table' |tblproperties ( | 'orc.compress'='SNAPPY', | 'partition.time-extractor.timestamp-pattern' = '$date 00:00:00', | 'sink.partition-commit.trigger' = 'process-time', | 'sink.partition-commit.policy.kind' = 'metastore,success-file' |) """.stripMargin tableEnv.executeSql(hiveSinkDDL) // 'sink.partition-commit.delay' = '10 s', // 写数据 val insertSql = """ |insert into hive_sink_table |select | user_name, | user_id, | `time`, | date_format(ts, 'yyyy-MM-dd') as `date` |from schema_data.kafka_source_Table """.stripMargin tableEnv.executeSql(insertSql) // 测试打印输入数据, 1. 以 connector=print 打印; 2. 转成 dataStream.print // val querySql = // """ // |select // | user_name, // | user_id, // | date_format(ts, 'yyyy-MM-dd') as `date`, // | date_format(ts, 'HH'), // | date_format(ts, 'mm') // |from kafkaSourceTable // """.stripMargin // val table = tableEnv.sqlQuery(querySql) // table.printSchema() // // val resultDStream = tableEnv.toAppendStream[Row](table) // resultDStream.print() // env.execute("flink sql kafka 2 hive") } }
pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>realtime-flink-platform-test</artifactId> <groupId>com.sm.com.sm.job</groupId> <version>1.0-SNAPSHOT</version> <modelVersion>4.0.0</modelVersion> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.11.2</flink.version> <hive.version>2.1.1-cdh6.2.0</hive.version> <hadoop.version>3.0.0-cdh6.2.0</hadoop.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>com.sm.common</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-orc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hive-2.2.0_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- Hive Dependency--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-serde</artifactId> <version>${hive.version}</version> <!-- <scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-common</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service-rpc</artifactId> </exclusion> <exclusion> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop-bundle</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <!-- <scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-llap-tez</artifactId> </exclusion> </exclusions> </dependency> <!-- Hadoop Dependency--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.2.0</version> <executions> <execution> <goals> <!--声明绑定到maven的compile阶段--> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
提交集群时,在 $FLINK_HOME/lib 下放入
flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
