1 安装 下载otp + rabbitmq
2 先安装otp, 再安装rabbitmq
3 访问http://127.0.0.1:15672
如果无法访问:
3.1 检查任务管理器中的服务, rabbitmq是否开启
3.2 调整rabbit管理后台的配置
在cmd模式下,进入rabbitmq的安装根目录切入sbin目录
执行命令: rabbitmq-plugins enable rabbitmq_management
4 使用游客账号登录: guest guest
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1GYXpJ11-1603278679953)(file:///C:\Users\admin\AppData\Roaming\Tencent\Users\1125400943\QQ\WinTemp\RichOle\6IDSYSJS1P}RT$TV4BH}OZG.png)]
RabbitMQ的用户角色分类 none、management、 policymaker、 monitoring、 administrator none 不能访问management plugin
1.用户可以通过AMQP做的任何事外加: 2.列出自己可以通过AMQP登入的virtual hosts 3.查看自己的virtual hosts中的queues, exchanges和bindings 4.查看和关闭自己的channels和connections 5.查看有关自己的virtual hosts的 “全局”的统计信息,包含其他用户在这些virtual, hosts中的活动。
policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections
RabbitMQ会绑定一-些端口,安装完后,需要将这些端口添加至防火墙。 25672 用于RabbitMQ节点间和CLII具通信,配合4369使用。 15672 HTTP_ _API端口,管理员用户才能访问,用于管理RabbitMQ,需要启用management插件。 15674 基于WebSocket的STOMP客户端端口(当 插件Web STOMP启用的时候打开) 15675 基于WebSocket的MQTT客户端端口(当 插件Web MQTT启用的时候打开)
1 解耦
技术层面的解耦: 下单 在同一个service中写, 商品,订单,消费记录. - service… 代码简洁,结构清晰了,可维护性更高了
业务层面的解耦: 下单 – 暂停 – 系统处理能力有限 , 我有条不紊的
事务 – 锁 – 数据库中的记录
添加rabbitmq相关的系统配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>系统配置文件:
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: lyj password: lyj 创建队列 : Queue
创建交换机 : DirectExchange(直连型交换机)
将消息队列与交换机进行绑定 : Binding
新建mq的包 新建DirectRabbitConfig类, 一定要加上@Configuration标识为配置类 package com.bdqn.order.util.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { private static Logger log = LoggerFactory.getLogger(DirectRabbitConfig.class); @Autowired private CachingConnectionFactory connectionFactory; //队列 起名:operLogQueue @Bean public Queue operLogQueue() { return new Queue("operLogQueue",true); //true 是否持久 } //Direct交换机 起名:OperLogExchange @Bean DirectExchange OperLogExchange() { return new DirectExchange("OperLogExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(operLogQueue()).to(OperLogExchange()).with("OperLogRouting"); } @Bean public RabbitTemplate rabbitTemplate(){ //若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true //每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback //使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // /** // * 如果消息没有到exchange,则confirm回调,ack=false // * 如果消息到达exchange,则confirm回调,ack=true // * exchange到queue成功,则不回调return // * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) // */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); }else{ log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } } 消息的生产者, 发送一条新消息
@Autowired RabbitTemplate rabbitTemplate; rabbitTemplate.convertAndSend("OperLogExchange","OperLogRouting", map); 三个参数: 1 : 交换机 2 消息队列绑定的路由键 3 消息的内容发送消息的内容如果包含了自己定的封装类,如 UserInfo (实体, pojo),必须实现 implements Serializable接口
消息的消费者, 监听消息队列, 有新消息就获取, 并删除已获取的消息
// 在某个service上加上注解, 将某个service标识为消息队列的消费者 @RabbitListener(queues = "operLogQueue") // 在这个service中还需绑定消息的处理方法,当消息队列中有新的消息时,将调用绑定的方法 @RabbitHandler ---------------------------------------------------- @Service @RabbitListener(queues = "operLogQueue") public class OperLogServiceImpl implements OperLogService { @Resource OperLogMapper operLogMapper; @Override public int addOperLog(OperLog log) { return operLogMapper.insertSelective(log); } @RabbitHandler public void addMqOperLog(Map map) { UserInfo u = (UserInfo)map.get("user"); OperLog log = new OperLog(); log.setOprType("登录"); log.setOprName(u.getUserName() + "进行登陆"); log.setCreateTime(new Date()); operLogMapper.insertSelective(log); } }5 消息在消费过程中,出现异常了怎么办
1.Queue: 真正储存数据的地方
2.Exchange:接收请求,转存数据
3.Bind:收到请求后储存到哪里
4.消息生产者: 发送数据的应用
5.消息消费者:去除数据处理的应用