简答:
异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。日志处理 - 解决大量日志传输。消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。详答:
主要是:解耦、异步、削峰。
解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
削峰:减少高峰时期对服务器压力。
系统可用性降低
本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
系统复杂度提高
加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。
MQ 的常见问题有:
消息的顺序问题消息的重复问题1)、消息的顺序问题
消息有序指的是可以按照消息的发送顺序来消费。
假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?
解决方案:
(1)保证生产者 - MQServer - 消费者是一对一对一的关系
缺陷:
并行度就会成为消息系统的瓶颈(吞吐量不够)更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。(2)通过合理的设计或者将问题分解来规避
不关注乱序的应用实际大量存在队列无序并不意味着消息无序,所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。2)、消息的重复问题
造成消息重复的根本原因是:网络不可达。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
解决的办法就是:消费端处理消息的业务逻辑应该保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。
关于接口幂等性问题可以参考我的另一篇博客:【编程开发】之接口的幂等性
1)、消息服务中两个重要概念
消息代理(message broker)和目的地(destination);
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
2)、消息队列主要的两种形式
队列(queue):点对点式 –消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列–消息只有唯一的发送者和接受者,但并不是说只能有一个接收者 主题(topic):发布订阅式 –发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息3)、JMS(Java Message Service)JAVA消息服务
基于JVM消息代理的规范。ActiveMQ、HornetMQ 是 JMS 实现。
JMS 是由 Java Api 定义,不跨语言也不跨平台,提供两种消息模型:点对点(Peer-2-Peer)和发布订阅(Pub/sub)。
支持多种消息类型:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message (只有消息头和属性)。
总结:JMS 定义了 JAVA API 层面的标准;在 java 体系中,多个 client 均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。
4)、AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容 JMS,RabbitMQ 是 AMQP 的实现。
AMQP 由网络线级协议定义,即跨语言又跨平台,提供了五种消息模型:
(1)、direct exchange(2)、fanout exchange(3)、topic change(4)、headers exchange(5)、system exchange本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分。
但消息类型只有一种:byte[],当实际应用时,有复杂的消息,可以将消息序列化后发送。
总结:AMQP 定义了 wire-level 层的协议标准;天然具有跨平台、跨语言特性。
RabbitMQ 是一个由 erlang 开发的 AMQP(Advanved Message Queue Protocol) 的开源实现。
Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
1)、Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
2)、Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
3)、Topic Exchange
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号 # 和符号 *。# 匹配 0 个或多个单词,* 匹配一个单词。
1)、Simple 模式(最简单的的收发模式)
消息产生消息,将消息放入队列消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除隐患:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的 ack,但如果设置成手动 ack,处理完后要及时发送 ack 消息给队列,否则会造成内存溢出。
2)、Work 模式(资源的竞争)
消息产生者将消息放入队列,多个消费者同时监听同一个队列,共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。
隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用。
3)、Publish/Subscribe 发布订阅模式(共享资源)
每个消费者监听自己的队列;生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。4)、Routing 路由模式
消息生产者将消息发送给交换机,交换机按照路由键匹配把消息发送给对应的消息队列;可以根据业务场景定义路由字符串。5)、Topic 主题模式(路由模式的一种)
主题模式中的路由功能添加了模糊匹配:
* 号匹配一个单词# 号匹配 0 个或多个单词工作流程如下:
消息产生者产生消息,把消息交给交换机交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费1)、消息的顺序性
RabbitMQ 保证消息的顺序性的方法为:
1、拆分多个 queue,每个 queue 一个 consumer,这种方法会产生比较多的 queue,比较麻烦点;2、一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。2)、解决消息的丢失问题
消息丢失分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
生产者丢失消息:生产者开启确认模式,配置确认回调和回退回调消息列表丢失消息:开启消息持久化,创建队列的时候设置 durable 为 true,发送消息的时候设置 deliveryMode 为 2消费者丢失消息:设置为手动 Ack 应答模式3)、消息的重复消费问题
保证消费消息的幂等性!
1)、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2)、添加配置
# 配置 RabbitMQ 服务器 IP 和端口 spring.rabbitmq.host=192.168.56.10 spring.rabbitmq.port=5672 # 虚拟主机配置 spring.rabbitmq.virtual-host=/ # 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true # 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual3)、开启 RabbitMQ 功能
1、引入了 amqp 场景启动器后,RabbitAutoConfiguration 就会自动生效2、SpringBoot 给容器自动配置了:RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;3、在自动配置中,SpringBoot 使用的是 @ConfigurationProperties(prefix = "spring.rabbitmq",所以,我们可以给配置文件中配置以 spring.rabbitmq 开头的 rabbitmq 相关信息;4、在主启动类中使用 @EnableRabbit 注解来开启 RabbitMQ 功能。要使用 RabbitMQ,我们需要创建交换机 Exchange、队列 Queue 和绑定 Binding 这三个对象,我们可以使用 AmqpAdmin 进行创建这三个对象。
1)、创建 Exchange、Queue 、Binding
@Autowired private AmqpAdmin amqpAdmin; // 创建 Exchange public void createExchange() { // 创建一个 DirectExchange,参数为:交换机的名称、是否持久化、是否自动删除 Exchange directExchange = new DirectExchange("hello-java-exchange", true, false); amqpAdmin.declareExchange(directExchange); log.info("Exchange[{}]创建成功:","hello-java-exchange"); } // 创建 Queue public void testCreateQueue() { // 队列参数为:队列名称、是否持久化、是否排他,是否自动删除 Queue queue = new Queue("hello-java-queue", true, false, false); amqpAdmin.declareQueue(queue); log.info("Queue[{}]创建成功:","hello-java-queue"); } // 创建 Binding public void createBinding() { // destination【目的地】,如果【目的地类型】为队列则填写队列名称 // destinationType【目的地类型】 // exchange【交换机】 // routingKey【路由键】 // argument【自定义参数】 // 功能:将交换机 exchange 和 destination 目的地进行绑定,使用 routingKey 作为指定的路由键 Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null); amqpAdmin.declareBinding(binding); log.info("Binding[{}]创建成功:","hello-java-binding"); }2)、发送消息
发送消息使用的是 SpringBoot 给我们提供的 RabbitTemplate 来操作消息的发送:
@Autowired private RabbitTemplate rabbitTemplate; public void sendMessageTest() { // 发送字符串消息 String msg = "Hello World"; // 发送消息类型为自定义Java对象 OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity(); reasonEntity.setId(1L); reasonEntity.setCreateTime(new Date()); reasonEntity.setName("reason"); reasonEntity.setStatus(1); reasonEntity.setSort(2); // 发送消息 rabbitTemplate.convertAndSend("hello-java-exchange", // 交换机 "hello.java", // 路由键 reasonEntity, // 消息类型为对象 new CorrelationData(UUID.randomUUID().toString())); log.info("消息发送完成:{}",reasonEntity); }发送消息,既可以发送字符串消息也可以发送 Java 对象,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现 Serializable 接口。
如果想要把发送的对象转换为 JSON,我们可以自定消息转换器,通过自定义消息转换器可以把对象消息转换为 JSON。
@Configuration public class MyRabbitConfig { // 自定义消息转换器 @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }配置好后,再次调用 rabbitTemplate.convertAndSend() 方法时就会把消息对象转换成 JSON 对象。
3)、接收消息
RabbitMQ 接收消息使用的是 @RabbitListener 注解和 @RabbitHandler 注解,并且要使用 @EnableRabbit 注解开启 RabbitMQ 功能。
/** * @RabbitListener 属性: * queues:声明需要监听的所有队列 * * 接收消息函数参数类型: * 1、Message message:原生的消息详细消息:消息头 + 消息体 * 2、可以指定消息内容对应的对象,spring 帮我们自动转换:T<发送消息的类型> * 3、channel:当前传输数据的通道 */ @RabbitListener(queues = {"hello-java-queue"}) public void revieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) { // 拿到主体内容 byte[] body = message.getBody(); // 拿到的消息头属性信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("接受到的消息...内容" + message + "===内容:" + content); }@RabbitListener 注解可以标注在类和方法上,而 @RabbitHandler 注解只能标注在方法上,所以我们一般使用 @RabbitListener 注解标注在类上表明监听的队列,而 @RabbitHandler 注解标注在方法上就可以进行方法并实现监听同一个队列的不同类型的消息:
@RabbitListener(queues = {"hello-java-queue"}) public class OrderService { @RabbitHandler public void revieveMessage(Message message, OrderReturnReasonEntity content) { System.out.println("接受到的消息...内容" + message + "===内容:" + content); } @RabbitHandler public void revieveMessage2(Message message, OrderEntity content) { System.out.println("接受到的消息...内容" + message + "===内容:" + content); } }由于网络的卡顿原因,或者服务宕机了都可能会导致的消息丢失,消息的丢失包括生产者丢失消息、消息列表丢失消息、消费者丢失消息。为了保证我们发送的消息不丢失、可靠抵达我们以前可以使用事务消息来解决,但这种方式性能下降严重,所以就引入了消息确认机制。
由于生产者发送的消息涉及两步:
1、生产者发送消息给 Broker;2、Broker 收到消息后把消息交给 Exchange,并由 Exchange 根据路由键交给指定的 Queue。所以生产者端的确认机制涉及两种:
1、生产者到 Broker 的 ConfirmCallback 确认模式;2、Exchange 到 Queue 到 ReturnCallback 回退模式。而消费者使用的是 Ack 应答机制。
1)、生产者确认机制
为了实现生产者的确认机制,我需要自定义 RabbitTemplate :
@Configuration public class MyRabbitConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct // MyRabbitConfig 对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 设置确认回调,只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("确认回调:" + correlationData + ack + cause); }); /** * 设置 Return 回调,只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Return 回调:" + message + replyCode + replyText + exchange + routingKey); }); } }并且需要在配置文件中开启相关设置:
# 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true2)、消费者确认机制
RabbitMQ 消费者默认是自动确认的,只要消息收到,客户端会自动确认,所以,不管你消息有没有处理成功都会把消息从消息队列中删除。
所以为了消费者的消息可靠,我们需要修改消息为手动确认,修改为手动确认需要修改配置文件内容:
# 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual设置为手动确认模式后,只要我们没有明确告诉 MQ 消息被签收,即没有 Ack 时,消息就是一直 unacked 状态,即使 Consumer 宕机了,消息也不会丢失,会重新变为 Ready 状态,下一次有新 客户端连接时再返回给客户端。
消费者消息签收确认代码如下:
@RabbitListener(queues = {"hello-java-queue"}) public void revieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) { System.out.println("受到的消息..." + content); // deliveryTag:消息投递标签,为一个 channel 内按顺序自增数字 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 处理成功则签收:1、签收的 deliveryTag;2、是否批量签收 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 处理失败退签:1、退签的 deliveryTag;2、是否批量操作;3、是否把消息返回给服务器 channel.basicNack(deliveryTag, false, true) } }购物下单等待支付的订单,超过一定时间后,系统就会自动取消订单并释放占有的资源。
常用的解决方案是:使用 Spring 的 Schedule 定时任务轮询数据库。但缺点很大:消耗系统内存、增加了数据库的压力、存在较大的时间误差。
较好的解决方案是使用 RabbitMQ 的延时队列:RabbitMQ 消息的 TTL 和死信 Exchange 结合。
消息的 TTL 就是消息的存活时间。RabbitMQ 可以对队列和消息分别设置 TTL:
对队列设置 TTL 就是队列没有消费者连着的保留时间,也可以对每个消息单独设置。超过了这个时间,我们就认为这个消息死了,这种消息称之为死信,而保存死信的队列就被称之为死信队列。如果同时对队列和消息设置,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息的死亡时间有可能不一样。在这里,只有设置单个消息的 TTL 才是实现延时任务的关键。可以通过设置消息的 expiration 字段或者 x-message-ttl 属性来设置时间,两者效果一样的。
一个消息在满足如下条件时会被放入到死信路由中(什么是死信):
一个消息被 Consumer 拒收了,并且 reject 方法的参数里 requque 是 false,也就是说不会被再次放入到队列中被其它消费者消费;消息的 TTL 到了,消息过期了;队列的长度限制满了,排在前面的消息会被丢弃或者扔到私信路由上。Dead Letter Exchange 其实就是一种普通的 exchange,和其他的 exchange 没有两样,只是在某一个设置 Dead Letter Exchange 的队列中有消息过期了,会自动触发消息的转发,发送到 Dead Letter Exchange 中去。
我们既可以控制消息在一段时间后变成私信,又可以控制死信的信息被路由到某一个指定的交换机中,结合二者就可以实现一个延时队列。
其工作原理和流程如下:
延时队列 order-delay-queue 和消费者监听的队列 order-release-queue 同时和 order-event-exchange 交换机绑定,延时队列和交换机的路由键为 order-create,消费者队列的路由键为 order-release-queue;生产者发送消息到 exchange 中,路由键设置为 order-create,然后交换机把消息投递到延时队列中;由于消息设置了超时时间 x-message-ttl 为 60 秒,死信队列路由交换机为 order-event-exchange,死信路由键为 order-release,所以当存放的消息到达指定的 60 秒后,就会变成私信并投递到指定的死信路由交换机中,这样,消息又重新回到了的了 order-event-exchange 交换机中;由于死信路由键已经重新设置为了 order-release,所以,交换机再次根据该路由把消息投递到 order-release-queue 队列中,最后交给消费者处理。在整个流程中,我们服用了 order-event-exchange 交换机,而延时队列只作为存放延时消息的队列不被任何消费者监听。
根据上面的原理和流程,下面我们来编写一个延时队列的案例:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class MyRabbitMQConfig { // 把 Queue、Exchange、Binding 对象的创建交给 Spring 管理 // 创建交换机 @Bean public Exchange orderEventExchange() { /* 交换机类型为 TopicExchange * String name, * boolean durable, * boolean autoDelete, * Map<String, Object> arguments */ return new TopicExchange("order-event-exchange", true, false); } // 创建死信队列 @Bean public Queue orderDelayQueue() { /* * Queue(String name, 队列名字 * boolean durable, 是否持久化 * boolean exclusive, 是否排他 * boolean autoDelete, 是否自动删除 * Map<String, Object> arguments) 属性 */ HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } // 消费者队列 @Bean public Queue orderReleaseQueue() { Queue queue = new Queue("order.release.queue", true, false, false); return queue; } // 绑定延时队列和交换机 @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map<String, Object> arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create", null); } // 绑定消费者队列和交换机 @Bean public Binding orderReleaseBinding() { return new Binding("order.release.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } }