文章目录
 1 MapReduce的分区与reduceTask的数量1.1 概念1.2 代码实现
    2 排序以及序列化2.1 概述2.2 代码实现
    3 计数器4 Combiner4.1 概念4.2 MapReduce综合练习之上网流量统计
   
  
 
 
1 MapReduce的分区与reduceTask的数量
 
1.1 概念
 
MapReduce当中的分区:物以类聚,人以群分。相同key的数据,去往同一个reduce。 ReduceTask的数量默认为一个,可以自己设定数量  job.setNumRudeceTasks(3) 分区决定了我们的数据该去往哪一个ReduceTask里面去
 
1.2 代码实现
 
程序main函数入口 
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
import java
.io
.IOException
;
public class PartitionMain extends Configured implements Tool {
    public static void main(String
[] args
) throws Exception 
{
        int run 
= ToolRunner
.run(new Configuration(), new PartitionMain(), args
);
        System
.exit(run
);
    }
    @Override
    public int run(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException 
{
        
        Job job 
= Job
.getInstance(super.getConf(), "myPartition");
        
        job
.setJarByClass(PartitionMain
.class);
        
        job
.setInputFormatClass(TextInputFormat
.class);
        
        TextInputFormat
.setInputPaths(job
, new Path("hdfs://node01:8020/partitionin"));
        
        job
.setMapperClass(PartitionMapper
.class);
        
        job
.setMapOutputKeyClass(Text
.class);
        job
.setMapOutputValueClass(NullWritable
.class);
        
        job
.setPartitionerClass(PartitionOwn
.class);
        
        
        job
.setReducerClass(PartitionReducer
.class);
        
        job
.setOutputKeyClass(Text
.class);
        job
.setOutputValueClass(NullWritable
.class);
        
        
        
        job
.setNumReduceTasks(2);
        job
.setOutputFormatClass(TextOutputFormat
.class);
        TextOutputFormat
.setOutputPath(job
,new Path("hdfs://node01:8020/partition_out"));
        
        boolean b 
= job
.waitForCompletion(true);
        return b 
? 0 : 1;
    }
}
 
自定义Partitioner 
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Partitioner
;
public class PartitionOwn extends Partitioner<Text, NullWritable> {
    
    @Override
    public int getPartition(Text text
, NullWritable nullWritable
, int numReduceTask
) {
        
        String result 
= text
.toString().split("\t")[5];
        System
.out
.println(result
);
        
        if (Integer
.parseInt(result
) > 15) {
            return 1;
        } else {
            return 0;
        }
    }
}
 
定义一个map类 
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException 
{
        
        context
.write(value
, NullWritable
.get());
    }
}
 
定义一个reduce类 
package cn
.itcast
.mr
.demo1
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException 
{
        context
.write(key
, NullWritable
.get());
    }
}
 
2 排序以及序列化
 
2.1 概述
 
序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程,把字节流转为结构化对象。 Mapreduce是按照字典顺序对k2的值进行排序。Hadoop没有沿用java的serialize方式实现序列化,可以用自己的writable接口实现序列化。 实现Writable可以进行序列化,实现Comparable可以进行排序,如果想既实现序列话,又进行排序,可以同时实现Writable和Comparable,或者实现WritableComparable 如果以一行文本内容作为k2,不能够实现二次排序的功能,这时可以这两个字段封装成一个JavaBean当做的k2
 
2.2 代码实现
 
程序main函数入口 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class SortMain extends Configured implements Tool {
    public static void main(String
[] args
) throws Exception 
{
        
        Configuration configuration 
= new Configuration();
        
        int run 
= ToolRunner
.run(configuration
, new SortMain(), args
);
        
        System
.exit(run
);
    }
    @Override
    public int run(String
[] args
) throws Exception 
{
        
        Job job 
= Job
.getInstance(super.getConf(), "sort");
        
        job
.setInputFormatClass(TextInputFormat
.class);
        TextInputFormat
.setInputPaths(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
        
        job
.setMapperClass(SortMapper
.class);
        job
.setMapOutputKeyClass(K2Bean
.class);
        job
.setMapOutputValueClass(NullWritable
.class);
        
        
        job
.setReducerClass(SortReducer
.class);
        job
.setOutputKeyClass(K2Bean
.class);
        job
.setOutputValueClass(NullWritable
.class);
        
        job
.setOutputFormatClass(TextOutputFormat
.class);
        TextOutputFormat
.setOutputPath(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
        
        boolean b 
= job
.waitForCompletion(true);
        return b 
? 0 : 1;
    }
}
 
自定义JavaBean并重写CompareTo 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class K2Bean implements WritableComparable<K2Bean> {
    
    private String first
;
    private int second
;
    
    @Override
    public int compareTo(K2Bean o
) {
        
        int i 
= this.first
.compareTo(o
.first
);
        
        if (i 
== 0) {
            int i1 
= Integer
.valueOf(this.second
).compareTo(Integer
.valueOf(o
.second
));
            return i1
;      
        } else {
            
            return i
;       
        }
    }
    
    @Override
    public void write(DataOutput out
) throws IOException 
{
        out
.writeUTF(first
);
        out
.writeInt(second
);
    }
    
    @Override
    public void readFields(DataInput in
) throws IOException 
{
        this.first 
= in
.readUTF();
        this.second 
= in
.readInt();
    }
	
    @Override
    public String 
toString() {
        return first 
+ '\t' + second
;
    }
	
	
    public String 
getFirst() {
        return first
;
    }
    public void setFirst(String first
) {
        this.first 
= first
;
    }
    public int getSecond() {
        return second
;
    }
    public void setSecond(int second
) {
        this.second 
= second
;
    }
}
 
定义一个map类 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
    @Override
    protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException 
{
        
        String
[] split 
= value
.toString().split("\t");
        
        K2Bean k2Bean 
= new K2Bean();
        k2Bean
.setFirst(split
[0]);
        k2Bean
.setSecond(Integer
.parseInt(split
[1]));
        
        context
.write(k2Bean
, NullWritable
.get());
    }
}
 
定义一个reduce类 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
    @Override
    protected void reduce(K2Bean key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException 
{
        
        for (NullWritable value 
: values
) {
            context
.write(key
, NullWritable
.get());
        }
    }
}
 
3 计数器
 
 
通过context上下文对象获取计数器 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> {
    @Override
    protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException 
{
    
		
		Counter counter 
= context
.getCounter("MR_COUNT", "MapRecordCounter");
        counter
.increment(1L
);
        
        
        String
[] split 
= value
.toString().split("\t");
        
        K2Bean k2Bean 
= new K2Bean();
        k2Bean
.setFirst(split
[0]);
        k2Bean
.setSecond(Integer
.parseInt(split
[1]));
        
        context
.write(k2Bean
, NullWritable
.get());
    }
}
 
通过enum枚举类型定义计数器 
package cn
.itcast
.mr
.demo2
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> {
	public static enum Counter
{
		REDUCE_INPUT_RECORDS
, REDUCE_INPUT_VAL_NUMS
,
	}
	
    @Override
    protected void reduce(K2Bean key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException 
{
    	
    	context
.getCounter(Counter
.REDUCE_INPUT_RECORDS
).increment(1L
);
        
        for (NullWritable value 
: values
) {
        	
        	context
.getCounter(Counter
.REDUCE_INPUT_VAL_NUMS
).increment(1L
);
            context
.write(key
, NullWritable
.get());
        }
    }
}
 
4 Combiner
 
4.1 概念
 
combiner可以先对相同k2进行合并,减少发送到reduce阶段的k2的数量,这么做的好处是可以节约网络带宽。 combiner其实就是一个reducer类,但是这个reducer类的输入和输出比较特殊,输入是k2,v2,输出还是k2,v2。 combiner不能改变数据结果值,只是用于调用减少发送到reduce端的数据量。
 
 
 注意:求平均值不能用combiner。
 
 
4.2 MapReduce综合练习之上网流量统计
 
程序main函数入口 
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class FlowNumMain extends Configured implements Tool {
    @Override
    public int run(String
[] args
) throws Exception 
{
        
        Job job 
= Job
.getInstance(super.getConf(), "flowSum");
        
        job
.setInputFormatClass(TextInputFormat
.class);
        TextInputFormat
.setInputPaths(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
        
        job
.setMapperClass(FlowNumMapper
.class);
        
        job
.setOutputKeyClass(Text
.class);
        job
.setOutputValueClass(FlowNum
.class);
        
        job
.setReducerClass(FlowNumReducer
.class);
        
        job
.setOutputValueClass(FlowNum
.class);
        job
.setOutputKeyClass(Text
.class);
        
        job
.setOutputFormatClass(TextOutputFormat
.class);
        TextOutputFormat
.setOutputPath(job
, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input"));
        
        boolean b 
= job
.waitForCompletion(true);
        return b 
? 0 : 1;
    }
    public static void main(String
[] args
) throws Exception 
{
        
        Configuration configuration 
= new Configuration();
        
        int run 
= ToolRunner
.run(configuration
, new FlowNumMain(), args
);
        
        System
.exit(run
);
    }
}
 
自定义JavaBean(FlowNum) 
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.Writable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class FlowNum implements Writable {
    
    private Integer upload
;
    private Integer download
;
    private Integer uploadSum
;
    private Integer downloadSum
;
    
    @Override
    public void write(DataOutput out
) throws IOException 
{
        out
.writeInt(upload
);
        out
.writeInt(download
);
        out
.writeInt(uploadSum
);
        out
.writeInt(downloadSum
);
    }
    
    @Override
    public void readFields(DataInput in
) throws IOException 
{
        this.upload 
= in
.readInt();
        this.download 
= in
.readInt();
        this.uploadSum 
= in
.readInt();
        this.downloadSum 
= in
.readInt();
    }
    public Integer 
getUpload() {
        return upload
;
    }
    public void setUpload(Integer upload
) {
        this.upload 
= upload
;
    }
    public Integer 
getDownload() {
        return download
;
    }
    public void setDownload(Integer download
) {
        this.download 
= download
;
    }
    public Integer 
getUploadSum() {
        return uploadSum
;
    }
    public void setUploadSum(Integer uploadSum
) {
        this.uploadSum 
= uploadSum
;
    }
    public Integer 
getDownloadSum() {
        return downloadSum
;
    }
    public void setDownloadSum(Integer downloadSum
) {
        this.downloadSum 
= downloadSum
;
    }
    @Override
    public String 
toString() {
        return upload 
+ "\t" + download 
+ "\t" + uploadSum 
+ "\t" + downloadSum
;
    }
}
 
定义一个map类 
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class FlowNumMapper extends Mapper<LongWritable, Text, Text, FlowNum> {
    @Override
    protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException 
{
        
        String
[] split 
= value
.toString().split("\t");
        
        String phoneNum 
= split
[1];
        
        Integer upload 
= Integer
.parseInt(split
[6]);
        Integer download 
= Integer
.parseInt(split
[7]);
        Integer uploadSum 
= Integer
.parseInt(split
[8]);
        Integer downloadSum 
= Integer
.parseInt(split
[9]);
        FlowNum flowNum 
= new FlowNum();
        flowNum
.setUpload(upload
);
        flowNum
.setDownload(download
);
        flowNum
.setUploadSum(uploadSum
);
        flowNum
.setDownloadSum(downloadSum
);
        
        context
.write(new Text(phoneNum
), flowNum
);
    }
}
 
定义reducer类 
package cn
.itcast
.mr
.demo3
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class FlowNumReducer extends Reducer<Text, FlowNum, Text, FlowNum> {
    @Override
    protected void reduce(Text key
, Iterable
<FlowNum> values
, Context context
) throws IOException
, InterruptedException 
{
        
        int upload 
= 0;
        int download 
= 0;
        int uploadSum 
= 0;
        int downloadSum 
= 0;
        
        for (FlowNum value 
: values
) {
            upload 
+= value
.getUpload();
            download 
+= value
.getDownload();
            uploadSum 
+= value
.getUploadSum();
            downloadSum 
+= value
.getDownloadSum();
        }
        FlowNum flowNum 
= new FlowNum();
        flowNum
.setUpload(upload
);
        flowNum
.setDownload(download
);
        flowNum
.setUploadSum(uploadSum
);
        flowNum
.setDownloadSum(downloadSum
);
        
        context
.write(key
, flowNum
);
    }
}