文章目录
1 MapReduce的分区与reduceTask的数量1.1 概念1.2 代码实现
2 排序以及序列化2.1 概述2.2 代码实现
3 计数器4 Combiner4.1 概念4.2 MapReduce综合练习之上网流量统计
1 MapReduce的分区与reduceTask的数量
1.1 概念
MapReduce当中的分区:物以类聚,人以群分。相同key的数据,去往同一个reduce。 ReduceTask的数量默认为一个,可以自己设定数量 job.setNumRudeceTasks(3) 分区决定了我们的数据该去往哪一个ReduceTask里面去
1.2 代码实现
程序main函数入口
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
import java
.io
.IOException
;
public class PartitionMain extends Configured implements Tool {
public static void main(String
[] args
) throws Exception
{
int run
= ToolRunner
.run(new Configuration(), new PartitionMain(), args
);
System
.exit(run
);
}
@Override
public int run(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Job job
= Job
.getInstance(super.getConf(), "myPartition");
job
.setJarByClass(PartitionMain
.class);
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.setInputPaths(job
, new Path("hdfs://node01:8020/partitionin"));
job
.setMapperClass(PartitionMapper
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(NullWritable
.class);
job
.setPartitionerClass(PartitionOwn
.class);
job
.setReducerClass(PartitionReducer
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(NullWritable
.class);
job
.setNumReduceTasks(2);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
,new Path("hdfs://node01:8020/partition_out"));
boolean b
= job
.waitForCompletion(true);
return b
? 0 : 1;
}
}
自定义Partitioner
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Partitioner
;
public class PartitionOwn extends Partitioner<Text, NullWritable> {
@Override
public int getPartition(Text text
, NullWritable nullWritable
, int numReduceTask
) {
String result
= text
.toString().split("\t")[5];
System
.out
.println(result
);
if (Integer
.parseInt(result
) > 15) {
return 1;
} else {
return 0;
}
}
}
定义一个map类
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
context
.write(value
, NullWritable
.get());
}
}
定义一个reduce类
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException
{
context
.write(key
, NullWritable
.get());
}
}
2 排序以及序列化
2.1 概述
序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程,把字节流转为结构化对象。 Mapreduce是按照字典顺序对k2的值进行排序。Hadoop没有沿用java的serialize方式实现序列化,可以用自己的writable接口实现序列化。 实现Writable可以进行序列化,实现Comparable可以进行排序,如果想既实现序列话,又进行排序,可以同时实现Writable和Comparable,或者实现WritableComparable 如果以一行文本内容作为k2,不能够实现二次排序的功能,这时可以这两个字段封装成一个JavaBean当做的k2
2.2 代码实现
程序main函数入口
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class SortMain extends Configured implements Tool {
public static void main(String
[] args
) throws Exception
{
Configuration configuration
= new Configuration();
int run
= ToolRunner
.run(configuration
, new SortMain(), args
);
System
.exit(run
);
}
@Override
public int run(String
[] args
) throws Exception
{
Job job
= Job
.getInstance(super.getConf(), "sort");
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.setInputPaths(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
job
.setMapperClass(SortMapper
.class);
job
.setMapOutputKeyClass(K2Bean
.class);
job
.setMapOutputValueClass(NullWritable
.class);
job
.setReducerClass(SortReducer
.class);
job
.setOutputKeyClass(K2Bean
.class);
job
.setOutputValueClass(NullWritable
.class);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
boolean b
= job
.waitForCompletion(true);
return b
? 0 : 1;
}
}
自定义JavaBean并重写CompareTo
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class K2Bean implements WritableComparable<K2Bean> {
private String first
;
private int second
;
@Override
public int compareTo(K2Bean o
) {
int i
= this.first
.compareTo(o
.first
);
if (i
== 0) {
int i1
= Integer
.valueOf(this.second
).compareTo(Integer
.valueOf(o
.second
));
return i1
;
} else {
return i
;
}
}
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeUTF(first
);
out
.writeInt(second
);
}
@Override
public void readFields(DataInput in
) throws IOException
{
this.first
= in
.readUTF();
this.second
= in
.readInt();
}
@Override
public String
toString() {
return first
+ '\t' + second
;
}
public String
getFirst() {
return first
;
}
public void setFirst(String first
) {
this.first
= first
;
}
public int getSecond() {
return second
;
}
public void setSecond(int second
) {
this.second
= second
;
}
}
定义一个map类
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] split
= value
.toString().split("\t");
K2Bean k2Bean
= new K2Bean();
k2Bean
.setFirst(split
[0]);
k2Bean
.setSecond(Integer
.parseInt(split
[1]));
context
.write(k2Bean
, NullWritable
.get());
}
}
定义一个reduce类
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
@Override
protected void reduce(K2Bean key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException
{
for (NullWritable value
: values
) {
context
.write(key
, NullWritable
.get());
}
}
}
3 计数器
通过context上下文对象获取计数器
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
Counter counter
= context
.getCounter("MR_COUNT", "MapRecordCounter");
counter
.increment(1L
);
String
[] split
= value
.toString().split("\t");
K2Bean k2Bean
= new K2Bean();
k2Bean
.setFirst(split
[0]);
k2Bean
.setSecond(Integer
.parseInt(split
[1]));
context
.write(k2Bean
, NullWritable
.get());
}
}
通过enum枚举类型定义计数器
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
public static enum Counter
{
REDUCE_INPUT_RECORDS
, REDUCE_INPUT_VAL_NUMS
,
}
@Override
protected void reduce(K2Bean key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException
{
context
.getCounter(Counter
.REDUCE_INPUT_RECORDS
).increment(1L
);
for (NullWritable value
: values
) {
context
.getCounter(Counter
.REDUCE_INPUT_VAL_NUMS
).increment(1L
);
context
.write(key
, NullWritable
.get());
}
}
}
4 Combiner
4.1 概念
combiner可以先对相同k2进行合并,减少发送到reduce阶段的k2的数量,这么做的好处是可以节约网络带宽。 combiner其实就是一个reducer类,但是这个reducer类的输入和输出比较特殊,输入是k2,v2,输出还是k2,v2。 combiner不能改变数据结果值,只是用于调用减少发送到reduce端的数据量。
注意:求平均值不能用combiner。
4.2 MapReduce综合练习之上网流量统计
程序main函数入口
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class FlowNumMain extends Configured implements Tool {
@Override
public int run(String
[] args
) throws Exception
{
Job job
= Job
.getInstance(super.getConf(), "flowSum");
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.setInputPaths(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
job
.setMapperClass(FlowNumMapper
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(FlowNum
.class);
job
.setReducerClass(FlowNumReducer
.class);
job
.setOutputValueClass(FlowNum
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
boolean b
= job
.waitForCompletion(true);
return b
? 0 : 1;
}
public static void main(String
[] args
) throws Exception
{
Configuration configuration
= new Configuration();
int run
= ToolRunner
.run(configuration
, new FlowNumMain(), args
);
System
.exit(run
);
}
}
自定义JavaBean(FlowNum)
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.Writable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class FlowNum implements Writable {
private Integer upload
;
private Integer download
;
private Integer uploadSum
;
private Integer downloadSum
;
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeInt(upload
);
out
.writeInt(download
);
out
.writeInt(uploadSum
);
out
.writeInt(downloadSum
);
}
@Override
public void readFields(DataInput in
) throws IOException
{
this.upload
= in
.readInt();
this.download
= in
.readInt();
this.uploadSum
= in
.readInt();
this.downloadSum
= in
.readInt();
}
public Integer
getUpload() {
return upload
;
}
public void setUpload(Integer upload
) {
this.upload
= upload
;
}
public Integer
getDownload() {
return download
;
}
public void setDownload(Integer download
) {
this.download
= download
;
}
public Integer
getUploadSum() {
return uploadSum
;
}
public void setUploadSum(Integer uploadSum
) {
this.uploadSum
= uploadSum
;
}
public Integer
getDownloadSum() {
return downloadSum
;
}
public void setDownloadSum(Integer downloadSum
) {
this.downloadSum
= downloadSum
;
}
@Override
public String
toString() {
return upload
+ "\t" + download
+ "\t" + uploadSum
+ "\t" + downloadSum
;
}
}
定义一个map类
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class FlowNumMapper extends Mapper<LongWritable, Text, Text, FlowNum> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] split
= value
.toString().split("\t");
String phoneNum
= split
[1];
Integer upload
= Integer
.parseInt(split
[6]);
Integer download
= Integer
.parseInt(split
[7]);
Integer uploadSum
= Integer
.parseInt(split
[8]);
Integer downloadSum
= Integer
.parseInt(split
[9]);
FlowNum flowNum
= new FlowNum();
flowNum
.setUpload(upload
);
flowNum
.setDownload(download
);
flowNum
.setUploadSum(uploadSum
);
flowNum
.setDownloadSum(downloadSum
);
context
.write(new Text(phoneNum
), flowNum
);
}
}
定义reducer类
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class FlowNumReducer extends Reducer<Text, FlowNum, Text, FlowNum> {
@Override
protected void reduce(Text key
, Iterable
<FlowNum> values
, Context context
) throws IOException
, InterruptedException
{
int upload
= 0;
int download
= 0;
int uploadSum
= 0;
int downloadSum
= 0;
for (FlowNum value
: values
) {
upload
+= value
.getUpload();
download
+= value
.getDownload();
uploadSum
+= value
.getUploadSum();
downloadSum
+= value
.getDownloadSum();
}
FlowNum flowNum
= new FlowNum();
flowNum
.setUpload(upload
);
flowNum
.setDownload(download
);
flowNum
.setUploadSum(uploadSum
);
flowNum
.setDownloadSum(downloadSum
);
context
.write(key
, flowNum
);
}
}