首先看下如下代码片段:
channle.exchangeDeclare("exchange_name", BuiltinExchangeType.DIRECT, true, false, null); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "exchange_name", "routing_key", null);本文会围绕exchangeDeclare、queueDeclare、queueBind三个方法展开源码分析。
作用:声明一个交换机。
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; exchange:交换器的名字。type:交换器的类型。durable:是否持久化。服务器重启时,该交换器是否存活。autoDelete:是否自动删除。交换器不再使用时,是否自动删除该交换器。(自动删除的前提是至少有一个队列或者交换器与该交换器绑定,之后所有与该交换器绑定的队列或者交换器解绑时,进行自动删除该交换器)arguments:用于交换器的结构化参数。 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; internal:是否是内置的。如果是内置的,客户端程序无法直接发送消息到该交换器,只能通过交换器路由到该交换器这种方式。 Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;接下来围绕这个方法展开源码分析。
先来看下recordExchange方法的处理逻辑。
构建了一个RecordedExchange实例,用于保存交换器的一些属性参数。
可见,将交换器的名字与它的参数属性进行绑定,放到一个Map里。
接着看 delegate.exchangeDeclare(…)方法。
delegate:RecoveryAwareChannelN类型发起了一次伪RPC调用。
交给了AMQChannel进行处理。(后面有分析)
作用:声明一个队列。
有如下三种重载的方法:
exclusive:是否为排他。针对一个连接而言。(即使排他队列是持久化的,关联的连接一旦关闭或者客户端退出,该排他队列会自动被删除。适用于客户端同时负责发送、读取消息的场景。)接下来看下它的源码。
可以看出queueDeclare()方法声明的队列是非持久化、排他、自动删除的。
与exchangeDeclare方法的处理逻辑类似。这里不再展开分析。
作用:将队列绑定到交换器上。
同样,有如下几种重载的方法:
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; queue:队列的名字。exchange:交换器的名字。routingKey:用于绑定的路由键。 Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;接下来接着分析它的源码。
validateQueueNameLength(…):要求队列名字的长度<=255。具体的处理跟exchangeDeclare的方式类似。这里不再展开分析了。
发布一条消息。
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; props:额外的属性。BasicProperties提供如下属性:
也支持builder链式方式传递参数。
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; mandatory:true:表示交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。false:出现上述情形,则消息直接被丢弃。 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; immediate:true:交换器将消息路由到队列时,发现队列上并不存在消费者,那么这条消息也不会存入队列中。当与路由匹配的所有队列都没有消费者时,该消息会通过Basic.Return命令返回给生产者。接下来分析下源码。
委派给ChannelN。
构造一个AMQCommand实例,然后调用transmit方法。
确保AMQChannel处于开放状态。 然后发起一次伪RPC调用。
(这两个方法在下面的AMQChannel中都有分析到,这里不再详细分析)
对basicPublish(mandatory或者immediate参数设置为true)被调用时,对发送失败的通知。
ReturnListener接口定义如下:
另外一种是添加一个基于lambda的ReturnListener。
ReturnCallback接口定义如下:
Return提供的属性如下:
确保AMQChannel处于开放状态。
monitor:普通的object对象。shutdownCause:ShutdownSignalException。先来看enqueueRpc方法的逻辑。
对原有的RpcContinuation进行封装。然后调用doEnqueueRpc方法。
核心是Supplier#get()方法。
对Method对象进行封装。然后调用quiescingTransmit方法。
获取channel编号、关联的AMQConnection。
AMQCommand -> AMQChannel -> AMQConnection。
实质上是从CommandAssembler获取相关的Frame,然后写入到数据流中。
核心是connection.writeFrame(…)方法。
将Frame写入到流中。
写数据的细节实现。
更新活跃时间。
刷新写入的Frame。
对流进行刷新。