PairWritable.java
使用PairWritable类型作为泛型
public class PairWritable implements WritableComparable<PairWritable>用来存储后期进行比较的内容
private String first; private int second;getter和setter
public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; }重写比较器,实现比较逻辑,实现排序规则
public int compareTo(PairWritable o) { // 先比较first,如果first相同则比较second int comp = this.first.compareTo(o.first); if (comp == 0) { // 数字之间的比较直接加减就行 return this.second - other.second; } return comp; }此处需要注意,需要用一个NullWritable作为占位符,作为输出的value,因为实际排序都由writableComparable的实现类给我们做了
for(Text value:values){ context.write(key,NullWritable.get()); }MyMapper.java
package org; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ // <k1,v1> -》 行偏移量,每一行的数据 此处 我一直都误解了 public class MyMapper extends Mapper<LongWritable, Text,PairWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splitString = value.toString().split("\t"); PairWritable pairWritable = new PairWritable(); pairWritable.setFirst(splitString[0]); pairWritable.setSecond(Integer.parseInt(splitString[1])); context.write(pairWritable,value); } }MyReducer.java
package org; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ // 此处输出使用了一个NullWritable类型作为一个占位符 public class MyReducer extends Reducer<PairWritable, Text,PairWritable, NullWritable> { @Override protected void reduce(PairWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text value : values){ context.write(key,NullWritable.get()); // 有重复值 也是有重复key值,因此不怕重复 } } }MyJob.java
package org; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ public class MyJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 为确保能在hadoop集群上运行,需要设置 job.setJarByClass(MyJob.class); // 分别设置Map Reducer job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 设置输出格式 job.setOutputKeyClass(PairWritable.class); job.setOutputValueClass(NullWritable.class); // 设置输入输出路径 FileInputFormat.addInputPath(job,new Path("hdfs:/Initial_Data/")); FileOutputFormat.setOutputPath(job,new Path("hdfs:/Clean_Data/")); System.exit(job.waitForCompletion(true)? 0:1); } }PairWritable.java
package org; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Zhao Wen on 2020/10/20 */ public class PairWritable implements WritableComparable<PairWritable> { private String first; // 字符串 private int second; public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int compareTo(PairWritable o) { int result = this.first.compareTo(o.first); if(result == 0){ return this.second = o.second; } return result; } /** * 序列化 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(first); dataOutput.writeInt(second); } /** * 反序列化 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.first = dataInput.readUTF(); this.second = dataInput.readInt(); } @Override public String toString() { return first+'\t'+second; } }温知识:
MapReduce程序开发中的FileInputFormat和TextInputFormat