大家好,我是陶朱公Boy,又跟大家見面了。
今天跟大家分享一款基於「生產者消費者模式」下實現的元件。
該元件是作者偶然在翻閱公司一中介軟體原始碼的時候碰到的,覺得設計的非常精美、巧妙,花了點時間整理成文分享給大家。
生產者和消費者彼此之間不進行通訊,中間通過一個容器(如阻塞佇列)來解決強解耦問題。
阻塞佇列起到了一定的資料緩衝作用,平衡了生產者和消費者對資料的處理能力。by—《Java並行程式設計的藝術》
該元件基於生產者消費者模式來編碼實現,是一款在地化解決流量削峰、解耦、非同步的利器。
此元件由以下知識點構成:執行緒池、阻塞佇列、LockSupport、Executor框架、final、volatile。此外你還能接觸到hash取模演演算法、介面回撥等機制。
元件本身程式碼量並不大,但知識點比較密集,所以希望大家能花一點時間認真看完。我將從適用場景、架構設計、原始碼解析這三個角度給大家講介紹這款元件。
現在很多後臺下載功能,普適的做法是先篩選轉換資料,然後對接雲端儲存平臺進行儲存,最後生成一個可存取的檔案地址,整個過程非常耗時。
其實完全可以生產者傳送一個下載請求就結束響應,伺服器端非同步的去消費這個任務請求,處理完生成地址後,再進行通知(比如更新對應資料庫檔案欄位)這是一種非同步體現,也解耦了生產者與消費者原來的同步互動方式,整體效率會更高。
有些應用它的QPS非常高,產生的資料本身並不是特別重要比如埋點的紀錄檔,如果實時呼叫埋點平臺可能給平臺側造成非常大的存取壓力。所以這個時候中間的阻塞佇列就起到了一定的緩衝作用,等一段時間或佇列資料量達到一定量(參賽可動態設定)再一次性拿出來轉換後,最後批次傳遞出去。
《Java並行程式設計的藝術》作者方騰飛有分享過他們基於生產者消費者模式實現的一個案例。
他們團隊早期有一個習慣,大家如果在平時工作當中遇到比較好的文章,會通過郵件轉發到專屬郵箱進行內部分享,這樣其他成員就能看到這篇文章,甚至大家會在底部評論、回覆、交流。
但期間遇到一個問題:一旦時間一長,以前的文章很難被檢閱。郵寄清單的視覺化太差,也不能進行歸類,有些新入職員工也看不到以往其他成員分享過的文章。
基於這些問題,有幾個小夥伴自發的趁業餘時間開發了一個簡易工具--yana。該工具功能就是:生產者執行緒會先往郵箱裡將所有分享的郵件下載下來(包括附件、圖片、郵件回覆等內容),下載完成後,通過confluence的Web Service介面,把文章儲存到confluence中去。這樣不僅好維護,而且留存問題也得到了解決。
不過隨著這款工具在其他部門的推廣,發現系統響應時間越來越長。只要單位時間內積累郵件一多,一次處理完可能就要花費幾分鐘。
於是他們升級了方案,把架構演進到了V2.0版本。整體思路是使用了生產者消費者模式來處理。
思路如下:生產者執行緒去郵件系統下載完郵件後,不會立即呼叫confluence的web service介面,而是選擇把下載的內容放入阻塞佇列後立即返回。而消費者啟動CPU*2個執行緒數來並行處理佇列中的郵件,從之前的單執行緒演變成了多執行緒處理,生產者和消費者實現了非同步、解耦。經過觀察,比起V1.0同步處理,速度比之前要快好了幾倍。
...
該元件支援「多生產者多消費者」場景,在多核時代充分利用CPU多核機制,消費者多執行緒並行處理阻塞佇列中的資料,加快任務處理速度。
該元件內部持有一個工作執行緒物件陣列,當生產者提交資料的時候,會先經過一個route元件(採用hash取模演演算法),動態路由到其中一個執行緒物件內的阻塞佇列中儲存起來。等到滿足一定條件,工作執行緒就會將自身執行緒物件內阻塞佇列中的資料轉換成指定容量的List物件(BlockQueue的drainto方法有支援),然後呼叫已經註冊的回撥函數把資料傳遞出去。
我們一起來看下這張工作執行緒內部執行流程圖:
首選我們說此元件物件內部持有一個工作執行緒物件陣列,每個工作執行緒物件內部持有一個有界阻塞佇列範例物件(ArrayBlockingQueue),方法有run(),add(),timeout()方法。
生產者呼叫元件自身的add方法後,add方法內部通過hash取模演演算法動態路由到某個工作執行緒物件內部的blockingQueue中去。
timeout方法是這款元件設計的一個亮點(容錯性設計)