資料不會無緣無故丟失,也不會莫名其妙增加
1、曾幾何時,知了在一家小公司做專案的時候,都是一個服務打天下,所以涉及到資料一致性的問題,都是直接用本地事務處理。
2、隨著時間的推移,使用者量增大了,發現一個Java服務扛不住了,於是技術大佬決定對於系統進行升級。根據系統的業務對於單體的一個服務進行拆分,然後對於開發人員也進行劃分,一個開發人員只開發和維護一個或幾個服務中的問題,大家各司其職,分工合作。
3、當然服務拆分不是一蹴而就的,這是一個耗時耗力的龐大工程,大多數系統都是進行多輪拆分,而後慢慢形成一個穩定的系統。遵守一個核心思想:先按總體業務進行一輪拆分,後面再根據拆分後的服務模組,進行一個細緻的拆分。
4、隨著服務拆分之後,使用者量是抗住了,但是發現資料都在不同的服務中存取,這就引出了一個新的問題:跨伺服器,如何保證資料的一致性?當然,跨服務的分散式系統中不僅僅這個問題,還有其他的一些列問題,如:服務可用性、服務容錯性、服務間呼叫的網路問題等等,這裡只討論資料一致性問題。
5、說到資料一致性,大致分為三種:強一致性、弱一致性、最終一致性。
從這三種一致型的模型上來說,我們可以看到,弱一致性和最終一致性一般來說是非同步冗餘的,而強一致性是同步冗餘的,非同步處理帶來了更好的效能,但也需要處理資料的補償。同步意味著簡單,但也必然會降低系統的效能。
上述說的資料一致性問題,其實也就是在說分散式事務的問題,現在有一些解決方案,相信大家多多少少都看到過,這裡帶大家回顧下。
2PC是一種強一致性設計方案,通過引入一個事務協調器來協調各個本地事務(也稱為事務參與者)的提交和回滾。
2PC主要分為2個階段:
1、第一階段:事務協調器會向每個事務參與者發起一個開啟事務的命令,每個事務參與者執行準備操作,然後再向事務協調器回覆是否準備完成。但是不會提交本地事務,但是這個階段資源是需要被鎖住的。
2、第二階段:事務協調器收到每個事務參與者的回覆後,統計每個參與者的回覆,如果每個參與者都回復「可以提交」,那麼事務協調器會傳送提交命令,參與者正式提交本地事務,釋放所有資源,結束全域性事務。但是有一個參與者回覆「拒絕提交」,那麼事務協調器傳送回滾命令,所有參與者都回滾本地事務,待全部回滾完成,釋放資源,取消全域性事務。
事務提交流程
事務回滾流程
當然2PC存在的問題這裡也提一下,一個是同步阻塞,這個會消耗效能。另一個是協調器故障問題,一旦協調器發生故障,那麼所有的參與者處理資源鎖定狀態,那麼所有參與者都會被阻塞。
3PC主要是在2PC的基礎上做了改進,主要為了解決2PC的阻塞問題。它主要是將2PC的第一階段分為2個步驟,先準備,再鎖定資源,並且引入了超時機制(這也意味著會造成資料不一致)。3PC的三個階段包括:CanCommit
、PreCommit
和 DoCommit
具體細節就不展開贅述了,就一個核心觀點:在CanCommit的時候並不鎖定資源,除非所有參與者都同意了,才開始鎖資源。
相比較前面的2PC和3PC,TCC和那哥倆的本質區別就是它是業務層面的分散式事務,而2PC和3PC是資料庫層面的。TCC是三個單詞的縮寫:Try
、Confirm
、Cancel
,也分為這三個流程。
Try:嘗試,即嘗試預留資源,鎖定資源
Confirm:確認,即執行預留的資源,如果執行失敗會重試
Cancel:取消,復原預留的資源,如果執行失敗會重試
從上圖可知,TCC對於業務的侵入是很大的,而且緊緊的耦合在一起。TCC相比較2PC和3PC,試用範圍更廣,可實現跨庫,跨不同系統去實現分散式事務。缺點是要在業務程式碼中去開發大量的邏輯實現這三個步驟,需要和程式碼耦合在一起,提高開發成本。
事務紀錄檔:在TCC模式中,事務發起者和事務參與者都會去記錄事務紀錄檔(事務狀態、資訊等)。這個事務紀錄檔是整個分散式事務出現意外情況(宕機、重啟、網路中斷等),實現提交和回滾的關鍵。
冪等性:在TCC第二階段,confirm或者cancel的時候,這兩個操作都需要保證冪等性。一旦由於網路等原因導致執行失敗,就會發起不斷重試。
防懸掛:由於網路的不可靠性,有異常情況的時候,try請求可能比cancel請求更晚到達。cancel可能會執行空回滾,但是try請求被執行的時候也不會預留資源。
關於seata這裡就不多提了,用的最多的是AT模式,上回知了逐步分析過,設定完後只需要在事務發起的方法上新增@GlobalTransactional
註解就可以開啟全域性事務,對於業務無侵入,低耦合。感興趣的話請參考之前討論Seata的內容。
知了之前在一家公司遇到過這樣的業務場景;使用者通過頁面投保,提交一筆訂單過來,這個訂單通過上游服務,處理保單相關的業務邏輯,最後流入下游服務,處理業績、人員晉升、分潤處理等等業務。對於這個場景,兩邊處理的業務邏輯不在同一個服務中,接入的是不同的資料庫。涉及到資料一致性問題,需要用到分散式事務。
對於上面介紹的幾種方案,只是討論了理論和思路,下面我來總結下這個業務場景中運用的一種實現方案。採用了本地訊息表+MQ非同步訊息的方案實現了事務最終一致性,也符合當時的業務場景,相對強一致性,實現的效能較高。下面是該方案的思路圖
對於落地實現,話不多說,直接上程式碼。先定義兩張表tb_order和tb_notice_message,分別存訂單資訊和本地事務資訊
CREATE TABLE `tb_order` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
`user_id` int(11) NOT NULL COMMENT '下單人id',
`order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '訂單編號',
`insurance_amount` decimal(16,2) NOT NULL COMMENT '保額',
`order_amount` decimal(16,2) DEFAULT NULL COMMENT '保費',
`create_time` datetime DEFAULT NULL COMMENT '建立時間',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
`is_delete` tinyint(4) DEFAULT '0' COMMENT '刪除標識:0-不刪除;1-刪除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
`type` tinyint(4) NOT NULL COMMENT '業務型別:1-下單',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '狀態:1-待處理,2-已處理,3-預警',
`data` varchar(255) NOT NULL COMMENT '資訊',
`retry_count` tinyint(4) DEFAULT '0' COMMENT '重試次數',
`create_time` datetime NOT NULL COMMENT '建立時間',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
`is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '刪除標識:0-不刪除;1-刪除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
處理訂單service,這裡可以用到我們之前說過的裝飾器模式,去裝飾這個service。把儲存本地事務,傳送mq訊息,交給裝飾器類去做,而service只需要關心業務邏輯即可,也符合開閉原則。
/**
* @author 往事如風
* @version 1.0
* @date 2022/12/13 10:58
* @description
*/
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {
private final OrderMapper orderMapper;
/**
* 訂單處理方法:只處理訂單關聯邏輯
* @param o
* @return
*/
@Override
public Order handle(Object o) {
// 訂單資訊
Order order = Order.builder()
.orderNo("2345678")
.createTime(LocalDateTime.now())
.userId(1)
.insuranceAmount(new BigDecimal(2000000))
.orderAmount(new BigDecimal(5000))
.build();
orderMapper.insert(order);
return order;
}
}
新增OrderService
的裝飾類OrderServiceDecorate
,負責對訂單邏輯的擴充套件,這裡是新增本地事務訊息,以及傳送MQ資訊,擴充套件方法新增了Transactional
註解,確保訂單邏輯和本地事務訊息的資料在同一個事務中進行,確保原子性。其中事務訊息標記處理中,待下游服務處理完業務邏輯,再更新處理完成。
/**
* @author 往事如風
* @version 1.0
* @date 2022/12/14 18:48
* @description
*/
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {
private final NoticeMessageMapper noticeMessageMapper;
private final RabbitTemplate rabbitTemplate;
/**
* 裝飾方法:對訂單處理邏輯進行擴充套件
* @param o
* @return
*/
@Override
@Transactional
public Object handle(Object o) {
// 呼叫service方法,實現保單邏輯
Order order = (Order) service.handle(o);
// 擴充套件:1、儲存事務訊息,2、傳送MQ訊息
// 本地事務訊息
String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
NoticeMessage noticeMessage = NoticeMessage.builder()
.retryCount(0)
.data(data)
.status(1)
.type(1)
.createTime(LocalDateTime.now())
.build();
noticeMessageMapper.insert(noticeMessage);
// 傳送mq訊息
log.info("傳送mq訊息....");
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
return null;
}
}
關於這個裝飾者模式,之前有講到過,可以看下之前釋出的內容。
下游服務監聽訊息,處理完自己的業務邏輯後(如:業績、分潤、晉升等),需要傳送MQ,上游服務監聽訊息,更新本地事務狀態為已處理。這需要注意的是下游服務需要做冪等處理,防止異常情況下,上游服務資料的重試。
/**
* @author 往事如風
* @version 1.0
* @date 2022/12/13 18:07
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
public void orderHandler(String msg) {
log.info("監聽到訂單訊息:{}", msg);
// 需要注意冪等,冪等邏輯
log.info("下游服務業務邏輯。。。。。");
JSONObject json = JSONUtil.parseObj(msg);
rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
}
}
這裡插個題外話,關於冪等的處理,我這裡大致有兩種思路
1、比如根據訂單號查一下記錄是否存在,存在就直接返回成功。
2、redis存一個唯一的請求號,處理完再刪除,不存在請求號的直接返回成功,可以寫個AOP去處理,與業務隔離。
言歸正傳,上游服務訊息監聽,下游傳送MQ訊息,更新本地事務訊息為已處理,分散式事務流程結束。
/**
* @author 往事如風
* @version 1.0
* @date 2022/12/13 18:29
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {
@Autowired
private NoticeMessageMapper noticeMessageMapper;
@RabbitHandler
public void updateOrder(Integer msgId) {
log.info("監聽訊息,更新本地事務訊息,訊息id:{}", msgId);
NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
noticeMessageMapper.updateById(msg);
}
}
存在異常情況時,會通過定時任務,輪詢的往MQ中傳送訊息,盡最大努力去讓下游服務達到資料一致,當然重試也要設定上限;若達到上限以後還一直是失敗,那不得不考慮是下游服務自身存在問題了(有可能就是程式碼邏輯存在問題)。
/**
* @author 往事如風
* @version 1.0
* @date 2022/12/14 10:25
* @description
*/
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {
private final RabbitTemplate rabbitTemplate;
private final NoticeMessageMapper noticeMessageMapper;
/**
* 最大自動重試次數
*/
private final Integer MAX_RETRY_COUNT = 5;
@Scheduled(cron = "0/20 * * * * ? ")
public void retry() {
log.info("定時任務,重試異常訂單");
LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
wrapper.eq(NoticeMessage::getStatus, 1);
List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
for (NoticeMessage noticeMessage : noticeMessages) {
// 重新傳送mq訊息
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
// 重試次數+1
noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
noticeMessageMapper.updateById(noticeMessage);
// 判斷重試次數,等於最長限制次數,直接更新為報警狀態
if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
noticeMessage.setStatus(3);
noticeMessageMapper.updateById(noticeMessage);
// 傳送告警,通知對應人員
// 告警邏輯(簡訊、郵件、企微群,等等)....
}
}
}
}
其實這裡有個問題,一個上游服務對應多個下游服務的時候。這個時候往往不能存一條本地訊息記錄。
其實分散式事務沒有一個完美的處理方案,只能說是儘量去滿足業務需求,滿足資料一致。如果程式不能處理了,最後由人工去兜底,做資料的補償方案。
程式設計檔案:
https://gitee.com/cicadasmile/butte-java-note
應用倉庫:
https://gitee.com/cicadasmile/butte-flyer-parent