最近在研究消息中间件,看了好多资料,个人建议大家应该好好学一下,推荐看阿里推出的RocketMQ,好处不用说了,阿里出品,必属精品。在这里为啥推荐呢?主要是因为消息中间件无论你是做啥项目,做什么业务,消息中间件的身影随处可见。解耦,异步,削峰流量限制,等等一系列的好处。而且最主要的是这是国内开源的交由apache组织的顶级国内开源项目,源代码是java所写,这里就不介绍Rocket相比其他中间件的优劣势。放在以后,以后Rocketmq我会作为一个栏目持续更新。当然,前提是有我摸鱼时间,最近太忙了,摸鱼时间都少了,好了废话不多说了,开搞!
RocketMq 三大角色 1.nameserver 称为路由中心。(可以理解注册中心,zk的作用,RocketMQ之前的版本也是用的zk,后面由于架构等方面的原因,而放弃了,后面会说道具体原因) 2.broker 消息的处理者。 3.Topic 一个虚拟概念,区别去AMQ中的topic概念。 ☺生产者,消费者我就不啰嗦了
消息中间件的设计思路一般基于主题的订阅发布机制,消息生产者(Producer)发送某一主题的消息到消息服务器(Broker),消息服务器负责该消息的持久化存储,消息消费者(Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式,其实Rocketmq 无论是pull,push 其实最终都是pull模式,采用长轮询机制,什么是长轮询后面有时间会将下,区别于长连接,短连接)或者消息消费者主动向消息服务器拉取消息(Pull模式),从而实现生产者消费者之间的解耦。 通过上面的解释,大家很容易的就理解了整个消息的发送过程,但是上面的结构是脆弱的,在如今高并发的情况下,单点故障的处理措施是必不可少的。所以就衍生出多个消息服务器来承担消息的存储,用来保证服务的高可用,消息的零丢失等各种消息中间件的情况。但是一旦引入了这么多服务器,那么服务器的状态,信息之间是如何通信的?而他又是如何让生产者,消费者感知到的呢?如果其中的一台消息服务器宕机了,生产者和消费者又是如何知道的呢?NameServer就是用来解决上述的问题的。
RocketMQ的逻辑部署图如上图所示。在Broker消息服务器启动时向所有的NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取服务器地址列表,然后根据负载算法从列表中选择一个消息服务器进行消息的发送。NameServer与每个Broker服务器保持长连接,并间隔10s检测Broker是否存活,如果检测Broker宕机,则从路由注册表中将其剔除。但是路由变化不会马上通知消息生产者?为什么,主要是降低了复杂度,保证了高可用,也就是AP(这就是后面不用zk,因为zk在一致性上做的还是相当不错的),而且namserver之间是无状态的,彼此互相不通信,所以需要broker集群依次向NameServer集群注册各自的节点信息,由于nameServer中无维护了broker等相关的信息,为了避免线程安全问题,内部引入了读写锁,用来保证当前这种读多写少的情况,将写请求串行化,当然会存在消息不一致,但是RocketMQ保证了消息至少被消费一次,所以如果当前消息路由信息没找到,会自动触发重试!
从上面的代码大致可以看出NameServer启动就是讲NameServerConfig,NettyServer-Config 的相关参数通过配置文件,或者启动命令行的参数填充,然后初始化NameServerController
加载相关配置后,创建NettyServer对象,然后开启两个定时任务,也就是心跳检测。 定时任务1:NameServer每隔10s扫描一次Broker,判断Broker上次放松心跳的时间是否距当前时间已经超出,是则剔除 定时任务2:NameServer每个10分钟打印一次配置信息
public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }NameServer主要的作用是为消息生产者和消息的消费者提供关于主题Topic的路由信息,所以NameServer就需要存储路由的基础信息,还有能够管理Broker的节点,包括路由注册,路由删除等功能
补充相关概念: RocketMQ 基于订阅发布机制,一个Topic可以拥有多个消息队列,一个Broker为每一主题默认创建4个读队列,4个写队列。多个Broker组成一个机器,BrokerName相同的多台Broker组成M-S架构,brokerId 0 为 主,大于0 为从。
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每个30s向NameServer发送心跳包,NameServer收到Broker心跳包时会更新BrokerLiveTable中的lastUpdateTimestamp,然后NameServer 每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,就是剔除该broker的路由信息,并且关闭socket连接。
发送心跳包,注册broker相关数据
路由注册需要加写锁,防止并发修改RouteInfoManager中的路由表。首先判断Broker所属集群是否存在,如果不存在,则创建,然后将Broker加入到集合中
根据上面的介绍,相信大家已经知道路由删除是什么情况了,这里就简单介绍下,不列举代码了。Broker每隔30s向NameServer发送一个心跳包,心跳包中包含BrokerId,Broker地址,Broker名称,Broker所属集群名称,Broker关联的FilterServer列表。但是如果Broker宕机了,NameServer无法收到心跳包,此时NameServer就会剔除失效的Broker.NameServer会每隔10s扫描BrokerLiveTable状态表,如果BrokerLivede LastUpdateTimeStamp的时间戳距当前时间超过120是,则认为Broker失效,移除该Broker,并更新TopicQueueTbale,brokerAddrTable,brokerLiveTable,filterServerTable 这几个集合中的关联数据。
RocketMQ的路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉去主题最新的路由。
主要介绍了NameServer路由功能,路由注册,路由发现,删除,已经很晚了,得睡觉了,要不明天又得迟到了,文中提到的不对的点,或者有疑问没补充到的欢迎留言,未完待续 QAQ
欢迎关注我的微信公众号 【猿之村】 来聊聊Java面试 加我的微信进一步交流和学习,微信手动搜索 【codeyuanzhicunup】添加即可 如有相关技术问题欢迎留言探讨,公众号主要用于技术分享,包括常见面试题剖析、以及源码解读、微服务框架、技术热点等。