文章目录
1 前置知识2 分布式事务2.1 生产者2.1.1 引入依赖2.1.2yml文件添加配置2.1.3 定义交换机2.1.4 生产者的回调方法(保证消息100%投递)2.1.5 补偿机制(保证消息的100%投递)2.1.6 controller2.1.6 service
2.2 消费者2.2.1 引入依赖2.2.2yml文件添加配置2.2.3定义交换机,路由,死信队列2.2.4消费者监听(幂等性处理)
1 前置知识
1四种交换机的说明
2主流程的讲解
2 分布式事务
2.1 生产者
2.1.1 引入依赖
<dependency>
<groupId>org
.springframework
.boot
</groupId
>
<artifactId>spring
-boot
-starter
-amqp
</artifactId
>
</dependency
>
2.1.2yml文件添加配置
server
:
port
: 8071
spring
:
application
:
name
: biz
-publisher
datasource
:
type
: com
.alibaba
.druid
.pool
.DruidDataSource
driverClassName
: com
.mysql
.cj
.jdbc
.Driver
url
: jdbc
:mysql
://127.0.0.1:3306/publisher
?useUnicode
=true&characterEncoding
=utf8
&serverTimezone
=UTC
username
: root
password
: 123456
initialSize
: 1
minIdle
: 3
maxActive
: 20
# 配置获取连接等待超时的时间
maxWait
: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis
: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis
: 30000
validationQuery
: select
'x'
testWhileIdle
: true
testOnBorrow
: false
testOnReturn
: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements
: true
maxPoolPreparedStatementPerConnectionSize
: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,
'wall'用于防火墙
filters
: stat
,wall
,slf4j
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties
: druid
.stat
.mergeSql
=true;druid
.stat
.slowSqlMillis
=5000
# 合并多个DruidDataSource的监控数据
#useGlobalDataSourceStat
: true
rabbitmq
:
host
: 127.0.0.1
port
: 5672
username
: guest
password
: guest
#确认消息已发送到交换机
(Exchange
)
#确认消息已发送到队列
(Queue
)
publisher
-returns
: true
# publisher
-confirm
-type
: correlated
publisher
-confirms
: true
# cloud
:
# stream
:
# rocketmq
:
# binder
:
# name
-server
: 127.0.0.1:9876
# bindings
:
# ChannelTrans
:
# producer
:
# transactional
: true
# group
: groupTrans
# bindings
:
# ChannelTrans
:
# destination
: topicTrans
mybatis
:
configuration
:
map
-underscore
-to
-camel
-case: true
mapper
-locations
: mybatis
*Mapper
.xml
#logging
:
# level
:
# root
: debug
2.1.3 定义交换机
package fastwave
.cloud
.demo
.publisher
.config
;
import org
.springframework
.amqp
.core
.*
;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
@Configuration
public class TransConfig {
public static final String TRANSFER_EXCHANGE_NAME
= "TransExchange";
public static final String TRANSFER_ROUTING_KEY
= "TransRoutingKey";
@Bean
public DirectExchange
TransExchange(){
return new DirectExchange(TRANSFER_EXCHANGE_NAME
);
}
}
2.1.4 生产者的回调方法(保证消息100%投递)
package fastwave
.cloud
.demo
.publisher
.services
;
import com
.alibaba
.fastjson
.JSON
;
import fastwave
.cloud
.demo
.publisher
.config
.TransConfig
;
import fastwave
.cloud
.demo
.publisher
.domain
.MsgLogDO
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.core
.MessageBuilder
;
import org
.springframework
.amqp
.core
.MessageDeliveryMode
;
import org
.springframework
.amqp
.rabbit
.connection
.CorrelationData
;
import org
.springframework
.amqp
.rabbit
.core
.RabbitTemplate
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.stereotype
.Service
;
import javax
.annotation
.PostConstruct
;
import javax
.annotation
.Resource
;
import java
.util
.Date
;
import java
.util
.Map
;
import java
.util
.UUID
;
@Service
public class TransferMQService {
@Resource(name
= "TemplateTrans")
private RabbitTemplate template
;
@Autowired
MsgLogService msgLogService
;
@PostConstruct
private void initRabbitTemplate(){
template
.setConfirmCallback((CorrelationData correlationData
, boolean ack
, String cause
) -> {
if(ack
){
MsgLogDO msgLogDO
= new MsgLogDO();
msgLogDO
.setMsgId(correlationData
.getId());
msgLogDO
.setStatus(1);
msgLogService
.update(msgLogDO
);
}
});
template
.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message
, int replyCode
, String replyText
, String exchange
, String routingKey
) {
Date curTime
= new Date();
MsgLogDO msgLogDO
= new MsgLogDO();
msgLogDO
.setMsgId(UUID
.randomUUID().toString());
msgLogDO
.setMsg(message
.toString());
msgLogDO
.setExchange(exchange
);
msgLogDO
.setRoutingKey(routingKey
);
msgLogDO
.setStatus(4);
msgLogDO
.setTryCount(0);
msgLogDO
.setCreateTime(curTime
);
msgLogDO
.setUpdateTime(curTime
);
msgLogDO
.setCreateTime(curTime
);
msgLogService
.save(msgLogDO
);
}
});
}
public void sendMessage(Map
<String, Object> params
){
String msg
= JSON
.toJSONString(params
);
String transId
= params
.get("transId").toString();
Message message
= MessageBuilder
.withBody(msg
.getBytes())
.setMessageId(transId
)
.setDeliveryMode(MessageDeliveryMode
.PERSISTENT
)
.build();
template
.convertAndSend(TransConfig
.TRANSFER_EXCHANGE_NAME
,TransConfig
.TRANSFER_ROUTING_KEY
, message
,
new CorrelationData(transId
));
}
}
2.1.5 补偿机制(保证消息的100%投递)
package fastwave
.cloud
.demo
.publisher
.job
;
import fastwave
.cloud
.demo
.publisher
.domain
.MsgLogDO
;
import fastwave
.cloud
.demo
.publisher
.services
.MsgLogService
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.core
.MessageBuilder
;
import org
.springframework
.amqp
.rabbit
.connection
.CorrelationData
;
import org
.springframework
.amqp
.rabbit
.core
.RabbitTemplate
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.scheduling
.annotation
.EnableScheduling
;
import org
.springframework
.scheduling
.annotation
.Scheduled
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.Resource
;
import java
.util
.HashMap
;
import java
.util
.List
;
import java
.util
.Map
;
@Component
@EnableScheduling
public class MQMessageJob {
@Autowired
private MsgLogService msgLogService
;
@Resource(name
= "TemplateTrans")
private RabbitTemplate reliableTemplate
;
private static Logger logger
= LoggerFactory
.getLogger(MQMessageJob
.class);
@Scheduled(cron
="10/10 * * * * ?" )
public void scanNoConfirmMsg(){
Map
<String, Object> searchParams
= new HashMap<String, Object>();
searchParams
.put("status", -1);
searchParams
.put("tryCount", 3);
try {
List
<MsgLogDO> list
= msgLogService
.list(searchParams
);
for(MsgLogDO item
: list
)
{
MsgLogDO msgLogDO
= new MsgLogDO();
msgLogDO
.setMsgId(item
.getMsgId());
msgLogDO
.setTryCount(item
.getTryCount() + 1);
msgLogService
.update(msgLogDO
);
Message message
= MessageBuilder
.withBody(item
.getMsg().getBytes()).setMessageId(item
.getMsgId()).build();
reliableTemplate
.convertAndSend(item
.getExchange(), item
.getRoutingKey(), message
, new CorrelationData(item
.getMsgId()));
}
}
catch(Exception e
)
{
logger
.error("扫描作业出错,信息:" + e
.getMessage());
}
}
}
2.1.6 controller
package fastwave
.cloud
.demo
.publisher
.controller
;
import fastwave
.cloud
.demo
.publisher
.services
.AccountService
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.web
.bind
.annotation
.GetMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestParam
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
import java
.util
.Map
;
@RestController
@RequestMapping("trans")
public class TransController {
@Autowired
AccountService accountService
;
@GetMapping("/transfer")
public String
transfer(@RequestParam Map
<String, Object> params
){
try {
accountService
.transfer(params
);
}
catch (Exception e
)
{
return "转账失败";
}
return "转账成功";
}
}
2.1.6 service
@Override
@Transactional(rollbackFor
= Exception
.class)
public String
transfer(Map
<String, Object> params
) {
Integer id1
= Integer
.parseInt(params
.get("id1").toString());
Integer id2
= Integer
.parseInt(params
.get("id2").toString());
Float amount
= Float
.parseFloat(params
.get("amount").toString());
String transId
= UUID
.randomUUID().toString();
params
.put("transId", transId
);
AccountDO user1
= accountDao
.get(id1
);
user1
.setBalance(user1
.getBalance() - amount
);
accountDao
.update(user1
);
TransDO transDO
= new TransDO();
transDO
.setId(transId
);
String message
= JSON
.toJSONString(params
);
transDO
.setMessage(message
);
transDao
.save(transDO
);
String msg
= JSON
.toJSONString(params
);
Date curTime
= new Date();
MsgLogDO msgLogDO
= new MsgLogDO();
msgLogDO
.setMsgId(transId
);
msgLogDO
.setMsg(msg
);
msgLogDO
.setExchange(TransConfig
.TRANSFER_EXCHANGE_NAME
);
msgLogDO
.setRoutingKey(TransConfig
.TRANSFER_ROUTING_KEY
);
msgLogDO
.setStatus(-1);
msgLogDO
.setTryCount(0);
msgLogDO
.setCreateTime(curTime
);
msgLogDO
.setUpdateTime(curTime
);
msgLogDO
.setCreateTime(curTime
);
msgLogDao
.save(msgLogDO
);
rabbitMQService
.sendMessage(params
);
return "转账成功";
}
2.2 消费者
2.2.1 引入依赖
<dependency>
<groupId>org
.springframework
.boot
</groupId
>
<artifactId>spring
-boot
-starter
-amqp
</artifactId
>
</dependency
>
2.2.2yml文件添加配置
server
:
port
: 8072
spring
:
application
:
name
: biz
-subscriber
datasource
:
type
: com
.alibaba
.druid
.pool
.DruidDataSource
driverClassName
: com
.mysql
.cj
.jdbc
.Driver
url
: jdbc
:mysql
://127.0.0.1:3306/subscriber
?useUnicode
=true&characterEncoding
=utf8
&serverTimezone
=UTC
username
: root
password
: 123456
initialSize
: 1
minIdle
: 3
maxActive
: 20
# 配置获取连接等待超时的时间
maxWait
: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis
: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis
: 30000
validationQuery
: select
'x'
testWhileIdle
: true
testOnBorrow
: false
testOnReturn
: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements
: true
maxPoolPreparedStatementPerConnectionSize
: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,
'wall'用于防火墙
filters
: stat
,wall
,slf4j
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties
: druid
.stat
.mergeSql
=true;druid
.stat
.slowSqlMillis
=5000
# 合并多个DruidDataSource的监控数据
#useGlobalDataSourceStat
: true
redis
:
host
: localhost
port
: 6379
# 连接超时时间(毫秒)
timeout
: 10000
jedis
:
pool
:
# 连接池中的最大空闲连接
max
-idle
: 8
# 连接池中的最小空闲连接
min
-idle
: 10
# 连接池最大连接数(使用负值表示没有限制)
max
-active
: 100
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max
-wait
: -1
rabbitmq
:
host
: 127.0.0.1
port
: 5672
username
: guest
password
: guest
listener
:
simple
:
retry
:
enabled
: true
max
-attempts
: 3
initial
-interval
: 3000
mybatis
:
configuration
:
map
-underscore
-to
-camel
-case: true
mapper
-locations
: mybatis
*Mapper
.xml
2.2.3定义交换机,路由,死信队列
package fastwave
.cloud
.demo
.subscriber
.config
;
import org
.springframework
.amqp
.core
.*
;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
@Configuration
public class TransConfig {
public static final String TRANSFER_EXCHANGE_NAME
= "TransExchange";
public static final String TRANSFER_ROUTING_KEY
= "TransRoutingKey";
public static final String TRANSFER_QUEUE_NAME
= "TransDirectQueue";
public static final String TRANSFER_DL_EXCHANGE_NAME
= "TransDlExchange";
public static final String TRANSFER_DL_ROUTING_KEY
= "TransDlRoutingKey";
public static final String TRANSFER_DL_QUEUE_NAME
= "TransDlDirectQueue";
@Bean
public DirectExchange
TransExchange(){
return new DirectExchange(TRANSFER_EXCHANGE_NAME
);
}
@Bean
public Queue
TransQueue() {
return QueueBuilder
.durable(TRANSFER_QUEUE_NAME
)
.withArgument("x-dead-letter-exchange",TRANSFER_DL_EXCHANGE_NAME
)
.withArgument("x-dead-letter-routing-key",TRANSFER_DL_ROUTING_KEY
)
.build();
}
@Bean
Binding
bindingTrans() {
return BindingBuilder
.bind(TransQueue()).to(TransExchange()).with(TRANSFER_ROUTING_KEY
);
}
@Bean
public DirectExchange
TransDlExchange(){
return new DirectExchange(TRANSFER_DL_EXCHANGE_NAME
);
}
@Bean
public Queue
TransDlQueue() {
return new Queue(TRANSFER_DL_QUEUE_NAME
,true);
}
@Bean
Binding
bindingDlTrans() {
return BindingBuilder
.bind(TransQueue()).to(TransExchange()).with(TRANSFER_DL_ROUTING_KEY
);
}
}
2.2.4消费者监听(幂等性处理)
package fastwave
.cloud
.demo
.subscriber
.controller
;
import com
.alibaba
.fastjson
.JSONObject
;
import com
.rabbitmq
.client
.Channel
;
import fastwave
.cloud
.demo
.subscriber
.common
.CacheUtil
;
import fastwave
.cloud
.demo
.subscriber
.config
.TransConfig
;
import fastwave
.cloud
.demo
.subscriber
.services
.AccountService
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.rabbit
.annotation
.RabbitListener
;
import org
.springframework
.amqp
.support
.AmqpHeaders
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.messaging
.handler
.annotation
.Header
;
import org
.springframework
.stereotype
.Component
;
import java
.io
.IOException
;
import java
.util
.Map
;
@Component
public class TransReceiver {
@Autowired
CacheUtil cacheUtil
;
@Autowired
AccountService accountService
;
@RabbitListener(queues
= TransConfig
.TRANSFER_QUEUE_NAME
)
public void HandlerMessage(Message message
) throws IOException
{
String messageId
= message
.getMessageProperties().getMessageId();
if(messageId
!= null
&& !cacheUtil
.exists(messageId
))
{
String msg
= new String(message
.getBody());
Map
<String, Object> map
= JSONObject
.parseObject(msg
, Map
.class);
Integer id2
= Integer
.parseInt(map
.get("id2").toString());
Float amount
= Float
.parseFloat(map
.get("amount").toString());
accountService
.transfer(id2
, amount
);
cacheUtil
.set(messageId
, true);
}
else {
System
.out
.println("已消费,不需要重复消费");
}
}
}