延时队列的使用

it2026-02-11  14

延时队列实现的包装类 其中addQueue负责往队列中添加延时任务 添加后会把任务对象加入RDelayedQueue中,让监听器监听

getQueue负责获得任务以及监听 //当队列为空时挂起当前线程,不会消耗CPU //take去除队列中首位对象并获取这个对象,如果Queue为空则等待 //然后让监听器执行这个对象的任务

@Component public class RedisDelayedQueue { private final static Logger LOGGER = LoggerFactory.getLogger(RedisDelayedQueue.class); @Autowired private RedissonClient redissonClient; /** * 任务回调监听 * * @param <T> */ public abstract static class BaseDelayTaskEventListener<T> { /** * 执行方法 * * @param t */ public abstract void invoke(T t); } /** * 添加队列 * * @param t DTO传输类 * @param delay 时间数量 * @param timeUnit 时间单位 * @param <T> 泛型 */ public <T> void addQueue(T t, long delay, TimeUnit timeUnit) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(t.getClass().getName()); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); LOGGER.info("发送延时消息<<=====>>队列:{},延时:{} 秒,消息内容:{}", t.getClass().getName(), delay, t.toString()); //delayedQueue.destroy(); } /** * 获取队列 * * @param zClass DTO泛型 * @param taskEventListener 任务回调监听 * @param <T> 泛型 * @return */ public <T> void getQueue(Class zClass, BaseDelayTaskEventListener taskEventListener) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(zClass.getName()); //由于此线程需要常驻,可以新建线程,不用交给线程池管理 new Thread(() -> { LOGGER.warn("开启监听延时队列[{}]的消息......", zClass.getName()); while (true) { try { //当队列为空时挂起当前线程,不会消耗CPU //take去除首位对象并获取,如果Queue为空则等待 //然后让监听器执行这个对象的任务 T t = blockingFairQueue.take(); taskEventListener.invoke(t); } catch (InterruptedException e) { LOGGER.error("监听消息出错:{}", e.getMessage()); } } }).start(); } }

业务逻辑 用配置好的threadpoolexecutor执行延时任务 handleDelayMessage会在整个项目启动前被@Order(0)作为前置任务启动

创建监听器 并且把invoke函数复写为业务逻辑代码,让之后getQueue监听时执行

@Component public class DelayTaskHandler extends AbstractDelayedTaskHandler { private final static Logger LOGGER = LoggerFactory.getLogger(DelayTaskHandler.class); @Autowired private RedisDelayedQueue redisDelayedQueue; @Resource @Qualifier("delayAsyncExecutor") private Executor threadPoolExecutor; @Resource ApplicationContext applicationContext; //handleDelayMessage会在整个项目启动前被@Order(0)作为前置任务启动 @Override public void handleDelayMessage() { //监听延迟队列 //创建监听器 RedisDelayedQueue.BaseDelayTaskEventListener<DelayedTaskDTO> taskEventListener = new RedisDelayedQueue.BaseDelayTaskEventListener<DelayedTaskDTO>() { //复写需要执行的方法 @Override public void invoke(DelayedTaskDTO taskBodyDTO) { threadPoolExecutor.execute(new Task(taskBodyDTO, applicationContext)); } }; //这里会启动不间断的任务监听 threadPoolExecutor.execute(() -> redisDelayedQueue.getQueue(DelayedTaskDTO.class, taskEventListener)); } private class Task implements Runnable { private DelayedTaskDTO delayTask; private ApplicationContext applicationContext; private Task(DelayedTaskDTO delayTask, ApplicationContext applicationContext) { this.delayTask = delayTask; this.applicationContext = applicationContext; } //业务逻辑任务的调用 @Override public void run() { if (Objects.nonNull(delayTask)) { DelayJobExecutor delayJobExecutor = (DelayJobExecutor) applicationContext.getBean(delayTask.getClazz()); delayJobExecutor.executor(delayTask.getMessage()); } } } }

创建延时任务,并加入队列 Clazz是Class类,这样获得了DelayJobExecutor实现类的Class类, 这样传入delayTask后,在执行Task类前 会把这个类在Spring容器中对应的DelayJobExecutor实例Bean传给Task里的run(){}任务,并使用这个Bean来执行业务方法

DelayedTaskDTO delayTask = new DelayedTaskDTO(); delayTask.setClazz(AutoCancelOrderServiceImpl.class); delayTask.setMessage(JSON.toJSONString(message)); delayedQueue.addQueue(delayTask, orderDelayCloseInterval, TimeUnit.MINUTES);

业务类接口

public interface DelayJobExecutor { void executor(String message); }

业务实现类

@Slf4j @Service public class AutoCancelOrderServiceImpl implements DelayJobExecutor { @Resource private WxTradeMapper wxTradeMapper; @Resource private WxOrderMapper wxOrderMapper; @Autowired private ITradeService tradeService; @Override @Transactional public void executor(String message) { JSONObject obj = JSON.parseObject(message); Example wxTradeExample = new Example(WxOrder.class); String tid = obj.getString("tid"); wxTradeExample.createCriteria().andEqualTo("tid", tid); WxTrade wxTrade = wxTradeMapper.selectOneByExample(wxTradeExample); if(wxTrade.getStatus().equals(OrderStatusEnum.UNPAID.type)){ log.info("延时队列: 订单{}到期未付款,做关闭处理", tid); //更新trade表状态 wxTrade.setUpdateTime(LocalDateTime.now()); wxTrade.setStatus(OrderStatusEnum.TRADE_CLOSE.type); wxTrade.setEndTime(LocalDateTime.now()); tradeService.updateTrade(wxTrade, wxTradeExample); Example wxOrderExample = new Example(WxOrder.class); wxOrderExample.createCriteria().andEqualTo("tid", tid); List<WxOrder> wxOrders = wxOrderMapper.selectByExample(wxOrderExample); wxOrders.forEach(wxOrder -> { wxOrder.setStatus(OrderStatusEnum.TRADE_CLOSE.type); wxOrder.setEndTime(LocalDateTime.now()); wxOrder.setUpdateTime(LocalDateTime.now()); }); wxOrderMapper.updateBatchByPrimaryKeySelective(wxOrders); log.info("延时队列: 订单{}已关闭处理", tid); } } }
最新回复(0)