【MapReduce】七、MapReduce扩展案例

it2025-08-06  9

文章目录

一、倒排索引案例(多job串联)二、TopN案例三、找博客共同好友案例

一、倒排索引案例(多job串联)

需求 有大量的文本(文档、网页),需要建立搜索索引: (1)数据输入 a.txt atguigu pingping atguigu ss atguigu ss

b.txt

atguigu pingping atguigu pingping pingping ss

c.txt

atguigu ss atguigu pingping

(2)期望输出数据 atguigu c.txt–>2 b.txt–>2 a.txt–>3 pingping c.txt–>1 b.txt–>3 a.txt–>1 ss c.txt–>1 b.txt–>1 a.txt–>2

需求分析 第一次处理 (1)第一次处理,编写OneIndexMapper类 package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> { String name; Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取文件名称 FileSplit split = (FileSplit) context.getInputSplit(); name = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1获取一行 String line = value.toString(); //2切割 String[] fields = line.split(" "); for (String word : fields) { //拼接 k.set(word + "--" + name); v.set(1); //写出 context.write(k,v); } } }

(2)第一次处理,编写OneIndexReducer类

package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OneIndexReducer extends Reducer<Text, IntWritable,Text,IntWritable> { IntWritable v= new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum +=value.get(); } v.set(sum); context.write(key,v); } }

(3)第一次处理,编写OneIndexDriver类

package com.until.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class OneIndexDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] {"d:/input/inputoneindex","d:/output3"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(OneIndexDriver.class); job.setMapperClass(OneIndexMapper.class); job.setReducerClass(OneIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }

(4)查看第一次输出结果

atguigu--a.txt 3 atguigu--b.txt 2 atguigu--c.txt 2 pingping--a.txt 1 pingping--b.txt 3 pingping--c.txt 1 ss--a.txt 2 ss--b.txt 1 ss--c.txt 1 第二次处理 (1)第二次处理,编写TwoIndexMapper类 package com.until.index; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TwoIndexMapper extends Mapper<LongWritable,Text,Text,Text> { Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //atguigu--a.txt 3 //1获取一行数据 String line = value.toString(); //2切割 String[] fields = line.split("--"); k.set(fields[0]); v.set(fields[1]); context.write(k,v); } }

(2)第二次处理,编写TwoIndexReducer类

package com.until.index; import com.until.outputformat.FilterReducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.io.Reader; public class TwoIndexReducer extends Reducer<Text,Text,Text,Text> { Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //atguigu a.txt 3 b.txt 2 StringBuilder sb = new StringBuilder(); for (Text value : values) { sb.append(value.toString().replace("\t","-->")+"\t"); } v.set(sb.toString()); context.write(key,v); } }

(3)第二次处理,编写TwoIndexDriver类

package com.until.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TwoIndexDriver { static { try { System.load("C:/mine/software/hadoop-2.7.2/bin/hadoop.dll"); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n" + e); System.exit(1); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] {"d:/input/inputtwoindex", "d:/output4"}; Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TwoIndexDriver.class); job.setMapperClass(TwoIndexMapper.class); job.setReducerClass(TwoIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }

(4)第二次查看最终结果

atguigu c.txt-->2 b.txt-->2 a.txt-->3 pingping c.txt-->1 b.txt-->3 a.txt-->1 ss c.txt-->1 b.txt-->1 a.txt-->2

二、TopN案例

需求 对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息 (1)输入数据 top10input.txt 13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 5548

(2)输出数据 part-r-00000.txt

13509468723 7335 110349 117684 13975057813 11058 48243 59301 13568436656 3597 25635 29232 13736230513 2481 24681 27162 18390173782 9531 2412 11943 13630577991 6960 690 7650 15043685818 3659 3538 7197 13992314666 3008 3720 6728 15910133277 3156 2936 6092 13560439638 918 4938 5856 需求分析 实现代码 (1)编写FlowBean类 package com.until.top; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; public FlowBean(){ super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow=dataInput.readLong(); sumFlow = dataInput.readLong(); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } public void set(long downFlow,long upFlow){ this.downFlow = downFlow; this.upFlow = upFlow; sumFlow = upFlow+downFlow; } @Override public int compareTo(FlowBean bean) { int result; if (this.sumFlow>bean.getSumFlow()){ result= -1; }else if (this.sumFlow<bean.getSumFlow()){ result = 1; }else { result=0; } return result; } }

(2)编写TopNMapper类

package com.until.top; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; public class TopNMapper extends Mapper<LongWritable, Text,FlowBean,Text> { private TreeMap<FlowBean,Text> flowMap = new TreeMap<>(); private FlowBean kBean; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { kBean = new FlowBean(); Text v =new Text(); String line = value.toString(); String[] fields = line.split("\t"); String phoneNum = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); long sumFlow = Long.parseLong(fields[3]); kBean.setUpFlow(upFlow); kBean.setDownFlow(downFlow); kBean.setSumFlow(sumFlow); v.set(phoneNum); flowMap.put(kBean,v); if (flowMap.size()>10){ flowMap.remove(flowMap.lastKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<FlowBean> bean = flowMap.keySet().iterator(); while (bean.hasNext()){ FlowBean k = bean.next(); context.write(k,flowMap.get(k)); } } }

(3)编写TopNReducer类

package com.until.top; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; public class TopNReducer extends Reducer<FlowBean, Text,Text,FlowBean> { TreeMap<FlowBean,Text> flowMap = new TreeMap<>(); @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { FlowBean bean = new FlowBean(); bean.set(key.getDownFlow(),key.getUpFlow()); flowMap.put(bean,new Text(value)); if (flowMap.size()>10){ flowMap.remove(flowMap.lastKey()); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<FlowBean> iterator = flowMap.keySet().iterator(); while (iterator.hasNext()){ FlowBean v = iterator.next(); context.write(new Text(flowMap.get(v)),v); } } }

(4)编写TopNDriver类

package com.until.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; import java.io.IOException; public class TopNDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TopNDriver.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }

三、找博客共同好友案例

需求 以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的) 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? (1)数据输入 friends.txt A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 需求分析 先求出A、B、C、….等是谁的好友 第一次输出结果 第二次输出结果代码实现 (1)第一次Mapper类 package com.until.friends; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class OneShareFriendsMapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //A:B,C,D,F,E,O String line = value.toString(); String[] fields = line.split(":"); String person = fields[0]; String[] friends = fields[1].split(","); for (String friend : friends) { context.write(new Text(friend),new Text(person)); } } }

(2)第一次Reducer类

package com.until.friends; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OneShareFriendsReducer extends Reducer<Text, Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text person : values) { sb.append(person).append(","); } context.write(key,new Text()); } }

(3)第一次Driver类

package com.until.friends; import com.until.index.OneIndexDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.regex.Pattern; public class OneShareFriendsDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(OneShareFriendsDriver.class); job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }

(4)第二次Mapper类

package com.until.friends; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Arrays; public class TwoShareFriendsMapper extends Mapper<LongWritable,Text, Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //A I,K,C,B,G,F,H,O,D, //友,人,人 String line = value.toString(); String[] friend_persons = line.split("\t"); String friend = friend_persons[0]; String[] persons = friend_persons[1].split(","); Arrays.sort(persons); for (int i = 0; i < persons.length; i++) { for (int j = i+1; j < persons.length; j++) { // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去 context.write(new Text(persons[i]+"-"+persons[j]),new Text(friend)); } } } }

(5)第二次Reducer类

package com.until.friends; import org.apache.commons.math3.distribution.AbstractMultivariateRealDistribution; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class TwoShareFriendsReducer extends Reducer<Text,Text,Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text friend : values) { sb.append(friend).append(" "); } context.write(key,new Text(sb.toString())); } }

(6)第二次Driver类

package com.until.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.jboss.netty.channel.ChannelHandlerLifeCycleException; import java.io.IOException; public class TwoShareFriendsDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(TwoShareFriendsDriver.class); job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }
最新回复(0)