RabbitMQ进阶系列--SpringBoot整合(进阶)

it2024-10-12  40

其他网址

RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化 - 简书

消息确认

其他网址

用了 springboot + rabbitmq 消息确认机制,我感觉掉坑里了SpringBoot 中使用RabbitMQ(二)消息确认SpringBoot+RabbitMq的几种姿势

生产者消息确认

简介

流程

发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递。 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。

如果消息没有到exchange,则confirm回调,ack=false如果消息到达exchange,则confirm回调,ack=trueexchange到queue成功,则不回调returnexchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

配置

application.yml

# 发送者开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true # 发送者开启 return 确认机制 spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。

@Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("confirm==>发送到broker失败\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } else { log.info("confirm==>发送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } } }

correlationData:对象内部有id (消息的唯一性)和Message。           (若ack为false,则Message不为null,可将Message数据 重新投递;若ack是true,则correlationData为null) ack:消息投递到exchange 的状态,true表示成功。 cause:表示投递失败的原因。 (若ack为false,则cause不为null;若ack是true,则cause为null)

给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。

public void send(String dataId, String exchangeName, String rountingKey, String message){ CorrelationData correlationData = new CorrelationData(); correlationData.setId(dataId); rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData); } public String receive(String queueName){ return String.valueOf(rabbitTemplate.receiveAndConvert(queueName)); }

        2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。

CorrelationData cd1 = new CorrelationData(); this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1); assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。 若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

注意:需要rabbitTemplate.setMandatory(true);

当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。

@Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" + "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); } }

message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。 

注册ConfirmCallback和ReturnCallback

整合后的写法

package com.example.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @Configuration public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); return rabbitTemplate; } // 下边这样写也可以 // @Autowired // private RabbitTemplate rabbitTemplate; // @PostConstruct // public void init() { // rabbitTemplate.setMandatory(true); // rabbitTemplate.setReturnCallback(this); // rabbitTemplate.setConfirmCallback(this); // } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("confirm==>发送到broker失败\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } else { log.info("confirm==>发送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}", correlationData, ack, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" + "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}", message, replyCode, replyText, exchange, routingKey); } }

消费者消息确认

简介

确认方式简介详述auto(默认)根据消息消费的情况,智能判定

若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去。

若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。

若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。

nonemq发出消息后直接确认消息 manual消费端手动确认消息

        消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。。

        如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。

只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

消息确认三种方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual

手动确认三种方式(basicAck,basicNack,basicReject)

basicAck        //表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。

函数原型:void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。 multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。 示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

实例:

@RabbitHandler public void process(String content, Channel channel, Message message){ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }

basicNack     //表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

函数原型:void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。 multiple:是否批量确认。 requeue:值为 true 消息将重新入队列。

basicReject     //拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

函数原型:void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。 requeue:值为 true 消息将重新入队列。

备份交换机

其他网址

【RabbitMq 篇三】-备份交换器_晏霖/胖虎的博客-博客RabbitMQ 如何进行消息可靠投递 | 码农家园

简介

        生产者发送消息,如果路由错误不能到达指定队列,就路由到备胎队列消费,这样做可以保证未被路由的消息不会丢失。其实保证消息不会丢失还可以通过消息的回调方法,添加ReturnListener的编程逻辑,但是这样做生产者的代码会复杂写,所以我们使用备份交换器实现。

实例

application.yml

server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualHost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual

MQ配置

package com.example.config; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRouterConfig { public static final String QUEUE_HELLO = "Queue@hello"; public static final String QUEUE_HI = "Queue@hi"; public static final String QUEUE_UNROUTE = "Queue@unroute"; public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome"; public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute"; public static final String ROUTINGKEY_HELLOS = "hello.#"; @Autowired AmqpAdmin amqpAdmin; @Bean Object initBindingTest() { amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true) .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE).build()); amqpAdmin.declareQueue(new Queue(QUEUE_HI, true)); amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true)); amqpAdmin.declareQueue(new Queue(QUEUE_UNROUTE, true)); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); return new Object(); } }

控制器

package com.example.controller; import com.example.config.RabbitRouterConfig; import com.example.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController public class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); } }

发送器

package com.example.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); } }

接收器

package com.example.mq; import com.example.config.RabbitRouterConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) public void hello(String hello) throws InterruptedException { System.out.println ("Receiver(hello) : " + hello); Thread.sleep(5 * 1000); System.out.println("(hello):sleep over"); } @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) public void unroute(String hello) throws InterruptedException { System.out.println ("Receiver(unroute) : " + hello); Thread.sleep(5 * 1000); System.out.println("(unroute):sleep over"); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) // public void hiAll(@Payload String payload, Message message, Channel channel) { // System.out.println("Receiver(hi):"); // System.out.println("payload:" + payload); // System.out.println("message:" + message); // System.out.println("channel:" + channel); // } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void helloAll(@Payload String payload, Message message, Channel channel) { // System.out.println("Receiver(hello):"); // System.out.println("payload:" + payload); // System.out.println("message:" + message); // System.out.println("channel:" + channel); // } }

测试

分别启动发送者和接收者,然后访问:http://localhost:9100/ae

结果:

接收者打印:

Receiver(unroute) : ae message:2020-11-23T17:47:13.198 (unroute):sleep over

死信/延迟 队列

其他网址

详细介绍Spring Boot + RabbitMQ实现延迟队列

《RabbitMQ实战指南》=> 第4章 RabbitMQ进阶

简介

默认不会超时。 

实例代码

路由配置

package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitRouterConfig { public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome"; public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute"; public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay"; public static final String ROUTINGKEY_HELLOS = "hello.#"; public static final String ROUTINGKEY_DELAY = "delay.#"; public static final String QUEUE_HELLO = "Queue@hello"; public static final String QUEUE_HI = "Queue@hi"; public static final String QUEUE_UNROUTE = "Queue@unroute"; public static final String QUEUE_DELAY = "Queue@delay"; public static final Integer TTL_QUEUE_MESSAGE = 5000; @Autowired AmqpAdmin amqpAdmin; @Bean Object initBindingTest() { amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build()); amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME) .durable(true) .autoDelete() .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO) .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY) .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY) .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE) .build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build()); amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build()); amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null)); amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE, EXCHANGE_FANOUT_UNROUTE, "", null)); amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE, EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null)); return new Object(); } }

控制器

package com.example.controller; import com.example.config.RabbitRouterConfig; import com.example.mq.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController public class HelloController { @Autowired private Sender sender; @PostMapping("/hi") public void hi() { sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now()); } @PostMapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + LocalDateTime.now()); } @PostMapping("/hello2") public void hello2() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now()); } @PostMapping("/ae") public void aeTest() { sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now()); } }

发送器

package com.example.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String routingKey, String message) { this.rabbitTemplate.convertAndSend(routingKey, message); } public void send(String exchange, String routingKey, String message) { this.rabbitTemplate.convertAndSend(exchange, routingKey, message); } }

接收器

package com.example.mq; import com.example.config.RabbitRouterConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI) public void hi(String payload) { System.out.println ("Receiver(hi) : " + payload); } // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO) // public void hello(String hello) throws InterruptedException { // System.out.println ("Receiver(hello) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(hello):sleep over"); // } // // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE) // public void unroute(String hello) throws InterruptedException { // System.out.println ("Receiver(unroute) : " + hello); // Thread.sleep(5 * 1000); // System.out.println("(unroute):sleep over"); // } @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY) public void delay(String hello) throws InterruptedException { System.out.println ("Receiver(delay) : " + hello); Thread.sleep(5 * 1000); System.out.println("(delay):sleep over"); } }

application.yml

server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualHost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548 (delay):sleep over

重试

其他网址

RabbitMQ 重试机制和消息幂等性_大数据_charming的专栏-博客rabbitMQ 重试 - JAVA-ANDROID - 博客园RabbitMQ实现重试次数方法一-SpringRetry - 简书

简介

        默认情况下,如果消费者程序出现异常情况, Rabbitmq 会自动实现补偿机制(也就是重试机制)。

        @RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。 如果Aop使用异常通知拦截获取异常信息的话 , 自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。

一般来说默认5s重试一次,可以修改重试策略,消费者配置(重试5次,不行就放弃):

listener: simple: retry: # 开启消费者重试(默认开启) enabled: true # 最大重试次数(默认无数次) max-attempts: 5 # 重试间隔次数 initial-interval: 3000

长度

其他网址

rabbitmq官网:最大长度

RabbitMQ 队列消息的条数限制、队列字节长度限制、队列溢出行为方式_From Zero To Hero-博客_rabbitmq 队列数量上限rabbitmq 限制队列长度和总字节数 | 大专栏

队列长度限制

队列的最大长度可以限制为一组消息数或一组字节数(忽略消息属性和其他开销的所有消息体长度总和),或者两者兼有。默认情况下,rabbitmq中的queue的最大长度和总字节数不受限制的(仅受全局内存,磁盘阈值的影响)。对于任何给定的队列,最大长度(任一类型)可以由客户端使用队列的参数来定义,也可以在服务器中使用配置策略(policies)来定义。在策略和参数都指定最大长度的情况下,将应用两个值中的较小值。队列长度可以使用 operator policies 强制设置。在所有情况下,都使用 就绪 消息的数量;未确认的消息不计入限制。rabbitmqctl list_queues 中的字段 messages_ready, message_bytes_ready 以及管理 API 展示的即为被限制的值。

默认最大队列长度限制行为

当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置。

队列溢出行为

使用溢出设置来配置队列溢出行为。如果 overflow 设置为 reject-publish,则最近发布的消息将被丢弃。此外,如果 发布者确认 已启用,将通过 basic.nack 消息对发布者进行拒绝通知。如果一条消息被路由到多个队列并被其中至少一个队列拒绝,该信道将通过 basic.nack 通知发布者。该消息仍将被发布到可以将其排队的所有其他队列。

使用配置定义最大队列长度

要使用配置指定最大长度,请将关键词 max-length 和 / 或 max-length-bytes 添加到配置定义中。例如:

typevaluerabbitmqctlrabbitmqctl set_policy my-pol “^one-meg$” \  ‘{“max-length-bytes”:1048576}’ \  --apply-to queuesrabbitmqctl on Windowsrabbitmqctl.bat set_policy my-pol “^one-meg$” ^ "{"“max-length-bytes”":1048576}" ^ --apply-to queues

my-pol 策略确保 one-meg 队列包含不超过 1MB 的消息数据。当达到1mB的限制时,最早的消息将从队列头中丢弃。

要定义溢出行为-是从头上删除消息还是拒绝新发布,需要将关键词 overflow 添加到策略定义中。例如:

rabbitmqctlrabbitmqctl set_policy my-pol "^two-messages$" \   '{"max-length":2,"overflow":"reject-publish"}' \   --apply-to queuesrabbitmqctl on Windowsrabbitmqctl.bat set_policy my-pol "^two-messages$" ^   "{""max-length"":2,""overflow"":""reject-publish""}" ^   --apply-to queues

my-pol 策略确保 two-messages 队列包含的消息不超过 2 条,并且所有其他发布都是基本发送的。只要队列包含 2 条消息并且发布者确认启用的情况下,其他发送的消息都会得到 basic.nack 响应。

策略配置也可以通过管理插件定义。详细请看 相关文档

在声明队列期间使用 x-arguments 定义最大队列长度

1)为队列声明参数 x-max-length 提供一个非负整数值来设置最大消息条数。

2)声明参数 x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。

3)溢出行为可以通过向队列声明参数 x-overflow 提供字符串值来设置。可能的值是:     drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息     reject-publish:最近发布的消息将被丢弃,即保存前n条消息。

下面 Java 中的这个示例声明了一个最大长度为10条消息的队列:

//创建队列 HashMap<String, Object> map = new HashMap<>(); //设置队列最大的条数 10条 map.put("x-max-length",10 ); //设置队列溢出方式    保留前10条 map.put("x-overflow","reject-publish" ); channel.queueDeclare(queueName,false,false,false,map);

MessageConvert

其他网址

RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化 - 简书

简介

RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等。当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化。SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理;若是 String 则转成字节数组;若是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能。当然,也可以发送端手动将数据转为JSON格式,接收端序列化为指定对象。

使用 JSON 序列化与反序列化

        RabbitMQ 提供了 Jackson2JsonMessageConverter 来支持消息内容 JSON 序列化与反序列化。被序列化对象应提供一个无参的构造函数,否则会抛出异常。

消息发送者:设置 MessageConverter 为 Jackson2JsonMessageConverter:

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

消息消费者:配置 MessageConverter 为 Jackson2JsonMessageConverter:

@Configuration public class RabbitMQConfig { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory( ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }

自定义序列化

 

最新回复(0)