RocketMQ 是一款開源的分散式訊息系統,基於高可用分散式叢集技術,提供低延時、高可靠的訊息釋出與訂閱服務。
這篇文章,筆者整理了 RocketMQ 原始碼中建立執行緒的幾點技巧,希望大家讀完之後,能夠有所收穫。
首先我們先溫習下常用的建立單執行緒的兩種方式:
▍一、實現 Runnable 介面
圖中,MyRunnable 類實現了 Runnable 介面的 run 方法,run 方法中定義具體的任務程式碼或處理邏輯,而Runnable 物件是作為執行緒建構函式的引數。
▍二、 繼承 Thread 類
執行緒實現類直接繼承 Thread ,本質上也是實現 Runnable 介面的 run 方法。
建立單執行緒的兩種方式都很簡單,但每次建立執行緒程式碼顯得有點冗餘,於是 RocketMQ 裡實現了一個抽象類 ServiceThread 。
我們可以看到抽象類中包含了如下核心方法:
下圖展示了 RocketMQ 眾多的單執行緒實現類。
實現類的程式設計模版類似 :
我們僅僅需要繼承抽象類,並實現 getServiceName 和 run 方法即可。啟動的時候,呼叫 start 方法 , 關閉的時候呼叫 shutdown 方法。
執行緒池是一種基於池化思想管理執行緒的工具,執行緒池維護著多個執行緒,等待著監督管理者分配可並行執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。
JDK中提供的 ThreadPoolExecutor 類,是我們最常使用的執行緒池類。
引數名 | 作用 |
---|---|
corePoolSize | 佇列沒滿時,執行緒最大並行數 |
maximumPoolSizes | 佇列滿後執行緒能夠達到的最大並行數 |
keepAliveTime | 空閒執行緒過多久被回收的時間限制 |
unit | keepAliveTime 的時間單位 |
workQueue | 阻塞的佇列型別 |
threadPoolFactory | 改變執行緒的名稱、執行緒組、優先順序、守護行程狀態 |
RejectedExecutionHandler | 超出 maximumPoolSizes + workQueue 時,任務會交給RejectedExecutionHandler來處理 |
任務的排程通過執行 execute方法完成,方法的核心流程如下:
在 RocketMQ 裡 ,網路請求都會攜帶命令編碼,每種命令對映對應的處理器,而處理器又會註冊對應的執行緒池。
當伺服器端 Broker 接收到傳送訊息命令時,都會有單獨的執行緒池 sendMessageExecutor 來處理這種命令請求。
基於 ThreadPoolExecutor 做了一個簡單的封裝 ,BrokerFixedThreadPoolExecutor 建構函式包含六個核心引數:
RocketMQ 實現了一個簡單的執行緒工廠:ThreadFactoryImpl,執行緒工廠可以定義執行緒名稱,以及是否是守護執行緒 。
開源專案 Cobar ,Xmemcached,Metamorphosis 中都有類似執行緒工廠的實現 。
執行緒名很重要,執行緒名很重要,執行緒名很重要 ,重要的事情說三遍。
我們看到 RocketMQ 中,無論是單執行緒抽象類還是多執行緒的封裝都會設定執行緒名 ,因為通過執行緒名,非常容易定位問題,從而大大提升解決問題的效率。
定位的媒介常見有兩種:紀錄檔檔案和堆疊記錄。
▍一、紀錄檔檔案
經常處理業務問題的同學,一定都經常與紀錄檔打交道。
▍二、堆疊記錄
jstack 是 java 虛擬機器器自帶的一種堆疊跟蹤工具 ,主要用來檢視 Java 執行緒的呼叫堆疊,執行緒快照包含當前 java 虛擬機器器內每一條執行緒正在執行的方法堆疊的集合,可以用來分析執行緒問題。
jstack -l 程序pid
筆者檢視執行緒堆疊,一般關注如下幾點:
本文是RocketMQ 系列文章的開篇,和朋友們簡單聊聊 RocketMQ 原始碼裡建立執行緒的技巧。
單執行緒抽象類 ServiceThread
使用者只需要實現業務邏輯以及定義執行緒名即可 ,不需要寫冗餘的程式碼。
執行緒池封裝
適當封裝,定義執行緒工廠,併合理設定執行緒池引數。
執行緒名很重要
檔案紀錄檔,堆疊記錄配合執行緒名能大大提升解決問題的效率。
RocketMQ 的多執行緒程式設計技巧很多,比如執行緒通訊,並行控制,執行緒模型等等,後續的文章會一一為大家展現。
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高質量的文章,非常感謝!