b.txt
atguigu pingping atguigu pingping pingping ssc.txt
atguigu ss atguigu pingping(2)期望输出数据 atguigu c.txt–>2 b.txt–>2 a.txt–>3 pingping c.txt–>1 b.txt–>3 a.txt–>1 ss c.txt–>1 b.txt–>1 a.txt–>2
需求分析 第一次处理 (1)第一次处理,编写OneIndexMapper类 package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> { String name; Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取文件名称 FileSplit split = (FileSplit) context.getInputSplit(); name = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1获取一行 String line = value.toString(); //2切割 String[] fields = line.split(" "); for (String word : fields) { //拼接 k.set(word + "--" + name); v.set(1); //写出 context.write(k,v); } } }(2)第一次处理,编写OneIndexReducer类
package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OneIndexReducer extends Reducer<Text, IntWritable,Text,IntWritable> { IntWritable v= new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum +=value.get(); } v.set(sum); context.write(key,v); } }(3)第一次处理,编写OneIndexDriver类
package com.until.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class OneIndexDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] {"d:/input/inputoneindex","d:/output3"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(OneIndexDriver.class); job.setMapperClass(OneIndexMapper.class); job.setReducerClass(OneIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }(4)查看第一次输出结果
atguigu--a.txt 3 atguigu--b.txt 2 atguigu--c.txt 2 pingping--a.txt 1 pingping--b.txt 3 pingping--c.txt 1 ss--a.txt 2 ss--b.txt 1 ss--c.txt 1 第二次处理 (1)第二次处理,编写TwoIndexMapper类 package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TwoIndexMapper extends Mapper<LongWritable,Text,Text,Text> { Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //atguigu--a.txt 3 //1获取一行数据 String line = value.toString(); //2切割 String[] fields = line.split("--"); k.set(fields[0]); v.set(fields[1]); context.write(k,v); } }(2)第二次处理,编写TwoIndexReducer类
package com.until.index; import com.until.outputformat.FilterReducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.io.Reader; public class TwoIndexReducer extends Reducer<Text,Text,Text,Text> { Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //atguigu a.txt 3 b.txt 2 StringBuilder sb = new StringBuilder(); for (Text value : values) { sb.append(value.toString().replace("\t","-->")+"\t"); } v.set(sb.toString()); context.write(key,v); } }(3)第二次处理,编写TwoIndexDriver类
package com.until.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TwoIndexDriver { static { try { System.load("C:/mine/software/hadoop-2.7.2/bin/hadoop.dll"); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n" + e); System.exit(1); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] {"d:/input/inputtwoindex", "d:/output4"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TwoIndexDriver.class); job.setMapperClass(TwoIndexMapper.class); job.setReducerClass(TwoIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }(4)第二次查看最终结果
atguigu c.txt-->2 b.txt-->2 a.txt-->3 pingping c.txt-->1 b.txt-->3 a.txt-->1 ss c.txt-->1 b.txt-->1 a.txt-->2(2)输出数据 part-r-00000.txt
13509468723 7335 110349 117684 13975057813 11058 48243 59301 13568436656 3597 25635 29232 13736230513 2481 24681 27162 18390173782 9531 2412 11943 13630577991 6960 690 7650 15043685818 3659 3538 7197 13992314666 3008 3720 6728 15910133277 3156 2936 6092 13560439638 918 4938 5856 需求分析 实现代码 (1)编写FlowBean类 package com.until.top; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; public FlowBean(){ super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow=dataInput.readLong(); sumFlow = dataInput.readLong(); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } public void set(long downFlow,long upFlow){ this.downFlow = downFlow; this.upFlow = upFlow; sumFlow = upFlow+downFlow; } @Override public int compareTo(FlowBean bean) { int result; if (this.sumFlow>bean.getSumFlow()){ result= -1; }else if (this.sumFlow<bean.getSumFlow()){ result = 1; }else { result=0; } return result; } }(2)编写TopNMapper类
package com.until.top; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; public class TopNMapper extends Mapper<LongWritable, Text,FlowBean,Text> { private TreeMap<FlowBean,Text> flowMap = new TreeMap<>(); private FlowBean kBean; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { kBean = new FlowBean(); Text v =new Text(); String line = value.toString(); String[] fields = line.split("\t"); String phoneNum = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); long sumFlow = Long.parseLong(fields[3]); kBean.setUpFlow(upFlow); kBean.setDownFlow(downFlow); kBean.setSumFlow(sumFlow); v.set(phoneNum); flowMap.put(kBean,v); if (flowMap.size()>10){ flowMap.remove(flowMap.lastKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<FlowBean> bean = flowMap.keySet().iterator(); while (bean.hasNext()){ FlowBean k = bean.next(); context.write(k,flowMap.get(k)); } } }(3)编写TopNReducer类
package com.until.top; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; public class TopNReducer extends Reducer<FlowBean, Text,Text,FlowBean> { TreeMap<FlowBean,Text> flowMap = new TreeMap<>(); @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { FlowBean bean = new FlowBean(); bean.set(key.getDownFlow(),key.getUpFlow()); flowMap.put(bean,new Text(value)); if (flowMap.size()>10){ flowMap.remove(flowMap.lastKey()); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<FlowBean> iterator = flowMap.keySet().iterator(); while (iterator.hasNext()){ FlowBean v = iterator.next(); context.write(new Text(flowMap.get(v)),v); } } }(4)编写TopNDriver类
package com.until.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; import java.io.IOException; public class TopNDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TopNDriver.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }(2)第一次Reducer类
package com.until.friends; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OneShareFriendsReducer extends Reducer<Text, Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text person : values) { sb.append(person).append(","); } context.write(key,new Text()); } }(3)第一次Driver类
package com.until.friends; import com.until.index.OneIndexDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.regex.Pattern; public class OneShareFriendsDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(OneShareFriendsDriver.class); job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }(4)第二次Mapper类
package com.until.friends; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Arrays; public class TwoShareFriendsMapper extends Mapper<LongWritable,Text, Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //A I,K,C,B,G,F,H,O,D, //友,人,人 String line = value.toString(); String[] friend_persons = line.split("\t"); String friend = friend_persons[0]; String[] persons = friend_persons[1].split(","); Arrays.sort(persons); for (int i = 0; i < persons.length; i++) { for (int j = i+1; j < persons.length; j++) { // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去 context.write(new Text(persons[i]+"-"+persons[j]),new Text(friend)); } } } }(5)第二次Reducer类
package com.until.friends; import org.apache.commons.math3.distribution.AbstractMultivariateRealDistribution; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class TwoShareFriendsReducer extends Reducer<Text,Text,Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text friend : values) { sb.append(friend).append(" "); } context.write(key,new Text(sb.toString())); } }(6)第二次Driver类
package com.until.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.jboss.netty.channel.ChannelHandlerLifeCycleException; import java.io.IOException; public class TwoShareFriendsDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TwoShareFriendsDriver.class); job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }