刷盘的入口方法在CommitLog#putMessage
handleDiskFlush(result, putMessageResult, msg);
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); PutMessageStatus flushStatus = null; try { flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { //flushOK=false; } if (flushStatus != PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }同步刷盘,生成一个GroupCommitRequest请求,并putRequest到列表中,然后等待请求处理完成后返回。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
异步刷盘,只是wakeup唤醒刷盘操作,直接返回。
刷盘服务flushCommitLogService具体使用哪一个实现类?
public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { this.flushCommitLogService = new FlushRealTimeService(); } this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() { @Override protected MessageExtBatchEncoder initialValue() { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); }由上述代码可知: 同步刷盘,对应的service是GroupCommitService.异步刷盘,对应的service是FlushRealTimeService。下面我们分别来查看GroupCommitService和FlushRealTimeService的源码。
首先我看下公共基类ServiceThread
public abstract class ServiceThread implements Runnable { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final long JOIN_TIME = 90 * 1000; private Thread thread; protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); protected volatile boolean stopped = false; protected boolean isDaemon = false; //Make it able to restart the thread private final AtomicBoolean started = new AtomicBoolean(false); public ServiceThread() { } public abstract String getServiceName(); public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } stopped = false; this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); } public void shutdown() { this.shutdown(false); } public void shutdown(final boolean interrupt) { log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(true, false)) { return; } this.stopped = true; log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } try { if (interrupt) { this.thread.interrupt(); } long beginTime = System.currentTimeMillis(); if (!this.thread.isDaemon()) { this.thread.join(this.getJointime()); } long elapsedTime = System.currentTimeMillis() - beginTime; log.info("join thread " + this.getServiceName() + " elapsed time(ms) " + elapsedTime + " " + this.getJointime()); } catch (InterruptedException e) { log.error("Interrupted", e); } } public long getJointime() { return JOIN_TIME; } @Deprecated public void stop() { this.stop(false); } @Deprecated public void stop(final boolean interrupt) { if (!started.get()) { return; } this.stopped = true; log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } if (interrupt) { this.thread.interrupt(); } } public void makeStop() { if (!started.get()) { return; } this.stopped = true; log.info("makestop thread " + this.getServiceName()); } public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } } protected void onWaitEnd() { } public boolean isStopped() { return stopped; } public boolean isDaemon() { return isDaemon; } public void setDaemon(boolean daemon) { isDaemon = daemon; } }ServiceThread的简洁用法(子类)如下:
@Override public void run() { log.info(this.getServiceName() + " service started"); //每间隔10000ms重复执行this.rebalanceImpl.doRebalance(); while (!this.isStopped()) { this.waitForRunning(10000); //或者等待10000ms,或者被wakeup唤醒 this.rebalanceImpl.doRebalance(); } log.info(this.getServiceName() + " service end"); }然后我们来看下同步刷盘服务GroupCommitService
class GroupCommitService extends FlushCommitLogService { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } this.wakeup(); } private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }重点看下putRequest方法,这里会唤醒服务线程。中断run方法中this.waitForRunning(10);直接作doCommit操作。在这里我们要特别注意RocketMQ 的同步刷盘是一次消息写入就只将一条消息刷写到磁盘吗?显然不是,而一次性全写。是通过fulushedWhere与nextOffset来比较,以确定一次GroupCommitRequest有没有刷盘成功。没有成功,flush。成功处理下一个GroupCommitRequest。
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); }
我们再来看下异步刷盘服务FlushRealTimeService的源码。
class FlushRealTimeService extends FlushCommitLogService { private long lastFlushTimestamp = 0; private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return FlushRealTimeService.class.getSimpleName(); } private void printFlushProgress() { // CommitLog.log.info("how much disk fall behind memory, " // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); } @Override public long getJointime() { return 1000 * 60 * 5; } }从本文开篇的handleDiskFlush方法,可以知晓每异步写入一条消息,都会触发flushCommitLogService.wakeup();直接中断this.waitForRunning(interval);。因此异步刷盘并非想当然的每隔500ms flush一盘。而是如果没有新的消息写入,会休眠 500ms,但收到了新的消息后,可以被唤醒,做到消息及时被刷盘,而不是一定要等 500 ms。