分散式事務系列文章
分散式事務 | 使用DTM 的Saga 模式
分散式事務 | 使用 dotnetcore/CAP 的本地訊息表模式
分散式事務 | 基於MassTransit的StateMachine實現Saga編排式分散式事務
分散式事務 | 基於MassTransit Courier實現Saga 編排式分散式事務
前面章節提及的MassTransit
、dotnetcore/CAP
都提供了分散式事務的處理能力,但也僅侷限於Saga和本地訊息表模式的實現。那有沒有一個獨立的分散式事務解決方案,涵蓋多種分散式事務處理模式,如Saga
、TCC
、XA
模式等。有,目前業界主要有兩種開源方案,其一是阿里開源的Seata
,另一個就是DTM
。其中Seata
僅支援Java、Go和Python語言,因此不在.NET 的選擇範圍。DTM
則通過提供簡單易用的HTTP和gRPC介面,遮蔽了語言的無關性,因此支援任何開發語言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等語言的SDK。
DTM,全稱Distributed Transaction Manager,是一個分散式事務管理器,解決跨資料庫、跨服務、跨語言更新資料的一致性問題。它提供了Saga、TCC、 XA和二階段訊息模式以滿足不同應用場景的需求,同時其首創的子事務屏障技術可以有效解決冪等、懸掛和空補償等異常問題。
那DTM是如何處理分散式事務的呢?以一個經典的跨行轉賬業務為例來看下事務處理過程。對於跨行轉賬業務而言,很顯然是跨庫跨服務的應用場景,不能簡單通過本地事務解決,可以使用Saga模式,以下是基於DTM提供的Saga事務模式成功轉賬的的時序圖:
從以上時序圖可以看出,DTM整個全域性事務分為如下幾步:
基於以上這個時序圖的基礎上,再來看下DTM的架構:
整個DTM架構中,一共有三個角色,分別承擔了不同的職責:
總體而言,AP-應用程式充當全域性事務編排器的角色通過DTM提供的開箱即用的SDK進行全域性事務和子事務的註冊。TM-事務管理器接收到註冊的全域性事務和子事務後,負責呼叫RM-資源管理器來執行對應的事務分支,TM-事務管理器根據事務分支的執行結果決定是否提及或回滾事務。
百聞不如一見,接下來就來實際上手體驗下如何基於DTM來實際應用Saga進行分散式跨行轉賬事務的處理。
接下來就來建立一個範例專案:
dotnet new webapi -n DtmDemo.Webapi
建立範例專案。Dtmcli
和Pomelo.EntityFrameworkCore.MySql
。{
"dtm": {
"DtmUrl": "http://localhost:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier",
}
}
BankAccount
實體類:namespace DtmDemo.WebApi.Models
{
public class BankAccount
{
public int Id { get; set; }
public decimal Balance { get; set; }
}
}
DtmDemoWebApiContext
資料庫上下文:using Microsoft.EntityFrameworkCore;
namespace DtmDemo.WebApi.Data
{
public class DtmDemoWebApiContext : DbContext
{
public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
: base(options)
{
}
public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
}
}
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;
var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 註冊DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{
options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});
// 註冊DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
dotnet ef migrations add 'Initial'
建立遷移。BankAccountController
如下,其中PostBankAccount
介面新增了await _context.Database.MigrateAsync();
用於自動應用遷移。using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class BankAccountsController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
public BankAccountsController(DtmDemoWebApiContext context)
{
_context = context;
}
[HttpGet]
public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
{
return await _context.BankAccount.ToListAsync();
}
[HttpPost]
public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
{
await _context.Database.MigrateAsync();
_context.BankAccount.Add(bankAccount);
await _context.SaveChangesAsync();
return Ok(bankAccount);
}
}
接下來定義SagaDemoController
來使用DTM的Saga模式來模擬跨行轉賬分散式事務:
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class SagaDemoController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
private readonly IConfiguration _configuration;
private readonly IDtmClient _dtmClient;
private readonly IDtmTransFactory _transFactory;
private readonly IBranchBarrierFactory _barrierFactory;
private readonly ILogger<BankAccountsController> _logger;
public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
{
this._context = context;
this._configuration = configuration;
this._dtmClient = dtmClient;
this._transFactory = transFactory;
this._logger = logger;
this._barrierFactory = barrierFactory;
}
}
對於跨行轉賬業務,使用DTM的Saga模式,首先要進行事務拆分,可以拆分為以下4個子事務,並分別實現:
[HttpPost("TransferOut")]
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
{
var msg = $"使用者{request.UserId}轉出{request.Amount}元";
_logger.LogInformation($"轉出子事務-啟動:{msg}");
// 1. 建立子事務屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
// 2. 在子事務屏障內執行事務操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"轉出子事務-執行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null || bankAccount.Balance < request.Amount)
throw new InvalidDataException("賬戶不存在或餘額不足!");
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"轉出子事務-失敗:{ex.Message}");
// 3. 按照介面協定,返回409,以表示子事務失敗
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"轉出子事務-成功:{msg}");
return Ok();
}
以上程式碼中有幾點需要額外注意:
_barrierFactory.CreateBranchBarrier(Request.Query)
,其中Request.Query
中的引數由DTM 生成,類似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga
,主要包含四個引數:
branchBarrier.Call(conn, async (tx) =>{}
**409**
狀態碼以告知DTM 子事務失敗。409
狀態碼。在外圍捕獲異常時切忌放大異常捕獲,比如直接catch(Exception)
,如此會捕獲由於網路等其他原因導致的異常,而導致DTM 不再自動處理該異常,比如業務異常時的自動重試。轉出補償,就是回滾轉出操作,進行賬戶餘額歸還,實現如下:
[HttpPost("TransferOut_Compensate")]
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
{
var msg = $"使用者{request.UserId}回滾轉出{request.Amount}元";
_logger.LogInformation($"轉出補償子事務-啟動:{msg}");
// 1. 建立子事務屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
// 在子事務屏障內執行事務操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"轉出補償子事務-執行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
return; //對於補償操作,可直接返回,中斷後續操作
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"轉出補償子事務-成功!");
// 2. 因補償操作必須成功,所以必須返回200。
return Ok();
}
由於DTM設計為總是執行補償,也就是說即使正向操作子事務失敗時,DTM 仍舊會執行補償邏輯。但子事務屏障會在執行時判斷正向操作的執行狀態,當子事務失敗時,並不會執行補償邏輯。
另外DTM的補償操作,是要求最終成功的,只要還沒成功,就會不斷進行重試,直到成功。因此在補償子事務中,即使補償子事務中出現業務失敗時,也必須返回**200**
。因此當出現bankAccount==null
時可以直接 return。
轉入子事務和轉出子事務的實現基本類似,都是開啟子事務屏障後,在branchBarrier.Call(conn, async tx => {}
中實現事務邏輯,並通過拋異常的方式並最終返回409
狀態碼來顯式告知DTM 子事務執行失敗。
[HttpPost("TransferIn")]
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
{
var msg = $"使用者{request.UserId}轉入{request.Amount}元";
_logger.LogInformation($"轉入子事務-啟動:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"轉入子事務-執行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
throw new InvalidDataException("賬戶不存在!");
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"轉入子事務-失敗:{ex.Message}");
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"轉入子事務-成功:{msg}");
return Ok();
}
轉入補償子事務和轉出補償子事務的實現也基本類似,都是開啟子事務屏障後,在branchBarrier.Call(conn, async tx => {}
中實現事務邏輯,並最終返回200
狀態碼來告知DTM 補償子事務執行成功。
[HttpPost("TransferIn_Compensate")]
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
{
var msg = "使用者{request.UserId}回滾轉入{request.Amount}元";
_logger.LogInformation($"轉入補償子事務-啟動:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"轉入補償子事務-執行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null) return;
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"轉入補償子事務-成功!");
return Ok();
}
拆分完子事務,最後就可以進行Saga事務編排了,其程式碼如下所示:
[HttpPost("Transfer")]
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"轉賬事務-啟動:使用者{fromUserId}轉賬{amount}元到使用者{toUserId}");
//1. 生成全域性事務ID
var gid = await _dtmClient.GenGid(cancellationToken);
var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
//2. 建立Saga
var saga = _transFactory.NewSaga(gid);
//3. 新增子事務
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
new TransferRequest(fromUserId, amount))
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
new TransferRequest(toUserId, amount))
.EnableWaitResult(); // 4. 按需啟用是否等待事務執行結果
//5. 提交Saga事務
await saga.Submit(cancellationToken);
}
catch (DtmException ex) // 6. 如果開啟了`EnableWaitResult()`,則可通過捕獲異常的方式,捕獲事務失敗的結果。
{
_logger.LogError($"轉賬事務-失敗:使用者{fromUserId}轉賬{amount}元到使用者{toUserId}失敗!");
return new BadRequestObjectResult($"轉賬失敗:{ex.Message}");
}
_logger.LogError($"轉賬事務-完成:使用者{fromUserId}轉賬{amount}元到使用者{toUserId}成功!");
return Ok($"轉賬事務-完成:使用者{fromUserId}轉賬{amount}元到使用者{toUserId}成功!");
}
主要步驟如下:
var gid =await _dtmClient.GenGid(cancellationToken);
_transFactory.NewSaga(gid);
saga.Add(string action, string compensate, object postData);
包含正向和反向子事務。EnableWaitResult()
開啟事務結果等待。saga.Submit(cancellationToken);
try...catch..
來捕獲DtmExcepiton
異常來獲取事務執行異常資訊。既然DTM作為一個獨立的服務存在,其負責通過HTTP
或gRPC
協定發起子事務的呼叫,因此首先需要啟動一個DTM範例,又由於本專案依賴MySQL,因此我們採用Docker Compose的方式來啟動專案。在Visual Studio中通過右鍵專案->Add->Docker Support->Linux
即可新增Dockerfile
如下所示:
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]
在Visual Studio中通過右鍵專案->Add Container Orchestrator Support->Docker Compose
即可新增docker-compose.yml
,由於整個專案依賴mysql
和DTM
,修改docker-compose.yml
如下所示,其中定義了三個服務:db,dtm和dtmdemo.webapi。
version: '3.4'
services:
db:
image: 'mysql:5.7'
container_name: dtm-mysql
environment:
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密碼
volumes:
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 掛載用於初始化資料庫的指令碼
ports:
- '3306:3306'
dtm:
depends_on: ["db"]
image: 'yedf/dtm:latest'
container_name: dtm-svc
environment:
IS_DOCKER: '1'
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事務資料
STORE_HOST: db # 指定MySQL服務名,這裡是db
STORE_USER: root
STORE_PASSWORD: '123456'
STORE_PORT: 3306
STORE_DB: "dtm" # 指定DTM 資料庫名
ports:
- '36789:36789' # DTM HTTP 埠
- '36790:36790' # DTM gRPC 埠
dtmdemo.webapi:
depends_on: ["dtm", "db"]
image: ${DOCKER_REGISTRY-}dtmdemowebapi
environment:
ASPNETCORE_ENVIRONMENT: docker # 設定啟動環境為docker
container_name: dtm-webapi-demo
build:
context: .
dockerfile: DtmDemo.WebApi/Dockerfile
ports:
- '31293:80' # 對映Demo:80埠到本地31293埠
- '31294:443' # 對映Demo:443埠到本地31294埠
其中dtmdemo.webapi
服務通過ASPNETCORE_ENVIRONMENT: docker
指定啟動環境為docker
,因此需要在專案下新增appsettings.docker.json
以設定應用引數:
{
"ConnectionStrings": {
"DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
},
"TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
"dtm": {
"DtmUrl": "http://dtm:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier"
}
}
另外db
服務中通過volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]
來掛載初始化指令碼,以建立DTM依賴的MySQL 儲存資料庫dtm
和範例專案使用子事務屏障需要的barrier
資料表。指令碼如下:
CREATE DATABASE IF NOT EXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`url` varchar(1024) NOT NULL COMMENT 'the url of this op',
`data` TEXT COMMENT 'request body, depreceated',
`bin_data` BLOB COMMENT 'request body',
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.kv;
CREATE TABLE IF NOT EXISTS dtm.kv (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`cat` varchar(45) NOT NULL COMMENT 'the category of this data',
`k` varchar(128) NOT NULL,
`v` TEXT,
`version` bigint(22) default 1 COMMENT 'version of the value',
create_time datetime default NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);
準備完畢,即可通過點選Visual Studio工具列的Docker Compose
的啟動按鈕,啟動後可以在Containers
視窗看到啟動了dtm-mysql
、dtm-svc
和dtm-webapi-demo
三個容器,並在瀏覽器中開啟了 http://localhost:31293/swagger/index.html Swagger 網頁。該種方式啟動專案是支援斷點偵錯專案,如下圖所示:
通過BankAccouts
控制器的POST
介面,初始化使用者1和使用者2各100元。再通過SagaDemo
控制器的/api/Transfer
介面,進行Saga事務測試。
由於使用者1和使用者2已存在,且使用者1餘額足夠, 因此該筆轉賬合法因此會成功,其執行路徑為:轉出(成功)->轉入(成功)-> 事務完成,執行紀錄檔如下圖所示:
由於使用者3不存在,因此執行路徑為:轉出(失敗)->轉出補償(成功)->事務完成。從下圖的執行紀錄檔可以看出,轉出子事務失敗,還是會呼叫對應的轉出補償操作,但子事務屏障會過進行過濾,因此實際上並不會執行真正的轉出補償邏輯,其中紅線框住的部分就是證明。
由於使用者3不存在,因此執行路徑為:轉出(成功)->轉入(失敗)->轉入補償(成功)->轉出補償(成功)->事務完成。從下圖的執行紀錄檔可以看出,轉入子事務失敗,還是會呼叫對應的轉入補償操作,但子事務屏障會過進行過濾,因此實際上並不會執行真正的轉入補償邏輯,其中紅線框住的部分就是證明。
在以上的範例中,重複提及子事務屏障,那子事務屏障具體是什麼,這裡有必要重點說明下。以上面使用者1轉賬10元到使用者3為例,整個事務流轉過程中,即轉出(成功)->轉入(失敗)->轉入補償(成功)->轉出補償(成功)->事務完成。
在提交事務之後,首先是全域性事務的落庫,主要由DTM 服務負責,主要包括兩張表:trans_global
和trans_branch_op
,DTM 依此進行子事務分支的協調。其中trans_global
會插入一條全域性事務記錄,用於記錄全域性事務的狀態資訊,如下圖1所示。trans_branch_op
表為trans_global
的子表,記錄四條子事務分支資料,如下圖2所示:
具體的服務再接收到來自Dtm的子事務分支呼叫時,每次都會往子事務屏障表barrier
中插入一條資料,如下圖所示。業務服務就是依賴此表來完成子事務的控制。
而子事務屏障的核心就是子事務屏障表唯一鍵的設計,以gid
、branch_id
、op
和barrier_id
為唯一索引,利用唯一索引,「以改代查」來避免競態條件。在跨行轉賬的Saga
範例中,子事務分支的執行步驟如下所示:
inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)
向子事務屏障表插入一條資料,有幾種情況:
每個子事務分支通過以上步驟,即可實現下圖的效果:
本文主要介紹了DTM的Saga模式的應用,基於DTM 首創的子事務屏障技術,使得開發者基於DTM 提供的SDK能夠輕鬆開發出更可靠的分散式應用,徹底將開發人員從網路異常的處理中解放出來,再也不用擔心空補償、防懸掛、冪等等分散式問題。如果要進行分散式事務框架的選型,DTM 將是不二之選。
關注我的公眾號『微服務知多少』,我們微信不見不散。
閱罷此文,如果您覺得本文不錯並有所收穫,請【打賞】或【推薦】,也可【評論】留下您的問題或建議與我交流。 你的支援是我不斷創作和分享的不竭動力!