MapReduce总结

it2025-10-17  7

一、MapReduce是什么?

MapReduce是一个分布式计算程序的编程框架,是用户开发基于Hadoop数据分析应用的核心框架。

二、MapReduce有什么用?

1、核心功能

MapReduce核心功能是:将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

2. 优缺点

优点

1)易于编程:简单的实现一些接口,就可以完成一个分布式程序; 2)良好的扩展性:计算资源不足,可以简单的增加机器来扩展它的计算能力 3)高容错性:一台机器挂了,它可以把任务转移到另外一个节点上运行,不至于任务失败,而且这个过程不需要人工参与,是Hadoop内部完成的; 4)适合PB级以上海量数据离线处理:可以实现上千台服务器集群并发工作。

缺点

1)不擅长实时计算:无法在毫秒或者秒级返回结果 2)不擅长流式计算:流式计算的输入数据是动态的,而MapReduce的数据源必须是静态的 3)不擅长DAG(有向无环图)计算: MapReduce的输出结果都会写入到磁盘,会造成大量的磁盘IO,性能底;

三、MapReduce怎么用?

1、自定义一个类继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类,重写map()方法,将源数据处理为业务需求要求的(K,V);

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text text = new Text(); private IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String word : words) { text.set(word); context.write(text, one); } } }

2、自定义一个类继承Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类,重写reducer()方法,将Mapper处理过的(K,V)汇总;

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1. 累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 2. 输出 this.value.set(sum); context.write(key, this.value); } }

3、自定义一个XxxDriver类,相当于Yarn客户端,用于提交整个程序到Yarn集群,提交的是封装了 MapReduce程序相关运行参数的job对象;代码编写步骤: 1)获取配置信息及Job对象 2)关联Driver的jar 3)关联Mapper和Reducer的jar 4)设置Mapper输出和最终输出的kv类型 5)设置文件输入输出的路径 6)提交Job

public class WcDriver { /** * 使用Idea本地模式测试 * * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ /*public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1. 获取配置信息以及Job对象 Job job = Job.getInstance(new Configuration()); // 2. 关联本Driver程序的jar job.setJarByClass(WcDriver.class); // 3. 关联Mapper和Reducer的jar job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); // 4. 设置mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6. 设置输入和输出路径 // 注意:FileInputFormat和FileOutputFormat导包别导错, // 应为:import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7. 提交Job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }*/ /** * 集群模式测试 * * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1. 获取配置信息及Job对象 Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://Hadoop2:9820"); configuration.set("mapreduce.framework.name", "yarn"); configuration.set("mapreduce.app-submission.cross-platform", "true"); configuration.set("yarn.resourcemanager.hostname","Hadoop3"); Job job = Job.getInstance(configuration); // 2. 关联Driver的jar job.setJar("D:\\IdeaProjects\\hadoop\\mapreduce-wc\\target\\mapreduce-1.0-SNAPSHOT.jar"); // 3. 关联Mapper和Reducer的jar job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); // 4. 设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 设置最终输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6. 设置文件输入输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7. 提交Job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

四、Hadoop序列化

1、什么是序列化?

序列化:将内存中的对象转化为字节序列的过程,以便于存储到磁盘和网络传输。 可以将对象序列化到磁盘、内存 、网络、剪贴板、流; 反序列化:将收到的字节序列或者磁盘持久化的数据转换为内存中的原对象的过程。

2、为什么要使用序列化?

为了将对象发送到远程计算机

3、为什么不适用java序列化?

Java序列化框架是一个重量级序列化框架,不便于在网络中高效传输;

4、Hadoop序列化的特点:

1)紧凑:高效使用存储空间 2)快速:读写数据额外开销小 3)可扩展:可随着通信协议的升级而升级 4)互操作:支持多语言交互

5、自定义bean对象实现序列化接口(Writable)

在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。 具体实现bean对象序列化步骤如下7步。 1)必须实现Writable接口 2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() { super(); }

3)重写序列化方法

@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }

4)重写反序列化方法

@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }

5)注意反序列化的顺序和序列化的顺序完全一致 6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。 7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,框架要求实现WritableComparable接口。因为MapReduce框中的Shuffle过程会对key排序。

@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
最新回复(0)