面向消息的中间件,提供可靠的消息传递机制进行与平台无关的数据交流,基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性收缩,冗余存储,流量削峰,异步通信,数据同步等。
发送者把消息发送给服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候转发给接收者。发送和接收是异步的,无需等待的,发送者的接收者的声明周期也没有关系。在发布/订阅的模式下,可以完成一对多的通信,可以让一条消息有多个接收者(类似于微信公众号)。
用来解应用于应用之间的耦合,Spring是用来解Java类于类的耦合。
是apache出品的,完全支持JMS1.1和J2EE1.4规范的。
特点:
多种语言和协议编写客户端:Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:Stomp、OpenWire、REST、WS Notification、XMPP、AMQP对Spring支持,很容易的于Spring项目整合。支持JDBC高速存储。支持异步(ajax)高性能集群,服务器-客户端,点对点。下载地址:apache官网下载,这里下载的是5.15.10版本。下载大的那个,大概60M。
安装:
必须要JDK1.7以上版本,并且配置环境变量。解压完成之后,进入64文件夹(32位系统就进入32位),然后点击activemq.bat启动。启动端口为8161,默认和用户名都是admin。建议直接上docker。
docker search activemq docker pull webcenter/activemq61616 为 容器使用的端口,8161为web界面管理端口,对外为8162。
访问的时候访问8362端口。
docker run -d --name activemq -p 61617:61616 -p 8162:8161 webcenter/activemq点对点的模式:
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。生产者不需要在接收者接收该消息时处于运行状态,接收者也不需要在生产者发送是处于运行状态,
发布者/订阅者模式:
支持向一个特定的消息主题发布消息,订阅者接收来自特定主题的消息,发布者和消费者都不知道对方。分为临时订阅和持久订阅。临时:订阅者下线就会消失;持久订阅:在订阅者未连接时发布的消息将在订阅者重新连接之后重新发布。
因为是普通的项目(非web项目)所有暂时没有使用Spring相关。
<dependencies> <!-- activeMQ--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <!-- slf4j--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies>使用异步线程
/** * 异步的队列消费者 * @date 2020/10/7 14:41 */ public class TestSyncConsumer { // 服务端的ip和端口号 private static String brokerUrl = "tcp://106.12.51.117:61617"; // 消息队列的名称 private static String queueName = "First-Message"; public static void main(String[] args) throws JMSException, IOException { // 创建ConnectionFactory对象,并且指定服务端的ip和端口号。 ConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); // 使用factory对象创建一个connection对象 Connection connection = factory.createConnection(); // 开启连接 connection.start(); // 创建session对象,第一个参数为是否开启事务,第二个为接收者的签收状态。 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 使用session对象创建destination对象 Queue queue = session.createQueue(queueName); // 创建消费者对象 MessageConsumer consumer = session.createConsumer(queue); // 异步消息监听 consumer.setMessageListener(new MessageListener() { @SneakyThrows public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } } }); // 不让程序结束 System.in.read(); // 关机资源 consumer.close(); session.close(); connection.close(); System.out.println("消息接收成功"); } }每个消费只能有一个消费者,1对1的关系;没有时间上的关联性,无论消费者还是生产者发送消息的时候是否处于运行状态,都可以提取消息。被消费之后在队列中不会再存储,所以消费者不会再消费到已经被消费的消息。