hadoop-MapReduce案例流量统计

it2025-03-15  20

MapReduce案例-流量统计

需求一: 统计求和

统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入

Step 1: 自定义map的输出value对象FlowBean

package flow_count_demo01; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private Integer upFlow; //上行数据包数 private Integer downFlow; //下行数据包数 private Integer upCountFlow; //上行流量总和 private Integer downCountFlow;//下行流量总和 public Integer getUpFlow() { return upFlow; } public void setUpFlow(Integer upFlow) { this.upFlow = upFlow; } public Integer getDownFlow() { return downFlow; } public void setDownFlow(Integer downFlow) { this.downFlow = downFlow; } public Integer getUpCountFlow() { return upCountFlow; } public void setUpCountFlow(Integer upCountFlow) { this.upCountFlow = upCountFlow; } public Integer getDownCountFlow() { return downCountFlow; } public void setDownCountFlow(Integer downCountFlow) { this.downCountFlow = downCountFlow; } @Override public String toString() { return upFlow +"\t"+ downFlow +"\t"+ upCountFlow +"\t"+ downCountFlow; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.upFlow); dataOutput.writeInt(this.downFlow); dataOutput.writeInt(this.downCountFlow); dataOutput.writeInt(this.upCountFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow=dataInput.readInt(); this.downFlow=dataInput.readInt(); this.upCountFlow=dataInput.readInt(); this.downCountFlow=dataInput.readInt(); } }

Step 2: 定义FlowMapper类

package flow_count_demo01; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.拆分行文本数据,得到手机号 //2.创建FlowBean对象,并从行文本数据拆分出流量的四个字段,并赋值 String[] spplit= value.toString().split("\t"); String phone=spplit[1]; FlowBean flowBean=new FlowBean(); flowBean.setUpFlow(Integer.parseInt(spplit[6])); flowBean.setDownFlow(Integer.parseInt(spplit[7])); flowBean.setUpCountFlow(Integer.parseInt(spplit[8])); flowBean.setDownCountFlow(Integer.parseInt(spplit[9])); //第三步 :将k2和v2 写入上下文中 context.write(new Text(phone),flowBean); } }

Step 3: 定义FlowReducer类

public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //1:遍历集合,并将集合中的对应的四个字段累计 Integer upFlow = 0; //上行数据包数 Integer downFlow = 0; //下行数据包数 Integer upCountFlow = 0; //上行流量总和 Integer downCountFlow = 0;//下行流量总和 for (FlowBean value : values) { upFlow += value.getUpFlow(); downFlow += value.getDownFlow(); upCountFlow += value.getUpCountFlow(); downCountFlow += value.getDownCountFlow(); } //2:创建FlowBean对象,并给对象赋值 V3 FlowBean flowBean = new FlowBean(); flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow); flowBean.setUpCountFlow(upCountFlow); flowBean.setDownCountFlow(downCountFlow); //3:将K3和V3下入上下文中 context.write(key, flowBean); } }

Step 4: 程序main函数入口FlowMain

public class JobMain extends Configured implements Tool { //该方法用于指定一个job任务 @Override public int run(String[] args) throws Exception { //1:创建一个job任务对象 Job job = Job.getInstance(super.getConf(), "mapreduce_flowcount"); //如果打包运行出错,则需要加该配置 job.setJarByClass(JobMain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径 job.setInputFormatClass(TextInputFormat.class); //TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount")); TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\flowcount_input")); //第二步:指定Map阶段的处理方式和数据类型 job.setMapperClass(FlowCountMapper.class); //设置Map阶段K2的类型 job.setMapOutputKeyClass(Text.class); //设置Map阶段V2的类型 job.setMapOutputValueClass(FlowBean.class); //第三(分区),四 (排序) //第五步: 规约(Combiner) //第六步 分组 //第七步:指定Reduce阶段的处理方式和数据类型 job.setReducerClass(FlowCountReducer.class); //设置K3的类型 job.setOutputKeyClass(Text.class); //设置V3的类型 job.setOutputValueClass(FlowBean.class); //第八步: 设置输出类型 job.setOutputFormatClass(TextOutputFormat.class); //设置输出的路径 TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\flowcount_out")); //等待任务结束 boolean bl = job.waitForCompletion(true); return bl ? 0:1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //启动job任务 int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
最新回复(0)