依赖:
<properties> <activemq.version>5.15.5</activemq.version> <slf4j.version>1.7.30</slf4j.version> <lombok.version>1.18.12</lombok.version> <spring.version>5.2.7.RELEASE</spring.version> </properties> <dependencies> <!-- spring-jms--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> <!-- activeMQ-pool 连接池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq.version}</version> </dependency> <!-- activeMQ-all--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version> </dependency> <!-- slf4j--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies>队列配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 声明连接工厂--> <amq:connectionFactory id="jmsFactory" brokerURL="tcp://106.12.51.117:61617"/> <!-- 配置connectionFactory的连接池信息 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="jmsFactory"/> <!-- 最大连接数--> <property name="maxConnections" value="10"/> </bean> <!-- 声明目的地--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 使用构造方法的参数注入队列名字--> <constructor-arg name="name" value="hello-spring-queue"/> </bean> <!-- 声明操作MQ的对象--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入连接工厂--> <property name="connectionFactory" ref="jmsFactory"/> <!-- 消息发送的默认目的地--> <property name="defaultDestination" ref="destinationQueue"/> <!-- 消息转换器--> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>队列消息生产者:
/** * 生产者 * @date 2020/10/8 14:44 */ public class TestProducer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // 发送到默认的目的地(在Xml中配置的) jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("mySpringMQ"); textMessage.setStringProperty("flag","加急"); return textMessage; } }); System.out.println("发送完成"); } }队列消息消费者:
/** * 消息接收(消费)者 * @date 2020/10/8 15:02 */ public class TestConsumer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // 重新指定队列名称;不指定为xml中设置的默认的。 jmsTemplate.setDefaultDestination(new ActiveMQQueue("new Queue")); // 接收消息 Object object = jmsTemplate.receiveAndConvert(); System.out.println(object); } }application-topic.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 声明连接工厂--> <amq:connectionFactory id="jmsFactory" brokerURL="tcp://106.12.51.117:61617"/> <!-- 配置connectionFactory的连接池信息 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="jmsFactory"/> <!-- 最大连接数--> <property name="maxConnections" value="10"/> </bean> <!-- 声明目的地--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 使用构造方法的参数注入队列名字--> <constructor-arg name="name" value="hello-spring-topic"/> </bean> <!-- 声明操作MQ的对象--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入连接工厂--> <property name="connectionFactory" ref="jmsFactory"/> <!-- 消息发送的默认目的地--> <property name="defaultDestination" ref="destinationTopic"/> <!-- 消息转换器--> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>主题发送(生产者)者:
/** * 生产者 * @date 2020/10/8 14:44 */ public class TestProducer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-topic.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // 发送到默认的目的地(在Xml中配置的) jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("mySpringMQ-topic"); textMessage.setStringProperty("flag","加急"); return textMessage; } }); System.out.println("发送完成"); } }主题消费者:
/** * 消息接收(消费)者 * @date 2020/10/8 15:02 */ public class TestConsumer { public static void main(String[] args) throws JMSException { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-topic.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); TextMessage textMessage = (TextMessage) jmsTemplate.receive(); System.out.println(textMessage.getText()); } }配置文件(在第一个队列的基础上添加了后面关于监听器的配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 声明连接工厂--> <amq:connectionFactory id="jmsFactory" brokerURL="tcp://106.12.51.117:61617"/> <!-- 配置connectionFactory的连接池信息 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="jmsFactory"/> <!-- 最大连接数--> <property name="maxConnections" value="10"/> </bean> <!-- 声明目的地--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 使用构造方法的参数注入队列名字--> <constructor-arg name="name" value="hello-spring-queue"/> </bean> <!-- 声明操作MQ的对象--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入连接工厂--> <property name="connectionFactory" ref="jmsFactory"/> <!-- 消息发送的默认目的地--> <property name="defaultDestination" ref="destinationQueue"/> <!-- 消息转换器--> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> <!-- 声明监听器对象--> <bean id="myListener" class="com.spring.activemq.test.queue.MyListener"/> <!-- 声明队列监听容器--> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="destinationQueue"/> <property name="messageListener" ref="myListener"/> </bean> </beans>实现的监听器对象:
/** * 信息监听器 * @date 2020/10/8 15:21 */ public class MyListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("监听器收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }生产者:
/** * 生产者 * 异步 * @date 2020/10/8 14:44 */ public class TestProducerSync { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue-listener.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // 发送到默认的目的地(在Xml中配置的) jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("mySpringMQ"); textMessage.setStringProperty("flag","加急"); return textMessage; } }); System.out.println("发送完成"); } }依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.spring-boot.activemq</groupId> <artifactId>activemq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>activemq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.3.0.RELEASE</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.3.0.RELEASE</version> <configuration> <mainClass>com.springboot.activemq.ActivemqApplication</mainClass> </configuration> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>配置文件:
spring: activemq: broker-url: tcp://106.12.51.117:61617 user: admin password: admin pool: max-connections: 100 jms: # false代表队列/true代表主题;如果需要发送主题(队列)需要修改这里,也就意味着不可以共存。 pub-sub-domain: false ## 声明队列名称 myqueue: my-SpringBoot-queue # 声明主题名称 mytopic: my-SpringBoot-topic在启动文件上开启JMS
@SpringBootApplication @EnableJms public class ActivemqApplication { public static void main(String[] args) { SpringApplication.run(ActivemqApplication.class, args); } }消费者
/** * 消息消费者 * @date 2020/10/8 15:46 */ @Component public class QueueConsumer { @JmsListener(destination = "${myqueue}") public void receive(TextMessage textMessage)throws JmsException{ try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }生产者
/** * 消息生产者 * @date 2020/10/8 15:46 */ @Component public class QueueProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @SuppressWarnings("depercation") public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"message:"+new Date().getTime()); } }这里在测试类中直接启动生产者就行,消费者的监听器会立即收到消息。
/** * 队列测试类 * @date 2020/10/8 15:52 */ @SpringBootTest public class QueueConsumerTest { @Autowired QueueProducer producer; @Test public void demo(){ producer.produceMsg(); } }需要在主启动类添加注解:
@EnableScheduling定时发送:
/** * 定时发送生产者 * @date 2020/10/8 15:59 */ @Component public class TimingQueueProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @SuppressWarnings("depercation") // 延时发送单位为毫秒 @Scheduled(fixedDelay = 2000) public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"定时发送的message:"+new Date().getTime()); } }然后启动项目,会每隔2秒发送一次。
修改yml
jms: # false代表队列/true代表主题;如果需要发送主题(队列)需要修改这里,也就意味着不可以共存。 pub-sub-domain: true配置:
/** * 主题配置(需要在启动项目的文件上开启JMS注解@EnableJms) * @date 2020/10/8 15:43 */ @Component public class TopicConfig { @Value("${mytopic}") private String myTopic; @Bean public Topic topic(){ return new ActiveMQTopic(myTopic); } }消费者:
/** * 主题消费者 * @date 2020/10/8 15:46 */ @Component public class TopicConsumer { @JmsListener(destination = "${mytopic}") public void receive(TextMessage textMessage)throws JmsException{ try { System.out.println("接收到消息-myTopic:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }生产者
/** * 主题生产者 * @date 2020/10/8 16:05 */ @Component public class TopicProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic; @SuppressWarnings("depercation") public void produceMsg(){ jmsMessagingTemplate.convertAndSend(topic,"myTopicMessage:"+new Date().getTime()); } }测试类:
/** * 主题测试类 * @date 2020/10/8 15:52 */ @SpringBootTest public class TopicConsumerTest { @Autowired TopicProducer producer; @Test public void demo(){ producer.produceMsg(); } }如果要进行测试,需要先启动整个项目再运行测试类。
其实这是一个web端发送到普通项目的小demo,但是基本没啥区别(除了使用JmsTempalte),所以这里就只黏贴代码了。
生产者(web端项目):
package com.activemq.web.webproducer.controller; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import java.util.HashMap; import java.util.Map; /** * @date 2020/10/8 17:03 */ @RestController @RequestMapping("msg") public class SendController { @Autowired private JmsTemplate jmsTemplate; /** * 发送队列 * @return */ @RequestMapping("sendQueue") public Map<String ,Object> sendQueue(){ HashMap<String ,Object> map = new HashMap<String,Object>(); // 设置队列名称 jmsTemplate.setDefaultDestination(new ActiveMQQueue("queue-hello-web")); jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("hello-jar-queue"); } }); map.put("success","发送成功"); return map; } /** * 发送主题 * @return */ @RequestMapping("sendTopic") public Map<String ,Object> sendTopic(){ HashMap<String ,Object> map = new HashMap<String,Object>(); // 设置主题名称(调用的方法一定是ActiveMQTopic!!!!!!,不要是ActiveMQTempTopic!) jmsTemplate.setDefaultDestination(new ActiveMQTopic("topic-hello-web")); jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("hello-jar-topic"); } }); map.put("success","发送成功"); return map; } }消费者(普通项目):
这里还是只能接收一个(topic或者是queue)。
/** * @date 2020/10/8 17:11 */ @Component public class MsgConsumer { @JmsListener(destination = "queue-hello-web") public void receiveQueue(TextMessage textMessage)throws JmsException { try { System.out.println("接收到队列消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } @JmsListener(destination = "topic-hello-web") public void receiveTopic(TextMessage textMessage)throws JmsException { try { System.out.println("接收到主题消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }生产者没有变换(基于上方的JmsTemplate项目代码)。
消费者:
需要自己实现一个监听器,用来监听topic发送的信息。
监听器:
package com.activemq.jar.activemqjar.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.listener.DefaultMessageListenerContainer; import javax.jms.ConnectionFactory; /** * @date 2020/10/8 18:36 */ @Configuration public class JmsContainerFactoryConfig { @Bean("jmsTopicContainerFactory") // 参数这里 有可能会爆红,但是并不影响运行。 public DefaultJmsListenerContainerFactory jmsTopicContainerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory jmsFactory = new DefaultJmsListenerContainerFactory(); jmsFactory.setConnectionFactory(connectionFactory); // 设置对主题的监听 jmsFactory.setPubSubDomain(true); return jmsFactory; } }监听的方法:
@Component public class MsgConsumer { @JmsListener(destination = "queue-hello-web") public void receiveQueue(TextMessage textMessage) throws JMSException { System.out.println("接收到队列消息:"+textMessage.getText()); } /** * 用来监听topic,第二参数是自己实现的监听器 * @param textMessage * @throws JMSException */ @JmsListener(destination = "topic-hello-web",containerFactory = "jmsTopicContainerFactory") public void receiveTopic(TextMessage textMessage)throws JMSException { System.out.println("接收到主题消息:" + textMessage.getText()); } }这样在消费者的配置文件中,设置为false就行,好像默认也是false。
jms: # 使用这个切换监听主题还是队列 pub-sub-domain: false