常用限流算法

引言

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。今天我们要聊的就是限流(Rate Limit),限流的目的很简单,就是为了保护系统不被瞬时大流量冲垮,

限流这个概念我其实很早之前就有去了解过,不过无奈之前工作所接触业务的并发量实在是谈不上限流。目前公司大促峰值QPS在2w往上,自然而然需要用到限流,特别是类似秒杀这种瞬时流量非常大但实际成单率低的业务场景。

目前比较常用的限流算法有三种

  • 计数器固定窗口算法

  • 计数器滑动窗口算法

  • 漏桶算法

  • 令牌桶算法

计数器固定窗口算法

计数器固定窗口算法是最简单的限流算法,实现方式也比较简单。就是通过维护一个单位时间内的计数值,每当一个请求通过时,就将计数值加1,当计数值超过预先设定的阈值时,就拒绝单位时间内的其他请求。如果单位时间已经结束,则将计数器清零,开启下一轮的计数。

但是这种实现会有一个问题,举个例子:

假设我们设定1秒内允许通过的请求阈值是200,如果有用户在时间窗口的最后几毫秒发送了200个请求,紧接着又在下一个时间窗口开始时发送了200个请求,那么这个用户其实在一秒内成功请求了400次,显然超过了阈值但并不会被限流。其实这就是临界值问题,那么临界值问题要怎么解决呢?

  • 代码实现 -- [CounterRateLimit.java](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/CounterRateLimit.java)
  • package com.walm.learn.algorithm.ratelimit;
       
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;
       
      /**
      * <p>CounterRateLimit</p>
      * <p>普通计数限流(单窗口)</p>
      *
      * @author wangjn
      * @since 2019-09-30
      */
      public class CounterRateLimit implements RateLimit , Runnable {
       
      /**
      * 阈值
      */
      private Integer limitCount;
       
      /**
      * 当前通过请求数
      */
      private AtomicInteger passCount;
       
      /**
      * 统计时间间隔
      */
      private long period;
      private TimeUnit timeUnit;
       
      private ScheduledExecutorService scheduledExecutorService;
       
       
      public CounterRateLimit(Integer limitCount) {
      this(limitCount, 1000, TimeUnit.MILLISECONDS);
      }
       
      public CounterRateLimit(Integer limitCount, long period, TimeUnit timeUnit) {
      this.limitCount = limitCount;
      this.period = period;
      this.timeUnit = timeUnit;
      passCount = new AtomicInteger(0);
      this.startResetTask();
      }
       
      @Override
      public boolean canPass() throws BlockException {
      if (passCount.incrementAndGet() > limitCount) {
      throw new BlockException();
      }
      return true;
      }
       
      private void startResetTask() {
      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
      scheduledExecutorService.scheduleAtFixedRate(this, 0, period, timeUnit);
      }
       
      @Override
      public void run() {
      passCount.set(0);
      }
      }

计数器滑动窗口算法

计数器滑动窗口法就是为了解决上述固定窗口计数存在的问题而诞生,学过TCP协议的同学应该对滑动窗口不陌生,其实还是不太一样的,下文我们要说的滑动窗口是基于时间来划分窗口的。而TCP的滑动窗口指的是能够接受的字节数,并且大小是可变的(拥塞控制)

滑动窗口是怎么做的?

前面说了固定窗口存在临界值问题,要解决这种临界值问题,显然只用一个窗口是解决不了问题的。假设我们仍然设定1秒内允许通过的请求是200个,但是在这里我们需要把1秒的时间分成多格,假设分成5格(格数越多,流量过渡越平滑),每格窗口的时间大小是200毫秒,每过200毫秒,就将窗口向前移动一格。为了便于理解,可以看下图

常用限流算法

图中将窗口划为5份,每个小窗口中的数字表示在这个窗口中请求数,所以通过观察上图,可知在当前时间快(200毫秒)允许通过的请求数应该是20而不是200(只要超过20就会被限流),因为我们最终统计请求数时是需要把当前窗口的值进行累加,进而得到当前请求数来判断是不是需要进行限流。

那么滑动窗口限流法是完美的吗?

细心观察的我们应该能马上发现问题,滑动窗口限流法其实就是计数器固定窗口算法的一个变种。流量的过渡是否平滑依赖于我们设置的窗口格数也就是统计时间间隔,格数越多,统计越精确,但是具体要分多少格我们也说不上来呀...

  • 代码实现 -- [SlidingWindowRateLimit.java]
  • package com.walm.learn.algorithm.ratelimit;
       
      import lombok.Data;
      import lombok.extern.slf4j.Slf4j;
       
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;
      import java.util.concurrent.locks.Lock;
      import java.util.concurrent.locks.ReentrantLock;
       
      /**
      * <p>SlidingWindowRateLimit</p>
      * <p>滑动窗口限流</p>
      *
      * @author wangjn
      * @since 2019-09-30
      */
      @Slf4j
      public class SlidingWindowRateLimit implements RateLimit, Runnable {
       
      /**
      * 阈值
      */
      private Integer limitCount;
       
      /**
      * 当前通过的请求数
      */
      private AtomicInteger passCount;
       
      /**
      * 窗口数
      */
      private Integer windowSize;
       
      /**
      * 每个窗口时间间隔大小
      */
      private long windowPeriod;
      private TimeUnit timeUnit;
       
       
      private Window[] windows;
      private volatile Integer windowIndex = 0;
       
      private Lock lock = new ReentrantLock();
      public SlidingWindowRateLimit(Integer limitCount) {
      // 默认统计qps, 窗口大小5
      this(limitCount, 5, 200, TimeUnit.MILLISECONDS);
      }
       
      /**
      * 统计总时间 = windowSize * windowPeriod
      */
      public SlidingWindowRateLimit(Integer limitCount, Integer windowSize, Integer windowPeriod, TimeUnit timeUnit) {
      this.limitCount = limitCount;
      this.windowSize = windowSize;
      this.windowPeriod = windowPeriod;
      this.timeUnit = timeUnit;
      this.passCount = new AtomicInteger(0);
      this.initWindows(windowSize);
      this.startResetTask();
      }
       
      @Override
      public boolean canPass() throws BlockException {
      lock.lock();
      if (passCount.get() > limitCount) {
      throw new BlockException();
      }
      windows[windowIndex].passCount.incrementAndGet();
      passCount.incrementAndGet();
      lock.unlock();
      return true;
      }
       
      private void initWindows(Integer windowSize) {
      windows = new Window[windowSize];
      for (int i = 0; i < windowSize; i++) {
      windows[i] = new Window();
      }
      }
       
      private ScheduledExecutorService scheduledExecutorService;
      private void startResetTask() {
      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
      scheduledExecutorService.scheduleAtFixedRate(this, windowPeriod, windowPeriod, timeUnit);
      }
       
      @Override
      public void run() {
      // 获取当前窗口索引
      Integer curIndex = (windowIndex + 1) % windowSize;
      log.info("info_reset_task, curIndex = {}", curIndex);
      // 重置当前窗口索引通过数量,并获取上一次通过数量
      Integer count = windows[curIndex].passCount.getAndSet(0);
      windowIndex = curIndex;
      // 总通过数量 减去 当前窗口上次通过数量
      passCount.addAndGet(-count);
      log.info("info_reset_task, curOldCount = {}, passCount = {}, windows = {}", count, passCount, windows);
      }
       
      @Data
      class Window {
      private AtomicInteger passCount;
      public Window() {
      this.passCount = new AtomicInteger(0);
      }
      }
      }

漏桶算法

上面所介绍的两种算法都不能非常平滑的过渡,下面就是漏桶算法登场了

什么是漏桶算法?

漏桶算法以一个常量限制了出口流量速率,因此漏桶算法可以平滑突发的流量。其中漏桶作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,当超出流量容器大小时,超出的流量会被丢弃。

下图比较形象的说明了漏桶算法的原理,其中水龙头是入口流量,漏桶是流量容器,匀速流出的水是出口流量。

常用限流算法

漏桶算法的特点

不过因为漏桶算法限制了流出速率是一个固定常量值,所以漏桶算法不支持出现突发流出流量。但是在实际情况下,流量往往是突发的。

令牌桶算法

令牌桶算法是漏桶算法的改进版,可以支持突发流量。不过与漏桶算法不同的是,令牌桶算法的漏桶中存放的是令牌而不是流量。

那么令牌桶算法是怎么突发流量的呢?

最开始,令牌桶是空的,我们以恒定速率往令牌桶里加入令牌,令牌桶被装满时,多余的令牌会被丢弃。当请求到来时,会先尝试从令牌桶获取令牌(相当于从令牌桶移除一个令牌),获取成功则请求被放行,获取失败则阻塞活拒绝请求。

常用限流算法

令牌桶算法的特点

  • 最多可以存发b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃

  • 请求到来时,如果令牌桶中少于n个令牌,那么不会删除令牌。该请求会被限流(阻塞活拒绝)

  • 算法允许最大b(令牌桶大小)个请求的突发

令牌桶算法限制的是平均流量,因此其允许突发流量(只要令牌桶中有令牌,就不会被限流)

  • 代码实现 -- [TokenBucketRateLimit.java]
    package com.walm.learn.algorithm.ratelimit;
       
      import lombok.extern.slf4j.Slf4j;
       
      import java.util.Objects;
      import java.util.concurrent.*;
      import java.util.concurrent.locks.LockSupport;
       
      /**
      * <p>LeakyBucketRateLimit</p>
      *
      * @author wangjn
      * @since 2019-10-08
      */
      @Slf4j
      public class LeakyBucketRateLimit implements RateLimit, Runnable {
       
      /**
      * 出口限制qps
      */
      private Integer limitSecond;
      /**
      * 漏桶队列
      */
      private BlockingQueue<Thread> leakyBucket;
       
      private ScheduledExecutorService scheduledExecutorService;
       
      public LeakyBucketRateLimit(Integer bucketSize, Integer limitSecond) {
      this.limitSecond = limitSecond;
      this.leakyBucket = new LinkedBlockingDeque<>(bucketSize);
       
      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
      long interval = (1000 * 1000 * 1000) / limitSecond;
      scheduledExecutorService.scheduleAtFixedRate(this, 0, interval, TimeUnit.NANOSECONDS);
      }
       
      @Override
      public boolean canPass() throws BlockException {
      if (leakyBucket.remainingCapacity() == 0) {
      throw new BlockException();
      }
      leakyBucket.offer(Thread.currentThread());
      LockSupport.park();
      return true;
      }
       
      @Override
      public void run() {
      Thread thread = leakyBucket.poll();
      if (Objects.nonNull(thread)) {
      LockSupport.unpark(thread);
      }
      }
      }

总结

至此,基本把以上4种限流算法的原理都解释清楚了。每种限流算法都有其固定特点,及各自适用的场景,其中计数器算法是其中最简单的,相当于滑动窗口算法的简化版,令牌桶算法相比漏桶算法对资源的利用率更高(允许突发流量)