目录
0 引 言
1 生产常见的问题
(1)Sqoop 空值问题
(2)Sqoop 数据一致性问题
(3)ADS层数据向Mysql同步数据存储为orc或parquet问题
(4)数据倾斜问题
(5)Map task并行度设置大于1的问题
2 小 结
sqoop作为一种重要的数据同步工具,在大数据中具有重要地位。本文对Sqoop生产中遇到的常见问题进行总结并给出具体的解决方案。
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,这就导致了两边同步数据时存储不一致问题。Sqoop在同步的时候应该严格保证两端的数据格式、数据类型一致,否则会带来异常。
方案 1:依赖自身参数
1)导出数据时采用--input-null-string和--input-null-non-string两个参数。
2)导入数据时采用--null-string和--null-non-string。
方案2 :建表时修改hive底层存储,修改为''(空串)
在hive导出时,给需要导出的表创建一张临时表,该表和Mysql同步的表、字段、类型等严格一致,将需要导出的数据插入到该表中,在建立该临时表的时候将hive中Null底层存储“/N”修改为''(空串)。具体可添加下面一句话
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' with serdeproperties('serialization.null.format' = '')示例 如下:
drop table $output_table; CREATE TABLE IF NOT EXISTS $output_table( gw_id STRING, sensor_id STRING, alarm_level STRING, alarm_state STRING, alarm_type STRING, alarm_scene STRING, dyear string, dmonth string, count BIGINT, compute_month BIGINT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' with serdeproperties('serialization.null.format' = '') location '/apps/hive/warehouse/phmdwdb.db/$log_dir'; 然后将需要导出的数据插入到该临时表中最后采用sqoop导出命令将数据导出到Mysql导入数据同理 ,不再叙述。
生产中比较推荐第二种方案,虽然比较麻烦,但是能够减少sqoop带来的一些不必要的麻烦,而且也比较容易定位问题,逻辑清晰,sqoop导出的时候只需要写基本的导出命令即可,这样sqoop很容易做成一个通用的脚本来调度。1)场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。
官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html
Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.
由于Sqoop将导出过程分解为多个事务,因此失败的导出作业可能会导致将部分数据提交到数据库。 这可能进一步导致后续作业由于某些情况下的插入冲突而失败,或导致其他作业中的重复数据。 您可以通过--staging-table选项指定登台表来解决此问题,该选项充当用于暂存导出数据的辅助表。 分阶段数据最终在单个事务中移动到目标表。--staging-table方式
(建立临时表,通过sqoop导入到临时表,成功之后再把临时表的数据通过事务导入到mysql的业务数据表,Sqoop在导入导出数据时,通过建立临时表可以解决好多问题,所以要学会巧用临时表)使用--staging-table选项,将hdfs中的数据先导入到临时表中,当hdfs中的数据导出成功后,临时表中的数据在一个事务中导出到目标表中(也就是说这个过程要不完全成功,要不完全失败)。为了能够使用staging这个选项,staging表在运行任务前或者是空的,要不就使用—clear-staging-table配置,如果staging表中有数据,并且使用了--clear-staging-table选项,sqoop执行导出任务前会删除staging表中所有的数据。注意:–direct导入时staging方式是不可用的,使用了—update-key选项时staging方式也不能用。
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root \ --password 123456 \ --table app_cource_study_report \ --columns watch_video_cnt,complete_video_cnt,dt \ --fields-terminated-by "\t" \ --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" \ --staging-table app_cource_study_report_tmp \ --clear-staging-table \ --input-null-string '\N' \2)场景2:设置map数量为1个(不推荐)
多个Map任务时,采用–staging-table方式,仍然可以解决数据一致性问题。
Sqoop导出时,如果导出的表存储为orc或parquet就会报错。具体错误如下:
实际上错误并不明显,到yarn上查看具体日志:
2020-04-22 11:24:47,814 FATAL [IPC Server handler 5 on 43129] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1586702868362_6282_m_000003_0 - exited : java.io.IOException: Can't export data, please check failed map task logs at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:122) at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164) Caused by: java.lang.RuntimeException: Can't parse input data: '�aҩ;����%�G8��}�_yd@rd�yd�$�����瑑 g�7V!o���+��O��s�4���R�v�p)�ћȜB��X�'���F!� �!_@�^�,��ȃ�|�|�YX�~?����>�a�i��6�=���g��?��r��- �љ�ɪ���șk���ȅȥJȕߑk� �+ wS �. �Cw' at appv_phm_switch_master_min_orc.__loadFromFields(appv_phm_switch_master_min_orc.java:1345) at appv_phm_switch_master_min_orc.parse(appv_phm_switch_master_min_orc.java:1178) at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:89) ... 10 more Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at appv_phm_switch_master_min_orc.__loadFromFields(appv_phm_switch_master_min_orc.java:1230) ... 12 more可以看出sqoop不能解析ORC格式的文件 。
解决方法:
(1)通过 Sqoop-HCatalog 集成解决 Sqoop 不支持 ORC 的问题(比较麻烦)。
(2)保守做法:(推荐方法)
建议tmp表对接mysql的改成默认texfile形式
tmp表修改成原来的textfile形式执行成功。
对于dws层或ADS层由sqoop导入到Mysql的表构建临时表tmp表,该表与Mysql的表保持严格一致,主要用来对接Mysql,该表保持默认策略。(textfile形式)
sqoop的数据分割策略不够优秀导致的数据倾斜
sqoop 抽数的并行化主要涉及到两个参数:num-mappers:启动N个map来并行导入数据,默认4个;split-by:按照某一列来切分表的工作单元。
要避免数据倾斜,对split-by指定字段的要求是 int类型同时数据分布均匀,满足这样的要求的表只有极少数的有自增主键的表才能满足。核心思想就是自己生成一个有序且均匀的自增ID,然后作为map的切分轴,这样每个map就可以分到均匀的数据,可以通过设置map的个数来提高吞吐量。建议:数据量500w 以下使用4个map即可。 数据量500w以上8即可,太多会对数据库加压,造成其他场景使用性能降低。 如果是为了专门导数据,和下游计算的并行度,可以适当调大。
引出场景: 通常可以指定split-by 对应的zizengID 列,然后使用–num-mappers或者-m指定map的个数,即并发的抽取进程数量。但是有时候会碰到很多的表没有添加自增ID或者,整数型的主键,或者 主键分布不均,反而会拖慢整个job的进程。
根据sqoop源码的设计,我们可以使用–query语句中添加自增ID,作为split-by的参数,与此同时通过设置的自增ID的范围可以设置boundary。核心语法如下:
```bash --query 方式:涉及参数 --query、--split-by、--boundary-query --query: select col1、 col2、 coln3、 columnN from (select ROWNUM() OVER() AS INC_ID, T.* from table T where xxx ) where $CONDITIONS --split-by: INC_ID --boundary-query: select 1 as MIN , sum(1) as MAX from table where xxx完整示例语法: password-file通过 echo -n “password content” > passsword-file 方式得到,这样不会包含异常字符。
sqoop import --connect $yourJdbConnectURL \ --username $yourUserName --password-file file;///localpasswordFile or hdfs relative Path --query "" \ --split-by "" \ -m 8 \ -boundary-query “select 1 as min , sum(1) as max from table where xx” \ --other parames参考链接:https://blog.csdn.net/qq_27882063/article/details/108352356?utm_medium=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param
并行度导入数据的 时候 需要指定根据哪个字段进行切分 该字段通常是主键或者是自增长不重复的数值类型字段,否则会报下面的错误。
Import failed: No primary key could be found for table. Please specify one with --split-by or perform a sequential import with ‘-m 1’.那么就是说当map task并行度大于1时,下面两个参数要同时使用
–split-by id 指定根据id字段进行切分
–m n 指定map并行度n个
本文对Sqoop生产中遇到的常见问题进行总结并给出具体的解决方案。