當一個服務擁有太多處理邏輯時,會導致程式碼結構異常的混亂,很難分辨一段邏輯是在哪個階段發揮作用的。
這時就可以引入狀態機模型,幫助程式碼結構變得清晰。
狀態機由一組狀態組成:
【初始狀態 -> 中間狀態 -> 最終狀態】。
在一個狀態機中,每個狀態會接收一組特定的事件,根據事件型別進行處理,並轉換到下一個狀態。當轉換到最終狀態時則退出。
狀態間轉換會有下面這三種型別:
在 Yarn 中提供了一個工廠類 StateMachineFactory
來幫助定義狀態機。如何使用,我們直接寫個 demo。
在上一篇文章《Yarn 服務庫和事件庫》案例基礎上進行擴充套件,增加狀態機庫的內容。如果還不瞭解服務庫和事件庫的同學,建議先學習下上一篇文章。
案例已上傳至 github,有幫助可以點個 ⭐️
https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo
狀態機實現,可以直接嵌入到上篇文章中的 AsyncDispatcher
使用。
這裡僅給出狀態機JobStateMachine
以及各種事件處理的程式碼。完整的程式碼專案執行,請到 github demo 中檢視。
import com.shuofxz.event.JobEvent;
import com.shuofxz.event.JobEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*
* 可參考 Yarn 中實現的狀態機物件:
* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,
* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,
* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
* */
@SuppressWarnings({"rawtypes", "unchecked"})
public class JobStateMachine implements EventHandler<JobEvent> {
private final String jobID;
private EventHandler eventHandler;
private final Lock writeLock;
private final Lock readLock;
// 定義狀態機
protected static final StateMachineFactory<JobStateMachine, JobStateInternal,
JobEventType, JobEvent>
stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
.addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
.installTopology();
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
public JobStateMachine(String jobID, EventHandler eventHandler) {
this.jobID = jobID;
// 多執行緒非同步處理,state 有可能被同時讀寫,使用讀寫鎖來避免競爭
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.eventHandler = eventHandler;
stateMachine = stateMachineFactory.make(this);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return stateMachine;
}
public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
System.out.println("Receiving event " + jobEvent);
// do something...
// 完成後傳送新的 Event —— JOB_START
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
}
}
public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
System.out.println("Receiving event " + jobEvent);
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
}
}
public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
@Override
public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
System.out.println("Receiving event " + jobEvent);
jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
}
}
public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {
@Override
public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
System.out.println("Receiving event " + jobEvent);
// 這是多結果狀態部分,因此需要人為制定後續狀態
// 這裡整個流程結束,設定一下對應的狀態
boolean flag = true;
if (flag) {
return JobStateInternal.SUCCEEDED;
} else {
return JobStateInternal.KILLED;
}
}
}
@Override
public void handle(JobEvent jobEvent) {
try {
// 注意這裡為了避免靜態條件,使用了讀寫鎖
writeLock.lock();
JobStateInternal oldState = getInternalState();
try {
getStateMachine().doTransition(jobEvent.getType(), jobEvent);
} catch (InvalidStateTransitionException e) {
System.out.println("Can't handle this event at current state!");
}
if (oldState != getInternalState()) {
System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());
}
} finally {
writeLock.unlock();
}
}
public JobStateInternal getInternalState() {
readLock.lock();
try {
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
}
}
public enum JobStateInternal {
NEW,
SETUP,
INITED,
RUNNING,
SUCCEEDED,
KILLED
}
}
hadoop 中提供了狀態機視覺化的工具類 VisualizeStateMachine.java
,可以拷貝到我們的工程中使用。
根據提示,執行需要三個引數:
Usage: %s <GraphName> <class[,class[,...]]> <OutputFile>%n
執行後會在專案根目錄生成圖檔案 jsm.gv
。
需要使用 graphviz
工具將 gv 檔案轉換成 png 檔案:
# linux 安裝
yum install graphviz
# mac 安裝
brew install graphviz
轉換:
dot -Tpng jsm.gv > jsm.png
視覺化狀態機展示:
再使用這個工具對 Yarn 中的 Application 狀態進行展示:
【思考】
如果不用狀態機,程式碼結構會是什麼樣呢?
下面這樣的程式碼,如果要增加或修改邏輯可能就是很痛苦的一件事情了。
// 一堆的函數呼叫
// 一堆的 if 巢狀
// 或者 switch case
本節對 Yarn 狀態機庫進行了介紹。實際使用時會結合事件庫、服務庫一同使用。
狀態機庫的使用幫助程式碼結構更加的清晰,新增狀態處理邏輯只需要增加一個狀態類別,或者增加一個方法處理對應型別的事件即可。將整個處理邏輯進行了拆分,便於編寫和維護。
參考文章:
原始碼|Yarn的事件驅動模型與狀態機