hadoop-规约(combiner)

it2025-03-02  26

规约Combiner

** 概念**

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一

combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件combiner 组件的父类就是 Reducercombiner 和 reducer 的区别在于运行的位置 Combiner 是在每一个 maptask 所在的节点运行Reducer 是接收全局所有 Mapper 的输出结果 combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤
自定义一个 combiner 继承 Reducer,重写 reduce 方法在 job 中设置 job.setCombinerClass(CustomCombiner.class)

combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来

为什么要有combiner?

通过word_count 案例实现规约

https://blog.csdn.net/qq_43751489/article/details/109149192

实现combiner

package word_count_conbiner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyCombiner extends Reducer<Text, LongWritable,Text,LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } context.write(key,new LongWritable(count)); } } package word_count_conbiner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName()); //打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数 job.setJarByClass(JobMain.class); //第一步:读取输入文件解析成key,value对 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///D:\\input\\combiner_input")); //第二步:设置我们的mapper类 job.setMapperClass(WordCountMapper.class); //设置我们map阶段完成之后的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //第三步分区,第四步排序,第六步分组,都默认 //第五步(规约) 进行操作 job.setCombinerClass(MyCombiner.class); //第七步:设置我们的reduce类 job.setReducerClass(WordCountReducer.class); //设置我们reduce阶段完成之后的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //第八步:设置输出类以及输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\out\\combiner_out")); boolean b = job.waitForCompletion(true); return b?0:1; } /** * 程序main函数的入口类 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Tool tool = new JobMain(); int run = ToolRunner.run(configuration, tool, args); System.exit(run); } }
最新回复(0)