分为两点: 1.MapReduce是一个分布式运算计算框架,是用户开发基于hadoop数据分析的应用的核心框架。 2.MapReduce核心功能是将用户编写的业务逻辑和自带默认组件整合成一个完整的分布式运算程序,并将它运行在hadoop集群上。
2.1.1 MapReduce易于编程 只要简单实现一些接口就可以完成一个分布式程序,这个分布式程序可以分布到大量的廉价计算机上运行,跟写一个简单的串行程序是一样的,因为这个原因MapReduce程序变得非常流行。 2.1.2良好的扩展性 当前计算机计算资源不能满足需求时,可以简单扩展机器数量来增强计算能力。 2.1.3 高容错性 MapReduce设计初衷是能够部署在廉价的pc机器上,这就要求它有很高的容错性,比如其中一台机器挂了,这台机器上的任务可以快速地转移到另一台机器上的节点上运行,不至于这个任务失败,而且这个过程不需要人参与,它完全是由hadoop内部完成的。
2.1.4 MapReduce适合PB以上的海量数据的离线处理 可以实现上千台服务器集群并发工作,提供数据处理能力
2.2.1 不擅长实时运算 无法像MapReduce一样毫秒级或者秒级内返回结果 2.2.2不擅长流式的运算 流式计算的输入数据是动态的,MapReduce本身设计决定了只能计算静态数据,输入的数据源必须是静态的,不能动态变化。
2.3.1MapTask如何工作? MapReduce运算程序一般分为两个阶段: 以单词统计为例子,MapTask一次读取一行单词数据,然后把每行单词通过空格或者逗号分割成一个个的单词,每个单词作为key,单词个数1作为value生成key value键值对,输出一个个(k,v)键值对给ReduceTask继续处理。Map阶段,MapTask完全并发运行,互不干扰。 2.3.2ReduceTask如何工作? ReduceTask接收到(k,v)键值对后,会根据k值的单词,统计,汇总,合并v,将合并后的(k,v)数据生成到输出结果文件当中。 Reduce阶段并发ReduceTask,完全互不相干,他们的输入数据依赖于上一个任务的MapTask的并发实例的输出。 2.3.3MapTask如何控制分区,排序? 如果设计任务有两个ReduceTask,MapTask会根据单词首字母情况,将单词按照字典顺序,分成两个分区,溢写到磁盘当中。 2.3.4MapTask和ReduceTask之间如何衔接? MapTask处理完成后将数据溢写到磁盘当中,ReduceTask从磁盘中获取MapTask运行结果的数据。
2.4.1.MrAppMaster MrAppMaster负责整个程序的过程调度和状态协调 2.4.2MapTask 负责Map阶段整个数据处理流程 2.4.3ReduceTask 负责整个Reduce阶段数据处理流程
3.1.1切片与MapTask并行度决定机制 (1)问题引出 MapTask任务并行度影响Map阶段任务处理并发速度,进而影响job的处理速度。 (2)一个Job的Map阶段并行度,由提交job时的切片数决定,默认情况下切片大小=blocksize,每一个切片分配给一个MapTask并行实例处理,切片时不考虑数据整体,而是对单独的文件进行切片。 3.2.2CombineTextInputFormat切片机制 框架默认的TextInputFormat切片机制是对任务按照文件来切片,不管文件多小,都会将该切片分配到一个MapTask中,如果有大量小文件,就会产生大量的MapTask,处理效率极其低下,CombineTextInputFormat用于过多小文件场景,它会将多个小文件按照逻辑划分到一个切片中,这样众多的小文件就会由一个MapTask处理了。 setMaxInputSplitSize按照小文件的实际大小确认具体的值。 切片过程: (a)判断虚拟存储的文件大小是否大于setMaxInputStream的值,大于则单独形成一个切片。 (b)如果小于setMaxInputStream的值,就继续寻找下一个虚拟存储的文件,并进行合并,共同形成一个切片 虚拟存储过程: 将输入目录下所有文件大小与setMaxInputStream的值做比较,如果不大于设置的最大值,则逻辑上划分一个块。如果大于设置的最大值且大于两倍以上,则以最大值划分为一块,另外一块超过最大值且不超过最大值2倍,此时文件被均分为2个虚拟存储块。 3.2.3TextInputFormat的KV TextInputFormat时FileInputFormat的实现类,按行读取每条记录,键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行符和回车符)
4.1全流程 (其中7-16为shuffle的逻辑) 1.传入待处理文件 /user/input/ss.txt (大小200m) 2.客户端submit前,获取待处理数据的信息,然后根据参数配置,形成一个任务分配的规划。 ss.txt 0~128 ss.txt 128~200 3.客户端提交信息给yarn RM(ResourceManager) Job.split wc.jar Jo.xml 4.计算出MapTask的的数量 Mr appmaster NodeManager 本例子为: MapTask1(ss.txt 0~128)和MapTask2(ss.txt 128~200)
5.默认使用inputFormat实现类TextInputFormat的reader()方法读取每一行数据,每一行生成一个(k,v)。 6.经过MapTask的逻辑运算后,用context.write(k,v)将生成的新的数据写入到outputCollector。 7.向环形缓冲区写入(k,v)数据。 8.分区,并进行区内的排序。 9.溢出到文件(分区且区内有序的) 10.Merge 归并排序 区内按照字典顺序 11.合并(用到Combiner)再归并排序 12.所有MapTask任务完成后,启动相印数量的ReduceTask 并告知ReduceTask处理数据范围,并告知ReduceTask处理数据范围。 13.将上一步的处理结果下载到ReduceTask本地磁盘,合并文件,归并排序。 14.ReduceTask1和ReduceTask2一次读取一组,Reduce(k,v) Context.write(kv). 15.分组,GroupingComparator(k,knext) 16.默认使用TextOutputFormat的RecordWriter的Write(k,v)将数据写入对应的分区当中(part-r-000000)和(part-r-000001)。 4.2Shuffle过程详解 (1)MapTask收集我们map()方法输出的kv对,放到内存缓冲区中 (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (3)多个溢出文件会被合并成大的溢出文件 (4)在溢出过程及合并过程中,都要调用partioner进行分区,和针对key进行排序 (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据 (6)ReduceTask会取到同一个分区的来自不同的MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序) (7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个键值对Group,调用用户自定义的reduce()方法) 注意: (1)shuffle中缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。 (2)缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认 100M。 (3)源码解析过程。 4.3Shuffle机制 Map方法之后,Reduce方法之前处理数据的过程叫做shuffle。 注意:Map方法输出的数据会先输出到内存缓冲区,内存不够就会溢出到磁盘当中,这时候shuffle就出场了,会把磁盘的小文件合并成大的溢出文件,在溢出和合并过程当中,都会有分区的操作, shuffle主要是对每个map来的数据做归并排序,按照相同key分组,然后传入到reduce方法中,shuffle过程中的Combiner为可选流程,Combiner做的也是分区合并的工作. 4.4partition分区 4.4.1partition分区是按照条件分区到不同的文件中,比如将统计结果按照手机号归属地不同省份输出到不同的文件当中(分区)。 4.4.2默认Partitioner分区是根据key的hashcode对reducetask取模得到的,用户无法控制哪个key存储到哪个分区。 4.5排序概述 对于MapTask,它会将处理的结果暂时放入环形缓冲区中,当环形缓冲区使用率达到一定的阈值过后,再对环形缓冲区数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有的文件进行归并排序。 对于ReduceTask,它会从每个MapTask上远程拷贝相印的数据文件,如果1.文件大小超过一定阈值,则溢写到磁盘上,否则储存在内存中。如果2.磁盘上的文件数目达到一定的阈值,则再进行一次归并排序以生成一个更大的文件;如果3.内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。 4.6排序分类 (1)部分排序 MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序 (2)全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是ReduceTask。但该方法在处理大型文件是效率极低,因为一台机器处理所有文件,完全丧失了MapTask所提供的并行架构 (3)辅助排序 (GroupingComparator分组) 在Reducer端对key进行分组。应用于在接收的key为bean对象时,想让一个或者几个字段相同(全部字段比较不相同)的key进入同一个reduce方法时,可以采用分组排序。 (4) 二次排序 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。 4.7Combiner合并 1.Combiner是MR程序中Mapper和Reducer之外的一种组件 2.Combiner组件的父类就是Reducer。 3.Combiner和Reducer的区别在与运行的位置 Combiner是在每个Maptask所在的节点运行; Reducer是接收全局所有Mapper的输出结果; 4.Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。 5.Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。 4.8MapTask工作机制 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写的map()函数处理,并生成一系列的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector。collect()输出结果。在该函数内部,将会生成key/value分区(调用partitioner),并写入一个环形内存缓冲区中。 (4)Spill阶段,即溢写,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作。 溢写阶段详情: 步骤1:利用快速排序算法对缓冲区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,在同一分区内所有数据按照key排序。 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 步骤3:将分区数据的元信息写到内存索引的数据结构SpillRecord中,其中每个分区的元信息包括在轮式文件中的偏移量,压缩前的数据大小和压缩后的数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.Out.index中。 (5)Combine阶段:当所有数据处理完成后,MapTask对所有的临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完成后,Maptask会将所有临时文件合成一个大文件,并保存到output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归的合并方式。每轮合并i o.sort.factor(默认10)个文件,并将生成的文件重新加入合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取的开销。 4.9ReduceTask工作机制 (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 (2)Merge阶段:在远程拷贝数据同时,ReduceTask启动了两个后台线程对磁盘和内存中的文件进行合并,以防止内存使用过多,或者磁盘文件过多。 (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据,为了将key相同的数据聚集在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现了对自己处理的结果进行了局部排序,因此ReduceTask只需对所有数据进行一次归并排序即可。 (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。 OutputFormat接口实现类: a .TextOutputFormat为默认输出格式,它的键和值可以是任意类型,因为TextOutputFormat方法调用toString()方法把他们转化为字符串。 b.SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种最好的输出格式,因为它的格式紧凑,很容易被压缩。 c .自定义输出格式,自定义实现输出,自定义OutputFormat为了实现控制最终文件的输出路经和输出格式 4.10数据清洗(ETL) 在运行MapReduce任务之前,往往要进行数据清洗,将不符合用户要求的数据清理掉,清理的过程往往1只需要运行Mapper程序,不需要运行Reduce程序。 // 1 获取1行数据 String line = value.toString();
// 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 设置key **4.10MapReduce开发总结** 1.输入数据接口:inputFormat 默认实现类:TextinputFormat 逻辑是一次读一行文本,然后将该行的起始偏移量设置为key,行内容设置为value返回。 CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。 2.逻辑处理接口:Mapper 根据业务需求实现其中三个方法: map() setup() cleanup() 3.Partioner分区 (1)有默认实现HashPartioner,逻辑是根据key的哈希值和numReduces来返回一个区号;key.hashCode()&Integer.MaxValue%numReduces (2) 如果业务上,有别的需求,可以自定义分区。 4.Comparable排序 (1)当我们用自定义的对象作为key来输出时,就必须实现WritableComparable接口,重写其中的CompareTo()方法. (2)部分排序:对最终输出的每一个文件内部排序 (3):全排序,对所有数据进行排序,通常只有一个Reduce。 (4):二次排序:排序的条件有两个。 (5):Combiner合并 Combiner合并可以提高执行效率,较少IO传输。但是使用时必须不能影响原有的业务处理结果。 (6):逻辑处理接口:Reducer 用户根据业务需求实现其中三个方法:reduce(),setup() (7):输出数据接口:OutputFormat (1):默认实现类时TextOutputFormat,功能逻辑是:将每一个kv对,向每个目标文本文件输出一行。 (2)将sequenceOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。 (3)用户可以自定义OutputFormat。Yarn主要由ResourceManager,NodeManager,ApplicationMaster和Container等组件组成。 Yarn架构 (1):ResourceManager(RM)的主要作用如下 (a)处理客户端的请求 (b)监控NodeManager (c)启动或监控ApplicationMaster (d)资源的分配和调度 (2):NodeManager(NM)的主要作用如下 (a)管理单个节点上的资源 (b)处理来自ResourceManager的命令 (c)处理来自ApplicationMaster的命令 (3):ApplicationMaster(AM)作用如下 (a)负责数据的切分 (b)为应用程序申请资源并分配给内部的任务 (c)任务的监控与容错 (4)Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存,CPU,磁盘,网络等。 4.1Yarn工作机制 0.Mr程序提交到客户端所在的节点 1.YarnRunner向ResourManager申请一个Application 2.RM 向ResourceManager申请一个Application。RM将应用程序资源路径 (hdfs://…/staging以及application—id)返还给YarnRunner. 3.YarnRunner提交job运行所需资源到HDFS上 4.资源提交完毕,申请运行mrAppMaster 5.RM将用户的请求初始化成一个Task 6.其中一个NodeManager 领取到Task任务 7. NodeManager创建容器Container,并生成MRAppmaster 8. Container从HDFS上拷贝资源到本地。 9. MRAppmaster向RM申请运行MapTask的资源 10.领取到任务,创建容器 11.RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 12.MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManger分别启动MapTask,MapTask对数据分区排序。 13.MrAppmaster等待所有的MapTask运行完毕后,向RM申请容器,运行ReduceTask。 14. RedeceTask向MapTask获取相印分区的数据。 15. 程序运行完毕后,MR会向RM申请注销自己。 4.2作业提交过程 (1).作业提交 第1步:Client调用job.waitforCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包,切片信息和配置文件到指定的资源到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAppMaster (2)作业初始化 第6步:当RM收到Client请求后,将该job添加到容量调度器中。 第7步:某一个空闲的NM领取到该job。 第8步:该NM创建了Container,并产生MRAppmaster 第9步:下载Client提交的资源到本地。 (3)任务分配 第10步:MrAppMaster向RM申请运行多个MapTask任务资源。 第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 (4)任务运行 第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 第13步:MrAppmaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 第14步:ReduceTask向MapTask获取相印分区数据。 第15步:程序运行完毕后,MR会向RM申请注销自己。 (5)进度状态更新 YARN中的任务将其进度和状态(包括Counter)返回给应用管理器,客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新,展示给用户。 (6)作业完成 除了向应用管理器请求作业进度外,客户端每5秒都会调用waitForCompletion()来检查检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后,应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户检查。
