MQ全称为Message Queue,即消息队列,RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
erlang:erlang语言是专门针对于开放高并发程序
AMQP:Advanced Message Queue 高级消息队列协议
开发中消息队列通常有如下应用场景
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间
MQ相当于一个中介,生产方通过MQ与消费方交换,它将应用程序关系进行解耦合
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ,Redis
1、使用简单,功能强大
2、基于AMQP协议
3、社区活跃,文档完善
4、高并发性能好,这主要益于erlang语言
5、Spring Boot默认继承了RabbitMQ
RabbitMQ的基本结构
组成部分说明如下:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和QueueExchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方Producer:消息生产者,即生产方客户端,生产方客户端将消费发送到MQConsumer:消息消费者,即消费方客户端,接收MQ转发的消息connection:连接channel:会话通道消息发布接收流程:
发送消息:
1、生产者和Broker建立TCP连接
2、生产者和Broker建立管道
3、生产者通过管道消息发送Broker,由Exchange将消息进行转发
4、Exchange将消息转发到指定的Queue(队列)
接收消息:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立管道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者
5、消费者接收消息
使用Docker安装RabbitMQ
# 试试rabbitmq版本,该版本包含了web控制页面 docker search rabbitmq:management # 拉去 docker pull rabbitmq:management # 配置镜像 docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management # 启用镜像 docker start rabbitmqRabbitMQ环境就搭建好了后,访问http://localhost:15672/#/查看控制页面;默认的用户名和密码都是guest,点击登录您将看到如下界面:
创建SpringBoot项目,在pom.xml文件中引入对应依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency>发送端操作流程:
1、创建连接
2、创建通道
3、声明队列
4、发送消息
package com.xuecheng.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ScholarTang * @Date 2020/10/9 3:09 下午 * @Desc rabbitmq入门程序 -- 消息生产者1 */ public class Producer { //定义队列 private static final String QUEUE = "HelloWorld"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//ip connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); /** * 声明队列(如果队列在mq中没有则会被创建) * * 参数信息:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments * 参数明细: * 1、queue:队列名称 * 2、durable:是否持久化;如果持久化,在mq重启后队列还在 * 3、exclusive:是否排他(是否独占连接),队列只允许在该连接中访问,如果连接关闭后队列也会被自动删除。如果将此参数设置为true,可用于临时队列 * 4、autoDelete:自动删除,队列不再使用时是否删除队列,如果将此参数和exclusive设置为true的话,可以实现临时队列(临时队列:队列不再使用时自动删除) * 5、arguments:参数,用于设置队列的扩展参数,比如:存活时间 */ channel.queueDeclare(QUEUE, true, false, false, null); /** * 发送消息 * * 参数信息:String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body * 参数明细: * 1、exchange:交换机,如果不指向使用的是mq默认的交换机("" 为空字符串表示不指向) * 2、routingKey:路由key,交换机根据路由key将消息发送到指定的队列。如果使用默认交换机,路由key则使用队列的名称 * 3、props:消息属性 * 4、body:消息内容 */ String message = "hello,这是一个消息"; channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("send to mq:" + message); //关闭连接 connection.close(); } }在执行main方法后,控制打印send to mq:hello,这是一个消息消息生产者会在指定的队列中生成一个待消费的消息,如下图所示
点击队列名称,查看队列中的消息
消费端操作流程:
1.创建连接
2.创建通道
3.声明队列
4.监听队列
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc rabbitmq入门程序 -- 消息消费端 */ public class Consumer { //定义队列名称 private static final String QUEUE = "HelloWorld"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE,true,defaultConsumer); /** * 消费者端是否需要关闭连接? * 不需要,消费者端需要保持连接,一直监听队列,并消费队列中的消息。 */ } }执行main方法后控制台会打印监听到的消息内容接收到的消息内容为:hello,这是一个消息。再看RabbitMQ管理页面查询状态,如下图所示:
如上图我们可以看到,刚刚生产者生产的消息已经被消费者消费掉了。
RabbitMQ的工作模式可以分为六种,详细如下:
1.WorkQueues:工作模式
2.Publish/subscribe:发布订阅模式
3.Routing:路由模式
4.Topics:通配符模式
5.Header:header转发器
6.RPC:远程过程调用
Work Queues模式与入门程序相比,多了一个消费者,两个消费端共同消费同一个队列中的消息,它的特点如下:
一个生产者将消息发送给一个队列,多个消费者共同监听一个队列的消息,消息不能被重复消费,RabbitMQ采用轮询的方式将消息发给每一个消费者。
Work Queues模式体验,我们可以在RabbitMQ入门程序上进行测试体验,只需要同时开启多个消费者端监听队列消费消息。
比方说现在开启了3个消费者端:
C1、C2、C3
当消息生产者第一次发送消息时,消息则会被C1消费掉(C2、C3没有接收到消息)
第二次则是C2消费掉了消息(C1、C3枚接收到消息),第三次时C3(C1,C2没有接收到消息)…依次轮询。
发布订阅模式: 1、一个生产者将消息发给交换机
2、与交换机绑定的有多个队列,每个消费者监听自己的队列
3、生产者将消息发给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
4、如果消息发给没有绑定队列的交换机上消息将丢失
1、publish/subscribe可以定义一个交换机绑定多个队列,一个消息可以发送给多个队列,被多个消费者消费
2、work queues无需定义交换机,一个消息一次只能发送给一个队列
3、publish/subscribe比work queues的功能强大,publish/subscribe也可以将多个消费者监听同一个队列实现work queues的功能
用一个案例来体现发布与定义模式的应用场景,案例:
用户通知,当用户充值成功或者转账完成系统通知用户,通知方式有短信、邮件多种方法。
案例最终达到的效果:
消息生产者生产一个消息,这个消息就相当于用户充值或者转账成功。接着消息被发送到交换机,交换机将消息转发到与其绑定的两个队列中(发送邮件的队列、发送短信的队列)当监听队列的消费者监听到消息则执行对应的业务代码。
code
消息生产者代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ScholarTang * @Date 2020/10/9 3:09 下午 * @Desc rabbitmq-publish/subscribe工作模式 消息生产者 */ public class ProducerPublish { //定义队列名称,表示用于接收电子邮件消息 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //定义队列名称,表示用于接收短信消息 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //定义交换机名称 private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); /** * 声明通知电子邮件队列、声明通知短信队列 * 参数信息:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments * 参数明细: * 1、queue:队列名称 * 2、durable:是否持久化;如果持久化,在mq重启后队列还在 * 3、exclusive:是否排他(是否独占连接),队列只允许在该连接中访问,如果连接关闭后队列也会被自动删除。如果将此参数设置为true,可用于临时队列 * 4、autoDelete:自动删除,队列不再使用时是否删除队列,如果将此参数和exclusive设置为true的话,可以实现临时队列(临时队列:队列不再使用时自动删除) * 5、arguments:参数,用于设置队列的扩展参数,比如:存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 定义换换机 * * 方法参数说明: * String exchange, BuiltinExchangeType type * 1、exchange:交换机名称 * 2、type:交换机类型 * DIRECT:对应的是Routing工作模式 * FANOUT :对应的rabbitmq的工作类型是 publish/subscribe工作模式 * TOPIC:对应的是Topics工作模式 * HEADERS:对应的是Header工模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); /** * 交换机和队列绑定 * 方法参数说明: * String queue, String exchange, String routingKey * 1、queue:队列名称 * 2、exchange:交换机名称 * 3、routingKey:路由key,它的作用是交换机的根据路由可以转发到指定的队列,在发布订阅模式中设置空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, ""); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, ""); /** * 发送消息 * * 参数信息:String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body * 参数明细: * 1、exchange:交换机,如果不指向使用的是mq默认的交换机("" 为空字符串表示不指向) * 2、routingKey:路由key,交换机根据路由key将消息发送到指定的队列。如果使用默认交换机,路由key则使用队列的名称 * 3、props:消息属性 * 4、body:消息内容 */ for (int i = 0; i < 10; i++) { String message = "hello,这是一个消息" + i; channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes()); System.out.println("send to mq:" + message); } //关闭会话通道 channel.close(); //关闭连接 connection.close(); } }消息消费者_email,在本业务场景中是用于监听和消费发送邮件的队列的消息
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc rabbitmq-publish/subscribe工作模式 消息消费者端 */ public class ConsumerSubscribeEmail { //定义队列名称,表示用于接收电子邮件消息 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //定义交换机名称 private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }消息消费者_sms,在本业务场景中是用于监听和消费发送短信的队列中的消息
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc rabbitmq入门程序 -- 消息消费端 */ public class ConsumerSubscribeSms { //定义队列名称,表示用于接收短信消息 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //定义交换机名称 private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }依次执行消息生产者端、消息消费者端_email、消息消费者端_sms的main方法,您将会在控制台看到如下这一幕:
// 消息生产者向交换机中发送了10条消息 send to mq:hello,这是一个消息0 send to mq:hello,这是一个消息1 send to mq:hello,这是一个消息2 send to mq:hello,这是一个消息3 send to mq:hello,这是一个消息4 send to mq:hello,这是一个消息5 send to mq:hello,这是一个消息6 send to mq:hello,这是一个消息7 send to mq:hello,这是一个消息8 send to mq:hello,这是一个消息9 //交换机将消息转发到与之绑定的队列中,这里有QUEUE_INFORM_EMAIL、QUEUE_INFORM_SMS两个队列 //监听QUEUE_INFORM_EMAIL队列的消费者消费了队列中的10条消息 接收到的消息内容为:hello,这是一个消息0 接收到的消息内容为:hello,这是一个消息1 接收到的消息内容为:hello,这是一个消息2 接收到的消息内容为:hello,这是一个消息3 接收到的消息内容为:hello,这是一个消息4 接收到的消息内容为:hello,这是一个消息5 接收到的消息内容为:hello,这是一个消息6 接收到的消息内容为:hello,这是一个消息7 接收到的消息内容为:hello,这是一个消息8 接收到的消息内容为:hello,这是一个消息9 //监听QUEUE_INFORM_SMS队列的消费者消费者了队列中的10条消息 接收到的消息内容为:hello,这是一个消息0 接收到的消息内容为:hello,这是一个消息1 接收到的消息内容为:hello,这是一个消息2 接收到的消息内容为:hello,这是一个消息3 接收到的消息内容为:hello,这是一个消息4 接收到的消息内容为:hello,这是一个消息5 接收到的消息内容为:hello,这是一个消息6 接收到的消息内容为:hello,这是一个消息7 接收到的消息内容为:hello,这是一个消息8 接收到的消息内容为:hello,这是一个消息9再打开RabbitMQ管理页面查看
1.查看连接通道
2.查看交换机
1.点击交换机(Exchanges)查看所有的交换机列表
2.根据交换机名称找到刚刚创建的交换机,点击进去查看交换机详情
3.点开Bindings隐藏项查看与该交换机绑定的队列信息
注意:
这时候会有一种特殊的情况。如上案例,如果有一个队列被多个消费者监听呢?
那这个队列的消息将会被这些消费者轮询消费掉。
路由模式:
1、一个交换机绑定多个队列,每个队列设置routingKey,并且一个队列可以设置多个routingKey
2、每个消费者监听自己的队列
3、生产者将消息发给交换机,发送消息时需要指定routingKey的值,交换机来判断改routingKey的值和那个队列的routingKey相等,如果相等则将消息转发给该队列。
1、publish/subscribe模式在绑定交换机时不需要指定routingKey,消息会发送到每一个绑定交换机的队列
2、Routing模式要去在绑定交换机时需要指定routingKey(这就是队列的routingKey),发送消息时指定消息的routingKey,交换机将消息转发到对应routingKey的队列中
3、Routing模式完全可以实现Publish/subscribe发布订阅模式的功能,它比发布订阅模式更加强大。
还是用户通知这个需求为例:当用户充值成功或者转账完成系统通知用户,通知方式有短信、邮件多种方法。
使用Routing工作模式去实现,最终达到的效果:
根据业务需求,在该场景中会有两个队列,分别用来接收和暂存短信通知和邮件通知的消息。在队列与交换机进行绑定的时候为每个队列绑定routingKey,当消息发送者在发送消息时指定消息的routingKey,消息被发送到交换机,交换机根据消息的routingKey与其绑定的队列的routingKey进行匹配,如果相等则将消息转发到对应的队列,这时候监听队列的消费者,监听到消息就执行相关的业务代码
代码部分
消息发送者
package com.xuecheng.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ScholarTang * @Date 2020/10/9 3:09 下午 * @Desc rabbitmq-Routing工作模式 消息生产者 */ public class ProducerRouting { //接收邮件消息的队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //邮件消息队列与交换机绑定时指定的第一个routingKey:routing_key_email_one private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE = "routing_key_email_one"; //邮件消息队列与交换机绑定时指定的第二个routingKey:routing_key_email_two private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO = "routing_key_email_two"; //接收短信消息的队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //短信消息队列与交换机绑定时指定的routingKey:routing_key_sms private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "routing_key_sms"; //定义交换机名称 private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); /** * 声明通知电子邮件队列、声明通知短信队列 * 参数信息:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments * 参数明细: * 1、queue:队列名称 * 2、durable:是否持久化;如果持久化,在mq重启后队列还在 * 3、exclusive:是否排他(是否独占连接),队列只允许在该连接中访问,如果连接关闭后队列也会被自动删除。如果将此参数设置为true,可用于临时队列 * 4、autoDelete:自动删除,队列不再使用时是否删除队列,如果将此参数和exclusive设置为true的话,可以实现临时队列(临时队列:队列不再使用时自动删除) * 5、arguments:参数,用于设置队列的扩展参数,比如:存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 定义换换机 * * 方法参数说明: * String exchange, BuiltinExchangeType type * 1、exchange:交换机名称 * 2、type:交换机类型 * DIRECT:对应的是Routing工作模式 * FANOUT :对应的rabbitmq的工作类型是 publish/subscribe工作模式 * TOPIC:对应的是Topics工作模式 * HEADERS:对应的是Header工模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** * 队列和交换机绑定,在绑定时可以设置其routingKey(一个队列可以设置对歌routingKey) * 方法参数说明: * String queue, String exchange, String routingKey * 1、queue:队列名称 * 2、exchange:交换机名称 * 3、routingKey:路由key,它的作用是交换机的根据路由可以转发到指定的队列,在发布订阅模式中设置空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE); channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY); /** * 发送消息 * * 参数信息:String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body * 参数明细: * 1、exchange:交换机,如果不指向使用的是mq默认的交换机("" 为空字符串表示不指向) * 2、routingKey:路由key,交换机根据路由key将消息发送到指定的队列。如果使用默认交换机,路由key则使用队列的名称 * 3、props:消息属性 * 4、body:消息内容 */ for (int i = 0; i < 10; i++) { String message = "hello,这是一个消息" + i; channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE, null, message.getBytes()); channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY, null, message.getBytes()); System.out.println("send to mq:" + message); } //关闭会话通道 channel.close(); //关闭连接 connection.close(); } }执行消息发送端的main方法,idea控制台打印发送的消息内容。我们再看一下RabbitMQ管理页面,如下图所示:
如图所示:
1.点击交换机(Exchanges)查看所有的交换机列表
2.根据交换机名称找到刚刚创建的交换机,点击进去查看交换机详情
3.点开Bindings隐藏项查看与该交换机绑定的队列信息
4.可以看到队列有指定的routingKey(一个队列可以有多个routingKey)
再点击Queue按钮,看一下队列信息,如下图所示:
其实不难看出,在消息发送端执行后,交换机根据消息的routingKey与其绑定的队列的routingKey进行匹配,消息转发到了对应的队列中,这就得到queue_inform_email和queue_inform_sms队列中各有10条消息,符合结果预期,此时就差消费者端来消费消息了,编写消费者端。
消费者端_email,用于监听和消费邮件通知队列中的消息
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/21 12:35 下午 * @Desc 消费者端_email,用于监听和消费邮件通知队列中的消息 */ public class ConsumerRoutingEmail { //接收邮件消息的队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //邮件消息队列与交换机绑定时指定的第一个routingKey:routing_key_email_one private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE = "routing_key_email_one"; //邮件消息队列与交换机绑定时指定的第二个routingKey:routing_key_email_two private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO = "routing_key_email_two"; //定义交换机名称 private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }消息消费者_sms,监听及消费短信通知队列中的消息
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/21 12:35 下午 * @Desc 消息消费者_sms,监听及消费短信通知队列中的消息 */ public class ConsumerRoutingSms { //接收短信消息的队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //短信消息队列与交换机绑定时指定的routingKey:routing_key_sms private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "routing_key_sms"; //定义交换机名称 private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS_ROUTING_KEY); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }执行两个消费者段的main方法,消费各自监听的队列中的消息。
控制台打印
//ConsumerRoutingEmail 接收到的消息内容为:hello,这是一个消息0 接收到的消息内容为:hello,这是一个消息1 接收到的消息内容为:hello,这是一个消息2 接收到的消息内容为:hello,这是一个消息3 接收到的消息内容为:hello,这是一个消息4 接收到的消息内容为:hello,这是一个消息5 接收到的消息内容为:hello,这是一个消息6 接收到的消息内容为:hello,这是一个消息7 接收到的消息内容为:hello,这是一个消息8 接收到的消息内容为:hello,这是一个消息9 //ConsumerRoutingSms 接收到的消息内容为:hello,这是一个消息0 接收到的消息内容为:hello,这是一个消息1 接收到的消息内容为:hello,这是一个消息2 接收到的消息内容为:hello,这是一个消息3 接收到的消息内容为:hello,这是一个消息4 接收到的消息内容为:hello,这是一个消息5 接收到的消息内容为:hello,这是一个消息6 接收到的消息内容为:hello,这是一个消息7 接收到的消息内容为:hello,这是一个消息8 接收到的消息内容为:hello,这是一个消息9再看RabbitMQ管理页面,如下图所示,两个队列中的消息都已经被消费掉了
Topics通配符工作模式: 1、一个交换机可以绑定多个队列,每个队列可以设置一个或多个通配符routingKey
2、生产者将消息发给交换机,交换机根据routingKey的值来匹配队列,匹配时采用通配符方式,匹配成功将消息转发到指定的队列
Topics和Routing的基本原理相同,即:生产者将消息发给交换机,交换机根据routingKey将消息转发给与routingKey匹配的队列
不同点的是:routingKey的匹配方式。Routing模式是相等匹配,Topics模式是通配符匹配
//通配符模式匹配 1、符号 # :匹配一个或者多个词,比如inform.#,可以匹配inform.sms、inform.email、inform.email.sms 2、符号 * :只能匹配一个词,比如inform.*,可以匹配inform.sms、inform.email案例: 根据用户的通知去设置去通知用户,设置接收Email的用户只能接收Email,设置接收sms的用户只能接收sms,设置两种通知类型都接收的则两种通知都有效
code
消息生产者程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ScholarTang * @Date 2020/10/21 3:09 下午 * @Desc rabbitmq-topics工作模式 消息生产者 */ public class ProducerTopics { //接收邮件消息的队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //接收短信消息的队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //邮件消息队列与交换机绑定时指定的通routingKey配符 private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#"; //短信消息队列与交换机绑定时指定的routingKey通配符 private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#"; //定义交换机名称 private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); /** * 声明通知电子邮件队列、声明通知短信队列 * 参数信息:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments * 参数明细: * 1、queue:队列名称 * 2、durable:是否持久化;如果持久化,在mq重启后队列还在 * 3、exclusive:是否排他(是否独占连接),队列只允许在该连接中访问,如果连接关闭后队列也会被自动删除。如果将此参数设置为true,可用于临时队列 * 4、autoDelete:自动删除,队列不再使用时是否删除队列,如果将此参数和exclusive设置为true的话,可以实现临时队列(临时队列:队列不再使用时自动删除) * 5、arguments:参数,用于设置队列的扩展参数,比如:存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 定义换换机 * * 方法参数说明: * String exchange, BuiltinExchangeType type * 1、exchange:交换机名称 * 2、type:交换机类型 * DIRECT:对应的是Routing工作模式 * FANOUT :对应的rabbitmq的工作类型是 publish/subscribe工作模式 * TOPIC:对应的是Topics工作模式 * HEADERS:对应的是Header工模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); /** * 队列和交换机绑定,在绑定时可以设置其routingKey(一个队列可以设置对歌routingKey) * 方法参数说明: * String queue, String exchange, String routingKey * 1、queue:队列名称 * 2、exchange:交换机名称 * 3、routingKey:路由key,它的作用是交换机的根据路由可以转发到指定的队列,在发布订阅模式中设置空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY); /** * 发送消息 * * 参数信息:String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body * 参数明细: * 1、exchange:交换机,如果不指向使用的是mq默认的交换机("" 为空字符串表示不指向) * 2、routingKey:路由key,交换机根据路由key将消息发送到指定的队列。如果使用默认交换机,路由key则使用队列的名称 * 3、props:消息属性 * 4、body:消息内容 */ //只发送邮件消息 // for (int i = 0; i < 10; i++) { // String message = "这是一个邮件消息" + i; // channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes()); // System.out.println("send to mq:" + message); // } //只发送短信消息 // for (int i = 0; i < 10; i++) { // String message = "这是一个短信消息" + i; // channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes()); // System.out.println("send to mq:" + message); // } //发送邮件和短信消息 for (int i = 0; i < 10; i++) { String message = "同时发送了邮件和短信消息:" + i; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email.sms", null, message.getBytes()); System.out.println("send to mq:" + message); } //关闭会话通道 channel.close(); //关闭连接 connection.close(); } }同样的运行消息生产者程序,发送的消息内容会在控制台打印,再回到RabbitMQ管理页面查看,根据代码中定义的交换机名字找到被创建的的交换机,点进去可以看到交换机绑定了两个队列,队列分别制定了不同的routingKey通配符规则,如下图所示:
回到代码中再来看,在发送消息的时候指定消息的routingKey,当消息被发送到交换机后,交换机会根据消息的routingKey与其绑定的队列的routingKey进行匹配,匹配上了则转发到这个队列中。然后,队列中的消息有监听队列的消费者消费掉
消费者端_email程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc 消息消费端_email */ public class ConsumerTopicsEmail { //定义队列名称,用于通知电子邮件 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //定义交换机名称 private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; //邮件消息队列与交换机绑定时指定的通routingKey配符 private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }消费者端_sms程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc 消费者端_sms */ public class ConsumerTopicsSms { //定义队列名称,用于通知电子邮件 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //定义交换机名称 private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; //邮件消息队列与交换机绑定时指定的通routingKey配符 private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); /** * 将队列绑定到交换机 */ channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,QUEUE_INFORM_SMS_ROUTING_KEY); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }运行两个消费者程序,如果消息生产者发送的消息是邮件消息,则消息会被监听邮件的消费者消费掉。
或者消息生产者发送的消息是短信消息,对应的消息会被监听短信的消费者消费掉。
如果消息生产者同时发送(消息的routingKey可以同时被两个队列的routingKey通配符匹配)了邮件和短信消息,那么两个队列中都有有一样的消息,消息由各自监听的消费者进行消费。
Header模式与routing路由模式不同的地方在于,header模式取消了routingKey,使用header中的key/value(键值对)匹配队列。
还是用topics工作模式的样例,header工作模式与routing、topics工作模式之间也是换汤不换药的操作,routing模式时在交换机中拿消息的routingKey和队列的routingKey进行比较,相等则转发到队列。topics是在routing之上加了一个匹配规则,而header再弃用了routingKey使用了一个键值对的形式,在发送消息的时候为消息设置一个键值对,交换机根据消息的键值对去与其绑定的队列进行匹配,相等则转发到这个队列中,最后监听队列的消费者将队列中的消息消费掉。
案例: 根据用户的通知去设置去通知用户,设置接收Email的用户只能接收Email,设置接收sms的用户只能接收sms,设置两种通知类型都接收的则两种通知都有效
Code
消息生产者程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; /** * @Author ScholarTang * @Date 2020/10/21 3:09 下午 * @Desc rabbitmq-header工作模式 消息生产者 */ public class ProducerHeader { //接收邮件消息的队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //接收短信消息的队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //定义交换机名称 private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); /** * 声明通知电子邮件队列、声明通知短信队列 * 参数信息:String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments * 参数明细: * 1、queue:队列名称 * 2、durable:是否持久化;如果持久化,在mq重启后队列还在 * 3、exclusive:是否排他(是否独占连接),队列只允许在该连接中访问,如果连接关闭后队列也会被自动删除。如果将此参数设置为true,可用于临时队列 * 4、autoDelete:自动删除,队列不再使用时是否删除队列,如果将此参数和exclusive设置为true的话,可以实现临时队列(临时队列:队列不再使用时自动删除) * 5、arguments:参数,用于设置队列的扩展参数,比如:存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 定义换换机 * * 方法参数说明: * String exchange, BuiltinExchangeType type * 1、exchange:交换机名称 * 2、type:交换机类型 * DIRECT:对应的是Routing工作模式 * FANOUT :对应的rabbitmq的工作类型是 publish/subscribe工作模式 * TOPIC:对应的是Topics工作模式 * HEADERS:对应的是Header工模式 */ channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS); /** * 队列和交换机绑定,在绑定时可以设置其routingKey(一个队列可以设置对歌routingKey) * 方法参数说明: * String queue, String exchange, String routingKey,Map<String, Object> arguments * 1、queue:队列名称 * 2、exchange:交换机名称 * 3、routingKey:路由key,它的作用是交换机的根据路由可以转发到指定的队列,在发布订阅模式中设置空字符串 * 4、arguments:参数key/value键值对 */ Map<String, Object> emailMap = new HashMap<>(); emailMap.put("inform.email","email"); Map<String, Object> smsMap = new HashMap<>(); smsMap.put("inform.sms","sms"); channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_HEADER_INFORM, "",emailMap); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_HEADER_INFORM, "",smsMap); /** * 发送消息 * * 参数信息:String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body * 参数明细: * 1、exchange:交换机,如果不指向使用的是mq默认的交换机("" 为空字符串表示不指向) * 2、routingKey:路由key,交换机根据路由key将消息发送到指定的队列。如果使用默认交换机,路由key则使用队列的名称 * 3、props:消息属性 * 4、body:消息内容 */ //发送邮件消息 for (int i = 0; i < 10; i++) { Map<String, Object> map = new HashMap<>(); map.put("inform.email","email"); String message = "发送邮件消息:" + i; AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(map); channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes()); System.out.println("send to mq:" + message); } //发送短信消息 for (int i = 0; i < 10; i++) { Map<String, Object> map = new HashMap<>(); map.put("inform.sms","sms"); String message = "发送短信消息:" + i; AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(map); channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes()); System.out.println("send to mq:" + message); } //发送邮件和短信消息 for (int i = 0; i < 10; i++) { Map<String, Object> map = new HashMap<>(); map.put("inform.email","email"); map.put("inform.sms","sms"); String message = "同时发送了邮件和短信消息:" + i; AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(map); channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes()); System.out.println("send to mq:" + message); } //关闭会话通道 channel.close(); //关闭连接 connection.close(); } }消费者端_email程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc 消费者端_email */ public class ConsumerHeaderEmail { //定义队列名称,用于通知电子邮件 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //定义交换机名称 private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS); /** * 将队列绑定到交换机 */ Map<String, Object> smsMap = new HashMap<>(); smsMap.put("inform.email","email"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADER_INFORM,"",smsMap); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }消费者端_sms程序代码
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @Author ScholarTang * @Date 2020/10/10 12:35 下午 * @Desc 消费者端_sms */ public class ConsumerHeaderSms { //接收短信消息的队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //定义交换机名称 private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接,与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setVirtualHost("/");//虚拟机ip connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 //新建连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 <生产者和mp服务的所有通信都是在channel通道中完成> Channel channel = connection.createChannel(); /** * 创建队列 */ channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** * 创建交换机 */ channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS); /** * 将队列绑定到交换机 */ Map<String, Object> smsMap = new HashMap<>(); smsMap.put("inform.sms","sms"); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADER_INFORM,"",smsMap); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当监听到消息后,此方法会被调用 * @param consumerTag 消费者标签,用于标识消费者,在监听队列的时候可以设置 * @param envelope 信封,可以通过信封获取交换机,消息ID等(消息ID,mq在channel通道中用来标识消息的ID,可以用于消息是否被接收) * @param properties 消息属性(发送端中设置的消息属性) * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息内容为:" + new String(body, "utf-8")); } }; /** * 监听队列 * 参数信息:String queue, boolean autoAck, Consumer callback * 参数明细: * 1、queue:队列 * 2、autoAck:自动回复,当消费者接收到消息后告诉mq消息已接收,如果将此参数设置为true表示自动回复,如果设置为false需要编程实现回复 * 3、callback:消费方法,当消费者接收到消息需要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }RPC工作模式: RPC即客户端远程调用服务端的方法,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下: 1、客户端即是生产者也是消费者,向RPC请求队列中发送RPC调用消息,同时监听RPC响应队列
2、服务端监听RPC请求队列消息,收到消息后执行服务端的方法,得到方法的返回结果
3、服务器将RPC方法的结果发送到RPC响应队列
案例:
就以结构图为例,模拟一个客户端给服务端啊发送消息,服务端接收消息并回复
Code
RpcClient
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; /** * @Author ScholarTang * @Date 2020/10/21 8:18 下午 * @Desc rabbitmq_rpc模式 客户端 */ public class RpcClient { //定义rcp队列名称 private static final String QUEUE_INFORM_RPC = "queue_inform_rpc"; //定义rpc响应队列名称 private static final String QUEUE_INFORM_RPC_RETURN = "queue_inform_rpc_return"; public static void main(String[] args) throws Exception{ //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); //创建新的通道,用这个通道与rpc响应队列保存连接 Channel channelByRpcReturn = connection.createChannel(); //声明rpc队列 channel.queueDeclare(QUEUE_INFORM_RPC, true, false, false, null); //声明rpc响应队列,这个操作是保证这个队列是存在的 channelByRpcReturn.queueDeclare(QUEUE_INFORM_RPC_RETURN, true, false, false, null); sendMessage(channel,QUEUE_INFORM_RPC); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channelByRpcReturn){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("服务端响应:" + new String(body, "utf-8")); System.out.println(); sendMessage(channel,QUEUE_INFORM_RPC); } }; //消费rpc响应队列中的消息 channelByRpcReturn.basicConsume(QUEUE_INFORM_RPC_RETURN,true,defaultConsumer); } public static void sendMessage(Channel channel,String queue) throws IOException { System.out.print("给服务器端发送消息:"); Scanner scanner = new Scanner(System.in); String message = scanner.next(); System.out.println("客户端:" + message); //向rpc队列发送消息 channel.basicPublish("",queue,null,message.getBytes()); } }RpcServer
package com.xuecheng.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; /** * @Author ScholarTang * @Date 2020/10/21 8:38 下午 * @Desc rabbitmq_rpc模式 服务端 */ public class RpcServer { //定义rcp队列名称 private static final String QUEUE_INFORM_RPC = "queue_inform_rpc"; //定义rpc响应队列名称 private static final String QUEUE_INFORM_RPC_RETURN = "queue_inform_rpc_return"; public static void main(String[] args) throws Exception { //通过连接工厂创建新的连接与MQ建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //配置连接参数 connectionFactory.setHost("127.0.0.1");//主机ip。mq服务所在的设备的ip地址 connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest");//用户名 connectionFactory.setPassword("guest");//密码 connectionFactory.setVirtualHost("/");//虚拟机。一个mq服务可以有多个虚拟机,每个虚拟机都是一个独立的mq //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道 Channel channel = connection.createChannel(); //创建新的通道,用这个通道与与rpc响应队列保存联系 Channel channelByRpcReturn = connection.createChannel(); //声明rpc队列 channel.queueDeclare(QUEUE_INFORM_RPC, true, false, false, null); //声明rpc响应队列,这个操作是保证这个队列是存在的 channelByRpcReturn.queueDeclare(QUEUE_INFORM_RPC_RETURN, true, false, false, null); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到客户端发过来的消息:" + new String(body, "utf-8")); sendMessage(channelByRpcReturn,QUEUE_INFORM_RPC_RETURN); } }; //消费rpc响应队列中的消息 channelByRpcReturn.basicConsume(QUEUE_INFORM_RPC, true, defaultConsumer); } public static void sendMessage(Channel channel,String queue) throws IOException { System.out.print("给客户端响应消息:"); Scanner scanner = new Scanner(System.in); String message = scanner.next(); System.out.println("服务端:" + message); //向rpc队列发送消息 channel.basicPublish("",queue,null,message.getBytes()); } }最终达到的效果:
为了测试的时候更直观的看到消息,所有在搭建工程的时候同样的也是需要搭建一个消息消费端和消息生产端
我们在消息生产端和消息消费端都使用这个配置类,这样可以确保无论是那一方先起来,这些队列和交换机都会被创建
RabbitMqConfig.java
package com.xuecheng.test.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author ScholarTang * @Date 2020/10/22 10:36 上午 * @Desc rabbitmq配置类 */ @Configuration public class RabbitMqConfig { //TODO 通常这些信息都是写在配置内中的,这里为了方便直接定义成静态常量 //接收邮件消息的队列名称 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; //接收短信消息的队列名称 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //邮件消息队列与交换机绑定时指定的通routingKey配符 public static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#"; //短信消息队列与交换机绑定时指定的routingKey通配符 public static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#"; //定义交换机名称 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; /** * 声明交换机 * @return */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } /** * 声明队列-邮箱队列 * @return */ @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } /** * 声明队列-短信队列 * @return */ @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_EMAIL); } /** * 队列绑定交换机-邮件队列绑定交换机 * @param queue 队列实例 * @param exchange 交换机实例 * @return */ @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_EMAIL_ROUTING_KEY).noargs(); } /** * 队列绑定交换机 * @param queue * @param exchange * @return */ @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_SMS_ROUTING_KEY).noargs(); } }我们使用RabbitTemplate来发送消息,这里我直接在SpringBoot测试类中来完成发送消息的操作
/** * @Author ScholarTang * @Date 2020/10/22 11:00 上午 * @Desc SpringBoot程序测试类 */ @RunWith(SpringRunner.class) @SpringBootTest public class TestRabbitMqApplicationTest { /** * 注入rabbitTemplate,使用rabbitTemplate来发送消息 */ @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { String message = "这是一个邮件消息"; /** * 发送消息 * 方法参数说明:String exchange, String routingKey, Object message * 1、exchange:交换机 * 2、routingKey:路由Key * 3、message:消息内容 */ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_TOPICS_INFORM, RabbitMqConfig.QUEUE_INFORM_EMAIL_ROUTING_KEY, message); } }这里我将消费者端定义成了一个Bean在SpringBoot程序启动时就会被Spring容器管理起来,在类中使用一个注解来监听队列中的消息,再消费消息
package com.xuecheng.test.rabbitmq.mq; import com.xuecheng.test.rabbitmq.config.RabbitMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Author ScholarTang * @Date 2020/10/22 11:38 上午 * @Desc 消息消费者 */ @Slf4j @Component public class ReceiveHandler { /** * @RabbitListener 注解用来监听队列 * @param message */ @RabbitListener(queues = {RabbitMqConfig.QUEUE_INFORM_EMAIL}) public void consumer(String message){ log.info("接收到了消息:{}", message); } }1、先启动消息消费端的SpringBoot程序,SpringBoot扫描bean将其加载到Spring容器,此时会创建队列和交换机,并监听队列,当队列中有消息就会被消费
2、运行消息生产端的测试方法发送消息
3、消息消费端接收到消息
