分散式事務解決方案

2023-02-15 12:01:00

資料不會無緣無故丟失,也不會莫名其妙增加

一、概述

1、曾幾何時,知了在一家小公司做專案的時候,都是一個服務打天下,所以涉及到資料一致性的問題,都是直接用本地事務處理。

2、隨著時間的推移,使用者量增大了,發現一個Java服務扛不住了,於是技術大佬決定對於系統進行升級。根據系統的業務對於單體的一個服務進行拆分,然後對於開發人員也進行劃分,一個開發人員只開發和維護一個或幾個服務中的問題,大家各司其職,分工合作。

3、當然服務拆分不是一蹴而就的,這是一個耗時耗力的龐大工程,大多數系統都是進行多輪拆分,而後慢慢形成一個穩定的系統。遵守一個核心思想:先按總體業務進行一輪拆分,後面再根據拆分後的服務模組,進行一個細緻的拆分。
4、隨著服務拆分之後,使用者量是抗住了,但是發現資料都在不同的服務中存取,這就引出了一個新的問題:跨伺服器,如何保證資料的一致性?當然,跨服務的分散式系統中不僅僅這個問題,還有其他的一些列問題,如:服務可用性、服務容錯性、服務間呼叫的網路問題等等,這裡只討論資料一致性問題。
5、說到資料一致性,大致分為三種:強一致性、弱一致性、最終一致性。

  • 強一致性:資料一旦寫入,在任一時刻都能讀取到最新的值。
  • 弱一致性:當寫入一個資料的時候,其他地方去讀這些資料,可能查到的資料不是最新的
  • 最終一致性:它是弱一致性的一個變種,不追求系統任意時刻資料要達到一致,但是在一定時間後,資料最終要達到一致。

從這三種一致型的模型上來說,我們可以看到,弱一致性和最終一致性一般來說是非同步冗餘的,而強一致性是同步冗餘的,非同步處理帶來了更好的效能,但也需要處理資料的補償。同步意味著簡單,但也必然會降低系統的效能。

二、理論

上述說的資料一致性問題,其實也就是在說分散式事務的問題,現在有一些解決方案,相信大家多多少少都看到過,這裡帶大家回顧下。

2.1、二階段提交

2PC是一種強一致性設計方案,通過引入一個事務協調器來協調各個本地事務(也稱為事務參與者)的提交和回滾。
2PC主要分為2個階段:
1、第一階段:事務協調器會向每個事務參與者發起一個開啟事務的命令,每個事務參與者執行準備操作,然後再向事務協調器回覆是否準備完成。但是不會提交本地事務,但是這個階段資源是需要被鎖住的。
2、第二階段:事務協調器收到每個事務參與者的回覆後,統計每個參與者的回覆,如果每個參與者都回復「可以提交」,那麼事務協調器會傳送提交命令,參與者正式提交本地事務,釋放所有資源,結束全域性事務。但是有一個參與者回覆「拒絕提交」,那麼事務協調器傳送回滾命令,所有參與者都回滾本地事務,待全部回滾完成,釋放資源,取消全域性事務。
事務提交流程

事務回滾流程

當然2PC存在的問題這裡也提一下,一個是同步阻塞,這個會消耗效能。另一個是協調器故障問題,一旦協調器發生故障,那麼所有的參與者處理資源鎖定狀態,那麼所有參與者都會被阻塞。

2.2、三階段提交

3PC主要是在2PC的基礎上做了改進,主要為了解決2PC的阻塞問題。它主要是將2PC的第一階段分為2個步驟,先準備,再鎖定資源,並且引入了超時機制(這也意味著會造成資料不一致)。3PC的三個階段包括:CanCommitPreCommit DoCommit
具體細節就不展開贅述了,就一個核心觀點:在CanCommit的時候並不鎖定資源,除非所有參與者都同意了,才開始鎖資源

2.3、TCC柔性事務

相比較前面的2PC和3PC,TCC和那哥倆的本質區別就是它是業務層面的分散式事務,而2PC和3PC是資料庫層面的。TCC是三個單詞的縮寫:TryConfirmCancel,也分為這三個流程。
Try:嘗試,即嘗試預留資源,鎖定資源
Confirm:確認,即執行預留的資源,如果執行失敗會重試
Cancel:取消,復原預留的資源,如果執行失敗會重試

從上圖可知,TCC對於業務的侵入是很大的,而且緊緊的耦合在一起。TCC相比較2PC和3PC,試用範圍更廣,可實現跨庫,跨不同系統去實現分散式事務。缺點是要在業務程式碼中去開發大量的邏輯實現這三個步驟,需要和程式碼耦合在一起,提高開發成本。
事務紀錄檔:在TCC模式中,事務發起者和事務參與者都會去記錄事務紀錄檔(事務狀態、資訊等)。這個事務紀錄檔是整個分散式事務出現意外情況(宕機、重啟、網路中斷等),實現提交和回滾的關鍵。
冪等性:在TCC第二階段,confirm或者cancel的時候,這兩個操作都需要保證冪等性。一旦由於網路等原因導致執行失敗,就會發起不斷重試。
防懸掛:由於網路的不可靠性,有異常情況的時候,try請求可能比cancel請求更晚到達。cancel可能會執行空回滾,但是try請求被執行的時候也不會預留資源。

2.4、Seata

關於seata這裡就不多提了,用的最多的是AT模式,上回知了逐步分析過,設定完後只需要在事務發起的方法上新增@GlobalTransactional註解就可以開啟全域性事務,對於業務無侵入,低耦合。感興趣的話請參考之前討論Seata的內容。

三、應用場景

知了之前在一家公司遇到過這樣的業務場景;使用者通過頁面投保,提交一筆訂單過來,這個訂單通過上游服務,處理保單相關的業務邏輯,最後流入下游服務,處理業績、人員晉升、分潤處理等等業務。對於這個場景,兩邊處理的業務邏輯不在同一個服務中,接入的是不同的資料庫。涉及到資料一致性問題,需要用到分散式事務。

對於上面介紹的幾種方案,只是討論了理論和思路,下面我來總結下這個業務場景中運用的一種實現方案。採用了本地訊息表+MQ非同步訊息的方案實現了事務最終一致性,也符合當時的業務場景,相對強一致性,實現的效能較高。下面是該方案的思路圖

  1. 真實業務處理的狀態可能會有多種,因此需要明確哪種狀態需要定時任務補償
  2. 假如某條單據一直無法處理結束,定時任務也不能無限制下發,所以本地訊息表需要增加輪次的概念,重試多少次後告警,人工介入處理
  3. 因為MQ和定時任務的存在,難免會出現重複請求,因此下游要做好冪等防重,否則會出現重複資料,導致資料不一致

對於落地實現,話不多說,直接上程式碼。先定義兩張表tb_order和tb_notice_message,分別存訂單資訊和本地事務資訊

CREATE TABLE `tb_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
  `user_id` int(11) NOT NULL COMMENT '下單人id',
  `order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '訂單編號',
  `insurance_amount` decimal(16,2) NOT NULL COMMENT '保額',
  `order_amount` decimal(16,2) DEFAULT NULL COMMENT '保費',
  `create_time` datetime DEFAULT NULL COMMENT '建立時間',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
  `is_delete` tinyint(4) DEFAULT '0' COMMENT '刪除標識:0-不刪除;1-刪除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
  `type` tinyint(4) NOT NULL COMMENT '業務型別:1-下單',
  `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '狀態:1-待處理,2-已處理,3-預警',
  `data` varchar(255) NOT NULL COMMENT '資訊',
  `retry_count` tinyint(4) DEFAULT '0' COMMENT '重試次數',
  `create_time` datetime NOT NULL COMMENT '建立時間',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
  `is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '刪除標識:0-不刪除;1-刪除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;

處理訂單service,這裡可以用到我們之前說過的裝飾器模式,去裝飾這個service。把儲存本地事務,傳送mq訊息,交給裝飾器類去做,而service只需要關心業務邏輯即可,也符合開閉原則

/**
 * @author 往事如風
 * @version 1.0
 * @date 2022/12/13 10:58
 * @description
 */
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {

    private final OrderMapper orderMapper;

    /**
     * 訂單處理方法:只處理訂單關聯邏輯
     * @param o
     * @return
     */
    @Override
    public Order handle(Object o) {
        // 訂單資訊
        Order order = Order.builder()
                .orderNo("2345678")
                .createTime(LocalDateTime.now())
                .userId(1)
                .insuranceAmount(new BigDecimal(2000000))
                .orderAmount(new BigDecimal(5000))
                .build();
        orderMapper.insert(order);
        return order;
    }
}

新增OrderService的裝飾類OrderServiceDecorate,負責對訂單邏輯的擴充套件,這裡是新增本地事務訊息,以及傳送MQ資訊,擴充套件方法新增了Transactional註解,確保訂單邏輯和本地事務訊息的資料在同一個事務中進行,確保原子性。其中事務訊息標記處理中,待下游服務處理完業務邏輯,再更新處理完成。

/**
 * @author 往事如風
 * @version 1.0
 * @date 2022/12/14 18:48
 * @description
 */
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {

    private final NoticeMessageMapper noticeMessageMapper;

    private final RabbitTemplate rabbitTemplate;

    /**
     * 裝飾方法:對訂單處理邏輯進行擴充套件
     * @param o
     * @return
     */
    @Override
    @Transactional
    public Object handle(Object o) {
        // 呼叫service方法,實現保單邏輯
        Order order = (Order) service.handle(o);
        // 擴充套件:1、儲存事務訊息,2、傳送MQ訊息
        // 本地事務訊息
        String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
        NoticeMessage noticeMessage = NoticeMessage.builder()
                .retryCount(0)
                .data(data)
                .status(1)
                .type(1)
                .createTime(LocalDateTime.now())
                .build();
        noticeMessageMapper.insert(noticeMessage);
        // 傳送mq訊息
        log.info("傳送mq訊息....");
        rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
        return null;
    }
}

關於這個裝飾者模式,之前有講到過,可以看下之前釋出的內容。

下游服務監聽訊息,處理完自己的業務邏輯後(如:業績、分潤、晉升等),需要傳送MQ,上游服務監聽訊息,更新本地事務狀態為已處理。這需要注意的是下游服務需要做冪等處理,防止異常情況下,上游服務資料的重試。

/**
 * @author 往事如風
 * @version 1.0
 * @date 2022/12/13 18:07
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void orderHandler(String msg) {
        log.info("監聽到訂單訊息:{}", msg);
        // 需要注意冪等,冪等邏輯
        log.info("下游服務業務邏輯。。。。。");
        JSONObject json = JSONUtil.parseObj(msg);
        rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
    }
}

這裡插個題外話,關於冪等的處理,我這裡大致有兩種思路
1、比如根據訂單號查一下記錄是否存在,存在就直接返回成功。
2、redis存一個唯一的請求號,處理完再刪除,不存在請求號的直接返回成功,可以寫個AOP去處理,與業務隔離。
言歸正傳,上游服務訊息監聽,下游傳送MQ訊息,更新本地事務訊息為已處理,分散式事務流程結束。

/**
 * @author 往事如風
 * @version 1.0
 * @date 2022/12/13 18:29
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {

    @Autowired
    private NoticeMessageMapper noticeMessageMapper;

    @RabbitHandler
    public void updateOrder(Integer msgId) {
        log.info("監聽訊息,更新本地事務訊息,訊息id:{}", msgId);
        NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
        noticeMessageMapper.updateById(msg);
    }
}

存在異常情況時,會通過定時任務,輪詢的往MQ中傳送訊息,盡最大努力去讓下游服務達到資料一致,當然重試也要設定上限;若達到上限以後還一直是失敗,那不得不考慮是下游服務自身存在問題了(有可能就是程式碼邏輯存在問題)。

/**
 * @author 往事如風
 * @version 1.0
 * @date 2022/12/14 10:25
 * @description
 */
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {

    private final RabbitTemplate rabbitTemplate;

    private final NoticeMessageMapper noticeMessageMapper;

    /**
     * 最大自動重試次數
     */
    private final Integer MAX_RETRY_COUNT = 5;

    @Scheduled(cron = "0/20 * * * * ? ")
    public void retry() {
        log.info("定時任務,重試異常訂單");
        LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
        wrapper.eq(NoticeMessage::getStatus, 1);
        List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
        for (NoticeMessage noticeMessage : noticeMessages) {
            // 重新傳送mq訊息
            rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
            // 重試次數+1
            noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
            noticeMessageMapper.updateById(noticeMessage);
            // 判斷重試次數,等於最長限制次數,直接更新為報警狀態
            if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
                noticeMessage.setStatus(3);
                noticeMessageMapper.updateById(noticeMessage);
                // 傳送告警,通知對應人員
                // 告警邏輯(簡訊、郵件、企微群,等等)....
            }
        }
    }
}

其實這裡有個問題,一個上游服務對應多個下游服務的時候。這個時候往往不能存一條本地訊息記錄。

  1. 這裡可以在訊息表多加個欄位next_server_count,表示一個訂單發起方,需要呼叫的下游服務數量。上游服務監聽的時候,每次會與下游的回撥都減去1,直到數值是0的時候,再更新狀態是已處理。但是要控制並行,這個欄位是被多個下游服務共用的。
  2. 還有一種處理方案是為每個下游服務,都記錄一條事務訊息,用type欄位去區分,標記型別。實現上游和下游對於事務訊息的一對一關係。
  3. 最後,達到最大重試次數以後,可以將訊息加入到一個告警列表,這個告警列表可以展示在管理後臺或其他監控系統中,展示一些必要的資訊,去供公司內部人員去人工介入,處理這種異常的資料,使得資料達到最終一致性。

四、總結

其實分散式事務沒有一個完美的處理方案,只能說是儘量去滿足業務需求,滿足資料一致。如果程式不能處理了,最後由人工去兜底,做資料的補償方案。

五、參考原始碼

程式設計檔案:
https://gitee.com/cicadasmile/butte-java-note

應用倉庫:
https://gitee.com/cicadasmile/butte-flyer-parent