Seata原始碼分析(一). AT模式底層實現

2022-05-25 12:00:54


以AT為例,我們使用Seata時只需要在全域性事務的方法上加上@GlobalTransactional,就開啟了全域性事務的支援。那麼Seata的底層到底是怎麼實現的呢?

首先我們知道,Seata也是一個SpringBoot專案,如果對Seata原始碼無從下手,那麼不妨從Spring切入:

GlobalTransactionScanner

在Seata中有一個重要的bean元件:GlobalTranscationScanner:

全域性事務掃描器,它繼承了AbstractAutoProxyCreator, InitializingBean, ApplicationContextAware, DisposableBean介面。這四個都是spring的類,所以想要知道這個GlobalTransactionScanner做了什麼工作,我們首先得介紹一下spring知識:

  • AbstractAutoProxyCreator: 是spring的AOP的一個核心類。

  • InitializingBean:此介面為Bean提供了初始化方法的方式,只包含afterPropertiesSet方法

  • DisposableBean:這個介面和InitializingBean是一組的.它只包含destroy方法,作用是為Bean生命週期結束前做一些收尾工作。

  • ApplicationContextAware:實現了這個介面的類可以方便地獲得ApplicationContext中的Bean。 可以簡單理解為它就是一個spring容器

可以看出來,GlobalTransactionScanner繼承了AOP,可以對Bean進行增強,同時還相當於一個spring容器。接下我們來看它做了實現這些介面都做了什麼事。

繼承AbstractAutoProxyCreator

實現AOP後,我們重點來看下被重寫的wrapIfNecessary, 它是AOP中核心的方法

1.wrapIfNecessary()

 @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // do checkers
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy:檢查是否為TCC(這裡我們研究AT,就不看TCC了)
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                  //...
                } else { 
				// 不是TCC模式:
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
		//判斷是否有相關事務的註解,如GlobalTransactional,如果沒有就不會代理,直接返回bean
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
				//發現存在事務相關注解的bean,則新增攔截器———GlobalTransactionalInterceptor
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
				// ...
                PROXYED_SET.add(beanName);
                return bean;
            }
        }
    }

說明: 此方法對被全域性事務註解的方法Bean進行了增強。具體實現是將攔截器織入代理物件。

2.invoke()

globalTransactionalInterceptor實現了MethodInterceptor這個介面,此介面只有一個方法#invoke():

   @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
	// 獲取註解標註的執行方法
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
			// 獲取全域性事務GlobalTransactional的後設資料
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
			// 獲取全域性鎖GlobalLock的後設資料.全域性鎖會將本地事務的執行納入到seata的管理,一起競爭全域性鎖,
			//保證全域性事務在執行時,不會收到本地其他事務的影響。(隔離性)
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
             if (globalTransactionalAnnotation != null) {
			 		//執行全域性事務
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
				//執行全域性鎖
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();
    }

說明:

3.handleGlobalTransaction()
我們重點關注執行全域性事務的方法handleGlobalTransaction().它的作用是獲取事務資訊並且執行全域性事務:

 Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        boolean succeed = true;
        try {
			// 呼叫execute方法,執行全域性事務
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
				// 獲取事務名稱
                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
				//獲取事務資訊,並封裝成TransactionInfo物件
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
             //...
            }
        } finally {
          //...
        }
    }

說明:

  1. 這個方法的主要工作是獲取事務的名稱和資訊,並開啟全域性事務.
  2. 全域性事務的開啟呼叫了transactionalTemplate中的execute()方法.下面繼續進入execute方法:

4.execute()

 public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo 獲取事務資訊
        TransactionInfo txInfo = business.getTransactionInfo();
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
		// 獲取當前事務,主要獲取XId
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation. 設定不同事務的傳播行為,執行不同邏輯.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
		//Spring事務的7種傳播行為
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 如果當前事務未空,則新建立一個.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }
            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
				// 開啟全域性事務.
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
					// 發生異常,全域性回滾.各個資料根據undo_log進行補償.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
				// 如果沒有異常發生,則提交全域性事務
                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        }
}

說明:

  1. 看到這裡可能有些似曾相似,這個流程下來不就是AT模式的2個階段嘛.我們探究到了AT模式的具體實現!

  2. 在此方法中,我們終於看到了開啟全域性事務的關鍵方法: beginTransaction(). 不過,我們知道TM要開啟全域性事務,首先得向TC發起請求. 說明我們還得進入beginTransaction()方法中一探究竟,這裡面還呼叫了不少方法,大家看的時候可以只看註釋的一行往下推進:

4.1 beginTransaction()

   private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
			//對TC發起請求
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } //...
    }


   @Override
    public void begin(int timeout, String name) throws TransactionException {
		// 判斷事務的發起者是不是TM,如果不是拋異常 
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        String currentXid = RootContext.getXID();  //獲取Xid
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
		//呼叫transactionManager.begin()
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }

	@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
		// 關鍵:syncCall 同步請求
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

	 private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
		// 通過Netty發起請求
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }

實現InitializingBean介面

InitializingBean介面只有一個方法afterPropertiesSet(),GlobalTransactionScanner對它進行了重寫:

說明: 呼叫了initCLient方法:初始化了TM和RM

   private void initClient() {
      	 // ....
        //init TM
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
      	// 紀錄檔
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        // 紀錄檔
        registerSpringShutdownHook();

    }

初始化TM:

    public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
	// 獲取TMRpc使用者端範例
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
		//初始化 TM Client
        tmNettyRemotingClient.init();
    }

呼叫 TmRpcClient.getInstance() 方法會獲取一個 TM 使用者端範例.
在獲取過程中,會建立 Netty 使用者端組態檔物件,以及建立 messageExecutor 執行緒池,該執行緒池用於在處理各種與伺服器端的訊息互動,在建立 TmRpcClient 範例時,建立 ClientBootstrap,用於管理 Netty 服務的啟停,以及 ClientChannelManager,它是專門用於管理 Netty 使用者端物件池.

初始化TM使用者端:
To Be Continue...

寫在最後

博主也是剛開始學習Seata,程式設計功力不太深,很多程式碼的精妙之處也看不出來.

參考連結:
Seata AT 模式啟動原始碼分析
視訊,本文很多都是從這裡整理的