Rocketmq在顺序消费时,为了保证消息消费的有序性,使用了加锁机制,即对messageQueue进行全局加锁,其实现原理可参考源码分析RocketMQ顺序消息消费实现原理,本节不作源码分析,重点在解答为什么要全局加锁。不少文章给出的原因是:当topic的队列个数小于消费组中消费者的个数,会导致多个消费者消费同一个queue,存在竞争。但是查看AllocateMessageQueueStrategy的实现源码,发现topic的队列个数小于消费组中消费者的个数时,会导致超出个数的消费者无队列可消费,即负载不均衡,但并不会出现多个消费者争抢消费同一个消费队列的情况。具体可参考AllocateMessageQueueAveragely的实现。
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }那顺序消费(重新分配队列,拉取消息,消费消息)时为什么要全局加锁呢?
新的consumer上线触发了rebalance(每隔30s重新负载均衡),在不加锁的情况下queue有可能被直接分配给别的consumer了,而原来的老的consumer可能还没有触发rebalance,导致在某一时刻某个队列同时存在两个consumer。如果不加锁,在这一段时间内存在并发,可能会出现重复消费,如消费者2消费到了第10条消息(当然已经消费了第6条消息),而消费者1才消费到第6条消息,形成可能的的乱序。
