業務視覺化-讓你的流程圖"Run"起來(3.分支選擇&跨語言分散式執行節點)

2022-07-25 15:00:44

前言

首先,感謝大家對上一篇文章[業務視覺化-讓你的流程圖"Run"起來(2.問題與改進)]的支援。

分享一下近期我對這個專案的一些改進。

1. 增加了分支選擇工程,可以根據節點的執行結果決定執行哪一個節點。

2. 增加了分散式執行節點功能,可以將流程節點部署到任意伺服器,通過佇列來排程節點,也就是說節點的執行將不在侷限於Java語言。

1. 如何讓流程圖「Run」起來

首先我們回顧一下前兩篇文章的知識,如何讓流程圖「Run」起來:

工程目錄[ html/network.html ]裡,提供了一個圖形化介面的流程設計器,可以通過拖拽的方式設計流程並生成Json檔案。

反之,也可以將Json檔案轉化為流程圖並進行編輯。

 

1.1 建立流程

建立流程過程如下圖所示:

節點和邊的ID自動生成,可以自己定義節點名和每個邊對應的節點返回值。

點選[ update json ]後,即可生成/更新流程圖對應的Json檔案。

 

1.2 節點與被執行的Java方法系結

我們需要寫一個Java類,繼承自FlowRunner。

然後在裡面寫每個節點對應的方法,用@Node註釋來實現與流程中節點的繫結。

同時將1.1中生成的Json檔案放到和Java類相同的目錄下。

TestFlow1.java

public class TestFlow1 extends FlowRunner {

	@Node(label = "a")
	public int process_a() {
		System.out.println("processing a");
		return 1;
	}

	@Node(label = "b")
	public void process_b() {
		System.out.println("processing b");
	}

	@Node(label = "c")
	public void process_c() {
		System.out.println("processing c");
	}

	@Node(label = "d")
	public void process_d() {
		System.out.println("processing d");
	}
}

TestFlow1.json

{
	"flowId": "your flow id",
	"nodes": [
		{
			"id": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"label": "a",
			"readyCheck": 0
		},
		{
			"id": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"label": "b",
			"readyCheck": 0
		},
		{
			"id": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"label": "c",
			"readyCheck": 0
		},
		{
			"id": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"label": "d",
			"readyCheck": 1
		}
	],
	"edges": [
		{
			"id": "36bdc526-f6ae-45de-9bb7-34c293b34006",
			"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"to": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"condition": "1",
			"arrows": "to"
		},
		{
			"id": "652b871d-338d-45f5-91a9-3a488ed9b6f4",
			"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"to": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"condition": "2",
			"arrows": "to"
		},
		{
			"id": "2691b6fe-ede9-4d1c-8b49-82d2a4ef014a",
			"from": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"arrows": "to"
		},
		{
			"id": "d8026555-7609-4d27-8689-fd3dbcfe11d7",
			"from": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"arrows": "to"
		}
	]
}

Test1.java

public class Test1 {

	public static void main(String[] args) {
		
		TestFlow1 testFlow = new TestFlow1();
		testFlow.startFlow(true);
		
	}
}

 

1.3 啟動流程

呼叫1.2中寫好的Java類的startFlow方法,即可啟動流程。

同步啟動

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);

非同步啟動

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(false);

1.4 關閉流程執行器

流程執行器會在第一個流程啟動的時候自動啟動,在整個系統關閉的時候,我們需要將流程執行器關閉,如下。

FlowStarter.shutdown();

1.5 流程執行結果確認

在流程執行完畢後,紀錄檔會輸出執行結果的json檔案,我們可以將這個檔案貼上到1.1介紹的工具裡,生成圖形化的執行結果來確認節點的執行狀況。

執行成功紀錄檔

Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
execute:29bc32c7-acd8-4893-9410-e9895da38b2e
node name:d
processing d
Complete success.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#36AE7C"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#36AE7C"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}

流程執行結束後,會輸出執行結果和執行後的流程圖狀態。
可以直接將json貼到下面的位置,檢視看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。


執行失敗紀錄檔

Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
	at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
	at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
	... 11 more
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
	at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
	at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
	... 11 more
Complete error.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#EB5353"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#E8F9FD"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}

流程執行結束後,會輸出執行結果和執行後的流程圖狀態。
可以直接將json貼到下面的位置,檢視看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。

2. 分支選擇的改進

2.1 分支選擇定義

我們可以在圖形介面中定義每條邊的值,流程執行時,對節點的返回值與後續邊的值進行比對,比對結果一直則執行該條邊對應的後續節點。

2.2 節點啟動條件定義

對於多條邊Join到一個節點的情況,我們需要定義該節點啟動的條件,如下:

1. 指向該節點的任意一條邊通過檢查,則啟動該節點

2. 指想該節點的所有邊都通過檢車後,啟動該節點

上圖表示節點b和節點c 任意一個節點完成後,執行節點d

 

2.3 節點返回值繫結

節點的返回值與Java的方法返回值自動繫結,流程執行後,

對於有返回值的方法,則會呼叫該返回值的toString方法作為該節點的返回值。

對於無返回值的方法,則預設空文字列為返回值。

比如,返回int值,則用返回的int值與後續邊的條件做對比。

@Node(label = "a")
	public int process_a() {
		System.out.println("processing a");
		return 1;
	}

返回String值,則用返回的String值與後續邊的條件做對比。

@Node(label = "a")
	public String process_a() {
		System.out.println("processing a");
		return "1";
	}

返回自定義Object等,則用返回的Ojbect值的toString()方法生成的字串與後續邊的條件做對比。

	@Node(label = "a")
	public MyObj process_a() {
		System.out.println("processing a");
		return new MyObj();
	}

2.4 節點間引數傳遞

目前還沒有對節點間引數傳遞做特別的處理,

可以通過類變數等方式進行節點間引數的傳遞。

3. 分散式執行的改進

3.1 系統結構

把單體的工程改進成分散式的工程,首先要明確系統結構和改進點。

目前的系統結構如下圖所示(黃色部分可以設定成分散式執行)

系統通過兩個佇列來進行節點間控制資訊的流轉。

1. 待啟動佇列

2. 完成佇列

3.1.1 待啟動佇列

生產者:流程管理器(FlowManager),流程啟動後,流程管理器將初始節點放入[待啟動佇列]中,等待消費。

消費者:流程執行器(NodeExecutor),流程執行器監聽[待啟動佇列],得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。

3.1.2 完成佇列

生產者:流程執行器(NodeExecutor),流程執行器監聽[待啟動佇列],得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。

消費者:流程管理器(FlowManager),流程管理器監聽[完成佇列],得到訊息後,根據完成節點的資訊,更新流程圖,然後將後續待啟動的節點放入[待啟動佇列]中,等待消費。

3.2 分散式系統改進

基於3.1介紹的系統結構,可以明顯的發現佇列是單機系統改進為分散式系統的改進點。

所以,把佇列變成可設定的佇列後,系統將可以通過組態檔選擇單機部署,或者分散式部署。

組態檔如下

node.executor.remote=false
queue.ready.manager=io.github.nobuglady.network.fw.queue.ready.ReadyQueueManager
queue.complete.manager=io.github.nobuglady.network.fw.queue.complete.CompleteQueueManager
node.executor=io.github.nobuglady.network.fw.executor.NodePool

 

node.executor

對節點的執行器的設定,系統預設提供了原生的執行器,可以通過Annotation對節點繫結的方法進行呼叫。

您可以設定自己的節點執行器,需要實現介面INodeExecutor

  onNodeReady:節點準備執行的時候,會呼叫這個方法

  這個方法裡需要寫節點執行的具體方法,並且在節點執行完畢後,將節點執行結果放入[完成佇列]。

  shutdown:系統關閉的時候,會呼叫這個方法

node.executor.remote

false:本地執行

true:遠端執行

本地執行時,會呼叫node.executor中設定的執行器,來執行節點的執行。

遠端執行時,則系統不會啟動 流程執行器(NodeExecutor)。也就是不會消費[待啟動佇列]中的訊息。

遠端執行時,目標系統監聽[待啟動佇列]的訊息,得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。

所以[待啟動佇列]和[完成佇列]必須設定成遠端系統可以存取的佇列。

 

queue.ready.manager

待啟動佇列管理器

需要提供佇列的消費和生產的方法

注:設定成遠端執行節點時,系統不會呼叫此佇列的消費方法。(由遠端系統消費此佇列資訊)

 

queue.complete.manager

完成佇列管理器

需要提供佇列的消費和生產的方法

注:設定成遠端執行節點時,系統不會呼叫此佇列的生產方法。(由遠端系統生產此佇列資訊)

 

5. 本地執行和分散式執行設定例

下面介紹以RabbiMQ作為遠端佇列,進行分散式呼叫的設定,選擇其他的遠端佇列可以酌情修改。

工程裡的test1-6分別對應如下6種啟動方式,Test1-6.java為啟動類。

每次啟動之前,需要修改ladybugflow1-6.properties為ladybugflow.properties

1.  預設設定:通過流程類啟動流程

啟動程式碼

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);
FlowStarter.shutdown();

2. 本地節點:自定義【待啟動佇列】和【完成佇列】

啟動程式碼

TestFlow2 testFlow = new TestFlow2();
testFlow.startFlow(true);
FlowStarter.shutdown();

3. 本地節點:自定義【節點執行器】

啟動程式碼

TestFlow3 testFlow = new TestFlow3();
testFlow.startFlow(true);
FlowStarter.shutdown();

4. 遠端節點:通過流程類啟動流程

啟動程式碼

TestFlow4 testFlow = new TestFlow4();
testFlow.startFlow(true);
FlowStarter.shutdown();

5. 預設設定:通過指定Json檔案來啟動流程

啟動程式碼

FlowRunner flowRunner = new FlowRunner(new TestFlow5());
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();

6. 遠端節點:通過指定Json檔案來啟動流程

啟動程式碼

FlowRunner flowRunner = new FlowRunner();
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();

感謝您看文章讀到這裡。

最後

原始碼:https://github.com/nobuglady/ladybugflow

執行例原始碼:https://github.com/nobuglady/ladybugflow-demo

執行例原始碼(遠端節點):https://github.com/nobuglady/ladybugflow-demo-remote-app