大規模佇列的核心訴求,不僅需要「快」,還需要兼顧「公平」。
HTTP是一種常用的通訊協定,除了常見網站存取、上傳下載,HTTP協定還經常被用在訊息推播場景上。
設想你搭建了一個電商平臺,有很多大型商家入駐了該電商平臺並售賣各類商品,在消費者購買某個商品後,平臺會通過HTTP協定將消費者購買商品的資訊通知商家,商家則會在後臺接收平臺推播過來的訊息。
一般情況下,所有的系統都正常工作。但突然有一天,A商家出現了爆款產品,使得銷售量迅速上升,由於A商家的後臺服務處理能力是有限的,便會出現平臺一直在給A商家推播商品售賣資訊,而將其他商家的訊息都排在後面,這便導致大量其他商家不能及時知道商品的售賣情況。
這種情況也會發生在某個大客戶系統異常、響應急劇變慢,導致平臺推播能力下降影響其他客戶。因此,實現不同客戶訊息推播的隔離與控制就顯得十分重要。
除了訊息推播場景,類似的需求也發生在平臺型的任務處理場景和資源排程場景。
在任務處理場景,很多客戶會使用平臺來處理任務,比如:通過通訊平臺傳送語音通知,每個客戶都有大量的語音通知業務請求需要處理。
由於資源是有限的,所以需要給每個客戶配額一定的處理能力,當客戶提交的請求大於配額的時候,平臺會按最高配額的處理速度來處理客戶的請求,並將超過配額的部分積壓延後處理,這樣會避免因為某些頭部客戶的請求量過大導致其他客戶的請求長時間無法處理的情況。
在資源排程場景,假設平臺有很多資源用於處理客戶的請求,雖然每個資源都能處理某些型別的任務,但是資源的實時處理能力是受限的。
比如:資源只能實時處理100QPS的請求,這時需要建立一套機制,將對應資源能處理的任務選擇出來,並按資源的實際處理能力提交給對應的資源進行處理,保證所有資源都能滿負荷執行。
上面三個場景看似不同,但背後其實隱藏的是同樣的問題模型:
| 系統處理能力受限,或者系統能承諾處理能力受限。
| 實際的請求量可能在某個時間點遠大於系統的處理能力。
| 個體與個體之間是獨立且存在差異的,平臺上有不同的客戶,不同客戶對時效的要求是不一樣的,不同客戶的任務規模也是不一樣的。
| 超高的並行,比如十萬甚至百萬QPS的HTTP訊息推播請求。
對於這種型別的問題,我們如何解決呢?
其實,不管是資源還是客戶,都可以抽象為一個實時處理能力受限的佇列。
對於訊息推播場景,可以為每個客戶建立一個佇列,把需要推播的訊息放到對應的客戶佇列裡,並按客戶最大設定流量輪流進行推播。
對於任務接收場景,每個客戶都可以被當作是一個佇列,只要能控制每個佇列的最大消費速度,就能保證不會因為頭部客戶的突發流量導致其他客戶被影響。
對於資源排程場景,可以為每個資源建立一個佇列,然後將對應資源能處理的任務放在佇列裡面,再按照佇列的實時處理能力,消費裡面的資料。
此外,即使同個客戶或者同個資源裡,業務內部也是有優先順序的,所以佇列內部也需要支援業務的優先順序。
因此,這類佇列模型於普通的生產消費者模型存在顯著的區別:
佇列數量非常多,佇列的消費速度需要滿足上限,支援優先順序。
如何構建這類面向百萬並行、支援優先順序的大規模佇列的生產消費系統?
提到生產消費模型,很自然會想到一些成熟的訊息中介軟體,如METAQ,KAFKA等。但是經過調研發現:當佇列數量的量級非常大,達到千級甚至萬級的時候,這些中介軟體還是存在較大瓶頸的。
以METAQ為例,由於METAQ是一個執行緒池模式,一個TOPIC就有一個執行緒池,所以當TOPIC非常多的時候,機器上需要開非常多的執行緒,這顯然是不可能的。通過分析發現,METAQ的問題主要是實現機制的問題,所以另一個思路是:基於開源的METAQ原始碼,對其消費端進行二次開發。
但這也會存在一系列的問題……
首先,METAQ的程式碼本身非常龐大,熟悉裡面的細節就需要投入非常大的成本。此外,METAQ的設計思路與面向大規模佇列的場景有著本質區別,METAQ核心設計思路是「快」。
然而,大規模佇列的核心訴求不僅需要「快」,還需要兼顧"公平",即保證所有的佇列都能達到自己的效能目標。
這就導致METAQ裡面有大量的邏輯其實並不匹配大規模佇列的生產消費模型。同時,考慮到後續METAQ的版本迭代等的穩定性風險也是非常難以控制的。
不管是METAQ還是KAFFA,在佇列優先順序的支援上比較弱,這些中介軟體在設計的時候,並非主要面向多優先順序的訊息。因此,支援這個特性也非常難,改造的成本也非常高。
通過綜合評估,基於分散式基礎佇列進行自建會更穩定、可靠、可落地。通過系統調研發現阿里雲的LINDORM和REDIS都提供基礎的佇列操作,LINDORM提供的STRONG CONSISTENCY(SC)級別的資料一致效能力,可以支援資料寫入後100%被立即讀出。而REDIS主要採用的是一種非同步備份的機制,所以從資料的高可靠考慮,選擇LINDORM是更可靠的方案。
LINDORM是一個支援多模型的NOSQL系統,相容HBASE/CASSANDRA、OPENTSDB、SOLR、SQL、HDFS等多種開源標準介面,支援的模型包括:KV,WIDECOLUMN,TABULAR和佇列模型等,這裡使用的就是它的佇列模型。
雖然LINDORM也叫佇列模型,但是它跟METAQ訊息佇列不一樣,他核心的主要只有兩個操作介面: 一個是PUT,把資料放入到某個佇列的隊尾,成功後會返回訊息對應的偏移,另一個是GET(I),從某個偏移地址獲取對應的資料,且單佇列最大隻支援150QPS。
到這裡便可以發現理想與現實的巨大鴻溝,我們生產消費系統的目標是要支援十萬、百萬並行,並且希望能自動解決消費進度管理、異常的恢復等問題,以LINDORM目前的狀況來看都是沒有的。
通過前文分析發現LINDORM只提供了插入資料及獲取資料兩個基礎操作,單佇列只支援150QPS,而且沒有消費進度管理和異常灰度機制,這些問題該如何解決?
這裡將構建百萬並行、支援多優先順序的大規模佇列生產消費系統稱為EMQ(ENORMOUSE SCALE MESSAGE QUEUE )。EMQ系統主要分為6個模組:佇列拆分、佇列分配、佇列消費、容量控制、消費進度管理、容錯機制。
為了便於理解,將之前提到的客戶對應的佇列及資源對應的佇列統一稱之為邏輯佇列,將LINDORM的佇列稱之為物理佇列。
LINDORM單佇列只支援150QPS,且任何物理佇列都存在容量限制。但是,我們系統設計的目標是一百萬QPS(儘管這個一百萬QPS是所有邏輯佇列的總和)。
單個邏輯佇列超過1000QPS在實際情況中非常常見,且從設計角度來講,邏輯佇列的QPS也十分難控制。因此,必須把邏輯佇列拆分成一個個150QPS以內的物理佇列。
在佇列拆分完後,系統需要消費這些物理佇列,因此需要把佇列合理的分配到應用叢集的機器上。
每臺機器上需要啟動對應的使用者端去消費各佇列裡面的資料,因為把一個支援1000QPS的佇列拆分成了20個小的物理佇列,所以每個佇列支援50QPS。
這裡通過單佇列容量50QPS乘以總的物理佇列數等於1000QPS來實現邏輯佇列支援1000QPS的目標,但是從邏輯上如果存在資料傾斜的時候,會存在總容量不滿1000PQS的情況。
考慮到該系統主要面向的是一個海量資料場景,因此從概率上來講,這是可以接受的。
佇列分配完後,還需要構建一個支援高效能的消費使用者端。該使用者端的高效能主要表現在:實現避免網路IO存取導致的效能下降;能快速處理本臺機器上的所有佇列,保證既不會輪不到,又能滿負荷處理;同時,在消費完訊息後能快速的執行業務系統的任務。
當完成佇列消費後,仍需要構建一個消費進度的管理模組,即管理當前佇列生產的點位和已經消費的資料的點位,這樣子才能清楚地知道下一個需要消費的資料以及佇列的積壓量。
系統的容錯機制主要包括三個方面:首先,如果某個偏移量沒有資料的時候,需要能發現並跳過對應的偏移;其次,因為消費完的資料需要提交各業務層進行處理,如果業務層處理失敗後我們應該有一定的異常恢復機制,如果業務層持續失敗的時候我們需要有一定的兜底機制;此外,當機器因為異常宕機的時候,在原來機器上消費的佇列需要平滑遷移到其他機器,從而實現無失真恢復。
如下圖為EMQ的佇列模型:
ROOT節點下分兩個節點:一是ONLINE節點,主要是面向生產環境,二是SANBOX,主要面向生產前的測試,這能保證系統在更新某個功能的時候可以先進行充分的測試然後再同步到生產環境。
在ONLINE節點下面是一個個TOPIC,這裡的TOPIC就是我們之前說的邏輯佇列,也就是分配給客戶的佇列或者為每個資源分配的佇列(後文使用TOPIC代指邏輯佇列)。每個TOPIC有一定的容量,也就是我們說的QPS。
每個TOPIC下有若干個GROUP,每個GROUP有獨立的容量,其值為TOPIC的容量除以總的GROUP數,並且要求這個值需要小於LINDORM物理佇列支援的最大QPS。
每個TOPIC下面有分優先順序的QUEUE,該設計主要是為了支援優先順序能力設計的。本文為了描述方便,會以高中低三個優先順序為例介紹。這三個優先順序的QUEUE是共用GROUP的容量,也就是說如果GROUP支援50QPS,那麼三個QUEUE的總QPS是50。這裡QUEUE才是真正對應LINDORM的物理佇列,這也是為什麼要求GROUP的容量需要小於LINDROM物理佇列支援的最大QPS。
對於資源排程場景,假設有一個資源的QPS是500QPS。那麼,他會對應一個TOPIC,這個TOPIC下面有10個GROUP,每個GROUP有3個優先順序,也就是它會生產3*10 = 30個LINDORM佇列。
假設每個GROUP的QPS為50,那麼對於100萬並行的系統將有約6萬個物理佇列,如何將這麼大數量級的佇列分配到機器上去?佇列分配應該滿足哪些原則?
首先,儘可能將佇列平均分配到每臺機器上,避免出現某個機器消費佇列資料量太多產生效能問題;其次,當機器下線、宕機或置換的時候,機器上消費的佇列儘可能不要發生大面積的遷移;最後,當佇列新增或者刪除的時候,機器上消費的佇列也儘可能得不要發生大面積的遷移。
基於這些原則,設計瞭如下圖所示的佇列分配模型。
首先,引入一個ZOOKEEPER叢集,在主節點下面建立兩個節點,一個是RUNNING節點,用於儲存機器的心跳資訊,在機器上線的時候會建立一個以機器IP為名字的臨時節點,在機器下線的時候會銷燬對應節點。二是SERVERLIST節點,該節點儲存的是所有消費的機器IP為名字的子節點,而在子節點裡儲存的是機器消費的所有佇列。
現在有一個佇列結集合和有一個機器列表集合,需要把佇列儘可能平均的分配到機器上。一個簡單的方法就是取所有的佇列除以機器總數,平均分配到所有機器。這看似簡單又完美的方法其實存在一些問題,當機器下線的時候,這個計算的過程就要重新來一把,可能導致大量的機器消費的佇列遷移。
如果不重新計算而是在第一次取平均,即在機器下線的時候把這個機器上的佇列平均分配到其他機器,機器上線的時候把其他機器上佇列抽取一部分過來,這種方案在邏輯上是可行的。
但是,如果有佇列新增的時候要執行佇列的設定,在佇列刪除的時候要重新平衡機器的消費佇列,這個無疑是非常複雜的。最為重要的是,這種增量變更的方式如果在其中某次分配存在問題,那麼後面可能一直無法挽回。
綜合考慮下,採用了一致性HASH的方案,考慮到一致性HASH的平衡性,能保證所有機器分配的佇列數較為接近,同時,由於一致性HASH的單調性,不管是機器變更或者佇列變更,不會導致大量的佇列機器關係發生變化。
在引入一箇中心計算任務後,當機器發生變化或消費的佇列發生變化時,都會全量的重新計算每臺機器消費的佇列。如果機器消費的佇列有新增,那麼它會新增消費對應的佇列,如果有減少,就會取消對應佇列的消費。
經過前面的一系列設計,已經完成了佇列的拆分,並且將佇列分配到叢集機器上。那還有最重要的一件事情,就是構建一個高效可靠消費使用者端。
保證能準確無誤高效能地消費佇列的裡面的資料,保證在佇列有資料的情況下按佇列配額的最大容量進行消費,以及當佇列裡的資料比較少的時候所有資料都快速被消費。
在原型機驗證環節,設計目標按照在8核16G的機器上,單機3000佇列的時候支援1000QPS並行的處理。如下圖,是EMQ的單機模型圖,主要包括分散式物理佇列、遠端資料拉取模組、本地緩衝處理模組、緩衝佇列分發&速度控制模組、訊息任務處理模組以及消費進度管理模組。
分散式物理隊使用的是LINDORM的佇列模型,考慮到後續的擴充套件,通過對LINDORM的操作做了一層抽象,只要實現適配層的方法,便可以快速支援其他基礎佇列模型,比如:SWIFT,REDIS等。
遠端資料拉取模組主要包括IO任務孵化器,核心是一個執行緒池會週期性地孵化一些任務,將遠端佇列裡面的資料拉取到本地,保障本地佇列緩衝區裡面的資料達到一定閾值。它的結束條件是本地緩衝區裡的資料滿足未來一段時間內的處理要求,或者所有遠端的資料都已經拉取到本地緩衝區。
本地緩衝區是一系列的本地佇列,與這臺機器上消費的LINDORM上物理佇列是一一對應的。也就是說:在遠端這臺機器有多少要消費的LINDORM佇列,本地就有多少個對應的佇列。
緩衝佇列分發與速度控制主要包括一個緩衝任務孵化器,它的核心職責是孵化一些佇列任務以及消費本地緩衝區裡面的資料,直到到達當前佇列的QPS上限設定,或者緩衝區的資料空了。
當消費完成一個新的資料之後,會更新對應通道的消費進度的點位,下次再消費的時候從新的點位開始消費,這樣保證消費進度不斷向前推移。同時,還會將消費進度的資訊週期性的範例化到資料庫,保證如果機器發生異常或者遷移的時候,能重新恢復之前的消費點位開始消費,因為這個備份是非同步且有延時的,這便於所有的訊息中介軟體一樣,一個訊息是可能重推的,需要業務處理的時候支援冪等操作。
這裡再重點介紹一下,消費速度控制,要單機消費幾千個佇列,但是每臺機器的執行緒是有限的,所以一定採用的是執行緒池的方案,如下圖:
每個佇列都有一個獨立的消費計數器,每秒鐘會執行若干個LOOP,每個LOOP會為每個佇列生成一個消費的任務,這個任務包含目標佇列和消費的最大的任務數。
每次執行拉取的時候會先對當前佇列的消費計數器加一,提前預佔,然後去消費佇列裡面的資料,如果成功了,那麼流程結束,如果失敗了會將計數器減一,實現回滾的操作。當越到後面的時候,有些佇列的當前秒需要拉取的資料已經足夠了,就無需再繼續拉取了。
在完成EMQ叢集模型和單機模型的設計之後,已經能夠實現面向大規模佇列百萬並行的生產消費系統能力,但是在很多業務場景下我們的任務都是存在一定優先順序的。
比如以簡訊傳送場景為例,簡訊分為通知業務、行銷業務、驗證碼業務,一個資源如果既能處理通知業務,也能處理行銷和驗證碼業務,在正常情況我們肯定是希望驗證碼的任務能優先被處理,然後再處理通知業務,最後才去處理去處理行銷業務。
也就是在資源排程場景,我們為每個資源建立了一個邏輯佇列,在EMQ裡面也就是一個TOPIC,這個佇列是需要能支援優先順序排程的,如果驗證碼的任務最後進入到佇列,它裡面已經堆積了大量的行銷業務請求,我們也希望這個驗證碼的請求能優先於其他行銷型別的請求被處理。
如果對應通用的佇列機制是不現實的,通用的佇列核心的邏輯就是先進先出。
那我們現在要實現優先順序搶佔,必須要在佇列設計上做文章,如下圖:
我們需要將一個佇列拆分成N個佇列,N是需要支援的優先順序個數。以三個優先順序為例,我們會構建高,中,低三個優先順序的佇列,這個三個優先順序佇列組成一個GROUP,共用這個GROUP的容量。也就是說如果這個GROUP的QPS是50,那麼在一秒鐘消費高中低三個優先順序佇列的總QPS不能超過50。
在消費佇列訊息的時候,會先消費高優先順序的佇列裡面的資料,然後再消費中優先順序佇列裡面的資料,最後才消費低優先順序佇列裡面的資料。這樣子就保證,高優先順序裡面的資料一定會先於中優先順序裡面的資料被處理。中優先順序裡面佇列的資料也會先於低優先順序裡面的資料被處理。
本文重點介紹瞭如何快速、低成本地構建面向百萬並行多優先順序的大規模佇列生產消費系統。在擁有基礎能力以後,在上面做各種複雜的業務能力設計便十分容易。比如:文章最開始提到的HTTP推播場景,那麼假設某個客戶突然有超10萬QPS的訊息需要推播,系統只支援10萬QPS推播能力,如果按先進先出,那麼可能其他客戶的訊息都無法推播了。但是,如果基於EMQ構建生產者消費者模型,為每個客戶(或客戶組)建立一個佇列,並且設定這個佇列支援的上限推播的QPS,訊息在傳送前先推播到EMQ佇列,按設定的限額消費。那麼,即使客戶瞬間有很大的資訊推播請求,也不會影響到其他客戶的正常業務處理。