從0到1構造自定義限流元件

2023-06-20 18:00:12

一 背景

在系統高可用設計中,介面限流是一個非常重要環節,一方面是出於對自身伺服器資源的保護,另一方面也是對依賴資源的一種保護措施。比如對於 Web 應用,我限制單機只能處理每秒 1000 次的請求,超過的部分直接返回錯誤給使用者端。雖然這種做法損害了使用者的使用體驗,但是它是在極端並行下的無奈之舉,是短暫的行為,因此是可以接受的。

二 設計思路

常見的限流有2種思路

  • 第一種是限制總量,也就是限制某個指標的累積上限,常見的是限制當前系統服務的使用者總量,例如:某個搶購活動商品數量只有 100 個,限制參與搶購的使用者上限為 1 萬個,1 萬以後的使用者直接拒絕。

  • 第二種是限制時間量,也就是限制一段時間內某個指標的上限,例如 1 分鐘內只允許 10000 個使用者存取;每秒請求峰值最高為 10 萬。

三 限流演演算法

目前實現限流演演算法主要分為3類,這裡不詳細展開介紹:

1)時間視窗

固定時間視窗演演算法是最簡單的限流演演算法,它的實現原理就是控制單位時間內請求的數量,但是這個演演算法有個缺點就是臨界值問題。
為了解決臨界值的問題,又推出滑動時間視窗演演算法,其實現原理大致上是將時間分為一個一個小格子,在統計請求數量的時候,是通過統計滑動時間週期內的請求數量。

2)漏斗演演算法

漏斗演演算法的核心是控制總量,請求流入的速率不確定,超過流量部分益出,該演演算法比較適用於針對突發流量,想要儘可能的接收全部請求的場景。其缺點也比較明顯,這個總量怎麼評估,大小怎麼設定,而且一旦初始化也沒法動態調整。

3)令牌桶演演算法

令牌桶演演算法的核心是控制速率,令牌產生的速度是關鍵,不斷的請求獲取令牌,獲取不到就丟棄。該演演算法比較適用於針對突發流量,以保護自身服務資源以及依賴資源為主,支援動態調整速率。缺點的話實現比較複雜,而且會丟棄很多請求。

四 實現步驟

我們自定義的這套限流元件有是基於guava RateLimiter封裝的,採用令牌桶演演算法以控制速率為主,支援DUCC動態設定,同時支援限流後的降級措施。接下來看一下整體實現方案

1、自定義RateLimiter Annotation標籤

這裡主要對限流相關屬性的一個定義,包括每秒產生的令牌數、獲取令牌超時時間、降級邏輯實現以及限流開關等內容

@Documented
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SysRateLimit {

    /**
     * 每秒產生的令牌數 預設500
     *
     * @return
     */
    double permitsPerSecond() default 500D;

    /**
     * 獲取令牌超時時間 預設100
     *
     * @return
     */
    long timeout() default 100;

    /**
     * 獲取令牌超時時間單位 預設毫秒
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

    /**
     * 服務降級方法名稱 Spring bean id
     *
     * @return
     */
    String fallbackBeanId() default "";

    /**
     * 限流key 唯一
     *
     * @return
     */
    String limitKey() default "";
}

2、基於Spring Aspect 構造切面

首先就是我們需要構造一個Aspect切面用於掃描我們自定義的SysRateLimit標籤

@Slf4j
@EnableAspectJAutoProxy
@Aspect
public class SysRateLimitAspect {
    
    /**
     * 自定義切入點
     */
    @Pointcut("@annotation(com.jd.smb.service.ratelimiter.annotation.SysRateLimit)")
    public void pointCut() {

    }

    /**
     * 方法前執行限流方案
     *
     * @param joinPoint
     * @return
     * @throws Throwable
     */
    @Around("pointCut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        // 如果未獲取到物件,直接執行方法
        if (signature == null) {
            return joinPoint.proceed();
        }

        try {
            Method method = joinPoint.getTarget().getClass().getDeclaredMethod(signature.getName(), signature.getMethod().getParameterTypes());
            // 獲取註解物件
            SysRateLimit sysRateLimit = method.getAnnotation(SysRateLimit.class);
            if (sysRateLimit == null) {
                return joinPoint.proceed();
            }
            
        } catch (Exception e) {
            // todo log
        }
        return joinPoint.proceed();
    }
}

獲取自定義SysRateLimit標籤的各種屬性

 // 限流key
String limitKey = sysRateLimit.limitKey();
if (StringUtils.isBlank(limitKey)) {
    return joinPoint.proceed();
}
// 令牌桶數量
double permitsPerSecond = sysRateLimit.permitsPerSecond();
// 獲取令牌超時時間
long timeout = sysRateLimit.timeout();
// 獲取令牌超時時間單位
TimeUnit timeUnit = sysRateLimit.timeUnit();

將我們自定義的SysRateLimiter 和 Guava RateLimiter 進行整合

  1. 首先我們需要構造一個全域性Map,用於儲存我們開啟限流的方法,key就是我們定義的limitKey, value就是我們轉換後的Guava RateLimiter
 /**
 * 儲存RateLimiter(key: limitKey value:RateLimiter )
 */
private static final Map<String, RateLimiter> LIMITER_MAP = new ConcurrentHashMap<>();
  1. 接著就是核心邏輯:這裡首先從我們建立的Map中獲取Guava RateLimiter,獲取不到就建立RateLimiter.create(permitsPerSecond) ;然後呼叫RateLimiter.tryAcquire()嘗試獲取令牌桶,獲取成功則執行後續的邏輯,這裡重點獲取失敗後,我們需要執行我們的降級方法。(注意:Guava RateLimiter 有很多API,這裡我們不展開討論,後續會針對Guava限流的原始碼進行詳細的解析)
RateLimiter rateLimiter;
// Map中是否存在 存在直接獲取
if (LIMITER_MAP.containsKey(limitKey)) {
    rateLimiter = LIMITER_MAP.get(limitKey);
} else {
    // 不存在建立後放到Map中
    rateLimiter = RateLimiter.create(permitsPerSecond);
    LIMITER_MAP.put(limitKey, rateLimiter);
}
// 嘗試獲取令牌
if (!rateLimiter.tryAcquire(timeout, timeUnit)) {
    // todo 限流後降級措施
    return this.fallBack(sysRateLimit, joinPoint, signature);
}

降級方案執行

上面我們在獲取令牌桶超時後,需要執行我們的降級邏輯,怎麼做呢?也很簡單,我們在定義SysRateLimiter的時候有個fallBackBeanId,這個就是我們執行降級邏輯的bean物件Id,需要我們提前進行建立。接著我們看一下是怎麼實現的。

    /**
     * 執行降級邏輯
     *
     * @param sysRateLimit
     * @param joinPoint
     * @param signature
     * @return
     */
    private Object fallBack(SysRateLimit sysRateLimit, ProceedingJoinPoint joinPoint, MethodSignature signature) {
        String fallbackBeanId = sysRateLimit.fallbackBeanId();
        // 當沒有設定具體的降級實現方案的時候 可以結合業務世紀情況設定限流錯誤碼
        if (StringUtils.isBlank(fallbackBeanId)) {
            // 自定義的 可以結合自己系統裡的進行設定
            return ApiResult.error(ResultCode.REACH_RATE_LIMIT);
        }

        try {
            // SpringContext中通過BeanId獲取物件 SpringUtils只是獲取bean物件的工具類 有多種實現方式 可自行百度
            Object bean = SpringUtils.getBean(fallbackBeanId);
            Method method = bean.getClass().getMethod(signature.getName(), signature.getParameterTypes());
            // 執行對應的方法
            return method.invoke(bean, joinPoint.getArgs());
        } catch (Exception e) {
            // todo error log
        }
        return ApiResult.error(ResultCode.REACH_RATE_LIMIT);
    }

這樣我們大概的一個架子就弄好了。 接下來我們看看實際該如何使用

3、具體應用

在方法入口引入SysRateLimiter標籤

@Slf4j
@RestController
@RequestMapping("/api/user")
@RequiredArgsConstructor
public class UserQueryController extends AbstractController {

    /**
     * 查詢使用者資訊
     *
     * @param request
     * @return
     */
    @GetMapping("/info/{id}")
    @SysRateLimit(permitsPerSecond = 500, limitKey = "UserQueryController.info", fallbackBeanId = "userQueryControllerFallBack",
            timeout = 100, timeUnit = TimeUnit.MILLISECONDS)
    public ApiResult<UserInfo> info(@PathVariable Long id, HttpServletRequest request) {
        // todo 業務邏輯查詢 這裡不展開
        return ApiResult.success();
    }
}

設定降級方法

@Service
public class UserQueryControllerFallBack {

    /**
     * 降級後執行的邏輯
     *
     * @param request
     * @return
     */
    public ApiResult<UserInfo> info(Long id, HttpServletRequest request) {
        // todo 編寫限流降級後的邏輯 可以是降級碼 也可以是預設物件
        return ApiResult.success(null);
    }
}

當請求進來的時候,會結合我們設定的閾值進行令牌桶的獲取,獲取失敗後會執行限流,這裡我們進行了限流後的降級處理。其實到這裡我們完成限流元件的簡單封裝和使用,但是仍有一些點需要我們進行處理,例如如何動態設定令牌的數量,接下來我們就看一下如何實現令牌的動態設定。

4、動態設定令牌數量

通過DUCC設定令牌數量 我們需要定義一個DUCC設定,這裡面內容很簡單,設定我們設定limitKey的令牌數量

@Data
@Slf4j
@Component
public class RateLimitConfig {

    /**
     * 設定config key: limitKey value: 數量
     */
    private Map<String, Integer> limitConfig;

    /**
     * 監聽ducc設定
     *
     * @param json
     */
    @LafValue(key = "rate.limit.conf")
    public void setConfig(String json) {
        if (StringUtils.isBlank(json)) {
            return;
        }
        Map<String, Integer> map = JsonModelUtils.getModel(json, Map.class, null);
        if (map != null) {
            Wrapper.wrapperBean(map, this, true);
        }
    }
}

通過DUCC設定獲取指定limitKey的令牌數量,獲取失敗則採用方法設定預設數量,這樣我們後面設定令牌數量就可以通過DUCC動態的設定了

 /**
     * 獲取令牌桶數量
     *
     * @param sysRateLimit
     * @return
     */
    private double getPermitsPerSecond(SysRateLimit sysRateLimit) {
        // 方法預設令牌數量
        double defaultValue = sysRateLimit.permitsPerSecond();
        if (rateLimitConfig == null || rateLimitConfig.getLimitConfig() == null) {
            return defaultValue;
        }
        // 設定的令牌數量
        Integer value = rateLimitConfig.getLimitConfig().get(sysRateLimit.limitKey());
        if (value == null) {
            return defaultValue;
        }
        return value;
    }

5、後續其他設定

其實後續我們的其他屬性都可以通過DUCC動態化的來設定,這裡呢因為和令牌桶數量類似,就不再展開描述了。感興趣的小夥伴可以自行設定,根據我們的使用,使用預設設定即可。

作者:京東零售 王磊

來源:京東雲開發者社群