Hadoop数据压缩

it2024-12-28  8

概述

MR支持的压缩编码

压缩方式选择

Gzip压缩 Bzip2压缩

Lzo压缩 Snappy压缩

压缩位置选择

压缩可以在MapReduce作用的任意阶段启用

压缩参数配置

参数默认值阶段建议io.compression.codecs(在core-site.xml中配置)Hadoop使用文件扩展名判断是否支持某种编解码器mapreduce.map.output.compress(在mapred-site.xml中配置)falsemapper输出这个参数设为true启用压缩mapreduce.map.output.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress.DefaultCodecmapper输出企业多使用LZO或Snappy编解码器在此阶段压缩数据mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)falsereducer输出这个参数设为true启用压缩mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress. DefaultCodecreducer输出使用标准工具或者编解码器,如gzip和bzip2mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)RECORDreducer输出SequenceFile输出使用的压缩类型:NONE和BLOCK

压缩实操案例

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 1.自定义的Mapper类需要继承Hadoop提供的Mapper类. * 2.指定4个泛型(2组kv) * 输入的kv: * key: 偏移量(可以理解为文件中内容读取的位置) LongWritable * value:一行内容 Text * * 输出的kv: * key: 一个单词 Text * value:单词出现1次 IntWritable * * 3. 重写map方法. * */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //输出的key Text outK = new Text(); //输出的value IntWritable outV = new IntWritable(1); /** * * @param key 输入的key * @param value 输入的value * @param context Mapper类的上下文对象.负责mapper类的执行. * * 每个输入的kv都会执行一次map方法. */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行数据 // atguigu atguigu String line = value.toString(); // 切分数据 // [atguigu,atguigu] String[] words = line.split(" "); // 将每个单词拼成kv for (String word : words) { //封装key outK.set(word); //写出 context.write(outK,outV); } } } import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 1.自定义的Reducer类需要继承Hadoop提供的Reducer类 * * 2.指定4个泛型(2组kv) * 输入的kv: 对应mapper中输出的kv * key: 单词 * value: 单词出现了1次 * * 输出的kv: * key: 单词 Text * value: 每个单词的总次数 IntWritable * * 3.重写reduce方法. */ public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { //输出的value IntWritable outV = new IntWritable(); /** * * @param key 某个单词 * @param values 理解为封装了某个key的所有value * @param context Reducer类的上下文对象.负责reducer类的执行. * * 一组相同key的kv组会调用一次reduce方法. */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum+=value.get(); } //封装kv outV.set(sum); //写出 context.write(key,outV); } } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import java.io.*; public class TestCompress { /** * 压缩: 使用支持压缩的输出流将数据写入到文件中 * @throws FileNotFoundException */ @Test public void testCompress() throws IOException, ClassNotFoundException { //待压缩的文件: String srcFile = "D:\\input\\inputWord\\JaneEyre.txt"; //压缩后的文件 String destFile ="D:\\input\\inputWord\\JaneEyre.txt" ; //输入流 FileInputStream fis = new FileInputStream(new File(srcFile)); //输出流 //使用的压缩格式 String compressClassName ="org.apache.hadoop.io.compress.DefaultCodec"; Class<?> compressClass = Class.forName(compressClassName); //编解码器对象 Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(compressClass, conf); FileOutputStream fos = new FileOutputStream(new File(destFile+codec.getDefaultExtension())); CompressionOutputStream codecOut = codec.createOutputStream(fos); //读写数据 IOUtils.copyBytes(fis,codecOut,conf); //关闭流 IOUtils.closeStream(fis); IOUtils.closeStream(codecOut); } /** * 解压缩: 使用支持解压缩的流从压缩文件中读取数据 */ @Test public void testDeCompress() throws IOException { // 待解压的文件 String srcFile = "D:\\input\\inputWord\\JaneEyre.txt.deflate"; //解压后的文件 String destFile = "D:\\input\\inputWord\\JaneEyre.txt"; //输入流 FileInputStream fis = new FileInputStream(new File(srcFile)); //编解码器对象 Configuration conf = new Configuration(); CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(new Path(srcFile)); CompressionInputStream codecIn = codec.createInputStream(fis); //输出流 FileOutputStream fos = new FileOutputStream(new File(destFile)); //读写数据 IOUtils.copyBytes(codecIn,fos ,conf); //关闭流 IOUtils.closeStream(codecIn); IOUtils.closeStream(fos); } } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1. 创建一个Job对象 Configuration conf = new Configuration(); //开启map输出的压缩 conf.set("mapreduce.map.output.compress","true"); conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.DefaultCodec"); //开启reduce输出的压缩 conf.set("mapreduce.output.fileoutputformat.compress","true"); conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.DefaultCodec"); Job job = Job.getInstance(conf); //2. 关联jar job.setJarByClass(WordCountDriver.class); //3. 关联mapper 和reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4. 设置mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5. 设置最终输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6. 设置输入和输出路径(本地) FileInputFormat.setInputPaths(job,new Path("D:\\input\\inputWord")); FileOutputFormat.setOutputPath(job,new Path("D:\\output4")); //集群 //FileInputFormat.setInputPaths(job,new Path(args[0])); //FileOutputFormat.setOutputPath(job,new Path(args[1])); //7. 提交job job.waitForCompletion(true); } }
最新回复(0)