MapReduce入门案例Wordcount

it2023-10-19  72

运行官方案例

1)新建一个文件words,随意输入一些单词。

2)上传到HDFS

hdfs dfs -put words /

3)提交给YARN执行

cd /opt/module/hadoop3.1.3/share/hadoop/mapreduce

yarn jar hadoop-mapreduce-examples-3.1.3.jar wordcount /words /output

注:输出目录必须不存在,下次要想输出到/output必须删除。


仿照源码写一个WordCount

1)新建Maven工程

配置依赖

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client-api</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client-runtime</artifactId> <version>3.1.3</version> </dependency> </dependencies>

在resources目录下新建log4j2.xml

<?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" strict="true" name="XMLConfig"> <Appenders> <!-- 类型名为Console,名称为必须属性 --> <Appender type="Console" name="STDOUT"> <!-- 布局为PatternLayout的方式, 输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --> <Layout type="PatternLayout" pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /> </Appender> </Appenders> <Loggers> <!-- 可加性为false --> <Logger name="test" level="info" additivity="false"> <AppenderRef ref="STDOUT" /> </Logger> <!-- root loggerConfig设置 --> <Root level="info"> <AppenderRef ref="STDOUT" /> </Root> </Loggers> </Configuration>

2)WCMapper类

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper接受InputFormat输入的<k,v>,然后输出<k,v>,经Shuffle处理后给Reducer * InputFormat输入的k表示偏移量(LongWritable),v表示一行的内容(Text) * Mapper要输出的k表示单词(Text),v表示计数(LongWritable) * map方法需重写实现业务逻辑 */ public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { //使用成员变量而不是局部变量可以节省空间 private Text k = new Text(); private static final LongWritable v = new LongWritable(1); //map方法需重写实现业务逻辑 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将一行分割单词 String line = value.toString(); String[] words = line.split(" "); //每个单词的<k,v>写入 for (String word : words) { k.set(word); context.write(k, v); } } }

3)WCReducer类

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer类接收Shuffle后的<k,v>,处理后输出<k, v> * Reducer输入的k是单词(Text),v是计数(LongWritable),输出一样。 * reduce方法需重写实现业务逻辑 */ public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { //使用成员变量而不是局部变量可以节省空间 private LongWritable v = new LongWritable(); //reduce方法需重写实现业务逻辑 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //Shuffle传过来的<k,v>已排序分组,现在要累加单词出现次数 long sum = 0; for (LongWritable value : values) { sum += value.get(); } v.set(sum); context.write(key, v); } }

4)WCDriver类

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.获取job实例 Job job = Job.getInstance(new Configuration()); //2.设置jar包 job.setJarByClass(WCDriver.class); //3.设置InputFormat,默认使用TextInputFormat //4.设置Mapper //使用WCMapper job.setMapperClass(WCMapper.class); //设置Mapper输出key数据类型 job.setMapOutputKeyClass(Text.class); //设置Mapper输出value数据类型 job.setMapOutputValueClass(LongWritable.class); //5.设置Shuffle //6.设置Reducer //使用WCReducer job.setReducerClass(WCReducer.class); //设置Reducer输出key数据类型 job.setOutputKeyClass(Text.class); //设置Reducer输出value数据类型 job.setOutputValueClass(LongWritable.class); //7.设置OutputFormat,默认使用TextOutputFormat //8.设置输入输出文件 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //9.提交job boolean flag = job.waitForCompletion(true); System.exit(flag ? 0 : 1); } }

5)打包上传至/opt/module/hadoop3.1.3/share/hadoop/mapreduce

6)运行

cd /opt/module/hadoop3.1.3/share/hadoop/mapreduce

yarn jar jar包名 WCDriver的全类名 /words /output

注:输出目录必须不存在。

最新回复(0)