waterdrop导出hdfs数据到clickhouse(text,csv,json)

it2023-11-03  78

首先用hive创建表(这里是为了生成hdfs文件方便,实际hive表导出应该是整合spark直接写sql导出):

CREATE TABLE test.hdfs2ch2( id int, name string, create_time timestamp); insert into hdfs2ch2 values(1,'zhangsan',' 2020-01-01 01:01:01.000001'); insert into hdfs2ch2 values(2,'lisi','2020-01-01 01:01:01.000002');

至于为什么要用’2020-01-01 01:01:01.000002’这种格式的数据,是为了多演示这种比较偏的类型. clickhosue创建表语句:

CREATE TABLE mydatabase.hdfs2ch2 ( `id` Int64, `name` String, `create_time` DateTime ) ENGINE = MergeTree() ORDER BY id SETTINGS index_granularity = 8192

下面上waterdrop脚本:

spark { #程序名称 spark.app.name = "Waterdrop" #executor的数量(数据量大可以适当增大) spark.executor.instances = 1 #每个excutor核数(并行度,数据量大可以适当增大到服务器核数一半以下,尽量不要影响clickhouse) spark.executor.cores = 1 #每个excutor内存(不能小于512m) spark.executor.memory = "1g" } input { hdfs { result_table_name = "test_source" #hive创建表的hdfs路径 path = "hdfs://node01:8020/user/hive/warehouse/test.db/hdfs2ch2" format="text" } } filter { split { #根据分隔符切割后给每个列的名字 fields = ["id", "name","create_time"] #这里指的是hive的字段分隔符,不然无法切割 delimiter = "\\001" } convert { #因为刚切割后所有字段类型为string,如果不转化就会报错 #可以转化的类型string、integer、long、float、double和boolean source_field = "id" new_type = "long" } date { #指定要进行转换的原字段名 source_field = "create_time" #指定转化结束后的字段名(必须指定) target_field = "create_time" #大S就是毫秒的表示,如果表示错误,会转化失败,转化失败就会生成当前时间 source_time_format = "yyyy-MM-dd HH:mm:ss.SSSSSS" target_time_format = "yyyy-MM-dd HH:mm:ss" } } output { #输出到控制台 stdout{ #限制数据两条 limit=2 } clickhouse { host = "node01:8123" clickhouse.socket_timeout = 50000 database = "mydatabase" table = "hdfs2ch2" fields = ["id","name","create_time"] username = "" password = "" bulk_size = 20000 } }

执行: ./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/hdfs-clickhouse2.conf

查看数据:

node01.hadoop.com 😃 select * from hdfs2ch2;

SELECT * FROM hdfs2ch2

┌─id─┬─name─────┬─────────create_time─┐ │ 1 │ zhangsan │ 2020-01-01 01:01:01 │ └────┴──────────┴─────────────────────┘ ┌─id─┬─name─┬─────────create_time─┐ │ 2 │ lisi │ 2020-01-01 01:01:01 │ └────┴──────┴─────────────────────┘

2 rows in set. Elapsed: 0.009 sec.

CSV 如果是csv格式,表头不是字段名的话,就使用上面方式导入,只是input里面 delimiter = ","其他的一样,但是如果表头是字段名的话:

input { hdfs { result_table_name = "test_source" path = "hdfs://node01:8020/user/hive/warehouse/test.db/hdfs2ch3" format="csv" #此处注明表头是字段名 options.header = "true" } }

这样子的话不要filter中的split标签进行切分添加字段名字了,但是每个字段还是string类型,与clickhouse类型不一样的还是要转换. 完整的csv示例:

xxx.csv id,name,age 1,zhangsan,18 spark { #程序名称 spark.app.name = "Waterdrop" #executor的数量(数据量大可以适当增大) spark.executor.instances = 1 #每个excutor核数(并行度,数据量大可以适当增大到服务器核数一半以下,尽量不要影响clickhouse) spark.executor.cores = 1 #每个excutor内存(不能小于512m) spark.executor.memory = "1g" } input { hdfs { result_table_name = "test_source" path = "hdfs://node01:8020/user/hive/warehouse/test.db/hdfs2ch3" format="csv" options.header = "true" } } filter { convert { source_field = "id" new_type = "integer" } convert { source_field = "age" new_type = "integer" } #date { # source_field = "create_time" # target_field = "create_time" # source_time_format = "yyyy-MM-dd HH:mm:ss.SSSSSS" # target_time_format = "yyyy-MM-dd HH:mm:ss" #} } output { stdout{ limit=2 } clickhouse { host = "node01:8123" clickhouse.socket_timeout = 50000 database = "mydatabase" table = "hdfs2ch3" fields = ["id","name","age"] username = "" password = "" bulk_size = 20000 } }

JSON 如果是json格式数据,跟csv一样,只是json带有一定格式,数字格式为long类型.

{"id":1,"name":"zhangsan","age":18} spark { #程序名称 spark.app.name = "Waterdrop" #executor的数量(数据量大可以适当增大) spark.executor.instances = 1 #每个excutor核数(并行度,数据量大可以适当增大到服务器核数一半以下,尽量不要影响clickhouse) spark.executor.cores = 1 #每个excutor内存(不能小于512m) spark.executor.memory = "1g" } input { hdfs { result_table_name = "test_source" path = "hdfs://node01:8020/user/hive/warehouse/test.db/hdfs2ch4" format="json" } } filter { convert { source_field = "id" new_type = "integer" } convert { source_field = "age" new_type = "integer" } #date { # source_field = "create_time" # target_field = "create_time" # source_time_format = "yyyy-MM-dd HH:mm:ss.SSSSSS" # target_time_format = "yyyy-MM-dd HH:mm:ss" #} } output { stdout{ limit=2 } clickhouse { host = "node01:8123" clickhouse.socket_timeout = 50000 database = "mydatabase" table = "hdfs2ch3" fields = ["id","name","age"] username = "" password = "" bulk_size = 20000 } }
最新回复(0)