redis实现延时队列

it2024-07-06  44

思路: 1.使用redis中zset数据结构    2.使用score排序,score为过期时间点    3.启动线程不断取出排序第一个,比较score和当前时间点,如果score小于或等于当前时间,说明此数据过期,需要处理  4.处理完毕在zset中移除 //初始化jedis private static JedisPool jedisPool = new JedisPool(new GenericObjectPoolConfig(), "127.0.0.1", 6380, 10000, "mima", 1); public static Jedis getJedis() { return jedisPool.getResource(); } @Async @Scheduled(cron = "0 * * * * ?") public void WODeposit() throws InterruptedException { List<String> woList = new ArrayList<String>(); woList.add("00 01"); woList.add("00 02"); woList.add("00 03"); woList.add("00 04"); for (int i = 0; i < woList.size(); i++) { //延迟5秒 Calendar cal1 = Calendar.getInstance(); cal1.add(Calendar.SECOND, 5); int second3later = (int) (cal1.getTimeInMillis() / 1000); Jedis jedis = RedisDelayTask.getJedis(); jedis.zadd("worderList", second3later, woList.get(i)); System.out.println(new Date() + "存入命令:" + woList.get(i)); //关闭jedis,存在低版本无法关闭的情况 jedis.close(); } } //消费命令 @Async @Scheduled(cron = "0/10 * * * * ?") public void WOTakeOut() throws InterruptedException { Jedis jedis = RedisDelayTask.getJedis(); Set<Tuple> items = jedis.zrangeWithScores("worderList", 0, 1); if (items == null || items.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } else { int score = (int) ((Tuple) items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if (nowSecond >= score) { String orderId = ((Tuple) items.toArray()[0]).getElement(); Long num = jedis.zrem("worderList", orderId); if (num != null && num > 0) { System.out.println(new Date() + "取出命令:" + orderId); } } } //关闭jedis jedis.close(); }

关键点:jedis.close(); jedis的版本依赖,低版本存在无法close的情况

<!-- 添加redis依赖 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency>
最新回复(0)