漏桶算法思路很简单,水(也就是请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。 示意图(来源网络)如下:
令牌桶算法和漏桶算法效果一样但方向相反的算法,更加容易理解。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入令牌(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个令牌,如果没有令牌可拿了就阻塞或者拒绝服务。示意图(来源网络)如下:
类似guava的ratelimit
/** * 使用计数器限速,guava的RateLimiter没用过研究后再说 * <p> * 单机1秒限速1000 */ public class RateLimiterutils { private final static long DELTA_MILLIS = 1000; private final static long PERMITS_PER_SECOND = 888; private static int counter = 0; private static long timestamp = System.currentTimeMillis(); public synchronized static void tryAcquire(int delta) { if (delta > PERMITS_PER_SECOND) { throw new IllegalArgumentException(String.format("delta gather than permitsPerSecond, delta:%d, permitsPerSecond:%d", delta, PERMITS_PER_SECOND)); } long now = System.currentTimeMillis(); if (now - timestamp < DELTA_MILLIS) { if (counter + delta <= PERMITS_PER_SECOND) { counter += delta; } else { try { Thread.sleep(timestamp + PERMITS_PER_SECOND - now); counter = delta; timestamp = timestamp + PERMITS_PER_SECOND; } catch (InterruptedException e) { e.printStackTrace(); } } } else { counter = delta; timestamp = now; } } }这种思想类似于Java集合框架的数组扩容,设置一个阈值,只有当超过该临界值时,才会触发异步调用。其余存取令牌的操作与本地限流无二。虽然该方案依旧存在误差,但误差最大也就一批次令牌数而已。
lua脚本的注释不允许使用中文,这点要注意,不然报错 令牌桶初始化,因为令牌的控制脚本需要用到前置参数
-- curr_timestamp: redis current timestamp(Unit of second) redis.replicate_commands() local curr_timestamp = tonumber(redis.call('TIME')[1]) --[[ last_second: token bucket last update timestamp(Unit of second) curr_permits: token bucket last permit amount max_burst: token bucket max amount --]] local result = 1 redis.call("HMSET", KEYS[1], "last_second", curr_timestamp, "curr_permits", ARGV[1], "max_burst", ARGV[2]) return result令牌桶控制脚本
-- curr_timestamp: redis current timestamp(Unit of second) redis.replicate_commands() local time = redis.call('time') local curr_timestamp = tonumber(time[1]) local curr_microseconds = tonumber(time[2]) local require_permits = tonumber(ARGV[1]) local result = {} -- result[1]: minimum time from next refresh(Unit of microsecond) result[1] = (1000000 - curr_microseconds) / 1000 local ratelimit_info = redis.call("HMGET", KEYS[1], "last_second", "curr_permits", "max_burst") local last_second = tonumber(ratelimit_info[1]) local curr_permits = tonumber(ratelimit_info[2]) local max_burst = tonumber(ratelimit_info[3]) -- If the last time has passed, update the token bucket if (curr_timestamp > last_second) then curr_permits = max_burst redis.call("HMSET", KEYS[1], "last_second", curr_timestamp) end -- Update the last permit amount, base on the curr_permits change if (curr_permits > require_permits) then redis.call("HMSET", KEYS[1], "curr_permits", curr_permits - require_permits) result[2] = require_permits return result end if (curr_permits > 0) then redis.call("HMSET", KEYS[1], "curr_permits", 0) end result[2] = curr_permits return result先初始化
@Bean("ratelimitLua") public DefaultRedisScript getRedisScript() { DefaultRedisScript redisScript = new DefaultRedisScript(); redisScript.setLocation(new ClassPathResource("limit/ratelimit.lua")); redisScript.setResultType(java.util.List.class); return redisScript; } @Bean("ratelimitInitLua") public DefaultRedisScript getInitRedisScript() { DefaultRedisScript redisScript = new DefaultRedisScript(); redisScript.setLocation(new ClassPathResource("limit/ratelimitInit.lua")); redisScript.setResultType(java.lang.Long.class); return redisScript; }再定义ratelimit方法控制入口
/** * 利用Redis进行限流,解决分布式、高TPS的问题 */ @Service public class RateLimitClient { private final static long PERMITS_PER_SECOND = 3250; @Autowired private StringRedisTemplate stringRedisTemplate; @Qualifier("getRedisScript") @Resource private RedisScript<List> ratelimitLua; @Qualifier("getInitRedisScript") @Resource private RedisScript<Long> ratelimitInitLua; /** * 初始化令牌桶!!! * * @param key * @return */ public Token initToken(String key) { Token token = Token.SUCCESS; if (stringRedisTemplate.hasKey(getKey(key))) { return token; } Long acquire = stringRedisTemplate.execute(ratelimitInitLua, Collections.singletonList(getKey(key)), String.valueOf(PERMITS_PER_SECOND), String.valueOf(PERMITS_PER_SECOND)); if (acquire == 1) { token = Token.SUCCESS; } else if (acquire == 0) { token = Token.SUCCESS; } else { token = Token.FAILED; } return token; } /** * 根据请求值去令牌桶获取,之所以用Long是因为lua脚本返回值用Integer接收有问题 * lua要不直接返回json然后转map,为了省事直接返回list * * list.get(0) 距离下一次刷新的最小时间间隔,单位:微秒 * list.get(1) 获取到的令牌数 * * @param key * @param permits * @return */ public List<Long> acquireIntervalAndToken(String key, Integer permits){ List intervalAndToken = stringRedisTemplate.execute(ratelimitLua, Collections.singletonList(getKey(key)), permits.toString()); return intervalAndToken; } public String getKey(String key) { return Constants.RATE_LIMIT_KEY + key; } }在多线程环境下验证
@Autowired private RateLimitClient rateLimitClient; @Test public void redisLuaScriptTest() throws Exception { Random random = new Random(); for (int i = 0; i < 100; i++) { // 令牌桶的线程安全验证 consumerExecutor.execute(new Runnable() { @Override public void run() { int requireToken = random.nextInt(1000); LOGGER.info("requireToken:{}", requireToken); List<Long> intervalAndToken = rateLimitClient.acquireIntervalAndToken(MessageProto.Platform.HUAWEI.name(), requireToken); LOGGER.info("requireToken:{}, acquireToken:{} ", requireToken, intervalAndToken); } }); } }