要线启动消费者,再启动生产者发送消息。
默认创建的方式就是非持久的订阅者。在客户端重启之后就不会与主题有联系。
持久的订阅者,需要指定ClientID,在客户端重启之后也会保持订阅。
/** * 主题的消费者 * 持久订阅者 * @date 2020/10/7 15:26 */ public class TestDurableConsumer { private static String brokerURL = "tcp://106.12.51.117:61617"; private static String topicName = "topic-Hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); Connection connection = factory.createConnection(); // 设置订阅者的id connection.setClientID("yanzu"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); // 使用session创建持久的订阅者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "myName"); topicSubscriber.setMessageListener(new MessageListener() { @SneakyThrows public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } } }); System.in.read(); session.close(); connection.close(); System.out.println("主题消息接收完成"); } }JMS的全称是Java Message Service,即Java消息服务,用于在两个应用程序之间,或者分布式系统中发送消息,进行异步通信。
StreamMessage:Java原始的数据流
MapMessage:一套名称:键值对
TestMessage:一个字符串对象
ObjectMessage:一个序列化的Java对象(发送自己的对象必须要放行所有包的安全检查)
factory.setTrustAllPackages(true);BytesMessage:一个字节的数据流
发送和接收消息是 一 一 对应的(发送的是什么类型的,接受时就要转为什么类型的)。
是一种加强的API,如果需要发送除了消息头意外的值,可以使用消息属性。用于识别、去重、重点标记等操作。就像可以分配给一个消息的附加消息头一样,允许开发者添加有关消息不透明的附加信息,它们还用于暴露消息选择器在消息过滤时使用的数据。
xxMessage.setTypeProperty();开启持久化就是指,在重启了activeMQ客户端之后消息还在。
开启持久化方式:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);非持久性:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);但是对与topic持久化是没有意义的,因为发布订阅模式是先启动再生产,消息已经被消费了,如果再启动生产者后启动订阅者,消息会被当做作废消息。
将第一个参数改为true就表示开启事务,消费者接收消息之后消费完成,队列中的消息没有被标记为消费,再次启动消费者还可以接收到消息。
如果第一个参数为false:消费者接收到消息之后直接完成消费,队列中不再有为消费的信息。
第二个参数为签收方式,指定是手都应答还是自动应答。
如果第一个为true,那么第二个就意义不大。
如果开启就必须提交事务消息才会到MQ里面去。
开启事务
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE)提交事务,保证全部的消息都发送成功才会调用,应该在全部的消息都发送结束之后调用(再释放资源之前)。
session.commit();事务回滚,应该在抛出异常时调用。
session.rollback();手动签收方法
xxMessage.acknowledge(); 非事务模式下签收: 生产者签收设置为自动,消费者签收设置为自动——正常生产者签收设置为自动,消费者签收设置为手动——重复消费生产者签收设置为手动,消费者签收设置为自动——正常 有事务模式下签收: 生产者签收设置为自动,消费者签收设置为自动,已提交——会出现重复消费生产者签收设置为自动,消费者签收设置为自动,未提交——出现重复消费生产者签收设置为自动,消费者签收设置为手动,未提交,已ACK——出现重复消费生产者签收设置为自动,消费者签收设置为手动,未提交,未ACK——正常生产者签收设置为手动,消费者签收设置为自动,已提交——正常生产者签收设置为手动,消费者签收设置为自动,未提交——出现重复消费生产者签收设置为手动,消费者签收设置为自动,未提交,未ACK——出现重复消费生产者签收设置为手动,消费者签收设置为自动,未提交,已ACK——出现重复消费生产者签收设置为手动,消费者签收设置为手动,已提交,未ACK——正常生产者签收设置为手动,消费者签收设置为手动,已提交,已ACK——正常在事务性会话中,当一个事务被成功提交则消费被自动签收。
如果事务回滚,则消息会被再次传递。
非事务性会话中,消息何时被确认取决于创建会话时的签收模式。
一般使用方法:
生产者为false,要发送多条消为了保证消息同时到达消息队列时才会用true;
消费者一般也为false,除非需要手动签收。
手动签收消费者设置:
Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE); .... // 拿到消息之后 xxMessage.acknowledge();配置文件就是类似于一个Spring的容器配置文件,使用的是jetty服务器,如果要修改端口可用去jetty.xml下进行修改。
相当于一个ActiveMQ的服务器实例。
内嵌服务器:
public static void main(String[] args) { BrokerService brokerService = new BrokerService(); // 设置使用的消息协议 brokerService.setUseJmx(true); // 设置协议形式 try { brokerService.addConnector("tcp://127.0.0.1:61616"); brokerService.start(); } catch (Exception e) { e.printStackTrace(); } System.out.println("内嵌MQ成功"); }