springboot整合rabbitmq实现分布式事务)(3)

it2023-02-03  53

文章目录

1 前置知识2 分布式事务2.1 生产者2.1.1 引入依赖2.1.2yml文件添加配置2.1.3 定义交换机2.1.4 生产者的回调方法(保证消息100%投递)2.1.5 补偿机制(保证消息的100%投递)2.1.6 controller2.1.6 service 2.2 消费者2.2.1 引入依赖2.2.2yml文件添加配置2.2.3定义交换机,路由,死信队列2.2.4消费者监听(幂等性处理)

1 前置知识

1四种交换机的说明

2主流程的讲解

2 分布式事务

2.1 生产者

2.1.1 引入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.1.2yml文件添加配置

server: port: 8071 spring: application: name: biz-publisher datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/publisher?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC username: root password: 123456 initialSize: 1 minIdle: 3 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 30000 validationQuery: select 'x' testWhileIdle: true testOnBorrow: false testOnReturn: false # 打开PSCache,并且指定每个连接上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,slf4j # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合并多个DruidDataSource的监控数据 #useGlobalDataSourceStat: true rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest #确认消息已发送到交换机(Exchange) #确认消息已发送到队列(Queue) publisher-returns: true # publisher-confirm-type: correlated publisher-confirms: true # cloud: # stream: # rocketmq: # binder: # name-server: 127.0.0.1:9876 # bindings: # ChannelTrans: # producer: # transactional: true # group: groupTrans # bindings: # ChannelTrans: # destination: topicTrans mybatis: configuration: map-underscore-to-camel-case: true mapper-locations: mybatis/**/*Mapper.xml #logging: # level: # root: debug

2.1.3 定义交换机

package fastwave.cloud.demo.publisher.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TransConfig { public static final String TRANSFER_EXCHANGE_NAME = "TransExchange"; public static final String TRANSFER_ROUTING_KEY = "TransRoutingKey"; @Bean public DirectExchange TransExchange(){ return new DirectExchange(TRANSFER_EXCHANGE_NAME); } }

2.1.4 生产者的回调方法(保证消息100%投递)

package fastwave.cloud.demo.publisher.services; import com.alibaba.fastjson.JSON; import fastwave.cloud.demo.publisher.config.TransConfig; import fastwave.cloud.demo.publisher.domain.MsgLogDO; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Date; import java.util.Map; import java.util.UUID; @Service public class TransferMQService { @Resource(name = "TemplateTrans") private RabbitTemplate template; @Autowired MsgLogService msgLogService; @PostConstruct private void initRabbitTemplate(){ //设置消息发送确认回调,发送成功后更新消息表状态 template.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> { if(ack){ MsgLogDO msgLogDO = new MsgLogDO(); msgLogDO.setMsgId(correlationData.getId()); msgLogDO.setStatus(1); msgLogService.update(msgLogDO); } }); template.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { Date curTime = new Date(); MsgLogDO msgLogDO = new MsgLogDO(); msgLogDO.setMsgId(UUID.randomUUID().toString()); msgLogDO.setMsg(message.toString()); msgLogDO.setExchange(exchange); msgLogDO.setRoutingKey(routingKey); msgLogDO.setStatus(4); msgLogDO.setTryCount(0); msgLogDO.setCreateTime(curTime); msgLogDO.setUpdateTime(curTime); msgLogDO.setCreateTime(curTime); msgLogService.save(msgLogDO); } }); } public void sendMessage(Map<String, Object> params){ String msg = JSON.toJSONString(params); String transId = params.get("transId").toString(); Message message = MessageBuilder.withBody(msg.getBytes()) .setMessageId(transId) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); template.convertAndSend(TransConfig.TRANSFER_EXCHANGE_NAME,TransConfig.TRANSFER_ROUTING_KEY, message, new CorrelationData(transId)); } }

2.1.5 补偿机制(保证消息的100%投递)

package fastwave.cloud.demo.publisher.job; import fastwave.cloud.demo.publisher.domain.MsgLogDO; import fastwave.cloud.demo.publisher.services.MsgLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; @Component @EnableScheduling public class MQMessageJob { @Autowired private MsgLogService msgLogService; @Resource(name = "TemplateTrans") private RabbitTemplate reliableTemplate; private static Logger logger = LoggerFactory.getLogger(MQMessageJob.class); //定时扫描记录表,将发送状态为-1且未超重发次数的消息再次发送,超过重发次数,必要时人工干预,生产环境中可以单独部署定时任务 @Scheduled(cron ="10/10 * * * * ?" ) public void scanNoConfirmMsg(){ Map<String, Object> searchParams = new HashMap<String, Object>(); searchParams.put("status", -1); searchParams.put("tryCount", 3); try { List<MsgLogDO> list = msgLogService.list(searchParams); for(MsgLogDO item : list) { MsgLogDO msgLogDO = new MsgLogDO(); msgLogDO.setMsgId(item.getMsgId()); msgLogDO.setTryCount(item.getTryCount() + 1); msgLogService.update(msgLogDO); Message message = MessageBuilder.withBody(item.getMsg().getBytes()).setMessageId(item.getMsgId()).build(); reliableTemplate.convertAndSend(item.getExchange(), item.getRoutingKey(), message, new CorrelationData(item.getMsgId())); } } catch(Exception e) { logger.error("扫描作业出错,信息:" + e.getMessage()); } } }

2.1.6 controller

package fastwave.cloud.demo.publisher.controller; import fastwave.cloud.demo.publisher.services.AccountService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.Map; @RestController @RequestMapping("trans") public class TransController { @Autowired AccountService accountService; @GetMapping("/transfer") public String transfer(@RequestParam Map<String, Object> params){ try { accountService.transfer(params); } catch (Exception e) { return "转账失败"; } return "转账成功"; } }

2.1.6 service

@Override @Transactional(rollbackFor = Exception.class) public String transfer(Map<String, Object> params) { Integer id1 = Integer.parseInt(params.get("id1").toString()); Integer id2 = Integer.parseInt(params.get("id2").toString()); Float amount = Float.parseFloat(params.get("amount").toString()); String transId = UUID.randomUUID().toString(); params.put("transId", transId); // 1.扣款操作 AccountDO user1 = accountDao.get(id1); user1.setBalance(user1.getBalance() - amount); accountDao.update(user1); // 2.扣款日志 TransDO transDO = new TransDO(); transDO.setId(transId); String message = JSON.toJSONString(params); transDO.setMessage(message); transDao.save(transDO); // 3.保存事务消息,防止发送失败后,可以在作业中重试,另外还可以手工处理 String msg = JSON.toJSONString(params); Date curTime = new Date(); MsgLogDO msgLogDO = new MsgLogDO(); msgLogDO.setMsgId(transId); msgLogDO.setMsg(msg); msgLogDO.setExchange(TransConfig.TRANSFER_EXCHANGE_NAME); msgLogDO.setRoutingKey(TransConfig.TRANSFER_ROUTING_KEY); msgLogDO.setStatus(-1); msgLogDO.setTryCount(0); msgLogDO.setCreateTime(curTime); msgLogDO.setUpdateTime(curTime); msgLogDO.setCreateTime(curTime); msgLogDao.save(msgLogDO); //4.发送消息 rabbitMQService.sendMessage(params); return "转账成功"; }

2.2 消费者

2.2.1 引入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.2.2yml文件添加配置

server: port: 8072 spring: application: name: biz-subscriber datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/subscriber?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC username: root password: 123456 initialSize: 1 minIdle: 3 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 30000 validationQuery: select 'x' testWhileIdle: true testOnBorrow: false testOnReturn: false # 打开PSCache,并且指定每个连接上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,slf4j # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合并多个DruidDataSource的监控数据 #useGlobalDataSourceStat: true redis: host: localhost port: 6379 # 连接超时时间(毫秒) timeout: 10000 jedis: pool: # 连接池中的最大空闲连接 max-idle: 8 # 连接池中的最小空闲连接 min-idle: 10 # 连接池最大连接数(使用负值表示没有限制) max-active: 100 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 3000 mybatis: configuration: map-underscore-to-camel-case: true mapper-locations: mybatis/**/*Mapper.xml

2.2.3定义交换机,路由,死信队列

package fastwave.cloud.demo.subscriber.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TransConfig { public static final String TRANSFER_EXCHANGE_NAME = "TransExchange"; public static final String TRANSFER_ROUTING_KEY = "TransRoutingKey"; public static final String TRANSFER_QUEUE_NAME = "TransDirectQueue"; public static final String TRANSFER_DL_EXCHANGE_NAME = "TransDlExchange"; public static final String TRANSFER_DL_ROUTING_KEY = "TransDlRoutingKey"; public static final String TRANSFER_DL_QUEUE_NAME = "TransDlDirectQueue"; @Bean public DirectExchange TransExchange(){ return new DirectExchange(TRANSFER_EXCHANGE_NAME); } @Bean public Queue TransQueue() { return QueueBuilder.durable(TRANSFER_QUEUE_NAME) //配置死信 .withArgument("x-dead-letter-exchange",TRANSFER_DL_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key",TRANSFER_DL_ROUTING_KEY) .build(); } @Bean Binding bindingTrans() { return BindingBuilder.bind(TransQueue()).to(TransExchange()).with(TRANSFER_ROUTING_KEY); } @Bean public DirectExchange TransDlExchange(){ return new DirectExchange(TRANSFER_DL_EXCHANGE_NAME); } @Bean public Queue TransDlQueue() { return new Queue(TRANSFER_DL_QUEUE_NAME,true); //true 是否持久 } @Bean Binding bindingDlTrans() { return BindingBuilder.bind(TransQueue()).to(TransExchange()).with(TRANSFER_DL_ROUTING_KEY); } }

2.2.4消费者监听(幂等性处理)

package fastwave.cloud.demo.subscriber.controller; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import fastwave.cloud.demo.subscriber.common.CacheUtil; import fastwave.cloud.demo.subscriber.config.TransConfig; import fastwave.cloud.demo.subscriber.services.AccountService; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Component public class TransReceiver { @Autowired CacheUtil cacheUtil; @Autowired AccountService accountService; @RabbitListener(queues = TransConfig.TRANSFER_QUEUE_NAME) public void HandlerMessage(Message message) throws IOException { String messageId = message.getMessageProperties().getMessageId(); if(messageId != null && !cacheUtil.exists(messageId)) { String msg = new String(message.getBody()); Map<String, Object> map = JSONObject.parseObject(msg, Map.class); Integer id2 = Integer.parseInt(map.get("id2").toString()); Float amount = Float.parseFloat(map.get("amount").toString()); accountService.transfer(id2, amount); cacheUtil.set(messageId, true); } else { System.out.println("已消费,不需要重复消费"); } } }
最新回复(0)