1、waterdrop由mongodb同步到clickhouse集群配置
spark { spark.app.name = “Waterdrop0923” spark.executor.instances = 40 spark.executor.cores = 2 spark.executor.memory = “3g” }
input{ mongodb { readconfig.uri=“mongodb://root:root@ip:port/ONLINE?authSource=admin” #MongoDB链接地址 readconfig.database=“ONLINE” #等同于关系型数据databases readconfig.collection=“tablename” #等同于关系型数据table readconfig.password=“root” #MongoDB数据库密码(我知道上面已经配置了,但是在这里不配置的话,会报错,这是我们走过的坑,重点记一下) # readconfig.spark.mongodb.input.partitioner = “MongoShardedPartitioner” #用于对数据进行分区的分区程序的类名(其实我也不是很明白) readconfig.spark.mongodb.input.partitioner = “MongoShardedPartitioner” #分片键 readconfig.spark.mongodb.input.partitionerOptions.shardkey = “TRADESN” #spark.mongodb.input.partitioner:用于对数据进行分区的分区程序的类名 #默认使用:MongoDefaultPartitioner,其他值有: #MongoSamplePartitioner:使用集合的平均文档大小和随机抽样来确定集合的合适分区。 #MongoShardedPartitioner:根据数据块对集合进行分区。需要对config数据库的读访问权限。 #MongoSplitVectorPartitioner:使用splitVector独立命令或主数据库上的命令来确定数据库的分区。需要特权才能运行splitVector命令 #MongoPaginateByCountPartitioner:创建特定数量的分区。需要查询每个分区。 #MongoPaginateBySizePartitioner:根据数据大小创建分区。需要查询每个分区。 result_table_name = “tasks” #读取表的别名 #num_partitions=500 } }
filter{ sql{ #SparkSQL sql=“select tradesn,trandate from tasks where trandate=’”${trandate}"’ " #num_partitions=500 } #分区 repartition{ num_partitions=500 } #类型转换 convert{ source_field = “orderamount” new_type = “double” } convert{ source_field = “payee_amount” new_type = “double” } convert{ source_field = “accountamount” new_type = “double” } convert{ source_field = “channelamount” new_type = “double” } convert{ source_field = “refundamount” new_type = “double” } convert{ source_field = “cashamount” new_type = “double” } convert{ source_field = “uncashamount” new_type = “double” } convert{ source_field = “creditamount” new_type = “double” } convert{ source_field = “src_fee” new_type = “double” } convert{ source_field = “payee_fee” new_type = “double” } convert{ source_field = “payer_fee” new_type = “double” } convert{ source_field = “payer_recharge_fee” new_type = “double” } convert{ source_field = “payer_oncededuct_amount” new_type = “double” } convert{ source_field = “settlement_amount” new_type = “double” } convert{ source_field = “calcfeeamount” new_type = “double” }
}
output { clickhouse { clickhouse.socket_timeout=50000 host = “IP:8123,ip:8123” database = “ysdw” table = “ods” fields = [“tradesn”,“trandate”] username = “default” password = “clickhouse” } }
2.oracle到clickhouse集群 spark { spark.app.name = “orcleToHive” spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = “1g” }
input { jdbc { driver = “oracle.jdbc.driver.OracleDriver” url = “jdbc:oracle:thin:@IP:1521:DC” #这里没有database,所以写表的时候可以带上库名称(对于那种一台服务器多个库的) table = “seq” user = “123” password = “123” result_table_name = “seq_temp” }
}
filter { Sql { table_name = “seq” # 查询数据 sql = “select sn,accountid,org_no from seq a where a.sn=‘20130130000000393006’” } repartition{ num_partitions=500 } convert{ source_field = “preamount” new_type = “double” } convert{ source_field = “amount” new_type = “double” } convert{ source_field = “cash_amount” new_type = “double” } convert{ source_field = “uncash_amount” new_type = “double” } convert{ source_field = “credit_amount” new_type = “double” } convert{ source_field = “createtime” new_type = “string” }
}
output { clickhouse { clickhouse.socket_timeout=50000 host = “ip:8123,ip:8123” database = “dw” table = “seq” fields = [“sn”,“accountid”,“undotype”,“zy”,“note”,“custtype”,“org_no”] username = “default” password = “clickhouse” }
}
3、hive到clickhouse集群
spark { spark.sql.catalogImplementation = “hive” spark.app.name = “Waterdrop” spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = “1g” }
input { hive { pre_sql = "select cast(last_up_time as long),sort_id,trandate from ods_trade.paybill where trandate=‘20200923’ " result_table_name = “mcc_cd” }
}
mongodb导出数据到另外一个库的集合 mongoexport -h IP:50000 -u root -p root -d YSDW -c table1 --authenticationDatabase=admin -q “{AC_DT:’$dateq’}” |mongoimport -h IP:50000 -u root -p root -d YSDM -c table2 --authenticationDatabase=admin --numInsertionWorkers 1 --writeConcern=’{w:1}’ ;