概述
MR支持的压缩编码
压缩方式选择
Gzip压缩 Bzip2压缩
Lzo压缩 Snappy压缩
压缩位置选择
压缩可以在MapReduce作用的任意阶段启用
压缩参数配置
参数默认值阶段建议
io.compression.codecs(在core-site.xml中配置)Hadoop使用文件扩展名判断是否支持某种编解码器mapreduce.map.output.compress(在mapred-site.xml中配置)falsemapper输出这个参数设为true启用压缩mapreduce.map.output.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress.DefaultCodecmapper输出企业多使用LZO或Snappy编解码器在此阶段压缩数据mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)falsereducer输出这个参数设为true启用压缩mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress. DefaultCodecreducer输出使用标准工具或者编解码器,如gzip和bzip2mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)RECORDreducer输出SequenceFile输出使用的压缩类型:NONE和BLOCK
压缩实操案例
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text outK
= new Text();
IntWritable outV
= new IntWritable(1);
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String line
= value
.toString();
String
[] words
= line
.split(" ");
for (String word
: words
) {
outK
.set(word
);
context
.write(outK
,outV
);
}
}
}
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable outV
= new IntWritable();
@Override
protected void reduce(Text key
, Iterable
<IntWritable> values
, Context context
) throws IOException
, InterruptedException
{
int sum
= 0 ;
for (IntWritable value
: values
) {
sum
+=value
.get();
}
outV
.set(sum
);
context
.write(key
,outV
);
}
}
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IOUtils
;
import org
.apache
.hadoop
.io
.compress
.CompressionCodec
;
import org
.apache
.hadoop
.io
.compress
.CompressionCodecFactory
;
import org
.apache
.hadoop
.io
.compress
.CompressionInputStream
;
import org
.apache
.hadoop
.io
.compress
.CompressionOutputStream
;
import org
.apache
.hadoop
.util
.ReflectionUtils
;
import org
.junit
.Test
;
import java
.io
.*
;
public class TestCompress {
@Test
public void testCompress() throws IOException
, ClassNotFoundException
{
String srcFile
= "D:\\input\\inputWord\\JaneEyre.txt";
String destFile
="D:\\input\\inputWord\\JaneEyre.txt" ;
FileInputStream fis
= new FileInputStream(new File(srcFile
));
String compressClassName
="org.apache.hadoop.io.compress.DefaultCodec";
Class
<?> compressClass
= Class
.forName(compressClassName
);
Configuration conf
= new Configuration();
CompressionCodec codec
=
(CompressionCodec
) ReflectionUtils
.newInstance(compressClass
, conf
);
FileOutputStream fos
= new FileOutputStream(new File(destFile
+codec
.getDefaultExtension()));
CompressionOutputStream codecOut
= codec
.createOutputStream(fos
);
IOUtils
.copyBytes(fis
,codecOut
,conf
);
IOUtils
.closeStream(fis
);
IOUtils
.closeStream(codecOut
);
}
@Test
public void testDeCompress() throws IOException
{
String srcFile
= "D:\\input\\inputWord\\JaneEyre.txt.deflate";
String destFile
= "D:\\input\\inputWord\\JaneEyre.txt";
FileInputStream fis
= new FileInputStream(new File(srcFile
));
Configuration conf
= new Configuration();
CompressionCodec codec
=
new CompressionCodecFactory(conf
).getCodec(new Path(srcFile
));
CompressionInputStream codecIn
= codec
.createInputStream(fis
);
FileOutputStream fos
= new FileOutputStream(new File(destFile
));
IOUtils
.copyBytes(codecIn
,fos
,conf
);
IOUtils
.closeStream(codecIn
);
IOUtils
.closeStream(fos
);
}
}
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
public class WordCountDriver {
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Configuration conf
= new Configuration();
conf
.set("mapreduce.map.output.compress","true");
conf
.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.DefaultCodec");
conf
.set("mapreduce.output.fileoutputformat.compress","true");
conf
.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.DefaultCodec");
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(WordCountDriver
.class);
job
.setMapperClass(WordCountMapper
.class);
job
.setReducerClass(WordCountReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(IntWritable
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(IntWritable
.class);
FileInputFormat
.setInputPaths(job
,new Path("D:\\input\\inputWord"));
FileOutputFormat
.setOutputPath(job
,new Path("D:\\output4"));
job
.waitForCompletion(true);
}
}
转载请注明原文地址: https://lol.8miu.com/read-21800.html