4.1 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景
网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。
Rabbitmq实现延时队列一般而言有两种形式: 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]
第二种方式:利用rabbitmq中的插件x-delay-message
4.2 TTL DLX实现延时队列 4.2.1 TTL DLX介绍 TTL RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX) RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。 x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
demo搭建
applicaton.yml中配置
spring: application: name: springboot-demo rabbitmq: host: 192.168.213.129 port: 5672 password: guest username: guest创建延时队列
@Configuration public class QueueConfig { /** 短信发送队列 */ public static final String QUEUE_MESSAGE = "queue.message"; /** 交换机 */ public static final String DLX_EXCHANGE = "dlx.exchange"; /** 短信发送队列 延迟缓冲(按消息) */ public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay"; /** * 短信发送队列 * @return */ @Bean public Queue messageQueue() { return new Queue(QUEUE_MESSAGE, true); } /** * 短信发送队列 * @return */ @Bean public Queue delayMessageQueue() { return QueueBuilder.durable(QUEUE_MESSAGE_DELAY) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 消息超时进入死信队列,绑定死信队列交换机 .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE) // 绑定指定的routing-key .build(); } /*** * 创建交换机 * @return */ @Bean public DirectExchange directExchange(){ return new DirectExchange(DLX_EXCHANGE); } /*** * 交换机与队列绑定 * @param messageQueue * @param directExchange * @return */ @Bean public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) { return BindingBuilder.bind(messageQueue) .to(directExchange) .with(QUEUE_MESSAGE); } }创建监听
@Component @RabbitListener(queues = QueueConfig.QUEUE_MESSAGE) public class MessageListener { /*** * 监听消息 * @param msg */ @RabbitHandler public void msg(@Payload Object msg){ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("当前时间:"+dateFormat.format(new Date())); System.out.println("收到信息:"+msg); } }编写测试类
@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; /*** * 发送消息 */ @Test public void sendMessage() throws InterruptedException, IOException { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("发送当前时间:"+dateFormat.format(new Date())); Map<String,String> message = new HashMap<>(); message.put("name","szitheima"); //设置延迟队列的过期时间 rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000"); return message; } }); System.in.read(); } }