RabbitMQ 02- 整合Spring Boot 实现消息手动Ack

it2025-08-12  7

本篇整合了SpringBoot和RabbitMQ的使用案例,对常用的三种交换机类型direct、fanout、topic的使用均有整合。

文章目录

一、整合步骤1.引入pom2.rabbitmq配置3.参数读取4.回调配置5.配置生产者1)定义交换机枚举2)定义队列枚举3)定义路由键枚举4)将交换机、队列、路由key分别绑定5)封装发送消息方法 6.配置消费者7.测试8.运行结果 二、参数配置详解1.基础信息2.SSL3.缓存cache4.Listener5.Template

一、整合步骤

1.引入pom

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.3.4.RELEASE</version> </dependency>

2.rabbitmq配置

application.properties文件:记住改成自己的ip及相关信息

## rabbitmq配置 spring.rabbitmq.addresses=yourip:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/

3.参数读取

rabbitmq配置文件读取类

@Configuration public class RabbitConfig { @Value("${spring.rabbitmq.addresses}") private String addresses; @Value("${spring.rabbitmq.username}") private String userName; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; /** * 构建mq实例工厂 * @return */ @Bean public ConnectionFactory rabbitConnectionFactory() { //rabbitmq连接 CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(addresses); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setVirtualHost(virtualHost); cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); cachingConnectionFactory.setPublisherReturns(true); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory rabbitConnectionFactory){ return new RabbitAdmin(rabbitConnectionFactory); } /** * 生成模板对象 * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 * @return */ @Bean @Scope("prototype") public RabbitTemplate rabbitTemplate(){ RabbitTemplate template=new RabbitTemplate(rabbitConnectionFactory()); //消息发送到交换器触发回调 template.setConfirmCallback(new ConfirmCallBackHandler()); //mandatory参数说明: 交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式; true表示会调用Basic.Return命令返回给生产者; false表示丢弃消息 template.setMandatory(true); //交换器发送消息到队列失败时回调 template.setReturnCallback(new ReturnCallBackHandler()); //序列化 template.setMessageConverter(new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter())); return template; } /** * RabbitMQ监听容器 * @param rabbitConnectionFactory * @return */ @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory); //设置并发 factory.setConcurrentConsumers(1); SimpleMessageListenerContainer s=new SimpleMessageListenerContainer(); //最大并发 factory.setMaxConcurrentConsumers(1); //消息接收——手动确认 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置超时 factory.setReceiveTimeout(2000L); //设置重试间隔 factory.setFailedDeclarationRetryInterval(3000L); return factory; } }

4.回调配置

/** * 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。 */ public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("消息唯一标识:"+correlationData); System.out.println("确认结果:"+ack); if (!ack){ System.out.println("失败原因:"+cause); } } } /** * 通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调 */ public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息主体:"+message); System.out.println("应答码:"+replyCode); System.out.println("原因描述:"+replyText); System.out.println("交换机:"+exchange); System.out.println("消息使用的路由键:"+routingKey); } }

5.配置生产者

1)定义交换机枚举
/** * 交换机枚举 */ public enum ExchangeEnum { DIRECT_EXCHANGE("DIRECT_EXCHANGE", "直连模式"), TOPIC_EXCHANGE("TOPIC_EXCHANGE", "主题模式"), FANOUT_EXCHANGE("FANOUT_EXCHANGE", "广播模式"); private String code; private String name; ExchangeEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }
2)定义队列枚举
/** * 队列枚举 */ public enum QueueEnum { DIRECT_QUEUE1("DIRECT_QUEUE1", "direct测试队列1"), TOPIC_QUEUE1("TOPIC_QUEUE1", "topic测试队列1"), TOPIC_QUEUE2("TOPIC_QUEUE2", "topic测试队列2"), FANOUT_QUEUE1("FANOUT_QUEUE1","fanout测试队列1"), FANOUT_QUEUE2("FANOUT_QUEUE2","fanout测试队列2"); private String code; private String name; QueueEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }
3)定义路由键枚举
/** * 路由key枚举 */ public enum RoutingKeyEnum { DIRECT_KEY1("DIRECT_KEY1", "direct测试routingkey1"), TOPIC_KEY1("*.TOPIC.*", "topic测试routingkey1"), TOPIC_KEY2("#.cc", "topic测试routingkey2"); private String code; private String name; RoutingKeyEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }
4)将交换机、队列、路由key分别绑定
@Configuration @AutoConfigureAfter(com.kathy.demo.config.RabbitConfig.class) public class RabbitExchangeBinding { private static final Logger LOGGER= LoggerFactory.getLogger(RabbitExchangeBinding.class); @Autowired private RabbitAdmin rabbitAdmin; /** * 直连型交换机 */ @Bean DirectExchange contractDirectExchange() { DirectExchange contractDirectExchange = new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getCode()); rabbitAdmin.declareExchange(contractDirectExchange); LOGGER.info("direct交换机默认实例创建成功"); return contractDirectExchange; } /** * 主题模式交换机 * @return */ @Bean TopicExchange contractTopicExchange(){ TopicExchange contractTopicExchange = new TopicExchange(ExchangeEnum.TOPIC_EXCHANGE.getCode()); rabbitAdmin.declareExchange(contractTopicExchange); LOGGER.info("topic交换机默认实例创建成功"); return contractTopicExchange; } /** * 广播模式交换机 * @return */ @Bean FanoutExchange contractFanoutExchange(){ FanoutExchange contractFanoutExchange = new FanoutExchange(ExchangeEnum.FANOUT_EXCHANGE.getCode()); rabbitAdmin.declareExchange(contractFanoutExchange); LOGGER.info("fanout交换机默认实例创建成功"); return contractFanoutExchange; } @Bean Queue directQueue1(){ Queue queue = new Queue(QueueEnum.DIRECT_QUEUE1.getCode()); rabbitAdmin.declareQueue(queue); LOGGER.debug("direct测试队列-1实例化成功"); return queue; } @Bean Queue topicQueue1(){ Queue queue = new Queue(QueueEnum.TOPIC_QUEUE1.getCode()); rabbitAdmin.declareQueue(queue); LOGGER.debug("topic测试队列-1实例化成功"); return queue; } @Bean Queue topicQueue2(){ Queue queue = new Queue(QueueEnum.TOPIC_QUEUE2.getCode()); rabbitAdmin.declareQueue(queue); LOGGER.debug("topic测试队列-2实例化成功"); return queue; } @Bean Queue fanoutQueue1(){ Queue queue = new Queue(QueueEnum.FANOUT_QUEUE1.getCode()); rabbitAdmin.declareQueue(queue); LOGGER.debug("fanout测试队列-1实例化成功"); return queue; } @Bean Queue fanoutQueue2(){ Queue queue = new Queue(QueueEnum.FANOUT_QUEUE2.getCode()); rabbitAdmin.declareQueue(queue); LOGGER.debug("fanout测试队列-2实例化成功"); return queue; } @Bean Binding bindingDirectQueue1(Queue directQueue1,DirectExchange exchange){ //绑定结构:队列-交换机-路由key Binding binding = BindingBuilder.bind(directQueue1).to(exchange).with(RoutingKeyEnum.DIRECT_KEY1.getCode()); rabbitAdmin.declareBinding(binding); LOGGER.debug("direct队列1/交换机绑定成功"); return binding; } @Bean Binding bindingTopicQueue1(Queue topicQueue1,TopicExchange exchange){ Binding binding = BindingBuilder.bind(topicQueue1).to(exchange).with(RoutingKeyEnum.TOPIC_KEY1.getCode()); rabbitAdmin.declareBinding(binding); LOGGER.debug("topic队列-1/交换机绑定成功"); return binding; } @Bean Binding bindingTopicQueue2(Queue topicQueue2,TopicExchange exchange){ Binding binding = BindingBuilder.bind(topicQueue2).to(exchange).with(RoutingKeyEnum.TOPIC_KEY2.getCode()); rabbitAdmin.declareBinding(binding); LOGGER.debug("topic队列-2/交换机绑定成功"); return binding; } @Bean Binding bindingFanoutQueue1(Queue fanoutQueue1,FanoutExchange exchange){ Binding binding = BindingBuilder.bind(fanoutQueue1).to(exchange); rabbitAdmin.declareBinding(binding); LOGGER.debug("fanout队列-1/交换机绑定成功"); return binding; } @Bean Binding bindingFanoutQueue2(Queue fanoutQueue2,FanoutExchange exchange){ Binding binding = BindingBuilder.bind(fanoutQueue2).to(exchange); rabbitAdmin.declareBinding(binding); LOGGER.debug("fanout队列-2/交换机绑定成功"); return binding; } }
5)封装发送消息方法
@Component public class RabbitMQProducer { private static final Logger logger= LoggerFactory.getLogger(RabbitMQProducer.class); private RabbitTemplate rabbitTemplate; @Autowired public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * 发送到指定routekey的指定queue * @param routeKey 路由键 * @param obj 消息内容 */ public void sendRabbitmqDirect(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("【消息发送者】发送消息到direct交换机,消息内容为: {"+obj+"}>>>>>>>>>>>>send:"+correlationData.getId()+""); this.rabbitTemplate.convertAndSend(ExchangeEnum.DIRECT_EXCHANGE.getCode(), routeKey , obj, correlationData); } /** * 所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上 * @param routeKey 路由键 * @param obj 消息内容 */ public void sendRabbitmqTopic(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("【消息发送者】发送消息到topic交换机,消息内容为: {"+obj+"}>>>>>>>>>>>>send:"+correlationData.getId()+""); this.rabbitTemplate.convertAndSend(ExchangeEnum.TOPIC_EXCHANGE.getCode(), routeKey , obj, correlationData); } /** * 将消息分发到所有的绑定队列,无routingkey概念 * @param obj 消息内容 */ public void sendRabbitmqFanout(Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("【消息发送者】发送消息到fanout交换机,消息内容为: {"+obj+"}>>>>>>>>>>>>send:"+correlationData.getId()+""); this.rabbitTemplate.convertAndSend(ExchangeEnum.FANOUT_EXCHANGE.getCode() ,"", obj,correlationData); } }

6.配置消费者

这里需要注意下,以上的配置采用的是手动确认消息的机制,即如果不手动确认的话,消息会一直发送,直到有消费者确认,rabbitmq默认是auto即自动确认机制。以下对三种交换机模式分别写了消费者类,为了测试我们配置的手动确认是否成功,代码中有方法采用手动确认,有的没有进行确认。

在实际的使用中用DirectConsumer中的确认代码即可,务必对消息进行确认。

@Component public class DirectConsumer { private static final Logger logger= LoggerFactory.getLogger(DirectConsumer.class); enum Action{ // 处理成功 ACCEPT, // 可以重试的错误 RETRY, // 无需重试的错误 REJECT; } /** * 手动确认模式 * @param content * @param message * @param channel */ @RabbitListener(queues = "DIRECT_QUEUE1") public void process(String content, Message message, Channel channel){ Action action=Action.ACCEPT; long tag=message.getMessageProperties().getDeliveryTag(); try{ logger.info("处理器-1接收处理队列A中的消息:"+content); }catch (Exception e){ //根据异常种类决定是ACCEPT、RETRY还是 REJECT action=Action.RETRY; e.printStackTrace(); }finally { try { if (action==Action.ACCEPT){ // 确认收到消息,消息将被队列移除;false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }else if(action==Action.RETRY){ //确认否定消息,第一个boolean表示一个consumer还是所有,第二个boolean表示requeue是否重新回到队列,true重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); }else { //拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。 channel.basicNack(tag,false,false); } } catch (IOException e) { //异常处理 e.printStackTrace();; } } } } @Component public class TopicConsumer { private static final Logger logger= LoggerFactory.getLogger(TopicConsumer.class); //未手动确认 @RabbitListener(queues = "TOPIC_QUEUE1") public void process(String content, Message message, Channel channel) throws IOException { logger.info("主题模式-接收处理队列A当中的消息: " + content); } //未手动确认 @RabbitListener(queues = "TOPIC_QUEUE2") public void process2(String content, Message message, Channel channel) throws IOException { logger.info("主题模式-接收处理队列B当中的消息: " +content); } } @Component public class FanoutConsumer { private static final Logger logger= LoggerFactory.getLogger(FanoutConsumer.class); //不进行消息确认,查看rabbitmq界面是否有未确认的消息 @RabbitListener(queues = "FANOUT_QUEUE1") public void process(String content, Message message, Channel channel) throws IOException { logger.info("广播模式-接收处理队列A当中的消息: " + content+"=========="); } //手动确认 @RabbitListener(queues = "FANOUT_QUEUE2") public void process2(String content, Message message, Channel channel) throws IOException { logger.info("广播模式-接收处理队列B当中的消息: " + content+"=========="); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }

7.测试

User类

@Data public class User implements Serializable { private String userName; private Integer userAge; private String userSchool; }

便于测试json数据引入pom

<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>

测试controller

@RestController public class RabbitTestController { @Autowired private RabbitTemplate rabbitTemplate; /** * 订阅模式测试 */ @RequestMapping("/testDirect") public String testSendMsg(){ RabbitMQProducer producer=new RabbitMQProducer(rabbitTemplate); producer.sendRabbitmqDirect(RoutingKeyEnum.DIRECT_KEY1.getCode(), "I'm Direct!"); return "successs"; } /** * 主题模式测试 * */ @RequestMapping("/testTopic") public String topicSendMsg (){ RabbitMQProducer producer=new RabbitMQProducer(rabbitTemplate); /*producer.sendRabbitmqTopic("1.TOPIC.1", "测试 1.TOPIC.1" ); producer.sendRabbitmqTopic("TOPIC.1", "测试 TOPIC.1" ); producer.sendRabbitmqTopic("cc", "测试 cc" );*/ //测试是否执行returncallback producer.sendRabbitmqTopic("1.1", "测试1.1" ); producer.sendRabbitmqTopic("2.2", "测试2.2" ); return "successs"; } /** * 广播模式测试 */ @RequestMapping("/testFanout") public String fanoutSendMsg(){ RabbitMQProducer producer=new RabbitMQProducer(rabbitTemplate); //发送对象 User user=new User(); user.setUserName("李四"); user.setUserAge(20); user.setUserSchool("测试学校"); producer.sendRabbitmqFanout(user); //发送json数据 JSONObject json=new JSONObject(); json.put("aa","i'm aa"); json.put("bb","i'm bb"); json.put("cc","i'm cc"); JSONObject jsonSon=new JSONObject(); jsonSon.put("dd","i'm dd"); json.put("jsonSon",jsonSon); producer.sendRabbitmqFanout(json); return "successs"; } }

8.运行结果

http://localhost:8080/testDirect执行结果 因为我们对消息进行了确认,所以在管理界面查看时unacked数量为0。

http://localhost:8080/testTopic 执行结果 我们未对消息进行确认,所以在管理界面查看时unacked数量为2,并且当每次重启应用的时候消息会一直发送,直到ack。

查看管理界面有两条消息未确认

对以下代码进行执行,测试returncallback是否配置成功 producer.sendRabbitmqTopic("1.1", "测试1.1" ); producer.sendRabbitmqTopic("2.2", "测试2.2" ); 因为匹配到符合routingkey的消费者,所以回调方法被调用,说明配置成功。

http://localhost:8080/testFanout 执行结果 测试代码中连续发送了两条消息,队列1没有手动ack,所以查看登录界面有两条消息未确认。

二、参数配置详解

1.基础信息

spring.rabbitmq.host: 默认localhost spring.rabbitmq.port: 默认5672 spring.rabbitmq.username: 用户名 spring.rabbitmq.password: 密码 spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机 spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresses后host spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙 spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时

2.SSL

spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路径 spring.rabbitmq.ssl.key-store-password: 访问key store的密码 spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码 spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 类型. spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置 spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证 spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证

3.缓存cache

spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量 spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效 spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel 和 connection

4.Listener

spring.rabbitmq.listener.type=simple: 容器类型.simple或direct spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器 spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量 spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量 spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量. spring.rabbitmq.listener.simple.transaction-size: 当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值 spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器 spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒 spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用 spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数 spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔 spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数 spring.rabbitmq.listener.simple.retry.stateless=true: 重试时有状态or无状态 spring.rabbitmq.listener.direct.acknowledge-mode= ack模式 spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器 spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量. spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队. spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败. spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量. spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful.

5.Template

spring.rabbitmq.template.mandatory: 启用强制信息;默认false spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间 spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间 spring.rabbitmq.template.retry.enabled=false: 发送重试是否可用 spring.rabbitmq.template.retry.max-attempts=3: 最大重试次数 spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次尝试发布或传递消息之间的间隔 spring.rabbitmq.template.retry.multiplier=1: 应用于上一重试间隔的乘数 spring.rabbitmq.template.retry.max-interval=10000: 最大重试时间间隔

参考 https://blog.csdn.net/girlgolden/article/details/97915368 https://segmentfault.com/a/1190000011797667 https://blog.csdn.net/scdncby/article/details/96482473

关注微信公众号【程序媛琬淇】,专注分享Java干货,给你意想不到的收获。

最新回复(0)