Flink源码阅读之FileSystem Connector

it2023-12-16  92

代码在flink-table-runtime-blink模块,用户指南参考官网.

目前是旧的实现方式,将会按FLIP-95重新实现FLINK-19336

入口类FileSystemTableFactory,如何做Factory discover的可以参考之前的博文,这里就不赘述了。

Sink

构造FileSystemTableSink对象,传入相关属性参数

public TableSink<RowData> createTableSink(TableSinkFactory.Context context) { Configuration conf = new Configuration(); context.getTable().getOptions().forEach(conf::setString); return new FileSystemTableSink( context.getObjectIdentifier(),//connector标识符 context.isBounded(),//是否有界流 context.getTable().getSchema(),//表的schema getPath(conf),//file 路径 context.getTable().getPartitionKeys(),//分区key conf.get(PARTITION_DEFAULT_NAME),//默认分区名称 context.getTable().getOptions());//参数 }

FileSystemTableSink会根据DataStream构造DataStreamSink

consumeDataStream主要做几个事情:

构造RowDataPartitionComputer,将分区字段和非分区字段index和type分开。EmptyMetaStoreFactory空的metastore实现。UUID生成文件前缀构造FileSystemFactory的实现根据是否有界流走不同分支处理 public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) { RowDataPartitionComputer computer = new RowDataPartitionComputer( defaultPartName, schema.getFieldNames(), schema.getFieldDataTypes(), partitionKeys.toArray(new String[0])); EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path); OutputFileConfig outputFileConfig = OutputFileConfig.builder() .withPartPrefix("part-" + UUID.randomUUID().toString()) .build(); FileSystemFactory fsFactory = FileSystem::get; if (isBounded) { FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>(); builder.setPartitionComputer(computer); builder.setDynamicGrouped(dynamicGrouping); builder.setPartitionColumns(partitionKeys.toArray(new String[0])); builder.setFormatFactory(createOutputFormatFactory()); builder.setMetaStoreFactory(metaStoreFactory); builder.setFileSystemFactory(fsFactory); builder.setOverwrite(overwrite); builder.setStaticPartitions(staticPartitions); builder.setTempPath(toStagingPath()); builder.setOutputFileConfig(outputFileConfig); return dataStream.writeUsingOutputFormat(builder.build()) .setParallelism(dataStream.getParallelism()); } else { Configuration conf = new Configuration(); properties.forEach(conf::setString); Object writer = createWriter(); TableBucketAssigner assigner = new TableBucketAssigner(computer); TableRollingPolicy rollingPolicy = new TableRollingPolicy( !(writer instanceof Encoder), conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder; if (writer instanceof Encoder) { //noinspection unchecked bucketsBuilder = StreamingFileSink.forRowFormat( path, new ProjectionEncoder((Encoder<RowData>) writer, computer)) .withBucketAssigner(assigner) .withOutputFileConfig(outputFileConfig) .withRollingPolicy(rollingPolicy); } else { //noinspection unchecked bucketsBuilder = StreamingFileSink.forBulkFormat( path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer)) .withBucketAssigner(assigner) .withOutputFileConfig(outputFileConfig) .withRollingPolicy(rollingPolicy); } return createStreamingSink( conf, path, partitionKeys, tableIdentifier, overwrite, dataStream, bucketsBuilder, metaStoreFactory, fsFactory, conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis()); } }

一般流式任务都是无界流,所以走else分支:

根据format类型创建Writer对象,比如parquet,是从BulkWriter创建来的用TableBucketAssigner包装RowDataPartitionComputer构造TableRollingPolicy,用于文件的生成策略,BulkWriter是根据checkpoint的执行来生成文件构造BucketsBuilder对象

createStreamingSink

BucketsBuilder包装成StreamingFileWriter,这是个operator,继承了AbstractStreamOperator在inputStream后增加了一个operator,主要处理逻辑在这个operator里面如果配置了sink.partition-commit.policy.kind,则会进行commit处理,比如维护partition到metastore或者生成_success文件,同样也是增加了一个operator最后通过一个DiscardingSink function将数据丢弃,因为数据在上面operator已经处理过了 public static DataStreamSink<RowData> createStreamingSink( Configuration conf, Path path, List<String> partitionKeys, ObjectIdentifier tableIdentifier, boolean overwrite, DataStream<RowData> inputStream, BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder, TableMetaStoreFactory msFactory, FileSystemFactory fsFactory, long rollingCheckInterval) { if (overwrite) { throw new IllegalStateException("Streaming mode not support overwrite."); } StreamingFileWriter fileWriter = new StreamingFileWriter( rollingCheckInterval, bucketsBuilder); DataStream<CommitMessage> writerStream = inputStream.transform( StreamingFileWriter.class.getSimpleName(), TypeExtractor.createTypeInfo(CommitMessage.class), fileWriter).setParallelism(inputStream.getParallelism()); DataStream<?> returnStream = writerStream; // save committer when we don't need it. if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) { StreamingFileCommitter committer = new StreamingFileCommitter( path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf); returnStream = writerStream .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer) .setParallelism(1) .setMaxParallelism(1); } //noinspection unchecked return returnStream.addSink(new DiscardingSink()).setParallelism(1); }

PS:这里有个java8的函数式接口的写法,第一次接触的同学可能会有点蒙,如果接口只有一个抽象方法,那么接口就是函数式接口,实现方式可以有很多种,最常见的就是使用匿名内部类,还有就是使用lambda或构造器引用来实现。如下,

FileSystemFactory fsFactory = FileSystem::get; //等同于 匿名类 FileSystemFactory fileSystemFactory = new FileSystemFactory() { public FileSystem create(URI fsUri) throws IOException { return FileSystem.get(fsUri); } }; // 等同于 lambda FileSystemFactory fileSystemFactory = uri -> FileSystem.get(uri);

数据写入filesystem

数据处理在StreamingFileWriter#processElement

public void processElement(StreamRecord<RowData> element) throws Exception { helper.onElement( element.getValue(), getProcessingTimeService().getCurrentProcessingTime(), element.hasTimestamp() ? element.getTimestamp() : null, currentWatermark); }

在此之前会在initializeState中通过BucketsBuilder创建Buckets,并封装到StreamingFileSinkHelper中

@Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()); // Set listener before the initialization of Buckets. inactivePartitions = new HashSet<>(); buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() { @Override public void bucketCreated(Bucket<RowData, String> bucket) { } @Override public void bucketInactive(Bucket<RowData, String> bucket) { inactivePartitions.add(bucket.getBucketId()); } }); helper = new StreamingFileSinkHelper<>( buckets, context.isRestored(), context.getOperatorStateStore(), getRuntimeContext().getProcessingTimeService(), bucketCheckInterval); currentWatermark = Long.MIN_VALUE; }

回到processElement,跟进代码你会发现最终数据会由Bucket的write写入文件

void write(IN element, long currentTime) throws IOException { //判断是否有inprogress的文件,如果没有则新起一个 if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", subtaskIndex, bucketId, element); } inProgressPart = rollPartFile(currentTime); } inProgressPart.write(element, currentTime); }

最终通过调用第三方包中write的方式写入文件系统,如 hadoop、hive、parquet、orc等

checkpoint

做cp的是snapshotState方法,主要逻辑在Buckets类中

public void snapshotState( final long checkpointId, final ListState<byte[]> bucketStatesContainer, final ListState<Long> partCounterStateContainer) throws Exception { Preconditions.checkState( bucketWriter != null && bucketStateSerializer != null, "sink has not been initialized"); LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).", subtaskIndex, checkpointId, maxPartCounter); bucketStatesContainer.clear(); partCounterStateContainer.clear(); snapshotActiveBuckets(checkpointId, bucketStatesContainer); partCounterStateContainer.add(maxPartCounter); } private void snapshotActiveBuckets( final long checkpointId, final ListState<byte[]> bucketStatesContainer) throws Exception { for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId); final byte[] serializedBucketState = SimpleVersionedSerialization .writeVersionAndSerialize(bucketStateSerializer, bucketState); bucketStatesContainer.add(serializedBucketState); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState); } } }

这里会对active状态的Bucket进行snapshot

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException { prepareBucketForCheckpointing(checkpointId); InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null; long inProgressFileCreationTime = Long.MAX_VALUE; if (inProgressPart != null) { inProgressFileRecoverable = inProgressPart.persist(); inProgressFileCreationTime = inProgressPart.getCreationTime(); this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable); } return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);//返回BucketState,用于序列化 } private void prepareBucketForCheckpointing(long checkpointId) throws IOException { if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId); } closePartFile(); } if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) { pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint); pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();//重置 } }

核心逻辑在closePartFile中,将inprogress状态的文件关闭并由内存提交到文件系统中,得到pendingFileRecoverable对象并存储到pendingFileRecoverablesForCurrentCheckpoint列表里,为snapshot准备。

private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException { InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null; if (inProgressPart != null) { pendingFileRecoverable = inProgressPart.closeForCommit(); pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable); inProgressPart = null;//置位null } return pendingFileRecoverable; }

写入中的文件是in progress,此时是不可以读取的,什么时候才可以被下游读取,取决于文件什么时候提交。上一步已经将数据写入文件了,但是还没有正式提交。我们知道checkpoint的几个步骤,不了解的可以参考之前的博文,在最后一步checkpointcoordinator会调用各operator的notifyCheckpointComplete方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); commitUpToCheckpoint(checkpointId); } public void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt = activeBuckets.entrySet().iterator(); LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId); while (activeBucketIt.hasNext()) { final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue(); bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) {//由于前面一系列清理动作,这里的bucket将不会是active状态 // We've dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); notifyBucketInactive(bucket); } } }

文件的提交是在Bucket的onSuccessfulCompletionOfCheckpoint

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { checkNotNull(bucketWriter); Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it = pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true) .entrySet().iterator(); while (it.hasNext()) { Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next(); for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) { bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); } it.remove(); } cleanupInProgressFileRecoverables(checkpointId); }

在commit方法中对文件进行重命名,使其能够被下游读取,比如hadoop的commit实现

@Override public void commit() throws IOException { final Path src = recoverable.tempFile(); final Path dest = recoverable.targetFile(); final long expectedLength = recoverable.offset(); final FileStatus srcStatus; try { srcStatus = fs.getFileStatus(src); } catch (IOException e) { throw new IOException("Cannot clean commit: Staging file does not exist."); } if (srcStatus.getLen() != expectedLength) { // something was done to this file since the committer was created. // this is not the "clean" case throw new IOException("Cannot clean commit: File has trailing junk data."); } try { fs.rename(src, dest); } catch (IOException e) { throw new IOException("Committing file by rename failed: " + src + " to " + dest, e); } }

最后会对InprogressFile的一些状态做清理工作

private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException { Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it = inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false) .entrySet().iterator(); while (it.hasNext()) { final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue(); // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes // the code more readable. final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);//除了s3,都返回false if (LOG.isDebugEnabled() && successfullyDeleted) { LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); } it.remove();//清除 } }

partition commit

分区提交的触发以及提交的策略。 触发条件分为process-time和partition-time。 process time的原理是当前Checkpoint需要提交的分区和当前系统时间注册到pendingPartitions map中,在提交时判断注册时间+delay是否小于当前系统时间来确定是否需要提交分区,如果delay=0直接提交。 所以如果delay=0立即提交,如果有数据延迟的话可能导致该分区过早的提交。如果delay=分区大小,那么就是在Checkpoint间隔+delay后提交上一次Checkpoint需要提交的分区。

@Override public void addPartition(String partition) { if (!StringUtils.isNullOrWhitespaceOnly(partition)) { this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime()); } } @Override public List<String> committablePartitions(long checkpointId) { List<String> needCommit = new ArrayList<>(); long currentProcTime = procTimeService.getCurrentProcessingTime(); Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<String, Long> entry = iter.next(); long creationTime = entry.getValue(); if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) { needCommit.add(entry.getKey()); iter.remove(); } } return needCommit; }

partition time的原理是基于watermark是否达到分区时间+delay来判断是否要提交。

@Override public void addPartition(String partition) { if (!StringUtils.isNullOrWhitespaceOnly(partition)) { this.pendingPartitions.add(partition); } } @Override public List<String> committablePartitions(long checkpointId) { if (!watermarks.containsKey(checkpointId)) { throw new IllegalArgumentException(String.format( "Checkpoint(%d) has not been snapshot. The watermark information is: %s.", checkpointId, watermarks)); } long watermark = watermarks.get(checkpointId); watermarks.headMap(checkpointId, true).clear(); List<String> needCommit = new ArrayList<>(); Iterator<String> iter = pendingPartitions.iterator(); while (iter.hasNext()) { String partition = iter.next(); LocalDateTime partTime = extractor.extract( partitionKeys, extractPartitionValues(new Path(partition)));//根据path来抽取时间,比如partition='day=2020-12-01/hour=11/minute=11' 转换成 2020-12-01 11:11:00 if (watermark > toMills(partTime) + commitDelay) { needCommit.add(partition); iter.remove(); } } return needCommit; }

Source

读取数据相对于写入数据要简单些。

创建FileSystemTableSource对象

public TableSource<RowData> createTableSource(TableSourceFactory.Context context) { Configuration conf = new Configuration(); context.getTable().getOptions().forEach(conf::setString); return new FileSystemTableSource( context.getTable().getSchema(), getPath(conf), context.getTable().getPartitionKeys(), conf.get(PARTITION_DEFAULT_NAME), context.getTable().getProperties()); }

构造source function,传入input format用于读取源数据。

public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { @SuppressWarnings("unchecked") TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); // Avoid using ContinuousFileMonitoringFunction InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo); return source.name(explainSource()); }

在run方法中,循环读取数据,发送到下游算子

public void run(SourceContext<OUT> ctx) throws Exception { try { Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed"); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } } } finally { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } isRunning = false; } }
最新回复(0)