漏桶算法和令牌桶算法
一背景
二 常用方法
令牌桶(Token Bucket)和漏桶(leaky bucket)是 最常用的两种限流的算法。
2.1 漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate),在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使某一个单独的流突发到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法可以结合起来为网络流量提供更大的控制。
2.2令牌桶算法
三 guava RateLimiter
3.1RateLimiter 简介
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流,非常易于使用.RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率.它支持两种获取permits接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到.
RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。与Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。(注意尽管并发性和速率是紧密相关的,比如参考通过设置许可证的速率来定义RateLimiter。在默认配置下,许可证会在固定的速率下被分配,速率单位是每秒多少个许可证。为了确保维护配置的速率,许可会被平稳地分配,许可之间的延迟会做调整。
可能存在配置一个拥有预热期的RateLimiter 的情况,在这段时间内,每秒分配的许可数会稳定地增长直到达到稳定的速率。
3.2 code
我们要实现一个基于速率的单机流控框架的时候,RateLimiter 是一个完善的核心组件,下面是demo
- import java.util.concurrent.ConcurrentMap;
- import com.google.common.collect.Maps;
- import com.google.common.util.concurrent.RateLimiter;
- public class TrafficShaper {
- //key-value(serverice,qps)
- private static final ConcurrentMap<String, Double> resourceMap = Maps.newConcurrentMap();
- //userkey-service limiter
- private static final ConcurrentMap<String, RateLimiter> userresourceLimiterMap = Maps.newConcurrentMap();
- static {
- //init
- resourceMap.put("aaa", 50.0);
- }
- public static void updateResourceQps(String resource, double qps) {
- resourceMap.put(resource, qps);
- }
- public static void removeResource(String resource) {
- resourceMap.remove(resource);
- }
- public static int enter(String resource,String userkey) {
- long t1 = System.currentTimeMillis();
- double qps = resourceMap.get(resource);
- //服务不限流
- if (qps == 0.0) {
- return 0;
- }
- String keyser = resource+userkey;
- RateLimiter keyserlimiter = userresourceLimiterMap.get(keyser);
- //if null,new limiter
- if(keyserlimiter == null)
- {
- keyserlimiter =RateLimiter.create(qps);
- RateLimiter putByOtherThread = userresourceLimiterMap.putIfAbsent(keyser, keyserlimiter);
- if (putByOtherThread != null) {
- keyserlimiter = putByOtherThread;
- }
- keyserlimiter.setRate(qps);
- }
- //tryacquire
- if (!keyserlimiter.tryAcquire()) {
- System.out.println("use:"+(System.currentTimeMillis()-t1)+"ms;"+resource+" visited too frequently by key:"+userkey);
- return 99;
- }else{
- System.out.println("use:"+(System.currentTimeMillis()-t1)+"ms;");
- return 0;
- }
- }
- public static void main(String[] args) throws InterruptedException {
- // TODO Auto-generated method stub
- int i=0;
- while(true){
- i++;
- long t2 = System.currentTimeMillis();
- System.out.println(t2+":qq:"+i);
- int res = TrafficShaper.enter("aaa", "qq");
- System.out.println((System.currentTimeMillis()-t2)+":qq:"+i);
- if(res ==99)
- {
- i=0;
- Thread.sleep(1000);
- }
- }
- }
- }
运行结果:
3.3 API接口
修饰符和类型 | 方法和描述 |
---|---|
double | acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits) 从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond) 根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | getRate() 返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | setRate(double permitsPerSecond) 更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString() 返回对象的字符表现形式 |
boolean | tryAcquire() 从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits) 从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit) 从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit) 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
3.4 源码分析
- public static RateLimiter create(double permitsPerSecond)
- 用于创建SmoothBursty类型的RateLimiter
- public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
- 用于创建SmoothWarmingUp类型的RateLimiter.API注释上比较长,如下:
- 根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率(只要存在足够请求数来使其饱和)。同样地,如果RateLimiter 在warmupPeriod时间内闲置不用,它将会逐步地返回冷却状态。也就是说,它会像它第一次被创建般经历同样的预热期。返回的RateLimiter 主要用于那些需要预热期的资源,这些资源实际上满足了请求(比如一个远程服务),而不是在稳定(最大)的速率下可以立即被访问的资源。返回的RateLimiter 在冷却状态下启动(即预热期将会紧跟着发生),并且如果被长期闲置不用,它将回到冷却状态。
- public double acquire() {
- return acquire(1);
- }
- public double acquire(int permits) {
- long microsToWait = reserve(permits);
- stopwatch.sleepMicrosUninterruptibly(microsToWait);//<span style="color: rgb(0, 128, 0); font-family: Menlo, Monaco, Consolas, 'Courier New', monospace; line-height: 1.42857143; background-color: rgb(245, 245, 245);"><span style="font-size:10px;">等待,当未达到限制时,microsToWait为0</span></span>
- return 1.0 * microsToWait / SECONDS.toMicros(1L);
- }
- final long reserve(int permits) {
- checkPermits(permits);//<span style="color:#006600;">参数校验>0</span>
- synchronized (mutex()) {//<span style="color:#006600;">并发的情况下同步</span>
- return reserveAndGetWaitLength(permits, stopwatch.readMicros());//<span style="color:#006600;">获取需要等待的时间</span>
- }
- }
- final long reserveAndGetWaitLength(int permits, long nowMicros) {
- long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
- return max(momentAvailable - nowMicros, 0);
- }
- abstract long reserveEarliestAvailable(int permits, long nowMicros);
- @Override
- final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
- resync(nowMicros);//<span style="color:#006600;">补充令牌</span>
- long returnValue = nextFreeTicketMicros;
- double storedPermitsToSpend = min(requiredPermits, this.storedPermits);//<span style="color:#006600;">本次请求消耗的令牌数</span>
- double freshPermits = requiredPermits - storedPermitsToSpend;
- long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
- + (long) (freshPermits * stableIntervalMicros);
- this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;//<span style="color:#006600;">计算下次可用时间</span>
- this.storedPermits -= storedPermitsToSpend;//<span style="color:#006600;">消耗令牌</span>
- return returnValue;
- }
- private void resync(long nowMicros) {//<span style="color:#006600;">补充令牌数,及更新下次可用令牌毫秒数</span>
- // if nextFreeTicket is in the past, resync to now
- if (nowMicros > nextFreeTicketMicros) {
- storedPermits = min(maxPermits,
- storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
- nextFreeTicketMicros = nowMicros;
- }
- }
对于storedPermitsToWaitTime,这是一个抽象接口,
- /**
- * Translates a specified portion of our currently stored permits which we want to
- * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
- * of the underlying function we use, for the range of
- * [(storedPermits - permitsToTake), storedPermits].
- *
- * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
- */
- abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
RateLimiter实际上由两种实现策略,其实现分别见SmoothBursty和SmoothWarmingUp。
3.4.1SmoothBursty
- <span style="font-size:12px;"> static final class SmoothBursty extends SmoothRateLimiter {
- /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
- final double maxBurstSeconds;
- SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
- super(stopwatch);
- this.maxBurstSeconds = maxBurstSeconds;
- }
- @Override
- void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
- double oldMaxPermits = this.maxPermits;
- maxPermits = maxBurstSeconds * permitsPerSecond;
- if (oldMaxPermits == Double.POSITIVE_INFINITY) {
- // if we don't special-case this, we would get storedPermits == NaN, below
- storedPermits = maxPermits;
- } else {
- storedPermits = (oldMaxPermits == 0.0)
- ? 0.0 // initial state
- : storedPermits * maxPermits / oldMaxPermits;
- }
- }
- @Override
- long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
- return 0L;
- }
- }</span>
- static final class SmoothWarmingUp extends SmoothRateLimiter {
- private final long warmupPeriodMicros;
- /**
- * The slope of the line from the stable interval (when permits == 0), to the cold interval
- * (when permits == maxPermits)
- */
- private double slope;
- private double halfPermits;
- SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {
- super(stopwatch);
- this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
- }
- @Override
- void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
- double oldMaxPermits = maxPermits;
- maxPermits = warmupPeriodMicros / stableIntervalMicros;
- halfPermits = maxPermits / 2.0;
- // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
- double coldIntervalMicros = stableIntervalMicros * 3.0;
- slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
- if (oldMaxPermits == Double.POSITIVE_INFINITY) {
- // if we don't special-case this, we would get storedPermits == NaN, below
- storedPermits = 0.0;
- } else {
- storedPermits = (oldMaxPermits == 0.0)
- ? maxPermits // initial state is cold
- : storedPermits * maxPermits / oldMaxPermits;
- }
- }
- @Override
- long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
- double availablePermitsAboveHalf = storedPermits - halfPermits;
- long micros = 0;
- // measuring the integral on the right part of the function (the climbing line)
- if (availablePermitsAboveHalf > 0.0) {
- double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
- micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
- + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
- permitsToTake -= permitsAboveHalfToTake;
- }
- // measuring the integral on the left part of the function (the horizontal line)
- micros += (stableIntervalMicros * permitsToTake);
- return micros;
- }
- private double permitsToTime(double permits) {
- return stableIntervalMicros + permits * slope;
- }
- }
四 其他常见实现方式
4.1Proxy 层的实现,针对部分 URL 或者 API 接口进行访问频率限制
Nginx 模块
limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s; server { location /search/ { limit_req zone=one burst=5; }
详细参见: ngx_http_limit_req_module
Haproxy 提供的功能
详细参见: Haproxy Rate limit 模块
4.2基于 Redis 功能的实现
这个在 Redis 官方文档有非常详细的实现。一般适用于所有类型的应用,比如 PHP、Python 等等。Redis 的实现方式可以支持分布式服务的访问频率的集中控制。Redis 的频率限制实现方式还适用于在应用中无法状态保存状态的场景。
转自:https://blog.****.net/bohu83/article/details/51596346