MapReduce排序案例

it2023-09-15  70

文章目录

一、源数据二、实现writableComparable接口三、定义两个字段first和second四、实现tostring()方法五、重写 compareTo()方法六、重写wirte()方法,实现序列化七、 重写readFileds()方法,重实现反序列化八、在map中将数据封装进比较器实现类对象中的first、second九、在reduce中将数据写出十、完整代码

一、源数据

a 1 a 9 b 3 a 7 b 8 b 10 a 5

PairWritable.java

二、实现writableComparable接口

使用PairWritable类型作为泛型

public class PairWritable implements WritableComparable<PairWritable>

三、定义两个字段first和second

用来存储后期进行比较的内容

private String first; private int second;

getter和setter

public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; }

四、实现tostring()方法

@Override public String toString() { return this.first+"\t"+this.second; }

五、重写 compareTo()方法

重写比较器,实现比较逻辑,实现排序规则

public int compareTo(PairWritable o) { // 先比较first,如果first相同则比较second int comp = this.first.compareTo(o.first); if (comp == 0) { // 数字之间的比较直接加减就行 return this.second - other.second; } return comp; }

六、重写wirte()方法,实现序列化

@Override public void write(DataOutput output) throws IOException { output.writeUTF(first); //写一个字符串 output.writeInt(second); // 写一个整数 }

七、 重写readFileds()方法,重实现反序列化

@Override public void readFields(DataInput input) throws IOException { this.first = input.readUTF(); this.second = input.readInt(); }

八、在map中将数据封装进比较器实现类对象中的first、second

String[] split = value.toString().split("\t"); PairWritable pairWritable = new PairWritable(); pairWritable.setFirst(split[0]); pairWritable.setSecond(Integer.parseInt(split[1])); context.write(pairWritable,value);

九、在reduce中将数据写出

此处需要注意,需要用一个NullWritable作为占位符,作为输出的value,因为实际排序都由writableComparable的实现类给我们做了

for(Text value:values){ context.write(key,NullWritable.get()); }

十、完整代码

MyMapper.java

package org; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ // <k1,v1> -》 行偏移量,每一行的数据 此处 我一直都误解了 public class MyMapper extends Mapper<LongWritable, Text,PairWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splitString = value.toString().split("\t"); PairWritable pairWritable = new PairWritable(); pairWritable.setFirst(splitString[0]); pairWritable.setSecond(Integer.parseInt(splitString[1])); context.write(pairWritable,value); } }

MyReducer.java

package org; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ // 此处输出使用了一个NullWritable类型作为一个占位符 public class MyReducer extends Reducer<PairWritable, Text,PairWritable, NullWritable> { @Override protected void reduce(PairWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text value : values){ context.write(key,NullWritable.get()); // 有重复值 也是有重复key值,因此不怕重复 } } }

MyJob.java

package org; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ public class MyJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 为确保能在hadoop集群上运行,需要设置 job.setJarByClass(MyJob.class); // 分别设置Map Reducer job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 设置输出格式 job.setOutputKeyClass(PairWritable.class); job.setOutputValueClass(NullWritable.class); // 设置输入输出路径 FileInputFormat.addInputPath(job,new Path("hdfs:/Initial_Data/")); FileOutputFormat.setOutputPath(job,new Path("hdfs:/Clean_Data/")); System.exit(job.waitForCompletion(true)? 0:1); } }

PairWritable.java

package org; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ public class PairWritable implements WritableComparable<PairWritable> { private String first; // 字符串 private int second; public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int compareTo(PairWritable o) { int result = this.first.compareTo(o.first); if(result == 0){ return this.second = o.second; } return result; } /** * 序列化 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(first); dataOutput.writeInt(second); } /** * 反序列化 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.first = dataInput.readUTF(); this.second = dataInput.readInt(); } @Override public String toString() { return first+'\t'+second; } }

温知识:

MapReduce程序开发中的FileInputFormat和TextInputFormat

最新回复(0)