MapReduce之InputFormat

it2023-10-21  68

InputFormat有两项任务:切片(getSplits)和生成键值对(createRecorderReader)


默认的切片规则

1)切片大小 = 块大小

2)若有多个文件,切片不考虑整体,只考虑单个文件

3)切成几个片,就有几个MapTask并行执行


InputFormat有以下实现类,它们的切片和生成键值对规则各不相同。


TextInputFormat(默认InputFormat)

1)切片规则

使用默认规则

2)键值对生成规则

<偏移量(LongWritable),每行的内容(Text)>


KeyValueTextInputFormat

1)切片规则

使用默认规则

2)键值对规则

<每行分隔符左边(Text),每行分隔符右边(Text)>

注:默认分隔符为\t,若有多个分隔符以第一个为基准

3)在Driver类中修改使用的InputFormat

//把InputFormat设为KeyValueTextInputFormat,以空格为分隔符 Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); Job job = Job.getInstance(conf); job.setInputFormatClass(KeyValueTextInputFormat.class);

NLineInputFormat

1)切片规则

每n行形成一个切片

2)键值对生成规则

和TextInputFormat一样

3)在Driver类中修改使用的InputFormat

//把InputFormat设为NLineInputFormat,每3行形成一个切片 NLineInputFormat.setNumLinesPerSplit(job, 3); job.setInputFormatClass(NLineInputFormat.class);

ConbineTextInputFormat(适用于小文件)

1)切片规则

设setMaxInputSplitSize值为4M

文件大小虚拟存储切片大小1.txt2M2M(2+2.5)M2.txt5M2.5M+2.5M(2.5+3)M3.txt3M3M(3+3)M4.txt6M3M+3M 

2)键值对生成规则

与TextInputFormat一样

3)在Driver类中修改使用的InputFormat

//把InputFormat设为CombineTextInputFormat,虚拟存储最大值为4MB CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); job.setInputFormatClass(CombineTextInputFormat.class);

SequenceFileOutputFormat(使用小文件)

1)切片规则

使用默认规则

2)键值对规则

<文件路径(Text),文件内容(BytesWritable)>

3)在Driver类中修改使用的InputFormat

job.setInputFormatClass(SequenceFileOutputFormat.class);

自定义InputFormat

1)MyInputFormat类

//生成键值对类型为<T1, T2> public class MyInputFormat extends FileInputFormat<T1, T2> { public RecordReader<T1, T2> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //返回一个RecordReader的实现类对象 return new MyRecordReader(); } }

2)MyRecordReader类

public class MyRecordReader extends RecordReader<T1, T2> { private T1 k = new T1(); //key private T2 v = new T2(); //value private FSDataInputStream fsDataInputStream = null; //HDFS的输入流 private FileSplit fileSplit = null; //输入文件 //初始化,用于开输入流 public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration()); //获取当前HDFS fileSplit = (FileSplit) inputSplit; //获取当前的输入文件 fsDataInputStream = fileSystem.open(fileSplit.getPath()); //打开HDFS的输入流 } //自定义键值对生成规则,类似与java迭代器的next方法 public boolean nextKeyValue() throws IOException, InterruptedException { } //获取当前key public T1 getCurrentKey() throws IOException, InterruptedException { return k; } //获取当前value public T2 getCurrentValue() throws IOException, InterruptedException { return v; } //显示进度 public float getProgress() throws IOException, InterruptedException { } //关流 public void close() throws IOException { IOUtils.closeStream(fsDataInputStream); } }

3)修改对应Mapper输入类型和Driver使用的InputFormat流

最新回复(0)