ActiveMQ——Day02

it2025-09-25  4

8. 发布/订阅模式

8.1 主题的生产者

/** * 主题的发送者 * @date 2020/10/7 15:18 */ public class TestProducer { private static String brokerURL = "tcp://106.12.51.117:61617"; private static String topicName = "topic-Hello"; public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); MessageProducer producer = session.createProducer(topic); for (int i = 0; i <= 10 ; i++) { TextMessage textMessage = session.createTextMessage("topic-message:" + i); producer.send(textMessage); } producer.close(); session.close(); connection.close(); System.out.println("主题消息发送完成"); } }

8.2 主题的消费者

/** * 主题的消费者 * @date 2020/10/7 15:26 */ public class TestConsumer { private static String brokerURL = "tcp://106.12.51.117:61617"; private static String topicName = "topic-Hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(topic); System.out.println("1号消费者"); consumer.setMessageListener(new MessageListener() { @SneakyThrows public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } } }); System.in.read(); session.close(); connection.close(); System.out.println("主题消息接收完成"); } }

8.3 注意点

要线启动消费者,再启动生产者发送消息。

8.4 与点对点模式的比较

8.5 订阅者

8.5.1 非持久订阅者

默认创建的方式就是非持久的订阅者。在客户端重启之后就不会与主题有联系。

8.5.2 持久订阅者

持久的订阅者,需要指定ClientID,在客户端重启之后也会保持订阅。

/** * 主题的消费者 * 持久订阅者 * @date 2020/10/7 15:26 */ public class TestDurableConsumer { private static String brokerURL = "tcp://106.12.51.117:61617"; private static String topicName = "topic-Hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); Connection connection = factory.createConnection(); // 设置订阅者的id connection.setClientID("yanzu"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); // 使用session创建持久的订阅者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "myName"); topicSubscriber.setMessageListener(new MessageListener() { @SneakyThrows public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } } }); System.in.read(); session.close(); connection.close(); System.out.println("主题消息接收完成"); } }

9. JMS

JMS的全称是Java Message Service,即Java消息服务,用于在两个应用程序之间,或者分布式系统中发送消息,进行异步通信。

9.3 JMS的组成和特点

JMS Provier 作用:实现JMS接口规范的消息中间件。JMS Producer 作用:消息生产者,创建与发送JMS消息的客户端应用。JMS Consumer 作用:消息消费者,接收和处理JMS消息的客户端应用。JMS Message 作用:消息的载体。

9.4 JMS Message

9.4.1 消息头

JMSDesination:消息发送的目的地,主要指Queue和TopicJMSDeliveyMode:消息的持久模式和非持久模式: 一条持久性的消息应该被传送“仅仅一次”,意味着如果JMS提供者出现问题故障,该消息不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息:最多会传送一次,意味着如果服务器出现故障,这条消息将消失。 JMSExpiration:消息的过期设置,默认为永远不过期。如果消息在过期时间之后还没有到达目的地,那么将会被清除。JMSPriority:消息优先级,0-9个级别;0-4为普通级别,5-9是加急消息。默认为4级。JMSMessageID:唯一识别每个消息的表示;由MQ生成。

9.4.2 消息的载体

StreamMessage:Java原始的数据流

MapMessage:一套名称:键值对

TestMessage:一个字符串对象

ObjectMessage:一个序列化的Java对象(发送自己的对象必须要放行所有包的安全检查)

factory.setTrustAllPackages(true);

BytesMessage:一个字节的数据流

发送和接收消息是 一 一 对应的(发送的是什么类型的,接受时就要转为什么类型的)。

9.4.3 消息属性

是一种加强的API,如果需要发送除了消息头意外的值,可以使用消息属性。用于识别、去重、重点标记等操作。就像可以分配给一个消息的附加消息头一样,允许开发者添加有关消息不透明的附加信息,它们还用于暴露消息选择器在消息过滤时使用的数据。

xxMessage.setTypeProperty();

10. 消息可靠性及持久化

10.1 持久设置

开启持久化就是指,在重启了activeMQ客户端之后消息还在。

开启持久化方式:

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

非持久性:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

但是对与topic持久化是没有意义的,因为发布订阅模式是先启动再生产,消息已经被消费了,如果再启动生产者后启动订阅者,消息会被当做作废消息。

11. 事务签收说明

11.1 事务

将第一个参数改为true就表示开启事务,消费者接收消息之后消费完成,队列中的消息没有被标记为消费,再次启动消费者还可以接收到消息。

如果第一个参数为false:消费者接收到消息之后直接完成消费,队列中不再有为消费的信息。

第二个参数为签收方式,指定是手都应答还是自动应答。

如果第一个为true,那么第二个就意义不大。

如果开启就必须提交事务消息才会到MQ里面去。

开启事务

Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE)

提交事务,保证全部的消息都发送成功才会调用,应该在全部的消息都发送结束之后调用(再释放资源之前)。

session.commit();

事务回滚,应该在抛出异常时调用。

session.rollback();

11.2 签收

手动签收方法

xxMessage.acknowledge(); 非事务模式下签收: 生产者签收设置为自动,消费者签收设置为自动——正常生产者签收设置为自动,消费者签收设置为手动——重复消费生产者签收设置为手动,消费者签收设置为自动——正常 有事务模式下签收: 生产者签收设置为自动,消费者签收设置为自动,已提交——会出现重复消费生产者签收设置为自动,消费者签收设置为自动,未提交——出现重复消费生产者签收设置为自动,消费者签收设置为手动,未提交,已ACK——出现重复消费生产者签收设置为自动,消费者签收设置为手动,未提交,未ACK——正常生产者签收设置为手动,消费者签收设置为自动,已提交——正常生产者签收设置为手动,消费者签收设置为自动,未提交——出现重复消费生产者签收设置为手动,消费者签收设置为自动,未提交,未ACK——出现重复消费生产者签收设置为手动,消费者签收设置为自动,未提交,已ACK——出现重复消费生产者签收设置为手动,消费者签收设置为手动,已提交,未ACK——正常生产者签收设置为手动,消费者签收设置为手动,已提交,已ACK——正常

11.3 签署和事务的关系

在事务性会话中,当一个事务被成功提交则消费被自动签收。

如果事务回滚,则消息会被再次传递。

非事务性会话中,消息何时被确认取决于创建会话时的签收模式。

一般使用方法:

生产者为false,要发送多条消为了保证消息同时到达消息队列时才会用true;

消费者一般也为false,除非需要手动签收。

手动签收消费者设置:

Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE); .... // 拿到消息之后 xxMessage.acknowledge();

12. 配置文件与broker

配置文件就是类似于一个Spring的容器配置文件,使用的是jetty服务器,如果要修改端口可用去jetty.xml下进行修改。

12.1 broker多配置文件

相当于一个ActiveMQ的服务器实例。

内嵌服务器:

public static void main(String[] args) { BrokerService brokerService = new BrokerService(); // 设置使用的消息协议 brokerService.setUseJmx(true); // 设置协议形式 try { brokerService.addConnector("tcp://127.0.0.1:61616"); brokerService.start(); } catch (Exception e) { e.printStackTrace(); } System.out.println("内嵌MQ成功"); }
最新回复(0)