在高並行系統中,出於系統保護角度考慮,通常會對流量進行限流。
限流*的目的是在遇到流量高峰期或者流量突增(流量尖刺)時,通過對流量速率進行限制,當達到限制速率時,可以拒絕服務(定向到錯誤頁或告知資源沒有了)、排隊或等待(比如秒殺、評論、下單)、降級(返回兜底資料或預設資料,如商品詳情頁庫存預設有貨)
筆者碰到這樣的一道題,有下面的介面,需要實現1s秒只可以申請max次資源,需要考慮到並行效能
。
顯然這是一個限流器,網上有許多博主使用鎖
來實現,筆者將使用juc中的原子類,使用自旋+cas
實現。並不是說自旋+cas 一定優於鎖(自旋是cpu操作,在並行很高的時候會浪費cpu資源),但是相對於鎖可以減少使用者態到核心態的切換(這是由於java執行緒和作業系統執行緒是1比1的模型,上鎖是需要切換到核心態的,儘管synchronized 具備鎖升級,但是如果並行高了,最終還是重量級鎖,依舊具備開銷)筆者的cas都沒有自定義cas次數,實際使用應該是自旋若干次就放棄,或者切換為使用鎖的方式。並且這裡都是單機的限流,分散式的限流需要使用其他中介軟體,如redis實現。
而且在並行很高,很極限的情況下,這種cas的方式存在bug,所以成熟的實現如gauva的限流器,是使用synchronized實現的。
當前介面限定1s只可以提供n次服務,對於此需求,我們很直觀的可以想到使用屬性記錄當前是第幾秒,並且記錄這一秒內請求了多少次。
在指定週期內累加存取次數,當存取次數達到設定的閾值時,觸發限流策略,當進入下一個時間週期時會將存取次數清零
優點:簡單直接
缺點:臨界問題
如果在0.51s內請求max次,從1s1.5s請求max次,這樣可以實現在0.5~1.5這一秒內請求來到2max次。
首先我們要記錄 在x秒
請求了y次
,並且這兩個值必須是原子的。
這裡我們使用AtomicStampedReference
(JUC原始碼學習筆記4——原子類,CAS,Volatile記憶體屏障,快取偽共用與UnSafe相關方法中講到過),此類本意是為了防止cas的ABA問題,其中會記錄值和對應的版本
,並且其compareAndSet
方法可以同時更新值和對應的版本,這裡我們使用版本表示當前是多少秒,請求多少次使用AtomicInteger
記錄,方便我們在同一秒的時候更新請求數量。
public class CountRateLimiter implements RateLimiter {
/**
* 一秒可以接受多少個請求
*/
private final int numAcceptablePerSecond;
/***
* 版本號對應秒數
* 裡面的 AtomicInteger 記錄這一秒範圍內的請求數量
*/
private final AtomicStampedReference<AtomicInteger> helper;
public CountRateLimiter(int numAcceptablePerSecond) {
this.numAcceptablePerSecond = numAcceptablePerSecond;
this.helper =
new AtomicStampedReference<>(new AtomicInteger(numAcceptablePerSecond), -1);
}
@Override
public boolean acquire(int n) {
//不要太過分了
if (n > numAcceptablePerSecond) {
return false;
}
//上一次請求是多少秒的請求
int oldSeconds = helper.getStamp();
//當前多少秒
int currentSeconds = currentSeconds();
//不是同一秒的請求
//如果和當前不是一個版本(意味著不是同一秒) 那麼cas 修改版本並重置許可數量
if (oldSeconds != currentSeconds) {
//原剩餘的許可數量
AtomicInteger oldPermits = helper.getReference();
//cas 修改 同時修改版本,並且扣減數量
if (helper.compareAndSet(oldPermits,
//新許可的數量為 numAcceptablePerSecond - n
new AtomicInteger(numAcceptablePerSecond - n), oldSeconds, currentSeconds)) {
//cas 成功 那麼說明成功 拿到令牌
return true;
}
}
//到這裡說明 是同一秒(oldSeconds == currentSeconds)
//或者上面的if存在多執行緒競爭當前執行緒競爭失敗 其他執行緒重置了計數器 ==> 那麼cas 減少許可數量
//這裡判斷了一下 當前秒還是相等的,避免由於gc在第一個if中停留太久,比如第一秒執行緒A和B進入到第一個if,執行緒B成功了,但是執行緒A失敗了,並且暫停了2s,出來的時候時間已經是3s了,我們不能讓1s的請求佔用3s時候的令牌數
return currentSeconds() == currentSeconds &&
//最後這裡存在問題 如果在0.99999s的請求來到這裡,但是時間來到1s,這個cas才成功,那麼0.99999s的請求將打過來。導致1s的qps大於max
helper.getReference().addAndGet(-n) >= 0;
}
private static int currentSeconds() {
return (int) ((System.currentTimeMillis() / 1000) % Integer.MAX_VALUE);
}
}
為了避免計數器中的臨界問題,讓限制更加平滑,將固定視窗中分割出多個小時間視窗,分別在每個小的時間視窗中記錄存取次數,然後根據時間將視窗往前滑動並刪除過期的小時間視窗。
計數器演演算法,可以看做只有兩個視窗,因此在兩個視窗邊界的時候會出現臨界問題。而滑動視窗統計當前時間處於的1s內產生了多少次請求,避免了臨界問題
/**
* 滑動視窗限流器
* <p>
* 假設指定視窗總時長 為 1s 可以接受 10個請求,視窗分為5格
* 說明單格時間長度為200毫秒
* |_____|_____|_____|_____|_____|
* 0 200 400 600 800 1000
* <p>
* 當前時間為 500毫秒 那麼落在 (500/200)%5 也就是第二格
* 那麼500 毫秒是否可以接受請求 需要統計所有格子中的數量
* <p>
* 當時間來到 1500 毫秒,落在 (1500/200)%5 也是第二格
* |_____|_____|_____|_____|_____|_____|_____|_____|
* 0 200 400 600 800 1000 1200 1400 1600
* 從500到1500才是我們需要記錄的,視窗陣列大小是不變的
*
* 500的視窗版本是 500/1000 = 0
* 1500的視窗版本是 1500/1000 = 1
*
* 根據視窗版本來統計 哪些格子我們是要統計的,如果舊視窗版本小於當前視窗版本 不要計數
* (這裡的版本 可以理解為沒過 1000秒 版本加1,版本不同意味著是一秒前的資料)
*
* @author cuzz
* @version 1.0
**/
public class SlidingWindowRateLimiter implements RateLimiter {
//滑動視窗中的一個元素
private static class WindowElement {
/***
* 版本
*/
private volatile long version;
/**
* 計數
*/
private final AtomicInteger counter;
private WindowElement(long version, AtomicInteger counter) {
this.version = version;
this.counter = counter;
}
private void changeVersion(long newVersion) {
this.version = newVersion;
}
private void reset(int n) {
counter.set(n);
}
void add(int n) {
counter.addAndGet(n);
}
}
/**
* 整個視窗的大小,比如一秒 只能接受100個請求 那麼此值設定為1000(毫秒)
*/
private final long windowTimeMillions = 1000;
/***
* 視窗的長度,視窗的長度,視窗越長,越能防止臨界問題
*/
private final int windowLength;
/***
* 視窗陣列
*/
private final AtomicReferenceArray<WindowElement> slidWindow;
/***
* 一秒接受 100個請求 那麼此值設定為 100
*/
private final int canAcceptRequestTimes;
/**
* 記錄 視窗每一個元素 對應的時間跨度
* 1秒接受100個請求 那麼此處為 1000(毫秒)/100 = 10毫秒
*/
private final int millionsEachOne;
/**
* @param windowLength 指定視窗數量
* @param canAcceptRequestTimes 在 1s 內可以接受多少個請求
*/
public SlidingWindowRateLimiter(int windowLength,
int canAcceptRequestTimes) {
this.windowLength = windowLength;
this.canAcceptRequestTimes = canAcceptRequestTimes;
slidWindow = new AtomicReferenceArray<>(new WindowElement[windowLength]);
millionsEachOne = (int) (windowTimeMillions / windowLength);
}
@Override
public boolean acquire(int n) {
//1s分為5格 那麼 一格200ms
//當前時間為 500毫秒 那麼落在 (500/200)%5 也就是第二格
long currentTimeMillis = System.currentTimeMillis();
//這次請求 落在 哪個桶
int index = (int) ((currentTimeMillis / millionsEachOne) % windowLength);
//當前這次請求的 version 即當前是多少秒
long version = (currentTimeMillis - currentTimeMillis % windowTimeMillions);
//1. 拿到當前當前的計數
//1.1 如果計數為空 說明從來沒有其他請求設定元素,這時,我們需要cas初始化結束計數
//1.2 如果計數不為空
// 1.2.1 是相同的版本 那麼自增計數
// 1.2.3 如果不是相同的版本(之前版本小於當前版本),那麼更新版本
// 1.2.4 如果不是相同的版本(之前版本大於當前版本),基本上不可能,因為時間是不會倒流的
//操作這次請求落下的桶
WindowElement currentIndex = slidWindow.accumulateAndGet(index,
new WindowElement(version, new AtomicInteger(n)), (old, now) -> {
//計數為空 說明從來沒有其他請求設定元素,這時,我們需要cas初始化結束計數
if (old == null) {
return now;
}
//當前請求的次數
int currentRequest = now.counter.get();
//是同一秒 那麼自增
if (old.version == now.version) {
old.add(now.counter.get());
} else {
//如果不是相同的版本(之前版本小於當前版本),那麼更新版本 更新技術
old.reset(currentRequest);
old.changeVersion(now.version);
}
return old;
});
//大於最大數量返回false 這一瞬間對應的元素 就已經超出了我們的預期 那麼返回false
if (currentIndex.counter.get() > canAcceptRequestTimes) {
return false;
}
//統計視窗內所有請求數
long sum = 0;
//下面這一段 不具備瞬時一致性
for (int i = 0; i < windowLength; i++) {
WindowElement e = slidWindow.get(i);
if (e != null && e.version == version) {
sum += e.counter.get();
if (sum > canAcceptRequestTimes) {
return false;
}
}
}
//小於等於才可以
return sum <= canAcceptRequestTimes;
}
}
感覺滑動視窗,不使用鎖,是比較難實現的,因為上面的增加次數 和下面的 統計總數,不具備原子性。
漏桶限流演演算法的核心就是, 不管上面的水流速度有多塊, 漏桶水滴的流出速度始終保持不變。
這裡的水流就是我們的請求,漏水的速度始終不變,是指我們業務執行緒處理的速度不變,每隔一段時間從任務佇列中拿一個請求進行處理
漏桶演演算法,我覺得可以用於閘道器層,每次請求來了放在阻塞佇列,然後閘道器執行緒,每隔一段執行緒拿出一個請求去轉發執行,但是轉發請求,拿到返回進行響應的時間是不固定,所以閘道器執行緒需要使用其他的執行緒去非同步處理,閘道器執行緒只負責定時拿請求分配給其他執行緒非同步處理。(本題目中介面要求立馬返回false 或者true,不太契合就沒寫了)
大體的思路就是請求來了塞到任務佇列,定時執行緒每隔一段時間取一個請求,交給另外的執行緒非同步處理。如果請求太多,那麼阻塞佇列塞不進去,直接返回false。
令牌桶演演算法:請求執行作為消費者,每個請求都需要去桶中拿取一個令牌,取到令牌則繼續執行;如果桶中無令牌可取,就觸發拒絕策略,可以是超時等待,也可以是直接拒絕本次請求,由此達到限流目的。當桶中令牌數大於最大數量的時候,將不再新增。它可以適應流量突發,N 個請求到來只需要從桶中獲取 N 個令牌就可以繼續處理。
Guava中的RateLimiter 使用此演演算法,且提供了預熱模式,推薦使用
我們可以使用一個定時任務每隔一段時間想桶中生成令牌,記錄令牌的數量使用原子類。
這樣實現非常簡單,但是每一個限流器需要一個執行緒去生成,如果我們存在100個介面單獨限流,那麼需要100個執行緒
我們需要記錄上一次請求的時間,和桶中剩餘的令牌數,並且桶中的數量最好為double型別,因為此次請求和上一次請求的間隔時間,生成的令牌數可以為小數。所以我實現一個Helper內部類,實現這兩個值的原子更新
static class Helper {
//同時記錄上一次請求時間 和 剩餘數量
private static class Pair {
public Pair(long time, double count) {
this.time = time;
this.count = count;
}
final long time;
final double count;
}
private double count() {
return pair.count;
}
private long time() {
return pair.time;
}
//同時記錄上一次請求時間 和 剩餘數量
private volatile Pair pair;
//反射獲取unsafe進行cas操作
private static final Unsafe UNSAFE = getUnsafe();
Helper(double count) {
pair = new Pair(-1, count);
}
//pair 欄位的偏移,cas需要當期更改物件的地址
private final static long OFFSET_OF_PAIR;
static {
try {
OFFSET_OF_PAIR = UNSAFE.objectFieldOffset(Helper.class.getDeclaredField("pair"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
//cas修改,一次是,上一次的時間 和上一次剩餘數量,和當前時間,當前數量
boolean cas(long oldTime, double oldCount, long newTime, double newCount) {
final Pair current = pair;
return oldTime == current.time &&
//小於0.00001視為 數量相同,這裡需要根據並行度進行設定
Math.abs(oldCount - current.count) < 0.00001 &&
casPair(current, new Pair(newTime, newCount));
}
//cas修改 呼叫unsafe 實現
boolean casPair(Pair old, Pair newPair) {
return UNSAFE.compareAndSwapObject(this, OFFSET_OF_PAIR, old, newPair);
}
private static Unsafe getUnsafe() {
try {
Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor();
constructor.setAccessible(true);
return constructor.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public class TokenRateLimiter3 implements RateLimiter {
//一秒可以接受多少請求
private final int tokenPerSeconds;
//記錄token數量和上次請求時間
private final Helper tokenCount;
public TokenRateLimiter3(int tokenPerSeconds) {
this.tokenPerSeconds = tokenPerSeconds;
tokenCount = new Helper(0);
}
@Override
public boolean acquire(int n) {
if (n > tokenPerSeconds) {
return false;
}
//當前請求時間
long currentTimeMillis = System.currentTimeMillis();
while (true) {
//當前token 有多少個
double token = tokenCount.count();
//上一期請求時間
long preRequestTime = tokenCount.time();
//這段時間可以生成多少令牌
double canGenerate = (((double) (currentTimeMillis - preRequestTime) / 1000.0)
* tokenPerSeconds);
//桶中可以儲存的令牌的數量,最大不過 tokenPerSeconds
double canStore = Math.min(token + canGenerate, tokenPerSeconds);
//小於請求的令牌數
if (canStore < (double) n) {
return false;
}
//剩餘多少令牌
double release = canStore - n;
//cas修改
if (tokenCount.cas(preRequestTime,
token, currentTimeMillis, release)) {
return true;
}
}
}
}