Shuffle会对Map阶段传过来的<k, v>按k排序,然后按k分组。
Shuffle会经历3次排序
1)每一个MapTask都会有一个环形缓冲区。<k, v>进入环形缓冲区按k快排生成有序文件。
2)由于数据量大,内存放不下,所以MapTask的数据只能分多次进入环形缓冲区得到多个有序文件,接着把这多个有序文件归并排序成一个有序文件。
3)一个切片就会有一个MapTask,所以多个切片产生的有序文件还需要归并排序成一个有序文件。
Shuffle的分组
得到有序文件后,会把相同的key组成一组,然后交给Reducer。所以可以看见reduce方法中两个参数是key和value的迭代器,代表一个组。
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException排序时如果k是自定义Xxx类型,需实现WritableComparable<XxxBean>接口的compareTo方法。
public class XxxBean implements WritableComparable<XxxBean>{ //实现compareTo方法 @Override public int compareTo(XxxBean o) { } //实现序列化写 @Override public void write(DataOutput dataOutput) throws IOException { } //实现序列化读 @Override public void readFields(DataInput dataInput) throws IOException { } }在Driver类中设置分组比较器
job.setGroupingComparatorClass(GroupComparator.class);1)Map阶段中可以进行切片,每个切片开启一个MapTask,于是就能并行工作。同样的,Reduce阶段也可以开启多个ReduceTask,进行并行工作。
2)Map阶段的切片是根据块大小决定的,而分区机制则是根据业务由用户自定义的。譬如分2个区统计单词,Reducer1统计所有切片中A-N打头的单词,Reducer2统计所有切片中O-Z打头的单词。
3)切片是发生在Map阶段的,分区必须提前发生在Shuffle阶段。第一次快排后就会发生分区,之后的两次归并排序以及分组都按分区进行,然后多个ReduceTask处理各自分区。
4)切片是真的把文件从物理上切开,而分区只是从逻辑上把文件切开。
5)分区要避免数据倾斜(某个ReduceTask任务过多,其他ReduceTask任务过少)。
1)自定义分区类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartition extends Partitioner<Text, LongWritable> { //每一条<k, v>,返回分区号 public int getPartition(Text text, LongWritable longWritable, int i) { char initial = text.toString().charAt(0); //获取首字母 if (('a' <= initial && initial <= 'g') || ('A' <= initial && initial <= 'G')) { return 0; } else if (('h' <= initial && initial <= 'q') || ('H' <= initial && initial <= 'Q')) { return 1; } else if (('r' <= initial && initial <= 'z') || ('R' <= initial && initial <= 'Z')) { return 2; } else { return 3; } } }2)修改Driver类
//设置Shuffle //设置3个分区,默认只有1个 job.setPartitionerClass(MyPartition.class); job.setNumReduceTasks(4);1)以wordcount为例,开启分区后,某个区统计到10个hello,那么就有10个<hello, 1>,效果和<hello, 10>一样,但占用资源多,所以引入Combiner机制。
2)Combiner功能和Reducer一样,只是Combiner只对一个MapTask产生的数据汇总,而Reducer对全局汇总。开启Combiner目的是减少IO。
3)Combiner发生在数据刚出环形缓冲区以及第一次归并排序后,总共两次。
4)Combiner默认是不开启的,主要是根据业务决定。譬如统计单词就应该开启,而计算平均值则不能开。总之能开尽量要开。