Seata Server 1.5.2 原始碼學習

2022-11-11 21:01:03

Seata 包括 Server端和Client端。Seata中有三種角色:TC、TM、RM,其中,Server端就是TC,TM和RM屬Client端。Client端的原始碼學習上一篇已講過,詳見 《Seata 1.5.2原始碼學習》,今天來學習Server端的原始碼。

原始碼下載地址:https://github.com/seata/seata

啟動類 ServerApplication 沒什麼好說的,重點是ServerRunner

ServerRunner 是一個 CommandLineRunner 範例,因此在Spring Boot啟動完成後會回撥其run()方法。而在ServerRunner的run()方法中呼叫了Server.start()方法。

在Server#start()方法中,初始化了包括id生成器在內的很多元件,我們先不管這些,重點關注以下幾行程式碼:

NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

DefaultCoordinator是一個單例Bean,在整個應用中只有一個DefaultCoordinator範例

DefaultCoordinator 實現了 TransactionMessageHandler

NettyRemotingServer#setHandler()設定的正是TransactionMessageHandler

DefaultCoordinator#onRequest()

重點是這三行:

AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
transactionRequest.handle(context);

DefaultCoordinator實現了TCInboundHandler介面,所以它不僅是一個TransactionMessageHandler,還是一個TCInboundHandler

這裡transactionRequest.setTCInboundHandler(this),就是指定AbstractTransactionRequestToTC中的TCInboundHandler設定為DefaultCoordinator

AbstractTransactionRequest#handle()

不同的請求分發給對應的處理器去處理

現在請求和對應的處理器都有了,下面具體看一下每種請求都是如何被處理的

1. 開啟全域性事務

開啟事務直接呼叫子類DefaultCoordinator#doGlobalBegin(),同時放在一個處理模板中執行

在doGlobalBean()中呼叫DefaultCore#begin()並返回全域性事務ID(xid)

new GlobalSession()

新增一個SessionManager作為Session的監聽器

Core

總結一下,開啟事務:

  1. 建立一個GlobalSession
  2. 給GlobalSession新增一個監聽器SessionManager
  3. session.begin()

開啟事務,建立一個全域性事務,如果是seata.store.mode=db的話,向global_table表插入一條記錄

2. 分支事務註冊

DefaultCore#branchRegister()

如果是AT模式,這裡呼叫的就是ATCore#branchSessionLock()

ATCore#branchSessionLock()檢查是否拿到鎖了

具體每種鎖的實現就不往下看了,挑其中一個看下,就RedisLocker吧

總之,分支註冊的時候需要檢查鎖,拿到本次事務中所涉及的所有需要加鎖的行的鎖才能註冊成功

所有行都加鎖成功,分支註冊才算成功,才會返回true

再回到AbstractCore#branchRegister(),整個方法是放在SessionHolder#lockAndExecute()中執行的

總結一下,分支註冊:

  1. 建立一個BranchSession
  2. 加鎖,獲取所有行的鎖
  3. 將BranchSession加到GlobalSession中
  4. 返回branchId

分支註冊,建立BranchSession,獲取全域性鎖成功後將branchSession加入globalSession

3. 提交全域性事務

首先判斷全域性事務狀態是否為begin,如果不是則不應該提交。如果是,則將事務active置為false,釋放全域性鎖,判斷是否可以非同步提交。分支型別是AT的都可以非同步提交,因此AT模式,預設是非同步提交。如果不能非同步提交,則採取同步提交。

3.1. 同步提交

遍歷所有已註冊的分支事務,向分支傳送同步請求,告訴它全域性事務開始提交了,不出意外的情況下返回分支狀態是二階段提交成功。當所有分支都提交成功,則返回true,於是全域性事務提交成功,返回全域性事務狀態為已提交。如果有分支提交失敗,則返回false,全域性事務提交失敗,返回全域性狀態為提交失敗。如果拋異常了,則會有定時任務稍後重試提交。

3.2. 非同步提交

非同步提交只是將全域性狀態置為非同步提交中,剩下的事情交給定時任務去執行

啟動的時候呼叫了DefaultCoordinator#init()方法,啟動定時任務

每次,定時任務執行前,要先獲取一把分散式鎖,這個鎖是io.seata.core.store.DistributedLocker,不是分支註冊時的那把鎖io.seata.core.lock.Locker

非同步提交首先將全域性狀態設定為AsyncCommitting,返回返回全域性狀態Committed。後臺有定時任務掃描,找到所有狀態為AsyncCommitting的全域性事務,迴圈遍歷。對於每個全域性事務提交,呼叫DefaultCore#doGlobalCommit(),遍歷所有已註冊的分支事務,向分支事務發請求,通知其提交事務,分支事務返回二階段提交成功,表示該分支事務提交成功,當所有分支事務都二階段提交成功,則全域性事務提交成功。

4. 回滾全域性事務

回滾,首先檢查全域性事務狀態是否為Begin,不是的話直接結束。遍歷當前全域性事務中已註冊的分支事務,依次給每個分支發請求,告訴分支事務需要回滾。如果所有分支返回回滾成功,則全域性回滾成功。如果有分支回滾失敗且不重試,則直接直接結束。如果失敗且可重試,或者執行過程中拋異常,則稍後會有定時任務重試回滾操作。

5. 分支上報

RM向TC報告分支事務狀態

只是更新一下分支狀態及相關資料

6. 查詢全域性事務狀態

7. 查詢全域性鎖

挑RedisLocker看一下吧

關於Seata Server 的原始碼學習就先到這裡,歡迎交流,多謝點贊 (^_^)