上圖中是一個水壩洩洪的圖,那麼,對於軟體系統,如何使用最方便的可程式化的方式增加服務限流能力呢?
下面我結合一個常規的springCloud專案實踐了一把,希望他山之石可以攻玉。
簡單使用jmeter,壓20個並行,存取 列表查詢介面 /worksheet/findInfo, 對應的服務崩潰。【apprun,common】
架構複雜度的一個種類是: 保護API和伺服器端點免受攻擊,
比如:拒絕服務,級聯失敗,或者 超額使用資源。
限流是一種技術,來控制API或者服務的消費速度,在分散式系統中,沒有比集中式的設定和管理API的消費速度更好的選擇,
只有這些請求在限定的速度記憶體取,才能保證API的正常,更多的將會產生Http的 請求頻繁錯誤。
互動模型圖:
SpringCloudGateway是一個簡單和輕量級的元件,也是一種管理限制API的消費速度有效的方式。
springCloudGateway的限流模型:
當前企業600人,按照兩倍估算,即1200人使用,高頻介面秒並行限制為20, 即有20個人同時使用同一個介面運算元據。
需要增加限流和熔斷的點:
元件 | 增加限制 | 業務說明 |
---|---|---|
openresty | 限流,熔斷 【統一】 | 保證流量再nginx的處理閾值,參考資料:5W/S |
gateway | 限流,熔斷 【統一】 | 保證每個API的存取速度在20/S 峰值40 ; |
apprun | 高頻介面限流,每個介面統一分類客製化熔斷邏輯 | 限流可以複用封裝的元件,熔斷採用最簡單的hystix ; |
devops | 高頻介面限流,每個介面統一分類客製化熔斷邏輯 | 限流可以複用封裝的元件,熔斷採用最簡單的hystix ; |
common | 高頻介面限流,每個介面統一分類客製化熔斷邏輯,feign客製化熔斷邏輯 | 限流可以複用封裝的元件,熔斷採用最簡單的hystix ; |
job | 高頻介面限流,每個介面統一分類客製化熔斷邏輯,feign客製化熔斷邏輯 | 限流可以複用封裝的元件,熔斷採用最簡單的hystix ; |
閘道器做整體限制,介面由業務來增加限流。
RequestRateLimiter GatewayFilter工廠使用了RateLimiter實現來決定當前的並行請求是否允許處理,
如果不能處理,預設返回狀態碼 429 - 太多請求;
這個過濾器採用了可選的KeyResolver引數和對於速度限制的特殊引數,下面會介紹。
keyResolver是一個實體實現了KeyResolver介面,設定指向一個bean的名字,
使用SpEL表示式。 #{@myKeyResolver} 是一個SPEL表示式指向了一個叫做myKeyResolver的bean,下面展示了 KeyResolver介面;
public interface KeyResolver {
Mono<String> resolve(ServerWebExchange exchange);
}
keyResolver介面是的外掛策略驅動請求限制,再未來的里程碑版本,將會由一些KeyResolver的實現。
預設實現KeyResolver的類是 PrincipalNameKeyResolver, 會接受ServerWebExchange的Principal引數, 並且會呼叫 Principal.getName()方法。
預設的,如果KeyResolver沒有找到key, 請求會被拒絕,你可以設定這個行為。
spring.cloud.gateway.filter.request-rate-limiter.deny-empty-key=true
spring.cloud.gateway.filter.request-rate-limiter.empty-key-status-code=xxxx
注意: RequestRateLimiter沒有設定短註解,下面的例子是非法的。
spring.cloud.gateway.routes[0].filters[0]=RequestRateLimiter=2, 2, #{@userkeyresolver}
Redis實現是基於Stripe . 它需要使用 spring-boot-starter-data-redis-reactive 這個starter ;
演演算法使用的是令牌桶。
key | 業務含義 | 用途 |
---|---|---|
redis-rate-limiter.replenishRate | 一個使用者每秒多少請求數,不包含丟棄的請求,這個速度就是令牌桶的數量。 | 補充速度 |
redis-rate-limiter.burstCapacity | 使用者每秒允許最大的請求數量,這個令牌數量就是令牌桶可以持有的數量,設定為0標識阻塞所有請求 | 突增容量 |
redis-rate-limiter.requestedTokens | 單個請求消耗多少令牌,這個數量就是從令牌桶中每個請求獲取令牌的數量,預設是1 | 請求消耗令牌數量 |
如果你把 replenishRate 和 burstCapacity值設定為一樣,則完成了一個穩定的速度設定。
臨時突增流量可以允許設定 burstCapacity > replenishRate ,
這種場景下,RateLimiter需要允許一些時間在 burstCapacity和 replenishRate 之間 。
兩種連續的徒增會導致丟棄請求,下面的例子設定了一個 redis-rate-limit.
速度限制在1個請求每秒, replenishRate=1, requestedTokens=60,burstCapacity=60 ;
spring:
cloud:
gateway:
routes:
- id: requestratelimiter_route
uri: https://example.org
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
redis-rate-limiter.requestedTokens: 1
上面的設定補充令牌的速度是10, 突增容量是20,但是在下一秒,只有10個請求是可以進入的;
下面的例子設定了一個KeyResolver。簡單的從請求引數中獲取user(在生產環境不推薦使用),
@Bean
KeyResolver userKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}
你也可以定義自己的RateLImiter,作為一個bean,實現RateLimiter介面即可,
在下面的設定中。你可以參照一個bean通過名字,使用SpEL表示式。
下面的例子定義了一個rateLimite並且使用自定義的KeyResolver.
spring:
cloud:
gateway:
routes:
- id: requestratelimiter_route
uri: https://example.org
filters:
- name: RequestRateLimiter
args:
rate-limiter: "#{@myRateLimiter}"
key-resolver: "#{@userKeyResolver}"
對所有的請求,限制如下。
key | value | 設定值原因 |
---|---|---|
replenishRate | 20 | 每個使用者每秒處理請求速度 為20 |
burstCapacity | 40 | 40,每秒處理請求數量突增容量 ; |
requestedTokens | 1 | 每個連線耗費1個令牌; |
原始碼分析: RequestRateLimiterGatewayFilterFactory
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
}
處理流程如下:
單個路由的限流設定:
spring:
cloud:
gateway:
routes:
- id: account-service
uri: http://localhost:8090
predicates:
- Path=/account/**
filters:
- RewritePath=/account/(?<path>.*), /$\{path}
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 60
redis-rate-limiter.requestedTokens: 15
重寫429的返回值。
package com.zengame.cycube.api.gateway.rest.aspect;
import cn.hutool.json.JSONUtil;
import com.zengame.cycube.api.lib.common.bean.R;
import com.zengame.cycube.api.lib.common.util.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.stream.Stream;
/**
* 魔方自定義限流
* @author Carter.li
* @createtime 2022/8/1 17:30
*/
@Slf4j
@Component
public class CubeRequestLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {
private final RateLimiter redisRateLimiter;
private final KeyResolver keyResolver;
private final boolean denyEmptyKey = true;
private static final String EMPTY_KEY = "____EMPTY_KEY__";
public CubeRequestLimiterGatewayFilterFactory(RateLimiter redisRateLimiter, KeyResolver keyResolver) {
super(redisRateLimiter, keyResolver);
this.redisRateLimiter = redisRateLimiter;
this.keyResolver = keyResolver;
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.getKeyResolver(), keyResolver);
RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), redisRateLimiter);
boolean denyEmpty = getOrDefault(config.getDenyEmptyKey(), this.denyEmptyKey);
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
return TokenCheckGatewayFilterFactory.generateJson(exchange, R.error(9998, "請求key為空"));
}
return chain.filter(exchange);
}
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
R<String> r = R.error(9998, "請求太頻繁");
r.setData(key);
r.setGuid("請控制請求速度");
r.setTraceId(Stream.of(exchange.getRequest().getHeaders().getFirst("requestId"), exchange.getRequest().getQueryParams().getFirst("requestId")).filter(StringUtils::isNotBlank).findFirst().orElse(UUIDUtils.uuid()));
log.warn("too many requests: {}", JSONUtil.toJsonStr(r));
return TokenCheckGatewayFilterFactory.generateJson(exchange, r);
});
});
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
}
jmeter指令碼
執行緒設定:
介面設定:
經過測試,對高頻介面增加了限流能力,而且限流能力是可以設定的。
在閘道器新增了最低限度的保護限流策略。
企業使用者數量有限,可以使用最小的資源滿足軟體系統的需求;
原創不易,關注誠可貴,轉發價更高!轉載請註明出處,讓我們互通有無,共同進步,歡迎溝通交流。