消息队列-RabbitMQ应用场景和消息确认机制

it2025-07-24  7

.1 RabbitMQ 消息队列

1.1 使用场景


1.2 概述

2.0 RabbitMQ概念

2.1 安装RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

2.2 Exchange 类型

如果下面这个是topic 类型的Exchange绑定的队列模式, 发送Routing key: atguigu.news ,则这四个队列都会收到消息.

3. 整合RabbitMQ

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ // 1. 创建 Exchange,Queue , 如何Bings, 如何收发消息 // 1) . AmqpAdmin 进行创建 @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; @Test void sendMessage() { Order order = new Order(); order.setId(1L); order.setName("健健"); rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",order); } @Test void createExchange() { amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange", true, false)); } @Test void createQueue() { //exclusive: false 不具有排他性 amqpAdmin.declareQueue(new Queue("hello-java-queue", true, false, false)); } @Test void createBings() { amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null)); } @Data class Order{ private Long id; private String name; } @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }

@RabbitListener: 类+方法上 都可以 @RabbitHandler: 方法上 重载 区分不同的消息 场景 1). queue,同一个消息,只能有一个客户端收到 2). 只有一个消息完全处理完,方法运行结束,就可以接收到下一个消息.

4.0 RabbitMQ 可靠投递

4.1 消息确认机制–可靠抵达

# 开启发送端确认 spring.rabbitmq.publisher-confirm-type=correlated /** * 定制RabbitTemplate */ @PostConstruct //MyRabbitConfig对象创建以后,执行这个方法 public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 1. 只要抵达Broker 服务器就行 * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id) * @param b 消息是否成功收到 * @param s 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("confirm...correlationData["+correlationData+"]==>是否失败:"+b+"失败原因:"+s); } }); //设置消息抵达队列的确认回调, 消息没有投递到指定的队列,就触发这个回调方法 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * * @param message 投递失败的消息详细信息 * @param i 回复的状态码 * @param s 回复的文本内容 * @param s1 当时这个消息发送给那个交换机 * @param s2 当时这个消息用哪个路由键 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println(message+","+i+","+s+","+s1+","+s2); } }); }

4.2 消息确认机制–可靠抵达 ack消息确认机制

#手动ack 消息, 只要我们没有明确告诉mq,货物被签收,没有ack回复, # 消息就一直unacked状态.即使Consumer宕机,消息也不会清理,会重新变为ready, # 下次Consumer连接进来,就发给他 spring.rabbitmq.listener.simple.acknowledge-mode=manual

如何签收:

1.starter 启动依赖 2. @configuration

@Bean //容器中的Binding,Queue,Exchange 都会自动创建(RabbitMq没有情况) public Queue orderDelayQueue(){ //队列名字,是否持久化,排他,自动删除 Map<String,Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange","order-event-exchange"); arguments.put("x-dead-letter-routing-key","order-release-order"); arguments.put("x-message-ttl",60000); return new Queue("order.delay.queue",true,false,false,arguments ); } @Bean public Queue orderReleaseOrderQueue(){ return new Queue("order.release.order.queue",true,false,false); } @Bean public Exchange orderEventExchange(){ return new TopicExchange("order-event-exchange",true,false) } @Bean public Binding orderCreateOrderBinding(){ new Binding("order.delay.queue",Binding.DestinationType.Queue,"order-event-exchange","order.create.order"); } @Bean public Binding orderReleaseOrderBinding(){ }
最新回复(0)