.1 RabbitMQ 消息队列
1.1 使用场景
1.2 概述
2.0 RabbitMQ概念
2.1 安装RabbitMQ
docker run
-d
--name rabbitmq
-p
5671:5671 -p
5672:5672 -p
4369:4369 -p
25672:25672 -p
15671:15671 -p
15672:15672 rabbitmq
:management
2.2 Exchange 类型
如果下面这个是topic 类型的Exchange绑定的队列模式, 发送Routing key: atguigu.news ,则这四个队列都会收到消息.
3. 整合RabbitMQ
<dependency>
<groupId>org
.springframework
.boot
</groupId
>
<artifactId>spring
-boot
-starter
-amqp
</artifactId
>
</dependency
>
spring
.rabbitmq
.host
=127.0.0.1
spring
.rabbitmq
.port
=5672
spring
.rabbitmq
.virtual
-host
=/
@Autowired
AmqpAdmin amqpAdmin
;
@Autowired
RabbitTemplate rabbitTemplate
;
@Test
void sendMessage() {
Order order
= new Order();
order
.setId(1L
);
order
.setName("健健");
rabbitTemplate
.convertAndSend("hello-java-exchange","hello.java",order
);
}
@Test
void createExchange() {
amqpAdmin
.declareExchange(new DirectExchange("hello-java-exchange", true, false));
}
@Test
void createQueue() {
amqpAdmin
.declareQueue(new Queue("hello-java-queue", true, false, false));
}
@Test
void createBings() {
amqpAdmin
.declareBinding(new Binding("hello-java-queue", Binding
.DestinationType
.QUEUE
,
"hello-java-exchange",
"hello.java", null
));
}
@Data
class Order{
private Long id
;
private String name
;
}
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter
messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
@RabbitListener: 类+方法上 都可以 @RabbitHandler: 方法上 重载 区分不同的消息 场景 1). queue,同一个消息,只能有一个客户端收到 2). 只有一个消息完全处理完,方法运行结束,就可以接收到下一个消息.
4.0 RabbitMQ 可靠投递
4.1 消息确认机制–可靠抵达
# 开启发送端确认
spring
.rabbitmq
.publisher
-confirm
-type
=correlated
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate
.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData
, boolean b
, String s
) {
System
.out
.println("confirm...correlationData["+correlationData
+"]==>是否失败:"+b
+"失败原因:"+s
);
}
});
rabbitTemplate
.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message
, int i
, String s
, String s1
, String s2
) {
System
.out
.println(message
+","+i
+","+s
+","+s1
+","+s2
);
}
});
}
4.2 消息确认机制–可靠抵达 ack消息确认机制
#手动ack 消息
, 只要我们没有明确告诉mq
,货物被签收
,没有ack回复
,
# 消息就一直unacked状态
.即使Consumer宕机
,消息也不会清理
,会重新变为ready
,
# 下次Consumer连接进来
,就发给他
spring
.rabbitmq
.listener
.simple
.acknowledge
-mode
=manual
如何签收:
1.starter 启动依赖 2. @configuration
@Bean
public Queue
orderDelayQueue(){
Map
<String,Object> arguments
= new HashMap<>();
arguments
.put("x-dead-letter-exchange","order-event-exchange");
arguments
.put("x-dead-letter-routing-key","order-release-order");
arguments
.put("x-message-ttl",60000);
return new Queue("order.delay.queue",true,false,false,arguments
);
}
@Bean
public Queue
orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
@Bean
public Exchange
orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false)
}
@Bean
public Binding
orderCreateOrderBinding(){
new Binding("order.delay.queue",Binding
.DestinationType
.Queue
,"order-event-exchange","order.create.order");
}
@Bean
public Binding
orderReleaseOrderBinding(){
}