MapReduce 排序和序列化
序列化 (Serialization) 是指把结构化对象转化为字节流
反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能
数据格式如下
a 1 a 1
a 9 a 5
b 3 a 7
a 7 a 9
b 8 b 3
b 10 b 8
a 5 b 10
要求:
第一列按照字典顺序进行排列第一列相同的时候, 第二列按照升序进行排列
解决思路:
将 Map 端输出的 <key,value> 中的 key 和 value 组合成一个新的 key (newKey), value值不变
这里就变成 <(key,value),value>, 在针对 newKey 排序的时候, 如果 key 相同, 就再对value进行排序
Step 1. 自定义类型和比较器
package mapreduce_sort
;
import org
.apache
.hadoop
.io
.WritableComparable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class SortBean implements WritableComparable <SortBean>{
private String word
;
private int num
;
public int getNum() {
return num
;
}
public void setNum(int num
) {
this.num
= num
;
}
public String
getWord() {
return word
;
}
public void setWord(String word
) {
this.word
= word
;
}
@Override
public String
toString() {
return word
+ '\t' + num
;
}
@Override
public int compareTo(SortBean sortBean
) {
int result
= this.word
.compareTo(sortBean
.word
);
if(result
==0) {
return this.num
- sortBean
.num
;
}
return result
;
}
@Override
public void write(DataOutput dataOutput
) throws IOException
{
dataOutput
.writeUTF(word
);
dataOutput
.writeInt(num
);
}
@Override
public void readFields(DataInput dataInput
) throws IOException
{
this.word
=dataInput
.readUTF();
this.num
=dataInput
.readInt();
}
}
Step 2. Mapper
package mapreduce_sort
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class SortMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] split
=value
.toString().split("\t");
SortBean sortBean
=new SortBean();
sortBean
.setWord(split
[0]);
sortBean
.setNum(Integer
.parseInt(split
[1]));
context
.write(sortBean
,NullWritable
.get());
}
}
Step 3. Reducer
package mapreduce_sort
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class SortReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {
@Override
protected void reduce(SortBean key
, Iterable
<NullWritable> values
, Context context
) throws IOException
, InterruptedException
{
context
.write(key
,NullWritable
.get());
}
}
Step 4. Main 入口
package mapreduce_sort
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.conf
.Configured
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.TextInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
import org
.apache
.hadoop
.util
.Tool
;
import org
.apache
.hadoop
.util
.ToolRunner
;
public class JobMain extends Configured implements Tool {
@Override
public int run(String
[] strings
) throws Exception
{
Job job
= Job
.getInstance(super.getConf(), "mapreduce_sort");
job
.setJarByClass(JobMain
.class);
job
.setInputFormatClass(TextInputFormat
.class);
TextInputFormat
.addInputPath(job
,new Path("hdfs://node01:8020/input/sort_input"));
job
.setMapperClass(SortMapper
.class);
job
.setMapOutputKeyClass(SortBean
.class);
job
.setMapOutputValueClass(NullWritable
.class);
job
.setReducerClass(SortReducer
.class);
job
.setOutputKeyClass(SortBean
.class);
job
.setOutputValueClass(NullWritable
.class);
job
.setOutputFormatClass(TextOutputFormat
.class);
TextOutputFormat
.setOutputPath(job
,new Path("hdfs://node01:8020/out/sort_out"));
boolean wait
= job
.waitForCompletion(true);
return wait
?0:1;
}
public static void main(String
[] args
) throws Exception
{
Configuration configuration
=new Configuration();
int run
= ToolRunner
.run(configuration
, new JobMain(), args
);
System
.exit(run
);
}
}