MapReduce案例-流量统计
需求一: 统计求和
统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入
Step 1: 自定义map的输出value对象FlowBean
package flow_count_demo01
;
import org
.apache
.hadoop
.io
.Writable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class FlowBean implements Writable {
private Integer upFlow
;
private Integer downFlow
;
private Integer upCountFlow
;
private Integer downCountFlow
;
public Integer
getUpFlow() {
return upFlow
;
}
public void setUpFlow(Integer upFlow
) {
this.upFlow
= upFlow
;
}
public Integer
getDownFlow() {
return downFlow
;
}
public void setDownFlow(Integer downFlow
) {
this.downFlow
= downFlow
;
}
public Integer
getUpCountFlow() {
return upCountFlow
;
}
public void setUpCountFlow(Integer upCountFlow
) {
this.upCountFlow
= upCountFlow
;
}
public Integer
getDownCountFlow() {
return downCountFlow
;
}
public void setDownCountFlow(Integer downCountFlow
) {
this.downCountFlow
= downCountFlow
;
}
@Override
public String
toString() {
return
upFlow
+"\t"+
downFlow
+"\t"+
upCountFlow
+"\t"+
downCountFlow
;
}
@Override
public void write(DataOutput dataOutput
) throws IOException
{
dataOutput
.writeInt(this.upFlow
);
dataOutput
.writeInt(this.downFlow
);
dataOutput
.writeInt(this.downCountFlow
);
dataOutput
.writeInt(this.upCountFlow
);
}
@Override
public void readFields(DataInput dataInput
) throws IOException
{
this.upFlow
=dataInput
.readInt();
this.downFlow
=dataInput
.readInt();
this.upCountFlow
=dataInput
.readInt();
this.downCountFlow
=dataInput
.readInt();
}
}
Step 2: 定义FlowMapper类
package flow_count_demo01
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class FlowCountMapper extends Mapper
<LongWritable
, Text
,Text
,FlowBean
> {
@Override
protected
void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] spplit
= value
.toString().split("\t");
String phone
=spplit
[1];
FlowBean flowBean
=new
FlowBean();
flowBean
.setUpFlow(Integer
.parseInt(spplit
[6]));
flowBean
.setDownFlow(Integer
.parseInt(spplit
[7]));
flowBean
.setUpCountFlow(Integer
.parseInt(spplit
[8]));
flowBean
.setDownCountFlow(Integer
.parseInt(spplit
[9]));
context
.write(new
Text(phone
),flowBean
);
}
}
Step 3: 定义FlowReducer类
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key
, Iterable
<FlowBean> values
, Context context
) throws IOException
, InterruptedException
{
Integer upFlow
= 0;
Integer downFlow
= 0;
Integer upCountFlow
= 0;
Integer downCountFlow
= 0;
for (FlowBean value
: values
) {
upFlow
+= value
.getUpFlow();
downFlow
+= value
.getDownFlow();
upCountFlow
+= value
.getUpCountFlow();
downCountFlow
+= value
.getDownCountFlow();
}
FlowBean flowBean
= new FlowBean();
flowBean
.setUpFlow(upFlow
);
flowBean
.setDownFlow(downFlow
);
flowBean
.setUpCountFlow(upCountFlow
);
flowBean
.setDownCountFlow(downCountFlow
);
context
.write(key
, flowBean
);
}
}
Step 4: 程序main函数入口FlowMain
public class JobMain extends Configured implements Tool
{
@Override
public
int run(String
[] args
) throws Exception
{
Job job
= Job
.getInstance(super
.getConf(), "mapreduce_flowcount");
job
.setJarByClass(JobMain
.class
);
job
.setInputFormatClass(TextInputFormat
.class
);
TextInputFormat
.addInputPath(job
, new
Path("file:///D:\\input\\flowcount_input"));
job
.setMapperClass(FlowCountMapper
.class
);
job
.setMapOutputKeyClass(Text
.class
);
job
.setMapOutputValueClass(FlowBean
.class
);
job
.setReducerClass(FlowCountReducer
.class
);
job
.setOutputKeyClass(Text
.class
);
job
.setOutputValueClass(FlowBean
.class
);
job
.setOutputFormatClass(TextOutputFormat
.class
);
TextOutputFormat
.setOutputPath(job
, new
Path("file:///D:\\out\\flowcount_out"));
boolean bl
= job
.waitForCompletion(true
);
return bl
? 0:1;
}
public
static void main(String
[] args
) throws Exception
{
Configuration configuration
= new
Configuration();
int run
= ToolRunner
.run(configuration
, new
JobMain(), args
);
System
.exit(run
);
}
}