上一篇文章對 ResourceManager 整體架構和功能進行了講述。本篇將對 RM 中管理 Application Master 的部分進行深入的講解。
下面將會介紹 RM 與 AM 整體通訊執行流程,並對 RM 中涉及的對應服務進行具體講解。
為了更好的學習本篇知識,建議先熟悉以下知識點,不瞭解的部分可翻到前面對應的文章進行學習:
使用者端提交任務到 RM 後,啟動 AM 到任務完成的流程如下所示:
各個步驟具體執行操作請對應下面各服務講解。
ApplictionMaster 管理部分主要由三個服務構成,它們共同管理應用程式的 AM 的生存週期。
(以下服務均能根據名稱找到原始碼中對應的類,可以看其具體的實現邏輯)
handle
方法收到 AM 事件後建立 Runnable 物件,之後會放到 masterEvents
阻塞佇列中,launcherHandlingThread
不斷從佇列中取出事件,提交到執行緒池 launcherPool
中處理。(流程圖如下所示)AbstractLivelinessMonitor
,在抽象類中已經實現好 live 檢查邏輯,在一段時間內未彙報心跳資訊,則任務其掛了。AMLivelinessMonitor
只需定義當 AM 被認為掛了(expire)時,需要處理的邏輯。RMAppAttemptEvent
EXPIRE 事件。抽象類 AbstractLivelinessMonitor
簡要介紹:
public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
// 裡面最重要的檢查函數
// 定期遍歷記錄的 list,看是否有超時的
// 檢查週期預設為超時時間的 1/3
private class PingChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) {
Iterator<Map.Entry<O, Long>> iterator =
running.entrySet().iterator();
//avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
Map.Entry<O, Long> entry = iterator.next();
if (currentTime > entry.getValue() + expireInterval) {
iterator.remove();
expire(entry.getKey());
LOG.info("Expired:" + entry.getKey().toString() +
" Timed out after " + expireInterval/1000 + " secs");
}
}
}
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
}
}
}
}
ApplicationMasterProtocol
的實現類。ApplicationMasterProtocol#allocate
方法定期呼叫實現,主要作用:
本篇主要介紹了 RM 中對 AM 的管理部分。首先介紹了 RM 相關元件與 AM 互動流程,之後對各服務執行邏輯、RPC 呼叫等進行了詳細的介紹。本篇中僅對 ApplicationMasterLauncher 元件進行了詳細講解,並繪圖說明,其餘部分各位同學感興趣可自行梳理。
在學習這部分知識時,建議對照原始碼進行梳理,可以更好的瞭解其中的流程。