Seata 包括 Server端和Client端。Seata中有三種角色:TC、TM、RM,其中,Server端就是TC,TM和RM屬Client端。Client端的原始碼學習上一篇已講過,詳見 《Seata 1.5.2原始碼學習》,今天來學習Server端的原始碼。
原始碼下載地址:https://github.com/seata/seata
啟動類 ServerApplication 沒什麼好說的,重點是ServerRunner
ServerRunner 是一個 CommandLineRunner 範例,因此在Spring Boot啟動完成後會回撥其run()方法。而在ServerRunner的run()方法中呼叫了Server.start()方法。
在Server#start()方法中,初始化了包括id生成器在內的很多元件,我們先不管這些,重點關注以下幾行程式碼:
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
DefaultCoordinator是一個單例Bean,在整個應用中只有一個DefaultCoordinator範例
DefaultCoordinator 實現了 TransactionMessageHandler
NettyRemotingServer#setHandler()設定的正是TransactionMessageHandler
DefaultCoordinator#onRequest()
重點是這三行:
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
transactionRequest.handle(context);
DefaultCoordinator實現了TCInboundHandler介面,所以它不僅是一個TransactionMessageHandler,還是一個TCInboundHandler
這裡transactionRequest.setTCInboundHandler(this),就是指定AbstractTransactionRequestToTC中的TCInboundHandler設定為DefaultCoordinator
AbstractTransactionRequest#handle()
不同的請求分發給對應的處理器去處理
現在請求和對應的處理器都有了,下面具體看一下每種請求都是如何被處理的
1. 開啟全域性事務
開啟事務直接呼叫子類DefaultCoordinator#doGlobalBegin(),同時放在一個處理模板中執行
在doGlobalBean()中呼叫DefaultCore#begin()並返回全域性事務ID(xid)
new GlobalSession()
新增一個SessionManager作為Session的監聽器
Core
總結一下,開啟事務:
開啟事務,建立一個全域性事務,如果是seata.store.mode=db的話,向global_table表插入一條記錄
2. 分支事務註冊
DefaultCore#branchRegister()
如果是AT模式,這裡呼叫的就是ATCore#branchSessionLock()
ATCore#branchSessionLock()檢查是否拿到鎖了
具體每種鎖的實現就不往下看了,挑其中一個看下,就RedisLocker吧
總之,分支註冊的時候需要檢查鎖,拿到本次事務中所涉及的所有需要加鎖的行的鎖才能註冊成功
所有行都加鎖成功,分支註冊才算成功,才會返回true
再回到AbstractCore#branchRegister(),整個方法是放在SessionHolder#lockAndExecute()中執行的
總結一下,分支註冊:
分支註冊,建立BranchSession,獲取全域性鎖成功後將branchSession加入globalSession
3. 提交全域性事務
首先判斷全域性事務狀態是否為begin,如果不是則不應該提交。如果是,則將事務active置為false,釋放全域性鎖,判斷是否可以非同步提交。分支型別是AT的都可以非同步提交,因此AT模式,預設是非同步提交。如果不能非同步提交,則採取同步提交。
3.1. 同步提交
遍歷所有已註冊的分支事務,向分支傳送同步請求,告訴它全域性事務開始提交了,不出意外的情況下返回分支狀態是二階段提交成功。當所有分支都提交成功,則返回true,於是全域性事務提交成功,返回全域性事務狀態為已提交。如果有分支提交失敗,則返回false,全域性事務提交失敗,返回全域性狀態為提交失敗。如果拋異常了,則會有定時任務稍後重試提交。
3.2. 非同步提交
非同步提交只是將全域性狀態置為非同步提交中,剩下的事情交給定時任務去執行
啟動的時候呼叫了DefaultCoordinator#init()方法,啟動定時任務
每次,定時任務執行前,要先獲取一把分散式鎖,這個鎖是io.seata.core.store.DistributedLocker,不是分支註冊時的那把鎖io.seata.core.lock.Locker
非同步提交首先將全域性狀態設定為AsyncCommitting,返回返回全域性狀態Committed。後臺有定時任務掃描,找到所有狀態為AsyncCommitting的全域性事務,迴圈遍歷。對於每個全域性事務提交,呼叫DefaultCore#doGlobalCommit(),遍歷所有已註冊的分支事務,向分支事務發請求,通知其提交事務,分支事務返回二階段提交成功,表示該分支事務提交成功,當所有分支事務都二階段提交成功,則全域性事務提交成功。
4. 回滾全域性事務
回滾,首先檢查全域性事務狀態是否為Begin,不是的話直接結束。遍歷當前全域性事務中已註冊的分支事務,依次給每個分支發請求,告訴分支事務需要回滾。如果所有分支返回回滾成功,則全域性回滾成功。如果有分支回滾失敗且不重試,則直接直接結束。如果失敗且可重試,或者執行過程中拋異常,則稍後會有定時任務重試回滾操作。
5. 分支上報
RM向TC報告分支事務狀態
只是更新一下分支狀態及相關資料
6. 查詢全域性事務狀態
7. 查詢全域性鎖
挑RedisLocker看一下吧
關於Seata Server 的原始碼學習就先到這裡,歡迎交流,多謝點贊 (^_^)