顺序消息

it2025-06-05  6

1.顺序消息

1.1消息队列负载

RocketMQ 首先需要通过 RebalanceService 线程实现消息队列的负载, 集群模式下同一个消 费组 内的消 费者共同承担其订阅主题下消息队列的消费, 同一个消息消费队列在同一时刻只会被消费组内一个消费者消费, 一个消费者同一时刻可以分配多个消费队列

List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } }

如果经过消息队列重新负载(分配)后 ,分配到新的消息 队列时 ,首先需要尝试Broker 发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过 ,等待其他消费者释放该消息 队列的锁 ,然后在下一次队列重新负载时再尝试加锁

1.2消息拉取

RocketMQ 消息拉取由 PullMessageService 线程负责,根据消息拉取任务循环拉取消息

if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; }

如果消息处理队列未被锁定,则延迟 3s 后再将 PullRequest 对象放入到拉取任务中,

如果该处 队列是第一次拉取 ,则 先计算拉取偏移量,然后 向消息服务端拉取消息

1.3消息消费

顺序消息消费的实现类 org.apache.rocketmq.client.impl consumer.ConsumeMessageOrderlyService

this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));

初始化实例参数,这里的关键是消息任务队列为 LinkedB!ockingQueue ,消息消费线程最大运行时线程个数为 onsumeThreadMin consumeThreadMa 参数将失效

public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

如果消费模式为集群模式,启动定时任务,默认每隔 20s 执行 一次锁定分配给自 己的消息消费队列 通过 Drocketmq. client.rebalance. locklnterval =20000 设置间隔,该值建议与一次消息负载频率设置相同, 从上文可知,集群模式下顺序消息消费在创建拉取任务时并

未将 ProcessQu ue locked 态设置为 true 在未锁定消息队列之前无法执行消息拉取任务, ConsumeM ssageOrderlyService 0s 频率对分配给自己的消息队列进行自动锁操作,从而消费加锁成功的消息消费队列

private void submitConsumeRequestLater( final ProcessQueue processQueue, final MessageQueue messageQueue, final long suspendTimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); } if (timeMillis < 10) { timeMillis = 10; } else if (timeMillis > 30000) { timeMillis = 30000; } this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); } }, timeMillis, TimeUnit.MILLISECONDS); } @Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }

构建消费任务 ConsumeRequest 并提交到消费线程池

if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }

如果消息处理队列为丢弃, 停止本次消费任务

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) {

根据消息队列获取 一个对象 然后消息消费时先申请独占 objLock,顺序消息消费的并发度为消息队列 ,也就是一个消息消费队列同 时刻只会被一个消 费线程池中一个线程消费

if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); }

如果是广播模式的话 ,直接进入消费,无须锁定处理队列,因为相互直接无竞争; 如果是集群模式,消息消费的前提条件 proceessQueue被锁定并且锁未超时 ,思考下,会不会出现当消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,此时如果还未来得及将 ProceeQueue 解除锁定,就被另外 一个消费者添加进去, 此时会存储多个消 息消费者同时消费 个消息队列?答案是不会 ,因为当 一个新的消费队列分配给消费者时, 在添加其拉取任务之前必须先向 Broker 发送对该消息队列加锁请求,只加锁成功后,才能添加拉取消息,否则到下一次负载后,只有消费队列被原先占有的消费者释放后,才能开始新的拉取任务 集群模式下,如果未锁定处理队列,则延迟该队列的消息消费

最新回复(0)