MappeReduce简介和使用MapReduce进行WordCount操作方法介绍

it2024-01-20  72

本博客首先讲述了MapReduce的基础知识,随后使用MapReduce完成了WordCount任务。是慕课网教程:https://coding.imooc.com/class/301.html 的学习笔记。

1 MapReduce概述

官方网站上的定义:Hadoop MapReduce是一个软件框架(分布式计算框架),目的是为了在廉价机器组成的大集群(几千个节点)上以可靠,容错的方式,比较容易的编写处理海量数据(TB量级数据集)的并行程序。

源自于谷歌的MapReduce论文,发表于2004年12月Hadoop MapReduce 是Google MapReduce的克隆版MapReduce的优点:(1)海量数据的离线处理(2)容易开发(3)容易运行。MapReduce的缺点:实时流式计算

2 MapReduce编程模型

这里以WordCount词频统计分析为例子介绍MapReduce 的编程模型。

2.1 图示

上图显示了使用MapReduce进行词频统计的流程,整个流程实际上需要我们开发的只有两个,分别是map过程和reduce过程。 map过程就是通过一个分隔符将文本分割,并添加上频数1,reduce过程就是将相同的单词放在一起,同时将频数相加。split和shuffling在编程模型中是不体现出来的。

2.2 MapReduce 编程模型之执行步骤

准备mao处理的输入数据Mapper处理ShuffleReduce处理(合并,归并)结果输出

流程图: 备注:RR:RecordReader类,Partisioner:分区,加上sort过程就相当于shuffle的过程。上图中的Node1和Node2是相同的,可以理解为同一个作业同时在两个机器上运行。 图中InputFormat是用来读数据的,读取数据之后就切片,通过RecordReader将切片读取进来,然后进行map处理,map处理之后输出一个中间的键值对结果,然后交给shuffle过程处理,之后使用reduce方法生成最终的键值对,最后通过OutputFormat来将结果写到HDFS或者本地文件系统中。

3.MapReduce 编程示例

下面的例子展示了使用MapReduce进行词频统计的方法,需要分析的文件在HDFS上,将结果也要输出到HDFS上。具体思路是定义Map类别,Reduce类,然后使用一个主类来对Map和Reduce进行调用。

3.1 定义Mapper类

package com.xjtu.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> * KEYIN: map任务读数据的key类型,一般来说是offset,是每行数据起始位置的偏移量。 * VALUEIN:map任务读数据的value类型,其实就是一行行的字符串 * KEYOUT:map方法自定义实现输出key的类型。 * VALUEOUT:map方法自定义实现输出的value的类型 */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 这里每一个单词的分隔符是空格,根据不同的文件来决定分隔符是什么 String[] words=value.toString().split(" "); for(String word:words){ context.write(new Text(word),new IntWritable(1)); } } }

3.2 定义Reducer类

package com.xjtu.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * Reducer 的输入是Mapper的输出,输出形式不变,只是单词的计数发生改变 */ public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { /** * 例子: * (hello,1) (world,1) * (hello,1) (world,1) * (hello,1) (world,1) * (welcome,1) * map的输出到reduce端,是按照相同的key分发到一个reduce上去执行的 * 比如上面的例子,分配情况如下: * reduce1: (hello,1)(hello,1)(hello,1) ==> (hello,<1,1,1>) * reduce2: (world,1)(world,1)(world,1) ==> (world,<1>) * reduce3: (welcome,1) * @param key Mapper输出的key * @param values 在wordcount案例中,value指的是单词出现的次数 * @param context 上下文对象 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //由上面的例子可知,values是一个可迭代的对象,迭代values,累加即可得到最终结果 Iterator<IntWritable> iterator=values.iterator(); int count=0; while (iterator.hasNext()){ IntWritable value=iterator.next(); count+=value.get(); } context.write(key,new IntWritable(count)); } }

3.3 设置主类使用Mapper和Reducer进行词频统计

package com.xjtu.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * 用于使用Mapper和Reducer完成wordcount功能 */ public class WordCountApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { //设置用户 System.setProperty("HADOOP_USER_NAME","root"); // 设置配置类,下面HDFS的地址要换成你自己的 Configuration conf=new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.31.25:8020"); // 定义一个Job对象 Job job=Job.getInstance(conf); // 设置Job对象运行依托的主类 job.setJarByClass(WordCountApp.class); // 设置Job作业运行需要的Mapper和Reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置Mapper的输出对应的类别 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置最终输出(Reducer的输出)对应的类别 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 定义输入的路径 FileInputFormat.setInputPaths(job,new Path("/wordcount/input")); // 检查是否已经存在目标文件夹,如果已经存在,则删除它,防止报错。 Path outputPath=new Path("/wordcount/output"); FileSystem fileSystem=FileSystem.get(new URI("hdfs://192.168.31.25:8020"),conf,"root"); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath,true); } //定义输出的路径 FileOutputFormat.setOutputPath(job,outputPath); // 提交job boolean result=job.waitForCompletion(true); System.exit(result?0:1); } }
最新回复(0)