【rocketMQ初级与进阶】2、docker安装rocketMQ(单机版)

it2026-03-19  2

因项目业务需要,加入消息队列技术。之前使用 activeMQ,但考虑再三,最后还是选择 阿里系的rocketMQ,比较强大,先更新安装,后期再更新rocket知识、应用场景、集群、rocketMQ源码分析等。

测试开发采取单机模式,后期将根据业务需求可能采取集群部署方式,使用docker docker-compose.yml方式进行安装。rocketMQ 镜像是使用 apacherocketmq/rocketmq:4.5.0官方镜像,rocketMQ可视化界面也是用apacherocketmq/rocketmq-console:2.0.0官方镜像。

 

1、环境版本

docker version

注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存。但是我在文件里配置了

JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m;

 

创建文件:

/data

rocketmq

        broker

               logs

               store

config

       broker.conf

docker-compose.yml

 

2、docker-compose.yml 文件

version: '3.7' services: # Service for nameserver namesrv: image: apacherocketmq/rocketmq:4.5.0 # 官方镜像 container_name: rmqnamesrv ports: - 9876:9876 volumes: # 本地目录:指定一个容器内的路径,Docker会自动创建一个数据卷。 - /data/rocketmq/broker/logs:/home/rocketmq/rocketmq-4.5.0/logs - /data/rocketmq/broker/store:/home/rocketmq/rocketmq-4.5.0/store command: sh mqnamesrv # Service for broker broker: image: apacherocketmq/rocketmq:4.5.0 # 官方镜像 container_name: rmqbroker links: - namesrv ports: - 10909:10909 - 10911:10911 - 10912:10912 volumes: # 本地目录:指定一个容器内的路径,Docker会自动创建一个数据卷。 - /data/rocketmq/broker/logs:/home/rocketmq/rocketmq-4.5.0/logs - /data/rocketmq/broker/store:/home/rocketmq/rocketmq-4.5.0/store - /data/rocketmq/config/broker.conf:/home/rocketmq/rocketmq-4.5.0/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.5.0/conf/broker.conf autoCreateTopicEnable=true # 使用我们自己创建的broker.conf - Dcom.rocketmq.sendMessageWithVIPChannel=false environment: - NAMESRV_ADDR=namesrv:9876 - JAVA_HOME=/usr/lib/jvm/jre # Java home - JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m # rocketmq-console-ng console: image: apacherocketmq/rocketmq-console:2.0.0 # 官方镜像 container_name: rocketmq-console-ng restart: always ports: - 7100:8080 # 映射端口:实际端口 depends_on: - namesrv environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 - Dcom.rocketmq.sendMessageWithVIPChannel=false

2、 broker.conf

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable=true # Set self-defined brokerIP address (e.g. the host node's) brokerIP1=公网ip namesrvAddr=公网ip:9876

3、启动

docker-compose up -d

浏览器输入网址:公网ip:7100  , docker-compose.yml 文件中 rocketmq-console-ng 配置对外映射地址是7100,可改其他。

 

 

 注意!!!如果,docker-compose up启动后,修改配置文件。不能直接 docker rm 这样数据就没有了。我们可以这样操作:

 docker-compose build # 修改文件后,执行这条命令 docker-compose up -d # 再次重启,就生效了,而且之前的数据还在!

 

4、producer发送消息

public static void main(String[] args) throws Exception, MQBrokerException { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("公网ip:9876"); //3.启动producer producer.start(); //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("broker-b", "Tag3", ("Hello rockerMQ ,单向消息").getBytes()); //5.发送单向消息 producer.sendOneway(msg); //6.关闭生产者producer producer.shutdown(); }

5、  consumer消费

public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("公网ip:9876"); //3.订阅主题Topic和Tag , || 表达式 consumer.subscribe("broker-b", "*"); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("Topic" + msg.getTopic() + ",consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); }

6、准备后期更新

rocketMQ 与 springboot 结合开发rocketMQ应用场景rocketMQ技术原理rocketMQ集群搭建rocketMQ源码分析与技术架构

 

 

最新回复(0)