InputFormat有两项任务:切片(getSplits)和生成键值对(createRecorderReader)
1)切片大小 = 块大小
2)若有多个文件,切片不考虑整体,只考虑单个文件
3)切成几个片,就有几个MapTask并行执行
InputFormat有以下实现类,它们的切片和生成键值对规则各不相同。
1)切片规则
使用默认规则
2)键值对生成规则
<偏移量(LongWritable),每行的内容(Text)>
1)切片规则
使用默认规则
2)键值对规则
<每行分隔符左边(Text),每行分隔符右边(Text)>
注:默认分隔符为\t,若有多个分隔符以第一个为基准
3)在Driver类中修改使用的InputFormat
//把InputFormat设为KeyValueTextInputFormat,以空格为分隔符 Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); Job job = Job.getInstance(conf); job.setInputFormatClass(KeyValueTextInputFormat.class);1)切片规则
每n行形成一个切片
2)键值对生成规则
和TextInputFormat一样
3)在Driver类中修改使用的InputFormat
//把InputFormat设为NLineInputFormat,每3行形成一个切片 NLineInputFormat.setNumLinesPerSplit(job, 3); job.setInputFormatClass(NLineInputFormat.class);1)切片规则
设setMaxInputSplitSize值为4M
文件大小虚拟存储切片大小1.txt2M2M(2+2.5)M2.txt5M2.5M+2.5M(2.5+3)M3.txt3M3M(3+3)M4.txt6M3M+3M2)键值对生成规则
与TextInputFormat一样
3)在Driver类中修改使用的InputFormat
//把InputFormat设为CombineTextInputFormat,虚拟存储最大值为4MB CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); job.setInputFormatClass(CombineTextInputFormat.class);1)切片规则
使用默认规则
2)键值对规则
<文件路径(Text),文件内容(BytesWritable)>
3)在Driver类中修改使用的InputFormat
job.setInputFormatClass(SequenceFileOutputFormat.class);1)MyInputFormat类
//生成键值对类型为<T1, T2> public class MyInputFormat extends FileInputFormat<T1, T2> { public RecordReader<T1, T2> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //返回一个RecordReader的实现类对象 return new MyRecordReader(); } }2)MyRecordReader类
public class MyRecordReader extends RecordReader<T1, T2> { private T1 k = new T1(); //key private T2 v = new T2(); //value private FSDataInputStream fsDataInputStream = null; //HDFS的输入流 private FileSplit fileSplit = null; //输入文件 //初始化,用于开输入流 public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration()); //获取当前HDFS fileSplit = (FileSplit) inputSplit; //获取当前的输入文件 fsDataInputStream = fileSystem.open(fileSplit.getPath()); //打开HDFS的输入流 } //自定义键值对生成规则,类似与java迭代器的next方法 public boolean nextKeyValue() throws IOException, InterruptedException { } //获取当前key public T1 getCurrentKey() throws IOException, InterruptedException { return k; } //获取当前value public T2 getCurrentValue() throws IOException, InterruptedException { return v; } //显示进度 public float getProgress() throws IOException, InterruptedException { } //关流 public void close() throws IOException { IOUtils.closeStream(fsDataInputStream); } }3)修改对应Mapper输入类型和Driver使用的InputFormat流