在ActiveMQ的配置文件中默认支持五种传输协议,分别是:1. openwire(tcp) 2.amqp 3.stomp 4.mqtt 5.ws。在更改协议时,必须保证ActiveMQ没有运行。
TCP传输允许客户端使用TCP套接字段,远程连接到ActiveMQ代理,这些配置了JMS的客户端的连接URL字符串或在代理的传输连接器URI上调整客户端上的底层TCP传输。
TCP是默认的broker配置,TCP的Client监听端口61616,在网络传输协议前,必须要序列化数据,消息是通过一个叫 wire protocol的来序列化成字节流,默认情况下ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。
优点:
高效性:字节流方式传递有效性、可用性:应用广泛、支持任何平台。与常规的TCP传输相似。它使用的是NIO API 实现,可以帮助提高扩展性,不用将Java NIO包与IBM的AIO4J包混淆。
更加侧重于底层的访问操作,允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。
配置NIO:
打开配置文件 conf/activemq.xml,找到添加:
<transprotConnectot name="nio" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>这里要注意端口不能和其它的协议的端口冲突。
传输时只需要将要传输到的地址前面的tcp改为nio即可。
ActiveMQ的持久化机制包括JDBC、KahaDB、LevelDB。
打开配置文件 conf/activemq.xml。
默认使用的是KahaDB。
KahaDB是基于文件的本地数据库存储形式,速度没有ActiveMQ快,但是有很强的扩展性,恢复的时间比ActiveMQ短很多,并且支持事务。
带**“ * ”**的重要属性
在ActiveMQ5.6版本之后之后,又推出了LavalDB的持久化引擎,持久性高于KahaDB,但是暂时并不推荐使用。
把消息的相关信息存到MySQL中。
需要先把MySQL的jar包放在ActiveMQ的lib目录下。
进入配置文件conf/activemq.xml,修改为:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>配置MySQL连接池(在broker标签外部):
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destory-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/mydata?useSSL=false"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean>activemq_acks:用于存储订阅关系,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
字段:
container:消息的destinationsub_dest:如果是static集群,这个字段会有其它集群系统的信息client_id:订阅者的唯一IDsub_name:订阅者名称selector:选择器,可以选择满足条件的消息last_acked_id:记录消费过消息的idactive_lock:在集群环境中才有用,只有一个broker可以获得消息,称为Master Broker,其它的只能作为备份等待Master Broker不可用,才可以成为下一个Master Broker,这个表用于记录哪个Broker是当前的Master Broker。
active_msgs: 用于存储消息,包括Topic和Queue。
字段
id:自增数据库主键container:消息的destinationmsgid_prod:消息发送者的主键msg_seq:消息发送的顺序,msgid_prod+msg_seq可以组成jms的messageIdexpiration:消息过期时间msg:消息本体的Java序列化对象的二进制数据priority:优先级 0-9 越大越优先activemq_acks:用于存储订阅关系,如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。一旦消息被消费就会在MySQL中删除。
不会立即存储到数据库中,如果7-10分钟之后消息还存在(没有被消费)再存到数据库中。提高了工作效率。
配置(conf/activemq.xml):
<persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data"/> </persistenceFactory>再ActiveMQ5.9之后推荐使用。
用来保证高可用+负载均衡。
官方推荐使用Zookeeper集群搭建消息队列集群。由于zookeeper内部的选举机制推荐至少三台的奇数台消息队列服务器。
即使集群之后也只有一个消息队列在使用,其它的都是处于待命状态,等待第一个宕机之后再顶替它的位置。
在发送非事务持久化消息的时候默认使用的是同步发送模式。send方法将会阻塞,直到broker发送一个确认信息给生产者,这个确认消息暗示生产者broker已经成功将它发送的消息路由到目标目的并把消息保存到二级存储中。能够保证消息不被丢失。
使用不同的模式对send方法的反应时间有巨大的影响,可以提高系统的性能,默认情况下是以异步发送的,除非在没有使用事务的情况下。
开启异步发送设置:
ActiveMQConneciontFactory factory = new ActiveMQConnectionFactory("ACTIVEMQ_UROL"); factory.setUseAsynSend(true);可能会丢失数据的场景:在持续发送过程中服务器突然宕机,尚未被发送到MQ的消息都会丢失,正确的异步发送是需要回调的。
回调:需要带上AsyncCallback的方法,并且需要重写onSuccess方法和onException方法。
onSuccess:表示消息成功的发送到MQ上,并且接收到了MQ持久化的回调。onException:一个入MQ队异常的回执。测试:
/** * 异步发送生产者 * @date 2020/10/11 10:00 */ public class TestAsyncProducer { private static String brokerUrl = "tcp://106.12.51.117:61617"; private static String queueName = "First-Message"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); // 开启异步发送 factory.setUseAsyncSend(true); factory.setTrustAllPackages(true); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(queue); for (int i = 0; i < 10; i++) { final TextMessage textMessage = session.createTextMessage(); textMessage.setStringProperty("msgId","Id--"+i); textMessage.setText("Hello ActiveMQ"+i); producer.send(textMessage, new AsyncCallback() { public void onException(JMSException exception) { try { System.out.println("出现异常"+textMessage.getStringProperty("msgId")); } catch (JMSException e) { e.printStackTrace(); } } public void onSuccess() { try { System.out.println("消息发送成功"+textMessage.getStringProperty("msgId")); } catch (JMSException e) { e.printStackTrace(); } } }); } producer.close(); session.close(); connection.close(); } }通过在配置文件中将broker标签的schedulerSupport属性设置为true可以启用延时投递。
/** * 延时发送生产者 * @date 2020/10/11 10:27 */ public class TestSchedulerProducer3 { private static String brokerUrl = "tcp://106.12.51.117:61617"; private static String queueName = "Message-scheduler"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); // 开启异步发送 factory.setUseAsyncSend(true); factory.setTrustAllPackages(true); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(queue); TextMessage textMessage = session.createTextMessage(); // 设置延时投递(10秒) textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000); // 投递间隔 5 秒 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,5*1000); // 重复投递次数 10 次 textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,10); // 设置cron表达式 textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,""); textMessage.setText("demo-scheduler"); producer.send(textMessage); producer.close(); session.close(); connection.close(); System.out.println("发送成功"); } }表达式类型:是系统的定时表达式类型,并不是Spring的。
指的是消息可以重新分派给消费者,不一定是之前的消费者,重发消息之后,消费者可以重新消费,重发的情况:
事务会话中,还没有进行session.commit()之前,进行session.rollback(),还没有提交的消息都会重新发送。使用客户端手动确认的方式,还未进行确认并且执行session.recover(),那么还没有acknowledge的消息都会重新发送。未ack的消息,当进行session.closed()关闭事务,所有还没有ack的消息broker端都会马上进行重发。消息被消费者拉取之后,超时没有响应ack,消息会被broker重新发送。1和2的情况消息会重发给原来的消费者,3和4的情况可以转发消息给别的消费者,超过最大重发次数的消息都会进入死信队列。默认最大重发次数是6次。
属性:
普通项目配置:
// 设置重发规则 RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); // 是否每次尝试发送失败后,增长等待时间 redeliveryPolicy.setUseExponentialBackOff(true); // 重发次数 redeliveryPolicy.setMaximumRedeliveries(10); // 重发间隔单位为毫秒 redeliveryPolicy.setInitialRedeliveryDelay(1000); // 第一次失败后重新发送之前等待500毫秒,第二次失败等待n*500毫秒。 redeliveryPolicy.setBackOffMultiplier(2); // 最大延时 redeliveryPolicy.setMaximumRedeliveryDelay(1000);Spring:
SpringBoot:
当消息被重发了多次(超过了最大重发次数,默认为6),之后就会被移入“死信队列”。
死信队列,用来存储处理失败或过期的消息。
一般在开发都会设置两个队列,一个是核心队列另一个是死信队列。
配置死信队列,进入conf/activemq.xml,添加
<policyEntry queue=">"> <deadLerrerStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessage="true"/> <shareDeadLetterStrategy processExpired="false"/> </deadLerrerStrategy> </policyEntry>配置说明:
queuePrefix代表死信队列的前缀,死信队列的名称为:DLQ.队列名称。useQueueForTopicMessage表示是否把topic的DeadLetter保存到Queue中,默认为true。processExpired表示是否把过期的消息放入死信队列,默认为true。网络延时传输中,会造成MQ的重试,在重试过程中,可能会触发重复消费问题。
如果消息是做数据库插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,也会有主键冲突,避免数据库出现脏数据。
也可以准备一个第三方来做消费记录,给消息分配一个全局的id,当开始消费前,先查询有没有消费记录。
用户注册后,需要发送注册邮件和注册短信,传统做法:
并行:发送注册邮件的同时发送注册短信;等到任务都完成返回客户端,这样消耗的时间稍微短。串行:发送完毕注册邮件之后再发送注册短信;等待任务完成返回客户端,这样就需要双倍的时间。但是这样的方式性能会有瓶颈。可以引入消息队列,将邮件,短信直接写入消息队列。速度会快很多。
普遍的引用会在两个系统之间互相调用,如果被调用的系统无法访问则失败。
引入消息队列:
将成功的消息写入消息队列,返回用户成功。另一个系统读取写入的信息,再次进行操作。
秒杀活动:在前端引入消息队列,可以控制活动访问的人数,可以缓解短时间内高流量压垮应用。
用户的请求,服务器接收后,首先写入消息队列。假如队列长度超过最大数量,则直接抛弃用户请求或跳转到错误界面。
秒杀业务根据消息队列中的请求消息,再做后续处理。
将消息队列用在日志处理中,解决大量日志传输的问题。
消息队列一般都内置了高效的通信机制,因此也可以用在消息通信中,实现点对点的消息队列或者聊天室等。
