Hadoop离线

it2025-04-02  7

文章目录

1 MapReduce的分区与reduceTask的数量1.1 概念1.2 代码实现 2 排序以及序列化2.1 概述2.2 代码实现 3 计数器4 Combiner4.1 概念4.2 MapReduce综合练习之上网流量统计

1 MapReduce的分区与reduceTask的数量

1.1 概念

MapReduce当中的分区:物以类聚,人以群分。相同key的数据,去往同一个reduce。 ReduceTask的数量默认为一个,可以自己设定数量  job.setNumRudeceTasks(3) 分区决定了我们的数据该去往哪一个ReduceTask里面去

1.2 代码实现

程序main函数入口 package cn.itcast.mr.demo1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class PartitionMain extends Configured implements Tool { public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new PartitionMain(), args); System.exit(run); } @Override public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取我们的job对象,封装我们的job任务 Job job = Job.getInstance(super.getConf(), "myPartition"); //打成jar包运行时必备 job.setJarByClass(PartitionMain.class); //第一步:读取文件,解析成k1,v1 job.setInputFormatClass(TextInputFormat.class); //设置输入类型 TextInputFormat.setInputPaths(job, new Path("hdfs://node01:8020/partitionin")); //第二步:自定义map逻辑,接收k1,v1,转换成k2,v2 job.setMapperClass(PartitionMapper.class); //设置k2,v2类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //第三步:分区 相同key的数据发送到同一个reduce中去 job.setPartitionerClass(PartitionOwn.class); //第四步到第六步省略 //第七步:自定义reduce逻辑 job.setReducerClass(PartitionReducer.class); //设置k3,v3类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置reduceTask的数量 //如果reduceTask的数量比分区多,就会有空文件 //如果reduceTask的数量比分区少,就会有一个reduce处理更多的数据 job.setNumReduceTasks(2); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out")); //提交任务 boolean b = job.waitForCompletion(true); return b ? 0 : 1; } } 自定义Partitioner package cn.itcast.mr.demo1; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PartitionOwn extends Partitioner<Text, NullWritable> { /** * 这个方法决定了数据去往哪一个reduce * * @param text k2 * @param nullWritable v2 * @param numReduceTask * @return */ @Override public int getPartition(Text text, NullWritable nullWritable, int numReduceTask) { //以"\t"切分k2的数据 String result = text.toString().split("\t")[5]; System.out.println(result); //判断结果值大于15去往一个分区,小于15去往一个分区 if (Integer.parseInt(result) > 15) { return 1; } else { return 0; } } } 定义一个map类 package cn.itcast.mr.demo1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //输出k2,v2 k2是一行文本数据,v2为NullWritable context.write(value, NullWritable.get()); } } 定义一个reduce类 package cn.itcast.mr.demo1; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }

2 排序以及序列化

2.1 概述

序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程,把字节流转为结构化对象。 Mapreduce是按照字典顺序对k2的值进行排序。Hadoop没有沿用java的serialize方式实现序列化,可以用自己的writable接口实现序列化。 实现Writable可以进行序列化,实现Comparable可以进行排序,如果想既实现序列话,又进行排序,可以同时实现Writable和Comparable,或者实现WritableComparable 如果以一行文本内容作为k2,不能够实现二次排序的功能,这时可以这两个字段封装成一个JavaBean当做的k2

2.2 代码实现

程序main函数入口 package cn.itcast.mr.demo2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SortMain extends Configured implements Tool { public static void main(String[] args) throws Exception { //创建Configuration对象 Configuration configuration = new Configuration(); //执行ToolRunner得到一个int类型的返回状态码 int run = ToolRunner.run(configuration, new SortMain(), args); //程序退出 System.exit(run); } @Override public int run(String[] args) throws Exception { //创建job对象 Job job = Job.getInstance(super.getConf(), "sort"); //输入数据,设置输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input")); //自定义map逻辑 job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(K2Bean.class); job.setMapOutputValueClass(NullWritable.class); //分区、排序、规约、分组省略 //自定义reduce逻辑 job.setReducerClass(SortReducer.class); job.setOutputKeyClass(K2Bean.class); job.setOutputValueClass(NullWritable.class); //输出数据,设置输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input")); //提交任务 boolean b = job.waitForCompletion(true); return b ? 0 : 1; } } 自定义JavaBean并重写CompareTo package cn.itcast.mr.demo2; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class K2Bean implements WritableComparable<K2Bean> { //将数据中字母封装到第一个,数字封装到第二个 private String first; private int second; /** * compareTo方法,用于数据的比较排序 * * @param o * @return */ @Override public int compareTo(K2Bean o) { //首先比较第一个字段 int i = this.first.compareTo(o.first); //如果第一个字段相同,就比较第二个字段 if (i == 0) { int i1 = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second)); return i1; //如果改为-i1,则为按第二个字段的字典顺序的倒序排序 } else { //如果第一个字段不同,直接返回结果 return i; //如果改为-i,则为按第一个字段的字典顺序的倒序排序 } } /** * Writable 序列化的方法 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second); } /** * 反序列化的方法 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readInt(); } //toSting方法 @Override public String toString() { return first + '\t' + second; } //get(),set()方法 public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } } 定义一个map类 package cn.itcast.mr.demo2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将接收到的数据切割为字符串集合 String[] split = value.toString().split("\t"); //将字符串转换为k2Bean K2Bean k2Bean = new K2Bean(); k2Bean.setFirst(split[0]); k2Bean.setSecond(Integer.parseInt(split[1])); //输出k2,v2 context.write(k2Bean, NullWritable.get()); } } 定义一个reduce类 package cn.itcast.mr.demo2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> { @Override protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //循环遍历会让重复的Key2都打印出来 for (NullWritable value : values) { context.write(key, NullWritable.get()); } } }

3 计数器

通过context上下文对象获取计数器 package cn.itcast.mr.demo2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper<LongWritable, Text, K2Bean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //自定义计数器,这里实现了统计map数据的条数 Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter"); counter.increment(1L); //将接收到的数据切割为字符串集合 String[] split = value.toString().split("\t"); //将字符串转换为k2Bean K2Bean k2Bean = new K2Bean(); k2Bean.setFirst(split[0]); k2Bean.setSecond(Integer.parseInt(split[1])); //输出k2,v2 context.write(k2Bean, NullWritable.get()); } } 通过enum枚举类型定义计数器 package cn.itcast.mr.demo2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReducer extends Reducer<K2Bean, NullWritable, K2Bean, NullWritable> { public static enum Counter{ REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS, } @Override protected void reduce(K2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //统计输入的key有多少 context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L); //循环遍历会让重复的Key2都打印出来 for (NullWritable value : values) { //统计输出的value有多少 context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L); context.write(key, NullWritable.get()); } } }

4 Combiner

4.1 概念

combiner可以先对相同k2进行合并,减少发送到reduce阶段的k2的数量,这么做的好处是可以节约网络带宽。 combiner其实就是一个reducer类,但是这个reducer类的输入和输出比较特殊,输入是k2,v2,输出还是k2,v2。 combiner不能改变数据结果值,只是用于调用减少发送到reduce端的数据量。

注意:求平均值不能用combiner。

4.2 MapReduce综合练习之上网流量统计

程序main函数入口 package cn.itcast.mr.demo3; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FlowNumMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //获取job对象 Job job = Job.getInstance(super.getConf(), "flowSum"); //输入数据,设置输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input")); //自定义map逻辑 job.setMapperClass(FlowNumMapper.class); //设置k2,v2类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowNum.class); //自定义reduce逻辑 job.setReducerClass(FlowNumReducer.class); //设置k3,v3类型 job.setOutputValueClass(FlowNum.class); job.setOutputKeyClass(Text.class); //输出数据,设置输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///G:\BigData\6.大数据hadoop离线\第四天\4、大数据离线第四天\排序\input")); //上传任务到集群 boolean b = job.waitForCompletion(true); return b ? 0 : 1; } public static void main(String[] args) throws Exception { //获取Configuration对象 Configuration configuration = new Configuration(); //使用ToolRunner返回一个状态码 int run = ToolRunner.run(configuration, new FlowNumMain(), args); //系统推出 System.exit(run); } } 自定义JavaBean(FlowNum) package cn.itcast.mr.demo3; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowNum implements Writable { //定义上行、下行、上行总、下行总流量 private Integer upload; private Integer download; private Integer uploadSum; private Integer downloadSum; /** * 序列化 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeInt(upload); out.writeInt(download); out.writeInt(uploadSum); out.writeInt(downloadSum); } /** * 反序列化 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.upload = in.readInt(); this.download = in.readInt(); this.uploadSum = in.readInt(); this.downloadSum = in.readInt(); } public Integer getUpload() { return upload; } public void setUpload(Integer upload) { this.upload = upload; } public Integer getDownload() { return download; } public void setDownload(Integer download) { this.download = download; } public Integer getUploadSum() { return uploadSum; } public void setUploadSum(Integer uploadSum) { this.uploadSum = uploadSum; } public Integer getDownloadSum() { return downloadSum; } public void setDownloadSum(Integer downloadSum) { this.downloadSum = downloadSum; } @Override public String toString() { return upload + "\t" + download + "\t" + uploadSum + "\t" + downloadSum; } } 定义一个map类 package cn.itcast.mr.demo3; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowNumMapper extends Mapper<LongWritable, Text, Text, FlowNum> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //以"\t"切分Text数据 String[] split = value.toString().split("\t"); //手机号 String phoneNum = split[1]; //index 6、7、8、9分别对应上行流量、下行流量、上行总流量、下行总流量 Integer upload = Integer.parseInt(split[6]); Integer download = Integer.parseInt(split[7]); Integer uploadSum = Integer.parseInt(split[8]); Integer downloadSum = Integer.parseInt(split[9]); FlowNum flowNum = new FlowNum(); flowNum.setUpload(upload); flowNum.setDownload(download); flowNum.setUploadSum(uploadSum); flowNum.setDownloadSum(downloadSum); //输出k2,v2 context.write(new Text(phoneNum), flowNum); } } 定义reducer类 package cn.itcast.mr.demo3; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowNumReducer extends Reducer<Text, FlowNum, Text, FlowNum> { @Override protected void reduce(Text key, Iterable<FlowNum> values, Context context) throws IOException, InterruptedException { //定义上行、下行、上行总、下行总流量,并初始化 int upload = 0; int download = 0; int uploadSum = 0; int downloadSum = 0; //遍历累加流量 for (FlowNum value : values) { upload += value.getUpload(); download += value.getDownload(); uploadSum += value.getUploadSum(); downloadSum += value.getDownloadSum(); } FlowNum flowNum = new FlowNum(); flowNum.setUpload(upload); flowNum.setDownload(download); flowNum.setUploadSum(uploadSum); flowNum.setDownloadSum(downloadSum); //输出k3,v3 context.write(key, flowNum); } }
最新回复(0)