RocketMQ-如何保证消息顺序消费

it2023-09-03  69

1.实现顺序消费

    RocketMQ支持局部顺序消费,但不支持全局,换句话说针对Topic中的每个queue是可以按照FIFO进行消费。     要保证一个订单有关的消息顺序消费,有两点需要注意,一是将订单有关的消息发送到相关Topic中同一个queue里,二是消费者按照先进先出的原则进行消费。

2.消息发送到指定queue中

    在消息发送时,需指定对应的MessageQueueSelector,此时我们只需通过订单号与queue进行关联,代码如下。send中的参数arg即为select中的arg,将订单号作为参数传入,同一订单号的相关消息则可以保证在同一queue中。

send(Message msg, MessageQueueSelector selector, Object arg)   private MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { try { int id = arg.hashCode(); int index = Math.abs(id) % mqs.size(); return mqs.get(index); } catch (Exception e) { log.error("MessageQueueSelector issue: " + e.getClass().getName() + " " + e.getLocalizedMessage()); return mqs.get(0); } } };

3.顺序消费

    如果使用MessageListenerConcurrently的话,必须保证是单线程才能顺序消费,但生产环境下,我们一般 都是多线程的形成,这样则需要使用MessageListenerOrderly。

consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); ... consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { ... } });

    MessageListenerOrderly使用代码如下。

consumer.setConsumeThreadMin(4); consumer.setConsumeThreadMax(8); consumer.registerMessageListener( new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { ... } });
最新回复(0)