首先,感謝大家對上一篇文章[業務視覺化-讓你的流程圖"Run"起來(2.問題與改進)]的支援。
分享一下近期我對這個專案的一些改進。
1. 增加了分支選擇工程,可以根據節點的執行結果決定執行哪一個節點。
2. 增加了分散式執行節點功能,可以將流程節點部署到任意伺服器,通過佇列來排程節點,也就是說節點的執行將不在侷限於Java語言。
工程目錄[ html/network.html ]裡,提供了一個圖形化介面的流程設計器,可以通過拖拽的方式設計流程並生成Json檔案。
反之,也可以將Json檔案轉化為流程圖並進行編輯。
建立流程過程如下圖所示:
節點和邊的ID自動生成,可以自己定義節點名和每個邊對應的節點返回值。
點選[ update json ]後,即可生成/更新流程圖對應的Json檔案。
我們需要寫一個Java類,繼承自FlowRunner。
然後在裡面寫每個節點對應的方法,用@Node註釋來實現與流程中節點的繫結。
同時將1.1中生成的Json檔案放到和Java類相同的目錄下。
TestFlow1.java
public class TestFlow1 extends FlowRunner {
@Node(label = "a")
public int process_a() {
System.out.println("processing a");
return 1;
}
@Node(label = "b")
public void process_b() {
System.out.println("processing b");
}
@Node(label = "c")
public void process_c() {
System.out.println("processing c");
}
@Node(label = "d")
public void process_d() {
System.out.println("processing d");
}
}
TestFlow1.json
{
"flowId": "your flow id",
"nodes": [
{
"id": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
"label": "a",
"readyCheck": 0
},
{
"id": "f2a76819-b6a8-49db-af25-fab8274550f3",
"label": "b",
"readyCheck": 0
},
{
"id": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
"label": "c",
"readyCheck": 0
},
{
"id": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
"label": "d",
"readyCheck": 1
}
],
"edges": [
{
"id": "36bdc526-f6ae-45de-9bb7-34c293b34006",
"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
"to": "f2a76819-b6a8-49db-af25-fab8274550f3",
"condition": "1",
"arrows": "to"
},
{
"id": "652b871d-338d-45f5-91a9-3a488ed9b6f4",
"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
"to": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
"condition": "2",
"arrows": "to"
},
{
"id": "2691b6fe-ede9-4d1c-8b49-82d2a4ef014a",
"from": "f2a76819-b6a8-49db-af25-fab8274550f3",
"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
"arrows": "to"
},
{
"id": "d8026555-7609-4d27-8689-fd3dbcfe11d7",
"from": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
"arrows": "to"
}
]
}
Test1.java
public class Test1 {
public static void main(String[] args) {
TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);
}
}
呼叫1.2中寫好的Java類的startFlow方法,即可啟動流程。
同步啟動
TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);
非同步啟動
TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(false);
流程執行器會在第一個流程啟動的時候自動啟動,在整個系統關閉的時候,我們需要將流程執行器關閉,如下。
FlowStarter.shutdown();
在流程執行完畢後,紀錄檔會輸出執行結果的json檔案,我們可以將這個檔案貼上到1.1介紹的工具裡,生成圖形化的執行結果來確認節點的執行狀況。
執行成功紀錄檔
Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
execute:29bc32c7-acd8-4893-9410-e9895da38b2e
node name:d
processing d
Complete success.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#36AE7C"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#36AE7C"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}
流程執行結束後,會輸出執行結果和執行後的流程圖狀態。
可以直接將json貼到下面的位置,檢視看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。
執行失敗紀錄檔
Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
... 11 more
java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
... 11 more
Complete error.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#EB5353"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#E8F9FD"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}
流程執行結束後,會輸出執行結果和執行後的流程圖狀態。
可以直接將json貼到下面的位置,檢視看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。
我們可以在圖形介面中定義每條邊的值,流程執行時,對節點的返回值與後續邊的值進行比對,比對結果一直則執行該條邊對應的後續節點。
對於多條邊Join到一個節點的情況,我們需要定義該節點啟動的條件,如下:
1. 指向該節點的任意一條邊通過檢查,則啟動該節點
2. 指想該節點的所有邊都通過檢車後,啟動該節點
上圖表示節點b和節點c 任意一個節點完成後,執行節點d
節點的返回值與Java的方法返回值自動繫結,流程執行後,
對於有返回值的方法,則會呼叫該返回值的toString方法作為該節點的返回值。
對於無返回值的方法,則預設空文字列為返回值。
比如,返回int值,則用返回的int值與後續邊的條件做對比。
@Node(label = "a")
public int process_a() {
System.out.println("processing a");
return 1;
}
返回String值,則用返回的String值與後續邊的條件做對比。
@Node(label = "a")
public String process_a() {
System.out.println("processing a");
return "1";
}
返回自定義Object等,則用返回的Ojbect值的toString()方法生成的字串與後續邊的條件做對比。
@Node(label = "a")
public MyObj process_a() {
System.out.println("processing a");
return new MyObj();
}
目前還沒有對節點間引數傳遞做特別的處理,
可以通過類變數等方式進行節點間引數的傳遞。
把單體的工程改進成分散式的工程,首先要明確系統結構和改進點。
目前的系統結構如下圖所示(黃色部分可以設定成分散式執行)
系統通過兩個佇列來進行節點間控制資訊的流轉。
1. 待啟動佇列
2. 完成佇列
3.1.1 待啟動佇列
生產者:流程管理器(FlowManager),流程啟動後,流程管理器將初始節點放入[待啟動佇列]中,等待消費。
消費者:流程執行器(NodeExecutor),流程執行器監聽[待啟動佇列],得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。
3.1.2 完成佇列
生產者:流程執行器(NodeExecutor),流程執行器監聽[待啟動佇列],得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。
消費者:流程管理器(FlowManager),流程管理器監聽[完成佇列],得到訊息後,根據完成節點的資訊,更新流程圖,然後將後續待啟動的節點放入[待啟動佇列]中,等待消費。
基於3.1介紹的系統結構,可以明顯的發現佇列是單機系統改進為分散式系統的改進點。
所以,把佇列變成可設定的佇列後,系統將可以通過組態檔選擇單機部署,或者分散式部署。
組態檔如下
node.executor.remote=false
queue.ready.manager=io.github.nobuglady.network.fw.queue.ready.ReadyQueueManager
queue.complete.manager=io.github.nobuglady.network.fw.queue.complete.CompleteQueueManager
node.executor=io.github.nobuglady.network.fw.executor.NodePool
對節點的執行器的設定,系統預設提供了原生的執行器,可以通過Annotation對節點繫結的方法進行呼叫。
您可以設定自己的節點執行器,需要實現介面INodeExecutor
onNodeReady:節點準備執行的時候,會呼叫這個方法
這個方法裡需要寫節點執行的具體方法,並且在節點執行完畢後,將節點執行結果放入[完成佇列]。
shutdown:系統關閉的時候,會呼叫這個方法
false:本地執行
true:遠端執行
本地執行時,會呼叫node.executor中設定的執行器,來執行節點的執行。
遠端執行時,則系統不會啟動 流程執行器(NodeExecutor)。也就是不會消費[待啟動佇列]中的訊息。
遠端執行時,目標系統監聽[待啟動佇列]的訊息,得到訊息後,根據節點資訊執行該節點,執行完成後,將節點的執行結果放入[完成佇列]中,等待消費。
所以[待啟動佇列]和[完成佇列]必須設定成遠端系統可以存取的佇列。
待啟動佇列管理器
需要提供佇列的消費和生產的方法
注:設定成遠端執行節點時,系統不會呼叫此佇列的消費方法。(由遠端系統消費此佇列資訊)
完成佇列管理器
需要提供佇列的消費和生產的方法
注:設定成遠端執行節點時,系統不會呼叫此佇列的生產方法。(由遠端系統生產此佇列資訊)
下面介紹以RabbiMQ作為遠端佇列,進行分散式呼叫的設定,選擇其他的遠端佇列可以酌情修改。
工程裡的test1-6分別對應如下6種啟動方式,Test1-6.java為啟動類。
每次啟動之前,需要修改ladybugflow1-6.properties為ladybugflow.properties
1. 預設設定:通過流程類啟動流程
啟動程式碼
TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);
FlowStarter.shutdown();
2. 本地節點:自定義【待啟動佇列】和【完成佇列】
啟動程式碼
TestFlow2 testFlow = new TestFlow2();
testFlow.startFlow(true);
FlowStarter.shutdown();
3. 本地節點:自定義【節點執行器】
啟動程式碼
TestFlow3 testFlow = new TestFlow3();
testFlow.startFlow(true);
FlowStarter.shutdown();
4. 遠端節點:通過流程類啟動流程
啟動程式碼
TestFlow4 testFlow = new TestFlow4();
testFlow.startFlow(true);
FlowStarter.shutdown();
5. 預設設定:通過指定Json檔案來啟動流程
啟動程式碼
FlowRunner flowRunner = new FlowRunner(new TestFlow5());
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();
6. 遠端節點:通過指定Json檔案來啟動流程
啟動程式碼
FlowRunner flowRunner = new FlowRunner();
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();
感謝您看文章讀到這裡。
原始碼:https://github.com/nobuglady/ladybugflow
執行例原始碼:https://github.com/nobuglady/ladybugflow-demo
執行例原始碼(遠端節點):https://github.com/nobuglady/ladybugflow-demo-remote-app