RabbitMq

it2025-01-26  12

RabbitMq 消息队列: 用来存放消息的队列

安装,管理后台

1 安装 下载otp + rabbitmq

2 先安装otp, 再安装rabbitmq

3 访问http://127.0.0.1:15672

​ 如果无法访问:

​ 3.1 检查任务管理器中的服务, rabbitmq是否开启

​ 3.2 调整rabbit管理后台的配置

​ 在cmd模式下,进入rabbitmq的安装根目录切入sbin目录

​ 执行命令: rabbitmq-plugins enable rabbitmq_management

4 使用游客账号登录: guest guest

添加用户

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1GYXpJ11-1603278679953)(file:///C:\Users\admin\AppData\Roaming\Tencent\Users\1125400943\QQ\WinTemp\RichOle\6IDSYSJS1P}RT$TV4BH}OZG.png)]

RabbitMQ角色

RabbitMQ的用户角色分类 none、management、 policymaker、 monitoring、 administrator none 不能访问management plugin

management

1.用户可以通过AMQP做的任何事外加: 2.列出自己可以通过AMQP登入的virtual hosts 3.查看自己的virtual hosts中的queues, exchanges和bindings 4.查看和关闭自己的channels和connections 5.查看有关自己的virtual hosts的 “全局”的统计信息,包含其他用户在这些virtual, hosts中的活动。

administrator

policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections

端口

RabbitMQ会绑定一-些端口,安装完后,需要将这些端口添加至防火墙。 25672 用于RabbitMQ节点间和CLII具通信,配合4369使用。 15672 HTTP_ _API端口,管理员用户才能访问,用于管理RabbitMQ,需要启用management插件。 15674 基于WebSocket的STOMP客户端端口(当 插件Web STOMP启用的时候打开) 15675 基于WebSocket的MQTT客户端端口(当 插件Web MQTT启用的时候打开)

消息队列的好处

1 解耦

​ 技术层面的解耦: 下单 在同一个service中写, 商品,订单,消费记录. - service… 代码简洁,结构清晰了,可维护性更高了

​ 业务层面的解耦: 下单 – 暂停 – 系统处理能力有限 , 我有条不紊的

​ 事务 – 锁 – 数据库中的记录

MQ编码:

1 引入mq的jar

​ 添加rabbitmq相关的系统配置

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

系统配置文件:

spring: rabbitmq: host: 127.0.0.1 port: 5672 username: lyj password: lyj

2 写mq的配置类

​ 创建队列 : Queue

​ 创建交换机 : DirectExchange(直连型交换机)

​ 将消息队列与交换机进行绑定 : Binding

新建mq的包 新建DirectRabbitConfig类, 一定要加上@Configuration标识为配置类 package com.bdqn.order.util.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { private static Logger log = LoggerFactory.getLogger(DirectRabbitConfig.class); @Autowired private CachingConnectionFactory connectionFactory; //队列 起名:operLogQueue @Bean public Queue operLogQueue() { return new Queue("operLogQueue",true); //true 是否持久 } //Direct交换机 起名:OperLogExchange @Bean DirectExchange OperLogExchange() { return new DirectExchange("OperLogExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(operLogQueue()).to(OperLogExchange()).with("OperLogRouting"); } @Bean public RabbitTemplate rabbitTemplate(){ //若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true //每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback //使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // /** // * 如果消息没有到exchange,则confirm回调,ack=false // * 如果消息到达exchange,则confirm回调,ack=true // * exchange到queue成功,则不回调return // * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) // */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); }else{ log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } }

3 发送消息

​ 消息的生产者, 发送一条新消息

@Autowired RabbitTemplate rabbitTemplate; rabbitTemplate.convertAndSend("OperLogExchange","OperLogRouting", map); 三个参数: 1 : 交换机 2 消息队列绑定的路由键 3 消息的内容

发送消息的内容如果包含了自己定的封装类,如 UserInfo (实体, pojo),必须实现 implements Serializable接口

4 获取消息

​ 消息的消费者, 监听消息队列, 有新消息就获取, 并删除已获取的消息

// 在某个service上加上注解, 将某个service标识为消息队列的消费者 @RabbitListener(queues = "operLogQueue") // 在这个service中还需绑定消息的处理方法,当消息队列中有新的消息时,将调用绑定的方法 @RabbitHandler ---------------------------------------------------- @Service @RabbitListener(queues = "operLogQueue") public class OperLogServiceImpl implements OperLogService { @Resource OperLogMapper operLogMapper; @Override public int addOperLog(OperLog log) { return operLogMapper.insertSelective(log); } @RabbitHandler public void addMqOperLog(Map map) { UserInfo u = (UserInfo)map.get("user"); OperLog log = new OperLog(); log.setOprType("登录"); log.setOprName(u.getUserName() + "进行登陆"); log.setCreateTime(new Date()); operLogMapper.insertSelective(log); } }

5 消息在消费过程中,出现异常了怎么办

5核心对象

1.Queue: 真正储存数据的地方

2.Exchange:接收请求,转存数据

3.Bind:收到请求后储存到哪里

4.消息生产者: 发送数据的应用

5.消息消费者:去除数据处理的应用

最新回复(0)