bucket4j对Spring API限流

速率限制是一种API访问策略,限制客户端在指定时间内对API的调用次数。用于防止API遭无意和恶意的过度调用。

速率限制通常通过跟踪IP地址或特定于业务的方式(如API密钥或访问令牌)应用于API。

作为API开发人员,可以选择在客户端达到限制时使用以下方式做出响应:

排队请求,直到剩余时间段已过。

允许请求,但对此请求收取额外费用。

或者,最常见的是拒绝请求(HTTP 429 - 请求过多)。

Bucket4j速率限制

Bucket4j是基于令牌桶算法的Java速率限制库,是一个线程安全的库,能在独立的JVM应用程序或集群环境中使用。通过JCache(JSR107)规范支持内存或分布式缓存。

令牌桶算法

假设有一个存储桶,其容量定义为它能够容纳的令牌数量。

当消费者想要访问API端点时,必须从bucket中获取令牌。我们会从存储桶中删除令牌(如果有)并接受请求。

另一方面,如果存储桶没有任何令牌,我们将拒绝请求。

当请求正在消耗令牌时,我们还将以一定的固定速率对其进行补充,保持不超过存储桶的容量。

假如一个速率限制为每分钟100个请求的API。可以创建一个容量为100的存储桶,每分钟的重新补充速率为100个令牌。

如果我们收到70个请求,少于给定分钟内可用令牌的数量,则我们将在下一分钟开始时仅添加30个令牌以使存储桶达到最大容量。

另一方面,如果我们在40秒内用完所有令牌,我们将等待20秒以重新补充存储桶。

Bucket4j入门

Maven配置:

<dependency>
    <groupId>com.github.vladimir-bukhtoyarov</groupId>
    <artifactId>bucket4j-core</artifactId>
    <version>4.10.0</version>
</dependency>

在介绍如何使用Bucket4j之前,让我们简要讨论一些核心类,以及它们如何表示令牌桶算法模型中的不同元素。

Bucket接口表示令牌桶,它提供了诸如tryConsume和 tryConsumeAndReturnRemaining之类的方法来使用令牌。

如果请求符合限制,并且令牌已使用,则这些方法将返回 true。

Bandwidth它定义了桶的极限,用它来配置存储桶的容量和填充速度。

Refill类用于定义在令牌添加到桶中的固定速率。我们可以将速率配置为在给定时间段内要添加的令牌数量。如,每秒10个存储桶或每5分钟200个令牌,依此类推。

Bucket中的tryConsumeAndReturnRemaining方法返回ConsumptionProbe。ConsumptionProbe包含消耗的结果以及存储区的状态,如,剩余的令牌,或者直到请求的令牌再次在存储区中可用前的剩余时间。

基本用法

对于每分钟10个请求的速率限制,创建一个容量为10的存储桶,每分钟的重新补充速率为10个令牌:

Refill refill = Refill.intervally(10, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(10, refill);
Bucket bucket = Bucket4j.builder()
    .addLimit(limit)
    .build();
for (int i = 1; i <= 10; i++) {
    assertTrue(bucket.tryConsume(1));
}
assertFalse(bucket.tryConsume(1));

Refill.interval在窗口的开始时有间隔地补充存储桶,这种情况下,每分钟开始时填充 10个令牌。

接下来,让我们看看重新补充的动作。

将重新补充速率设置为每2秒1个令牌,并限制请求以遵守速率限制:

Bandwidth limit = Bandwidth.classic(1, Refill.intervally(1, Duration.ofSeconds(2)));
Bucket bucket = Bucket4j.builder()
    .addLimit(limit)
    .build();
assertTrue(bucket.tryConsume(1));     // first request
Executors.newScheduledThreadPool(1)   // schedule another request for 2 seconds later
    .schedule(() -> assertTrue(bucket.tryConsume(1)), 2, TimeUnit.SECONDS); 

假设我们的速率限制为每分钟10个请求。同时,我们可能希望避免在前5秒内耗尽所有令牌的峰值。

Bucket4j允许我们在同一个存储桶上设置多个限制(带宽)。让我们添加另一个限制,该限制在20秒的时间窗口内仅允许5个请求:

Bucket bucket = Bucket4j.builder()
    .addLimit(Bandwidth.classic(10, Refill.intervally(10, Duration.ofMinutes(1))))
    .addLimit(Bandwidth.classic(5, Refill.intervally(5, Duration.ofSeconds(20))))
    .build();
for (int i = 1; i <= 5; i++) {
    assertTrue(bucket.tryConsume(1));
}
assertFalse(bucket.tryConsume(1));

Bucket4j对Spring API进行速率限制

使用Bucket4j在Spring REST API中限制速率。

一个简单的REST API,根据给定的尺寸计算并返回矩形的面积:

@RestController
class AreaCalculationController {
@PostMapping(value = "/api/v1/area/rectangle")
public ResponseEntity<AreaV1> rectangle(@RequestBody RectangleDimensionsV1 dimensions) {
 return ResponseEntity.ok(new AreaV1("rectangle", dimensions.getLength() * dimensions.getWidth()));
}
}

确保API已启动:

$ curl -X POST http://localhost:9001/api/v1/area/rectangle \
    -H "Content-Type: application/json" \
    -d '{ "length": 10, "width": 12 }'
{ "shape":"rectangle","area":120.0 }

应用速率限制

现在引入一个朴素的速率限制,每分钟允许20个请求。换句话说,如果API在1分钟内收到20个请求,则拒绝下个请求。

@RestController
class AreaCalculationController {
private final Bucket bucket;
 
public AreaCalculationController() {
 Bandwidth limit = Bandwidth.classic(20, Refill.greedy(20, Duration.ofMinutes(1)));
 this.bucket = Bucket4j.builder()
   .addLimit(limit)
   .build();
}
//..
}

使用tryConsume方法检查是否允许请求。如果达到限制,通过以HTTP 429状态进行响应:

public ResponseEntity<AreaV1> rectangle(@RequestBody RectangleDimensionsV1 dimensions) {
    if (bucket.tryConsume(1)) {
        return ResponseEntity.ok(new AreaV1("rectangle", dimensions.getLength() * dimensions.getWidth()));
    }
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).build();
}

API客户端和定价计划

接下来,我们为业务为中心引入定价计划。

免费:每个API客户端每小时20个请求。

基本:每个API客户端每小时40个请求。

专业:每个API客户端每小时100个请求。

每个API客户端都有一个唯一的API密钥,必须随着请求一起发送。以确定API客户端链接的定价计划。

让我们为每个定价计划定义速率限制(带宽):

enum PricingPlan {
    FREE {
        Bandwidth getLimit() {
            return Bandwidth.classic(20, Refill.intervally(20, Duration.ofHours(1)));
        }
    },
    BASIC {
        Bandwidth getLimit() {
            return Bandwidth.classic(40, Refill.intervally(40, Duration.ofHours(1)));
        }
    },
    PROFESSIONAL {
        Bandwidth getLimit() {
            return Bandwidth.classic(100, Refill.intervally(100, Duration.ofHours(1)));
        }
    };
    //..

接下来,添加一种方法来根据给定的API密钥解析定价计划:

enum PricingPlan {
     
static PricingPlan resolvePlanFromApiKey(String apiKey) {
 if (apiKey == null || apiKey.isEmpty()) {
   return FREE;
 } else if (apiKey.startsWith("PX001-")) {
   return PROFESSIONAL;
 } else if (apiKey.startsWith("BX001-")) {
   return BASIC;
 }
 return FREE;
}
//..
}

接下来,为每个API密钥存储存储桶,并检索存储桶以进行速率限制:

class PricingPlanService {
private final Map<String, Bucket> cache = new ConcurrentHashMap<>();
 
public Bucket resolveBucket(String apiKey) {
 return cache.computeIfAbsent(apiKey, this::newBucket);
}
 
private Bucket newBucket(String apiKey) {
 PricingPlan pricingPlan = PricingPlan.resolvePlanFromApiKey(apiKey);
 return Bucket4j.builder()
   .addLimit(pricingPlan.getLimit())
   .build();
}
}

现在在每个API密钥的内存中都有一个存储区。让我们修改控制器以使用:

@RestController
class AreaCalculationController {
private PricingPlanService pricingPlanService;
 
public ResponseEntity<AreaV1> rectangle(@RequestHeader(value = "X-api-key") String apiKey,
 @RequestBody RectangleDimensionsV1 dimensions) {
 
 Bucket bucket = pricingPlanService.resolveBucket(apiKey);
 ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
 if (probe.isConsumed()) {
   return ResponseEntity.ok()
     .header("X-Rate-Limit-Remaining", Long.toString(probe.getRemainingTokens()))
     .body(new AreaV1("rectangle", dimensions.getLength() * dimensions.getWidth()));
 }
  
 long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;
 return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
   .header("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill))
   .build();
}
}

让我们来看一下这些变化。API客户端发送带有X-api-key请求标头的API密钥。我们使用 PricingPlanService获取此API密钥的存储桶,并通过使用存储桶中的令牌来检查是否允许该请求。

为了增强API的客户端体验,我们将使用以下附加响应标头发送有关速率限制的信息:

X-Rate-Limit-Remaining:当前时间窗口中剩余的令牌数量。

X速率限制重试秒后:剩余时间(以秒为单位),直到重新补充存储桶为止。

通过调用ConsumptionProbe的getRemainingTokens和getNanosToWaitForRefill来分别获取存储桶中剩余令牌的数量和到下一次重新补充前的剩余时间。如果我们能够成功地消耗令牌getNanosToWaitForRefill方法返回0。

使用Spring MVC拦截器

到目前为止,一切都很好!假设我们现在必须添加一个新的API端点,该端点计算并返回给定三角形的高度和底数的面积:

@PostMapping(value = "/triangle")
public ResponseEntity<AreaV1> triangle(@RequestBody TriangleDimensionsV1 dimensions) {
    return ResponseEntity.ok(new AreaV1("triangle", 0.5d * dimensions.getHeight() * dimensions.getBase()));
}

我们还需要对新端点进行速率限制,可以简单地从先前的端点复制并粘贴速率限制代码。

或者,用Spring MVC的HandlerInterceptor将速率限制代码与业务代码分离。

创建一个RateLimitInterceptor并在preHandle方法中实现速率限制代码:

public class RateLimitInterceptor implements HandlerInterceptor {
private PricingPlanService pricingPlanService;
 
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) 
 throws Exception {
 String apiKey = request.getHeader("X-api-key");
 if (apiKey == null || apiKey.isEmpty()) {
   response.sendError(HttpStatus.BAD_REQUEST.value(), "Missing Header: X-api-key");
   return false;
 }
 
 Bucket tokenBucket = pricingPlanService.resolveBucket(apiKey);
 ConsumptionProbe probe = tokenBucket.tryConsumeAndReturnRemaining(1);
 if (probe.isConsumed()) {
   response.addHeader("X-Rate-Limit-Remaining", String.valueOf(probe.getRemainingTokens()));
   return true;
 } else {
   long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;
   response.addHeader("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill));
   response.sendError(HttpStatus.TOO_MANY_REQUESTS.value(),
    "You have exhausted your API Request Quota"); 
   return false;
 }
}

}

最后,将拦截器添加到InterceptorRegistry中:

public class AppConfig implements WebMvcConfigurer {
     
private RateLimitInterceptor interceptor;
 
@Override
public void addInterceptors(InterceptorRegistry registry) {
 registry.addInterceptor(interceptor)
   .addPathPatterns("/api/v1/area/**");
}
}

声明式API速率限制

Bucket4j自动配置:

https://github.com/MarcGiffing/bucket4j-spring-boot-starter

示例中,使用X-api-key请求标头的值作为标识和应用速率限制的键。

Bucket4j Spring Boot Starter提供了几种预定义的配置定义速率限制密钥:

naive的速率限制过滤器,默认设置。

按IP地址过滤。

基于表达式的过滤器。

基于表达式的过滤器使用Spring Expression Language(SpEL)。SpEL提供对根对象的访问,例如HttpServletRequest,可用在IP地址(getRemoteAddr()),请求标头(getHeader('X-api-key'))上构建过滤器表达式等等。

首先,将bucket4j-spring-boot-starter依赖项添加到我们的pom.xml中:

<dependency>
    <groupId>com.giffing.bucket4j.spring.boot.starter</groupId>
    <artifactId>bucket4j-spring-boot-starter</artifactId>
    <version>0.2.0</version>
</dependency>

在早期的实现中使用内存映射来存储每个API密钥(使用者)的存储桶。在这里,可以使用Spring的缓存抽象配置内存存储,例如Caffeine或Guava。

添加缓存依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
    <groupId>javax.cache</groupId>
    <artifactId>cache-api</artifactId>
</dependency>
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>2.8.2</version>
</dependency>
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>jcache</artifactId>
    <version>2.8.2</version>
</dependency>

注:我们还添加了jcache 依赖项,以符合Bucket4j的缓存支持。

应用配置

让我们将应用程序配置为使用Bucket4j入门库。

首先,我们将配置Caffeine缓存以在内存中存储API密钥和存储桶:

spring:
  cache:
    cache-names:
    - rate-limit-buckets
    caffeine:
      spec: maximumSize=100000,expireAfterAccess=3600s

配置 Bucket4j:

bucket4j:
  enabled: true
  filters:
  - cache-name: rate-limit-buckets
    url: /api/v1/area.*
    strategy: first
    http-response-body: "{ \"status\": 429, \"error\": \"Too Many Requests\", \"message\": \"You have exhausted your API Request Quota\" }"
    rate-limits:
    - expression: "getHeader('X-api-key')"
      execute-condition: "getHeader('X-api-key').startsWith('PX001-')"
      bandwidths:
      - capacity: 100
        time: 1
        unit: hours
    - expression: "getHeader('X-api-key')"
      execute-condition: "getHeader('X-api-key').startsWith('BX001-')"
      bandwidths:
      - capacity: 40
        time: 1
        unit: hours
    - expression: "getHeader('X-api-key')"
      bandwidths:
      - capacity: 20
        time: 1
        unit: hours

说明:

bucket4j.enabled = true –启用Bucket4j自动配置

bucket4j.filters.cache-name – 从缓存中获取 API密钥的存储桶

bucket4j.filters.url –指示应用速率限制的路径表达式

bucket4j.filters.strategy = first –在第一个匹配速率限制配置处停止

bucket4j.filters.rate-limits.expression –使用Spring Expression Language(SpEL)检索密钥

bucket4j.filters.rate-limits.execute-condition –使用SpEL决定是否执行速率限制

bucket4j.filters.rate-limits.bandwidths –定义Bucket4j速率限制参数

将PricingPlanService和RateLimitInterceptor替换为顺序评估的速率限制配置列表。

 curl -v -X POST http://localhost:9000/api/v1/area/triangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "height": 20, "base": 7 }'

curl -v -X POST http://localhost:9000/api/v1/area/triangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "height": 7, "base": 20 }'