通过rabbitMQ实现延时队列

it2023-06-20  71

基本流程

发送消息到死信队列死信队列中设置过期时间,并在过期后转发到普通队列消费者监听普通队列并进行消费

代码如下

消息发送者:

messageSemnder.send(orderNo);

messageSemnder:

@Component public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(String orderNo) { if (orderNo != null) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("queue.cashDeposit.delay.exchange", "queue.cashDepositDel", orderNo, correlationData); } } }

CashDepositRabbitConfig(创建队列):

@Configuration public class CashDepositRabbitConfig { //创建死信队列 @Bean public Queue cashDepositDelQueue() { Map<String, Object> params = new HashMap<>(); params.put("x-message-ttl", 1000 * 60 * 30); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", "queue.cashDeposit.exchange"); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", "queue.cashDeposit.del"); return new Queue("queue.cashDepositDel.delay", true, false, false, params); } //创建死信交换机 @Bean public DirectExchange cashDepositDelayExchange() { return new DirectExchange("queue.cashDeposit.delay.exchange"); } //绑定死信队列 @Bean public Binding cashDepositDelBindExchange(Queue cashDepositDelQueue, DirectExchange cashDepositDelayExchange) { //创建绑定规则,将队列和交换机通过路由键进行绑定 return BindingBuilder.bind(cashDepositDelQueue).to(cashDepositDelayExchange).with("queue.cashDepositDel"); } //创建普通队列 @Bean public Queue cashDepositQueue() { return new Queue("queue.cashDeposit.del", true); } //创建普通交换机 @Bean public TopicExchange cashDepositTopicExchange() { return new TopicExchange("queue.cashDeposit.exchange"); } //绑定普通队列 @Bean public Binding cashDepositBindExchange(Queue cashDepositQueue, TopicExchange cashDepositTopicExchange) { //创建绑定规则,将队列和交换机通过路由键进行绑定 return BindingBuilder.bind(cashDepositQueue).to(cashDepositTopicExchange).with("queue.cashDeposit.del"); } }

消息消费者(监听队列):

@Component public class Receive { @RabbitListener(queues = "queue.cashDeposit.del") @RabbitHandler public void receiveMessage(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException { String messageStr = new String(message.getBody()); String orderNo = JSON.parseObject(messageStr, String.class); // TODO: 处理逻辑... //手动签收消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
最新回复(0)