数仓基本机构如下: 在对数仓的分层表格设计中,根据不同的业务需求会使用到不同表格写入方式:
有些表格的内容通常不会进行进行变化,在数仓中采用特殊表的模式,只在数仓建造初始化的时候导入一次,之后不再进行内容导入:例如省份表和地区表
省份表(特殊) drop table if exists ods_base_province; create external table ods_base_province ( `id` bigint COMMENT '编号', `name` string COMMENT '省份名称', `region_id` string COMMENT '地区ID', `area_code` string COMMENT '地区编码', `iso_code` string COMMENT 'iso编码,superset可视化使用' ) COMMENT '省份表' row format delimited fields terminated by '\t' STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/warehouse/gmall/ods/ods_base_province/'; 地区表(特殊) drop table if exists ods_base_region; create external table ods_base_region ( `id` string COMMENT '编号', `region_name` string COMMENT '地区名称' ) COMMENT '地区表' row format delimited fields terminated by '\t' STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/warehouse/gmall/ods/ods_base_region/';对于表的内容数量不多,变化频繁的表,采用全量导入的方法.在数仓中创建分区表,对应每天导入一次全量表的所有信息 例如:sku商品表、spu商品表、品牌表、活动表。
drop table if exists ods_sku_info; create external table ods_sku_info( `id` string COMMENT 'skuId', `spu_id` string COMMENT 'spuid', `price` decimal(16,2) COMMENT '价格', `sku_name` string COMMENT '商品名称', `sku_desc` string COMMENT '商品描述', `weight` string COMMENT '重量', `tm_id` string COMMENT '品牌id', `category3_id` string COMMENT '品类id', `create_time` string COMMENT '创建时间' ) COMMENT 'SKU商品表' PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t' STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/warehouse/gmall/ods/ods_sku_info/';用于数据量巨大,数据内容主要以添加为主,较少修改的场景,数仓创建分区表,每天导入新增的信息内容.例如:订单详情表、支付流水表、商品评论表
drop table if exists ods_payment_info; create external table ods_payment_info( `id` bigint COMMENT '编号', `out_trade_no` string COMMENT '对外业务编号', `order_id` string COMMENT '订单编号', `user_id` string COMMENT '用户编号', `alipay_trade_no` string COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) COMMENT '支付金额', `subject` string COMMENT '交易内容', `payment_type` string COMMENT '支付类型', `payment_time` string COMMENT '支付时间' ) COMMENT '支付流水表' PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t' STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/warehouse/gmall/ods/ods_payment_info/';用于新增内容多,并且会频繁更新数据的表,例如订单表、用户表、优惠券领用表. 每天导入的数据包含当天新增的数据以及对比之前数据有改变的内容
drop table if exists ods_order_info; create external table ods_order_info ( `id` string COMMENT '订单号', `final_total_amount` decimal(16,2) COMMENT '订单金额', `order_status` string COMMENT '订单状态', `user_id` string COMMENT '用户id', `out_trade_no` string COMMENT '支付流水号', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '操作时间', `province_id` string COMMENT '省份ID', `benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额', `original_total_amount` decimal(16,2) COMMENT '原价金额', `feight_fee` decimal(16,2) COMMENT '运费' ) COMMENT '订单表' PARTITIONED BY (`dt` string) -- 按照时间创建分区 row format delimited fields terminated by '\t' -- 指定分割符为\t STORED AS -- 指定存储方式,读数据采用LzoTextInputFormat;输出数据采用TextOutputFormat INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/warehouse/gmall/ods/ods_order_info/' -- 指定数据在hdfs上的存储位置 ;DWD层从ODS层的关系模型中建立维度模型
用于信息不需要更新的维度表,如地区维度和时间维度:
DROP TABLE IF EXISTS `dwd_dim_base_province`; CREATE EXTERNAL TABLE `dwd_dim_base_province` ( `id` string COMMENT 'id', `province_name` string COMMENT '省市名称', `area_code` string COMMENT '地区编码', `iso_code` string COMMENT 'ISO编码', `region_id` string COMMENT '地区id', `region_name` string COMMENT '地区名称' ) COMMENT '地区维度表' stored as parquet location '/warehouse/gmall/dwd/dwd_dim_base_province/' tblproperties ("parquet.compression"="lzo");在维度建模中将ODS层中的多张表格降为到一张维度表中,每天全量添加到一张分区表 例如 : 商品维度表
DROP TABLE IF EXISTS `dwd_dim_sku_info`; CREATE EXTERNAL TABLE `dwd_dim_sku_info` ( `id` string COMMENT '商品id', `spu_id` string COMMENT 'spuid', `price` decimal(16,2) COMMENT '商品价格', `sku_name` string COMMENT '商品名称', `sku_desc` string COMMENT '商品描述', `weight` decimal(16,2) COMMENT '重量', `tm_id` string COMMENT '品牌id', `tm_name` string COMMENT '品牌名称', `category3_id` string COMMENT '三级分类id', `category2_id` string COMMENT '二级分类id', `category1_id` string COMMENT '一级分类id', `category3_name` string COMMENT '三级分类名称', `category2_name` string COMMENT '二级分类名称', `category1_name` string COMMENT '一级分类名称', `spu_name` string COMMENT 'spu名称', `create_time` string COMMENT '创建时间' ) COMMENT '商品维度表' PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_dim_sku_info/' tblproperties ("parquet.compression"="lzo");事务型事实表的特点是每日增量更新,因此往事务型事实表插入数据的时候,要插入当天的增量数据进入当天的分区。 例如 : 支付事实表有关系的两个表:ODS层支付表(增量同步)、ODS层订单表(新增及变化同步),所以我们只需要查出这两个表的当天分区的数据即可
drop table if exists dwd_fact_payment_info; create external table dwd_fact_payment_info ( `id` string COMMENT 'id', `out_trade_no` string COMMENT '对外业务编号', `order_id` string COMMENT '订单编号', `user_id` string COMMENT '用户编号', `alipay_trade_no` string COMMENT '支付宝交易流水编号', `payment_amount` decimal(16,2) COMMENT '支付金额', `subject` string COMMENT '交易内容', `payment_type` string COMMENT '支付类型', `payment_time` string COMMENT '支付时间', `province_id` string COMMENT '省份ID' ) COMMENT '支付事实表表' PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_fact_payment_info/' tblproperties ("parquet.compression"="lzo");每一个时间周期做一次快照,导入的数据量是全量.区别在于对应的ODS层数据表是全量更新还是新增及变化 例如:加购事实表:
drop table if exists dwd_fact_cart_info; create external table dwd_fact_cart_info( `id` string COMMENT '编号', `user_id` string COMMENT '用户id', `sku_id` string COMMENT 'skuid', `cart_price` string COMMENT '放入购物车时价格', `sku_num` string COMMENT '数量', `sku_name` string COMMENT 'sku名称 (冗余)', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '修改时间', `is_ordered` string COMMENT '是否已经下单。1为已下单;0为未下单', `order_time` string COMMENT '下单时间', `source_type` string COMMENT '来源类型', `srouce_id` string COMMENT '来源编号' ) COMMENT '加购事实表' PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_fact_cart_info/' tblproperties ("parquet.compression"="lzo");对应ODS层数据全量更新
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; insert overwrite table dwd_fact_cart_info partition(dt='$do_date') select id, user_id, sku_id, cart_price, sku_num, sku_name, create_time, operate_time, is_ordered, order_time, source_type, source_id from ods_cart_info where dt='$do_date';对应ODS层数据新增及变化 需要对照与前一天的数据变化
insert overwrite table dwd_fact_cart_info partition('$do_date') select nvl(new.id,old.id), nvl(new.user_id,old.user_id), nvl(new.sku_id,old_sku_id), nvl(new.cart_price,old.cart_price), nvl(new.sku_num,old.sku_num), nvl(new.sku_name,old.sku_name), nvl(new.create_time,old.create_time), nvl(new.operate_time,old.operate_time), nvl(new.is_ordered,old.is_ordered), nvl(new.order_time,old.order_time), nvl(new.source_type,old.source_type), nvl(new.source_id,old.source_id) from (select * from ods_cart_info where dt = '$do_date') new full outer join (select * from ods_cart_info where dt = date_add('$do_date',-1) ) old用于事实表具有明确的生命周期,新增的数据需要对生命周期进行修改,对应的ODS层数据为新增及变化 例如订单事实表:订单生命周期:创建时间=》支付时间=》取消时间=》完成时间=》退款时间=》退款完成时间。
drop table if exists dwd_fact_order_info; create external table dwd_fact_order_info ( `id` string COMMENT '订单编号', `order_status` string COMMENT '订单状态', `user_id` string COMMENT '用户id', `out_trade_no` string COMMENT '支付流水号', `create_time` string COMMENT '创建时间(未支付状态)', `payment_time` string COMMENT '支付时间(已支付状态)', `cancel_time` string COMMENT '取消时间(已取消状态)', `finish_time` string COMMENT '完成时间(已完成状态)', `refund_time` string COMMENT '退款时间(退款中状态)', `refund_finish_time` string COMMENT '退款完成时间(退款完成状态)', `province_id` string COMMENT '省份ID', `activity_id` string COMMENT '活动ID', `original_total_amount` decimal(16,2) COMMENT '原价金额', `benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额', `feight_fee` decimal(16,2) COMMENT '运费', `final_total_amount` decimal(16,2) COMMENT '订单金额' ) COMMENT '订单事实表' PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_fact_order_info/' tblproperties ("parquet.compression"="lzo");在进行表格设计时,以生命周期的第一个时间为分区字段,插入数据时,读取所有需要修改的分区表,通过新旧对照的方法,将没有该表的内容原封不动保留,有改变的内容,通过唯一ID进行覆盖,最终使用overwrite方法复写所有读取的分区表(使用动态分区):
set hive.exec.dynamic.partition.mode=nonstrict; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; insert overwrite table dwd_fact_order_info partition(dt) select if(new.id is null,old.id,new.id), if(new.order_status is null,old.order_status,new.order_status), if(new.user_id is null,old.user_id,new.user_id), if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no), if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态 if(new.tms['1002'] is null,old.payment_time,new.tms['1002']), if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']), if(new.tms['1004'] is null,old.finish_time,new.tms['1004']), if(new.tms['1005'] is null,old.refund_time,new.tms['1005']), if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']), if(new.province_id is null,old.province_id,new.province_id), if(new.activity_id is null,old.activity_id,new.activity_id), if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount), if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount), if(new.feight_fee is null,old.feight_fee,new.feight_fee), if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount), date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd') from ( select id, order_status, user_id, out_trade_no, create_time, payment_time, cancel_time, finish_time, refund_time, refund_finish_time, province_id, activity_id, original_total_amount, benefit_reduce_amount, feight_fee, final_total_amount from dwd_fact_order_info where dt in ( select date_format(create_time,'yyyy-MM-dd') from ods_order_info where dt='$do_date' ) )old full outer join ( select info.id, info.order_status, info.user_id, info.out_trade_no, info.province_id, act.activity_id, log.tms, info.original_total_amount, info.benefit_reduce_amount, info.feight_fee, info.final_total_amount from ( select order_id, str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms from ods_order_status_log where dt='$do_date' group by order_id )log join ( select * from ods_order_info where dt='$do_date' )info on log.order_id=info.id left join ( select * from ods_activity_order where dt='$do_date' )act on log.order_id=act.order_id )new on old.id=new.id;对于没有生命周期,同时有大量跟新添加内容的表格,适合使用拉链表. 例如 : 用户维度表 拉链表采用开始周期和结束周期来确定当前信息的有效性:
创建拉链表 drop table if exists dwd_dim_user_info_his; create external table dwd_dim_user_info_his( `id` string COMMENT '用户id', `name` string COMMENT '姓名', `birthday` string COMMENT '生日', `gender` string COMMENT '性别', `email` string COMMENT '邮箱', `user_level` string COMMENT '用户等级', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '操作时间', `start_date` string COMMENT '有效开始日期', `end_date` string COMMENT '有效结束日期' ) COMMENT '用户拉链表' stored as parquet location '/warehouse/gmall/dwd/dwd_dim_user_info_his/' tblproperties ("parquet.compression"="lzo"); 初始化拉链表 SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; insert overwrite table dwd_dim_user_info_his select id, name, birthday, gender, email, user_level, create_time, operate_time, '$do_date', '9999-99-99' from ods_user_info oi where oi.dt='$do_date';在后续拉链表的使用中,由于拉链表不使用分区表,所以在复写拉链表时,最好优先导入一张临时表,再由临时表导入到拉链表:
drop table if exists dwd_dim_user_info_his_tmp; create external table dwd_dim_user_info_his_tmp( `id` string COMMENT '用户id', `name` string COMMENT '姓名', `birthday` string COMMENT '生日', `gender` string COMMENT '性别', `email` string COMMENT '邮箱', `user_level` string COMMENT '用户等级', `create_time` string COMMENT '创建时间', `operate_time` string COMMENT '操作时间', `start_date` string COMMENT '有效开始日期', `end_date` string COMMENT '有效结束日期' ) COMMENT '订单拉链临时表' stored as parquet location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/' tblproperties ("parquet.compression"="lzo");拉链表导入数据采用新旧对比和全盘复写的方式:
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; insert overwrite table dwd_dim_user_info_his_tmp select * from ( select id, name, birthday, gender, email, user_level, create_time, operate_time, '2020-06-15' start_date, '9999-99-99' end_date from ods_user_info where dt='2020-06-15' union all select old.id, old.name, old.birthday, old.gender, old.email, old.user_level, old.create_time, old.operate_time, old.start_date, if(new.id is not null and old.end_date='9999-99-99',date_add(new.dt,-1),old.end_date) end_date from ( select * from dwd_dim_user_info_his where start_date<'2020-06-15' ) old left join ( select * from ods_user_info where dt='2020-06-15' )new on old.id=new.id ) his order by cast(his.id as bigint),his.start_date;DWS层和DWT层同属于宽表层,通常以主题为区分,DWS表示一个时间周期的主题行为,DWT表示累积型的主题行为 例如: DWS(每日设备行为,每日会员行为) ,DWT(设备主题宽表.会员主题宽表)
每日设备行为 drop table if exists dws_uv_detail_daycount; create external table dws_uv_detail_daycount ( `mid_id` string COMMENT '设备id', `brand` string COMMENT '手机品牌', `model` string COMMENT '手机型号', `login_count` bigint COMMENT '活跃次数', `page_stats` array<struct<page_id:string,page_count:bigint>> COMMENT '页面访问统计' ) COMMENT '每日设备行为表' partitioned by(dt string) stored as parquet location '/warehouse/gmall/dws/dws_uv_detail_daycount' tblproperties ("parquet.compression"="lzo"); 设备主题宽表 drop table if exists dwt_uv_topic; create external table dwt_uv_topic ( `mid_id` string comment '设备id', `brand` string comment '手机品牌', `model` string comment '手机型号', `login_date_first` string comment '首次活跃时间', `login_date_last` string comment '末次活跃时间', `login_day_count` bigint comment '当日活跃次数', `login_count` bigint comment '累积活跃天数' ) COMMENT '设备主题宽表' stored as parquet location '/warehouse/gmall/dwt/dwt_uv_topic' tblproperties ("parquet.compression"="lzo");注意 : DWS层的表格是分区表,一个时间周期(每天)产生一张分区表统计当天的行为,而DWT层的表格不是分区表,用于统计累计的结果
ADS属于对各大指标进行分析的表格,例如: 设备主题 : 日活,月活,每日新增,沉默用户,流失用户,用户留存率 会员主题 : 会员信息,漏斗分析
drop table if exists ads_uv_count; create external table ads_uv_count( `dt` string COMMENT '统计日期', `day_count` bigint COMMENT '当日用户数量', `wk_count` bigint COMMENT '当周用户数量', `mn_count` bigint COMMENT '当月用户数量', `is_weekend` string COMMENT 'Y,N是否是周末,用于得到本周最终结果', `is_monthend` string COMMENT 'Y,N是否是月末,用于得到本月最终结果' ) COMMENT '活跃设备数' row format delimited fields terminated by '\t' location '/warehouse/gmall/ads/ads_uv_count/';在进行表格设计时不采用分区表,每日的内容会作为一条或几条(用户留存率)信息插入到表格中:
insert into table ads_uv_count select '$do_date' dt, daycount.ct, wkcount.ct, mncount.ct, if(date_add(next_day('$do_date','MO'),-1)='$do_date','Y','N') , if(last_day('$do_date')='$do_date','Y','N') from ( select '$do_date' dt, count(*) ct from dwt_uv_topic where login_date_last='$do_date' )daycount join ( select '$do_date' dt, count (*) ct from dwt_uv_topic where login_date_last>=date_add(next_day('$do_date','MO'),-7) and login_date_last<= date_add(next_day('$do_date','MO'),-1) ) wkcount on daycount.dt=wkcount.dt join ( select '$do_date' dt, count (*) ct from dwt_uv_topic where date_format(login_date_last,'yyyy-MM')=date_format('$do_date','yyyy-MM') )mncount on daycount.dt=mncount.dt;小文件问题 由于hive的底层使用hdfs进行存储,使用上面这种方法会造成大量小文件的问题,每次向表格中插入一条信息都会作为一个单独的小文件 解决方法 : 每次插入当天信息时,使用查询语句查出表格的全部内容,合并复写到表格中: (-- 使用union有去重功能, 不去重可使用union all )
set mapreduce.job.queuename=hive; insert overwrite table ${APP}.ads_uv_count (select * from ${APP}.ads_uv_count ) union (select '$do_date' dt, daycount.ct_d, wkcount.ct_w, mncount.ct_m, if(date_add(next_day('$do_date','MO'),-1)='$do_date','Y','N') , if(last_day('$do_date')='$do_date','Y','N') from ( select '$do_date' dt, count(*) ct_d from ${APP}.dwt_uv_topic where login_date_last='$do_date' )daycount join ( select '$do_date' dt, count (*) ct_w from ${APP}.dwt_uv_topic where login_date_last>=date_add(next_day('$do_date','MO'),-7) and login_date_last<= date_add(next_day('$do_date','MO'),-1) ) wkcount on daycount.dt=wkcount.dt join ( select '$do_date' dt, count (*) ct_m from ${APP}.dwt_uv_topic where date_format(login_date_last,'yyyy-MM')=date_format('$do_date','yyyy-MM') )mncount on daycount.dt=mncount.dt);union格式问题
使用日期函数进行计算时,结果数据为日期类型,不能与字符串直接进行union 需要将union上下内容统一格式为字符串才能合并插入,例如:
insert overwrite table ${APP}.ads_user_topic (select * from ${APP}.ads_user_topic) union (select '$do_date', string(sum(if(login_date_last='$do_date',1,0))), string(sum(if(login_date_first='$do_date',1,0))), string(sum(if(payment_date_first='$do_date',1,0))), string(sum(if(payment_count>0,1,0))), string(count(*)), sum(if(login_date_last='$do_date',1,0))/count(*), sum(if(payment_count>0,1,0))/count(*), sum(if(login_date_first='$do_date',1,0))/sum(if(login_date_last='$do_date',1,0)) from ${APP}.dwt_user_topic);