<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring rabbitmq: host: 127.0.0.1 port: 5672 username: root password: admin
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayConfig { /** * 延时交换机 --- 交换机用于重新分配队列(接收死信队列中的过期消息,将其转发到需要延迟消息的模块队列) * @return */ @Bean public DirectExchange exchange() { return new DirectExchange(DelayQueueContent.DELAY_EXCHANGE); } /** * 实际消费队列 * 用于延时消费的队列 */ @Bean public Queue repeatTradeQueue() { Queue queue = new Queue(DelayQueueContent.DELAYMSG_RECEIVE_QUEUE_NAME, true,false,false); return queue; } /** * 绑定交换机并指定routing key(死信队列绑定延迟交换机和实际消费队列绑定延迟交换机的路由键一致) * @return */ @Bean public Binding repeatTradeBinding() { return BindingBuilder.bind(repeatTradeQueue()) .to(exchange()) .with(DelayQueueContent.DELAY_KEY); } //死信队列 @Bean public Queue deadLetterQueue() { Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl", DelayQueueContent.EXPERI_TIME); args.put("x-dead-letter-exchange", DelayQueueContent.DELAY_EXCHANGE); args.put("x-dead-letter-routing-key", DelayQueueContent.DELAY_KEY); return new Queue(DelayQueueContent.DELAY_QUEUE_NAME, true, false, false, args); } }
public class DelayQueueContent { /** * ttl(延时)交换机名称 */ public static final String DELAY_EXCHANGE="message.ttl.exchange"; /** * ttl(延时)队列名称 */ public static final String DELAY_QUEUE_NAME ="message.ttl.queue"; /** * dlx(死信)队列名称 */ public static final String DELAYMSG_RECEIVE_QUEUE_NAME="message.dlx.queue"; /** * 绑定键 */ public static final String DELAY_KEY = "message.dlx.routing"; /** * TTL 有效时间 3小时 */ public static final int EXPERI_TIME = 3*60*60*1000; // 3*60*60*1000; }
import org.springframework.amqp.rabbit.core.RabbitTemplate; @RestController @RequestMapping("/sendMqMessage") public class SendMqMessageController { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送延迟消息 * @return */ @GetMapping("/send") public String sendDelayMsg(){ rabbitTemplate.convertAndSend(DelayQueueContent.DELAY_QUEUE_NAME, "holle,this is message!"); log.info("发送时间:"+ LocalDateTime.now()); return "success"; } }
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import org.springframework.amqp.core.Message; import java.time.LocalDateTime; import java.util.Map; /** * 队列延时消费 */ @Slf4j @Component public class DelayListener { /** * 接收延迟消息 * @param channel * @param json * @param message * @param map */ @RabbitHandler @RabbitListener(queues = DelayQueueContent.DELAYMSG_RECEIVE_QUEUE_NAME) public void receiveDelayMsg(Channel channel, String json, Message message, @Headers Map<String,Object> map){ try { log.info("接收到的消息: {}", json); log.info("接收时间:{}" ,LocalDateTime.now()); // 在这里实现具体的逻辑 // todo…… //代码为在消费者中开启消息接收确认的手动ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("消息消费成功!",json); } catch (Exception e) { log.error("消息消费失败!",json); e.printStackTrace(); } } }
.