架構高可用之限流-抽刀斷水水更流

2022-12-13 12:01:02

上圖中是一個水壩洩洪的圖,那麼,對於軟體系統,如何使用最方便的可程式化的方式增加服務限流能力呢?

下面我結合一個常規的springCloud專案實踐了一把,希望他山之石可以攻玉。

背景

簡單使用jmeter,壓20個並行,存取 列表查詢介面 /worksheet/findInfo, 對應的服務崩潰。【apprun,common】

架構複雜度的一個種類是: 保護API和伺服器端點免受攻擊,

比如:拒絕服務,級聯失敗,或者 超額使用資源。

限流是一種技術,來控制API或者服務的消費速度,在分散式系統中,沒有比集中式的設定和管理API的消費速度更好的選擇,

只有這些請求在限定的速度記憶體取,才能保證API的正常,更多的將會產生Http的 請求頻繁錯誤。

互動模型圖:

SpringCloudGateway是一個簡單和輕量級的元件,也是一種管理限制API的消費速度有效的方式。

springCloudGateway的限流模型:

目標

當前企業600人,按照兩倍估算,即1200人使用,高頻介面秒並行限制為20, 即有20個人同時使用同一個介面運算元據。

需要增加限流和熔斷的點:

元件 增加限制 業務說明
openresty 限流,熔斷 【統一】 保證流量再nginx的處理閾值,參考資料:5W/S
gateway 限流,熔斷 【統一】 保證每個API的存取速度在20/S 峰值40 ;
apprun 高頻介面限流,每個介面統一分類客製化熔斷邏輯 限流可以複用封裝的元件,熔斷採用最簡單的hystix ;
devops 高頻介面限流,每個介面統一分類客製化熔斷邏輯 限流可以複用封裝的元件,熔斷採用最簡單的hystix ;
common 高頻介面限流,每個介面統一分類客製化熔斷邏輯,feign客製化熔斷邏輯 限流可以複用封裝的元件,熔斷採用最簡單的hystix ;
job 高頻介面限流,每個介面統一分類客製化熔斷邏輯,feign客製化熔斷邏輯 限流可以複用封裝的元件,熔斷採用最簡單的hystix ;

實現路徑

閘道器做整體限制,介面由業務來增加限流。

gateway

gateway自帶過濾器

RequestRateLimiter GatewayFilter工廠使用了RateLimiter實現來決定當前的並行請求是否允許處理,

如果不能處理,預設返回狀態碼 429 - 太多請求;

這個過濾器採用了可選的KeyResolver引數和對於速度限制的特殊引數,下面會介紹。

keyResolver是一個實體實現了KeyResolver介面,設定指向一個bean的名字,

使用SpEL表示式。 #{@myKeyResolver} 是一個SPEL表示式指向了一個叫做myKeyResolver的bean,下面展示了 KeyResolver介面;

public interface KeyResolver {
    Mono<String> resolve(ServerWebExchange exchange);
}

keyResolver介面是的外掛策略驅動請求限制,再未來的里程碑版本,將會由一些KeyResolver的實現。

預設實現KeyResolver的類是 PrincipalNameKeyResolver, 會接受ServerWebExchange的Principal引數, 並且會呼叫 Principal.getName()方法。

預設的,如果KeyResolver沒有找到key, 請求會被拒絕,你可以設定這個行為。

spring.cloud.gateway.filter.request-rate-limiter.deny-empty-key=true
spring.cloud.gateway.filter.request-rate-limiter.empty-key-status-code=xxxx

注意: RequestRateLimiter沒有設定短註解,下面的例子是非法的。

spring.cloud.gateway.routes[0].filters[0]=RequestRateLimiter=2, 2, #{@userkeyresolver}

RedisLimiter介紹

Redis實現是基於Stripe . 它需要使用 spring-boot-starter-data-redis-reactive 這個starter ;

演演算法使用的是令牌桶。

key 業務含義 用途
redis-rate-limiter.replenishRate 一個使用者每秒多少請求數,不包含丟棄的請求,這個速度就是令牌桶的數量。 補充速度
redis-rate-limiter.burstCapacity 使用者每秒允許最大的請求數量,這個令牌數量就是令牌桶可以持有的數量,設定為0標識阻塞所有請求 突增容量
redis-rate-limiter.requestedTokens 單個請求消耗多少令牌,這個數量就是從令牌桶中每個請求獲取令牌的數量,預設是1 請求消耗令牌數量

如果你把 replenishRate 和 burstCapacity值設定為一樣,則完成了一個穩定的速度設定。

臨時突增流量可以允許設定 burstCapacity > replenishRate ,

這種場景下,RateLimiter需要允許一些時間在 burstCapacity和 replenishRate 之間 。

兩種連續的徒增會導致丟棄請求,下面的例子設定了一個 redis-rate-limit.

速度限制在1個請求每秒, replenishRate=1, requestedTokens=60,burstCapacity=60 ;

spring:
  cloud:
    gateway:
      routes:
      - id: requestratelimiter_route
        uri: https://example.org
        filters:
        - name: RequestRateLimiter
          args:
            redis-rate-limiter.replenishRate: 10
            redis-rate-limiter.burstCapacity: 20
            redis-rate-limiter.requestedTokens: 1

上面的設定補充令牌的速度是10, 突增容量是20,但是在下一秒,只有10個請求是可以進入的;

下面的例子設定了一個KeyResolver。簡單的從請求引數中獲取user(在生產環境不推薦使用),

@Bean
KeyResolver userKeyResolver() {
    return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}

你也可以定義自己的RateLImiter,作為一個bean,實現RateLimiter介面即可,

在下面的設定中。你可以參照一個bean通過名字,使用SpEL表示式。

{@myRateLimiter} 是一個表示式,參照了一個名字叫做 myRateLimiter的bean ,

下面的例子定義了一個rateLimite並且使用自定義的KeyResolver.

spring:
  cloud:
    gateway:
      routes:
      - id: requestratelimiter_route
        uri: https://example.org
        filters:
        - name: RequestRateLimiter
          args:
            rate-limiter: "#{@myRateLimiter}"
            key-resolver: "#{@userKeyResolver}"

魔方的限流設定

對所有的請求,限制如下。

key value 設定值原因
replenishRate 20 每個使用者每秒處理請求速度 為20
burstCapacity 40 40,每秒處理請求數量突增容量 ;
requestedTokens 1 每個連線耗費1個令牌;

原始碼分析: RequestRateLimiterGatewayFilterFactory

public GatewayFilter apply(Config config) {
		KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
		RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
		boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
		HttpStatusHolder emptyKeyStatus = HttpStatusHolder
				.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

		return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
			if (EMPTY_KEY.equals(key)) {
				if (denyEmpty) {
					setResponseStatus(exchange, emptyKeyStatus);
					return exchange.getResponse().setComplete();
				}
				return chain.filter(exchange);
			}
			String routeId = config.getRouteId();
			if (routeId == null) {
				Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
				routeId = route.getId();
			}
			return limiter.isAllowed(routeId, key).flatMap(response -> {

				for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
					exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
				}

				if (response.isAllowed()) {
					return chain.filter(exchange);
				}

				setResponseStatus(exchange, config.getStatusCode());
				return exchange.getResponse().setComplete();
			});
		});
	}

處理流程如下:

單個路由的限流設定:

spring:
  cloud:
    gateway:
      routes:
        - id: account-service
          uri: http://localhost:8090
          predicates:
            - Path=/account/**
          filters:
            - RewritePath=/account/(?<path>.*), /$\{path}
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 1
                redis-rate-limiter.burstCapacity: 60
                redis-rate-limiter.requestedTokens: 15

重寫429的返回值。

package com.zengame.cycube.api.gateway.rest.aspect;

import cn.hutool.json.JSONUtil;
import com.zengame.cycube.api.lib.common.bean.R;
import com.zengame.cycube.api.lib.common.util.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.stream.Stream;

/**
 * 魔方自定義限流
 * @author Carter.li
 * @createtime 2022/8/1 17:30
 */
@Slf4j
@Component
public class CubeRequestLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {

    private final RateLimiter redisRateLimiter;
    private final KeyResolver keyResolver;
    private final boolean denyEmptyKey = true;
    private static final String EMPTY_KEY = "____EMPTY_KEY__";


    public CubeRequestLimiterGatewayFilterFactory(RateLimiter redisRateLimiter, KeyResolver keyResolver) {
        super(redisRateLimiter, keyResolver);
        this.redisRateLimiter = redisRateLimiter;
        this.keyResolver = keyResolver;
    }

    @Override
    public GatewayFilter apply(Config config) {
        KeyResolver resolver = getOrDefault(config.getKeyResolver(), keyResolver);
        RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), redisRateLimiter);
        boolean denyEmpty = getOrDefault(config.getDenyEmptyKey(), this.denyEmptyKey);

        return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
            if (EMPTY_KEY.equals(key)) {
                if (denyEmpty) {
                    return TokenCheckGatewayFilterFactory.generateJson(exchange, R.error(9998, "請求key為空"));
                }
                return chain.filter(exchange);
            }
            String routeId = config.getRouteId();
            if (routeId == null) {
                Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                routeId = route.getId();
            }
            return limiter.isAllowed(routeId, key).flatMap(response -> {

                for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
                    exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
                }

                if (response.isAllowed()) {
                    return chain.filter(exchange);
                }

                R<String> r = R.error(9998, "請求太頻繁");
                r.setData(key);
                r.setGuid("請控制請求速度");
                r.setTraceId(Stream.of(exchange.getRequest().getHeaders().getFirst("requestId"), exchange.getRequest().getQueryParams().getFirst("requestId")).filter(StringUtils::isNotBlank).findFirst().orElse(UUIDUtils.uuid()));
                log.warn("too many requests: {}", JSONUtil.toJsonStr(r));
                return TokenCheckGatewayFilterFactory.generateJson(exchange, r);

            });
        });
    }

    private <T> T getOrDefault(T configValue, T defaultValue) {
        return (configValue != null) ? configValue : defaultValue;
    }
}

測試

jmeter指令碼

執行緒設定:

介面設定:

經過測試,對高頻介面增加了限流能力,而且限流能力是可以設定的。

小結

在閘道器新增了最低限度的保護限流策略。

企業使用者數量有限,可以使用最小的資源滿足軟體系統的需求;

原創不易,關注誠可貴,轉發價更高!轉載請註明出處,讓我們互通有無,共同進步,歡迎溝通交流。