本博客首先讲述了MapReduce的基础知识,随后使用MapReduce完成了WordCount任务。是慕课网教程:https://coding.imooc.com/class/301.html 的学习笔记。
1 MapReduce概述
官方网站上的定义:Hadoop MapReduce是一个软件框架(分布式计算框架),目的是为了在廉价机器组成的大集群(几千个节点)上以可靠,容错的方式,比较容易的编写处理海量数据(TB量级数据集)的并行程序。
源自于谷歌的MapReduce论文,发表于2004年12月Hadoop MapReduce 是Google MapReduce的克隆版MapReduce的优点:(1)海量数据的离线处理(2)容易开发(3)容易运行。MapReduce的缺点:实时流式计算
2 MapReduce编程模型
这里以WordCount词频统计分析为例子介绍MapReduce 的编程模型。
2.1 图示
上图显示了使用MapReduce进行词频统计的流程,整个流程实际上需要我们开发的只有两个,分别是map过程和reduce过程。 map过程就是通过一个分隔符将文本分割,并添加上频数1,reduce过程就是将相同的单词放在一起,同时将频数相加。split和shuffling在编程模型中是不体现出来的。
2.2 MapReduce 编程模型之执行步骤
准备mao处理的输入数据Mapper处理ShuffleReduce处理(合并,归并)结果输出
流程图: 备注:RR:RecordReader类,Partisioner:分区,加上sort过程就相当于shuffle的过程。上图中的Node1和Node2是相同的,可以理解为同一个作业同时在两个机器上运行。 图中InputFormat是用来读数据的,读取数据之后就切片,通过RecordReader将切片读取进来,然后进行map处理,map处理之后输出一个中间的键值对结果,然后交给shuffle过程处理,之后使用reduce方法生成最终的键值对,最后通过OutputFormat来将结果写到HDFS或者本地文件系统中。
3.MapReduce 编程示例
下面的例子展示了使用MapReduce进行词频统计的方法,需要分析的文件在HDFS上,将结果也要输出到HDFS上。具体思路是定义Map类别,Reduce类,然后使用一个主类来对Map和Reduce进行调用。
3.1 定义Mapper类
package com
.xjtu
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] words
=value
.toString().split(" ");
for(String word
:words
){
context
.write(new Text(word
),new IntWritable(1));
}
}
}
3.2 定义Reducer类
package com
.xjtu
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
import java
.util
.Iterator
;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key
, Iterable
<IntWritable> values
, Context context
) throws IOException
, InterruptedException
{
Iterator
<IntWritable> iterator
=values
.iterator();
int count
=0;
while (iterator
.hasNext()){
IntWritable value
=iterator
.next();
count
+=value
.get();
}
context
.write(key
,new IntWritable(count
));
}
}
3.3 设置主类使用Mapper和Reducer进行词频统计
package com
.xjtu
.mr
.wc
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
import java
.net
.URI
;
import java
.net
.URISyntaxException
;
public class WordCountApp {
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
, URISyntaxException
{
System
.setProperty("HADOOP_USER_NAME","root");
Configuration conf
=new Configuration();
conf
.set("fs.defaultFS","hdfs://192.168.31.25:8020");
Job job
=Job
.getInstance(conf
);
job
.setJarByClass(WordCountApp
.class);
job
.setMapperClass(WordCountMapper
.class);
job
.setReducerClass(WordCountReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(IntWritable
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(IntWritable
.class);
FileInputFormat
.setInputPaths(job
,new Path("/wordcount/input"));
Path outputPath
=new Path("/wordcount/output");
FileSystem fileSystem
=FileSystem
.get(new URI("hdfs://192.168.31.25:8020"),conf
,"root");
if(fileSystem
.exists(outputPath
)){
fileSystem
.delete(outputPath
,true);
}
FileOutputFormat
.setOutputPath(job
,outputPath
);
boolean result
=job
.waitForCompletion(true);
System
.exit(result
?0:1);
}
}