规约Combiner
** 概念**
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件combiner 组件的父类就是 Reducercombiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行Reducer 是接收全局所有 Mapper 的输出结果 combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤
自定义一个 combiner 继承 Reducer,重写 reduce 方法在 job 中设置 job.setCombinerClass(CustomCombiner.class)
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
为什么要有combiner?
通过word_count 案例实现规约
https://blog.csdn.net/qq_43751489/article/details/109149192
实现combiner
package word_count_conbiner
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class MyCombiner extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key
, Iterable
<LongWritable> values
, Context context
) throws IOException
, InterruptedException
{
long count
= 0;
for (LongWritable value
: values
) {
count
+= value
.get();
}
context
.write(key
,new LongWritable(count
));
}
}
package word_count_conbiner
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class JobMain extends Configured implements Tool {
@Override
public int run(String
[] args
) throws Exception
{
Job job
= Job
.getInstance(super.getConf(), JobMain
.class.getSimpleName());
job
.setJarByClass(JobMain
.class);
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.addInputPath(job
,new Path("file:///D:\\input\\combiner_input"));
job
.setMapperClass(WordCountMapper
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(LongWritable
.class);
job
.setCombinerClass(MyCombiner
.class);
job
.setReducerClass(WordCountReducer
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(LongWritable
.class);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
,new Path("file:///D:\\out\\combiner_out"));
boolean b
= job
.waitForCompletion(true);
return b
?0:1;
}
public static void main(String
[] args
) throws Exception
{
Configuration configuration
= new Configuration();
Tool tool
= new JobMain();
int run
= ToolRunner
.run(configuration
, tool
, args
);
System
.exit(run
);
}
}