通常的,在Web应用中,客户端发起HTTP请求,访问服务接口,服务端进行处理,并反馈消息,通信即告结束。 但是,当服务端处理过程较长时,情况就会变得复杂。例如,客户端会考虑添加超时机制,即超过指定时间还没有得到服务端反馈时,将会强制终止连接,并认为远程服务不可用。 消息队列(Message Queue)通常作为中间件,通过异步的机制,来解决上述这些实时性要求不高、且耗时较长的接口通信问题。 其优势是,大大缩短客户端的通信等待时间,且能够将请求稳妥地交付给服务端,保证请求的正常执行。 在用法上,通常会独立地创建一个消息队列服务,所有的生产者(客户端和Web应用)都可以往队列中发送消息,同时所有的消费者(Web应用)都可以侦听并读取消息,并进行后续处理。 在这种模式下,消息队列是一个公共的消息中转站,多个应用之间可以实现数据的互通。这种方式下,各个应用与消息队列服务之间是通过TCP协议建立长连接,而并非HTTP。 相对于公共的消息队列服务,在有些场合,也需要采用异步机制,但无需在多个应用之间互通。例如:导出数据记录并保存为excel、执行图片识别任务等。在这些场合下,是否可以考虑不部署单独的消息队列服务,而是嵌入到当前应用中。
官方参考:http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html。
源代码:WEB-INF/mq-broker.xml,该文件在web.xml中被引用。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.11.0.xsd"> <!-- Create an embedded ActiveMQ Broker --> <!-- http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html --> <amq:broker brokerName="embedded" dataDirectory="./data" useJmx="false" persistent="true"> <amq:transportConnectors> <amq:transportConnector name="openwire" uri="tcp://localhost:12345" /> </amq:transportConnectors> <amq:plugins> <amq:simpleAuthenticationPlugin> <amq:users> <amq:authenticationUser username="admin" password="774rfv4" groups="admins,publishers,consumers"/> <amq:authenticationUser username="publisher" password="883edc3" groups="publishers,consumers"/> <amq:authenticationUser username="consumer" password="992wsx2" groups="consumers"/> <amq:authenticationUser username="guest" password="001qaz1" groups="guests"/> </amq:users> </amq:simpleAuthenticationPlugin> </amq:plugins> </amq:broker> <import resource="mq.xml"/> </beans>其中需要注意的是, ① 消息队列代理服务由框架来创建,无需代码。 ② 在配置中指定不同的账户和群组,而不是采用系统默认的admin/admin。 ③ 该消息队列支持持久化。
源代码:WEB-INF/mq.xml,该文件在mq-broker.xml中被引用,用来建立与消息队列服务的连接。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd"> <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/myJMS/ConnectionFactory"></property> </bean> <!-- P2P模式 --> <bean id="messageQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/myJMS/MessageQueue"></property> </bean> <!-- ### 当代码结构改变时需修改此处类路径 ###--> <bean id="messageDispatcher" class="com.foo.app1.integration.MessageDispatcher"> </bean> <bean id="receiveMessageListener" class="com.foo.app1.integration.ReceiveMessageListener"> <property name="messageDispatcher" ref="messageDispatcher"></property> </bean> <bean id="listenerContainerQueue" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="destination" ref="messageQueue"></property> <property name="messageListener" ref="receiveMessageListener"></property> </bean> <!-- P2P模式 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="messageQueue"></property> </bean> </beans>其中, ① 使用JNDI定义的消息队列连接工厂(connectionFactory)和消息队列(messageQueue)。 ② 定义了一个消息侦听器ReceiveMessageListener, ③ 定义的消息发送模板jmsTemplate。
①中的JNDI资源定义如下: 源代码:conf/server.xml,其中的连接信息和账户信息与创建消息队列代理服务(mq-brokder.xml)的一致。
<Context> …… <Resource name="myJMS/ConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:12345" brokerName="embedded" userName="consumer" password="992wsx2"/> <Resource name="myJMS/MessageQueue" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" description="Foo Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="Foo.Queue"/> </Context>消息发送的思路是调用消息发送模板jmsTemplate的发送方法(“send”)即可。 源代码:MessageSenderService.java
this.jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg.toString(TAG0)); return textMessage; } });消息接收的思路是通过消息侦听器ReceiveMessageListener来捕获。 源代码:ReceiveMessageListener.java
public class ReceiveMessageListener implements MessageListener { …… @Override public void onMessage(Message message) { final String TAG0 = Module_Tag+"::onMessage()"; if (message instanceof TextMessage) { //文本消息 final TextMessage msg = (TextMessage) message; String text = null; try { text = msg.getText(); } catch(JMSException e) { this.logger.E(TAG0, e.getMessage() ); } …… } } …… };源代码:pom.xml
<!-- 消息队列组件 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-jaas</artifactId> <version>5.11.4</version> </dependency>Spring框架的版本为:4.3.25.RELEASE。
虽然官方的指导中并没有涉及在SpringBoot框架下如何实现嵌入式的消息队列代理服务, 但是按照 Spring MVC—>Spring Boot 可以遵照“XML配置代码化”的基本原则,姑且试试。
对于Bean组件,Spring Boot中可以通过配置组件(“@Configuration”注解)来创建。但是明显的,消息队列服务的创建过程不是一个Bean就完事了。 在Spring MVC框架中,通过XML来实现创建,而在Spring Boot中则可以在main函数中动心思。
源代码:Main.java
public static void main(String[] args) throws UnknownHostException { ConfigurableApplicationContext appCtx = SpringApplication.run(Main.class, args); …… try { startActiveMQBroker("tcp://localhost:12345"); } catch (Exception e) { //e.printStackTrace(); System.out.println("启动嵌入式消息队列代理服务异常 ==> " + e.getMessage() ); } }其中“startActiveMQBroker”即是创建消息队列代理服务的方法。
/** * 启动ActiveMQ嵌入式的消息队列代理服务 * @param uri 服务URI * @throws Exception * @see http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html */ public static void startActiveMQBroker(String uri) throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("embedded"); broker.setUseJmx(false); //是否使用持久化,如果要持久化还需设置数据存储适配器(kahadb)和数据目录 broker.setPersistent(true); broker.setDataDirectory("mq-temp-data"); //设置通道连接器 TransportConnector connector = new TransportConnector(); connector.setUri(new URI(uri)); broker.addConnector(connector); //通过设置插件来设置账户和群组 List<AuthenticationUser> users = new ArrayList<AuthenticationUser>(); users.add(new AuthenticationUser("admin", "774rfv4", "admins,publishers,consumers") ); users.add(new AuthenticationUser("publisher", "883edc3", "publishers,consumers") ); users.add(new AuthenticationUser("consumer", "992wsx2", "consumers") ); users.add(new AuthenticationUser("guest", "001qaz1", "guests") ); SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(); plugin.setUsers(users); broker.setPlugins(new SimpleAuthenticationPlugin[] { plugin }); //启动代理服务 broker.start(); }其实现结果与Spring MVC框架下的相同。
至于客户端的设置,使用Spring Boot的脚手架(spring-boot-starter-activemq)即可。 相关配置:
源代码:application.properties
##消息队列 spring.activemq.broker-url = tcp://localhost:12345 spring.activemq.user = consumer spring.activemq.password = 992wsx2 #目的地类型,false为Queue,true为Topic,默认为false spring.jms.pub-sub-domain = false ##自定义消息队列名称 queue-foo = foo虽然消息队列代理服务创建了,但是目标消息队列还没有创建。 消息队列的创建通过初始化一个Bean组件即可。
源代码:ActiveMQConfig.java
@Configuration public class ActiveMQConfig { /** 队列名称 */ @Value("${queue-foo}") private String queueName; @Bean(name = "fooQueue") public Queue fooQueue() { return new ActiveMQQueue(queueName); } };消息发送的思路和Spring MVC的基本一致, 默认引入的组件既可以是jmsTemplate也可以是jmsMessagingTemplate。 源代码:MessageSenderService.java
@Autowired //private JmsMessagingTemplate jmsMessagingTemplate; private JmsTemplate jmsTemplate; @Resource private Queue fooQueue; …… //this.jmsMessagingTemplate.convertAndSend(fooQueue, msg.toString(TAG0) ); this.jmsTemplate.convertAndSend(fooQueue, msg.toString(TAG0) );其中发送的目标即是(4)中所创建的队列。
消息接收的思路是通过注解@JmsListener来指定捕获JMS消息的方法。 源代码:ReceiveMessageListener.java
@JmsListener(destination = "${queue-foo}") public void receive(TextMessage msg) throws JMSException { final String TAG0 = Module_Tag+"::onMessage()"; String text = null; try { text = msg.getText(); } catch(JMSException e) { this.logger.E(TAG0, e.getMessage() ); } …… }其中通过注解的“destination”参数可以指定监听的队列,这个要比Spring MVC的XML要简化多了。
源代码:pom.xml
<dependency> <!-- 消息队列 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <!-- ActiveMQ代理服务 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> </dependency> <dependency> <!-- ActiveMQ JAAS配置 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-jaas</artifactId> </dependency> <dependency> <!-- ActiveMQ持久化存储 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId> </dependency>Spring Boot框架的版本为2.3.0.RELEASE。 相关ActiveMQ的jar文件版本为5.15.12。
嵌入式的消息队列服务适用于接口异步操作的场合,例如:数据记录导出、图像生成、调用第三方的工具服务等。 嵌入模式的优势在于,既不考虑网络通信(哪怕是LAN),又能使用消息队列的功能特性。 出于稳定安全考虑,消息队列建议支持持久化(哪怕是Web应用重启),且采用一定安全机制(ActiveMQ支持的是JAAS)。
(1)有关ActiveMQ jar文件版本导致Spring MVC框架代码冲突的问题,可参见:https://blog.csdn.net/paulorwys/article/details/80368068。 【全文完】