原創/朱季謙
曾經在SpringCloudAlibaba的Seata分散式事務搭建過程中,跨節點通過openfeign呼叫不同服務時,發現全域性事務XID在當前節點也就是TM處,是正常能通過RootContext.getXID()獲取到分散式全域性事務XID的,但在下游節點就出現獲取為NULL的情況,導致全域性事務失效,出現異常時無法正常回滾。
當時看了一遍原始碼,才知道問題所在,故而把這個過程瞭解到的分散式事務XID是如何跨節點傳輸的原理記錄下來。
本文預設是使用Seata的AT模式。
在那一次的搭建過程中,我設定了三個節點,分別是訂單節點order,商品庫存節點product,賬戶餘額節點account,模擬購買下單邏輯,在分散式環境下,生成一份訂單時,通過openfeign遠端扣減庫存,最後同樣通過openfeign去扣減賬戶(當然,實際場景遠不止這些,這裡只是簡單模擬這個過程)。
正常情況下,其中有一步出錯,整個全域性分散式事務就會進行回滾。
這三個節點在Seata AT模式下,流程圖是這樣的,order充當TM/RM角色,product和充當RM角色,按照在Linux伺服器上的Seats Service就充當TC角色。
首先是最初呼叫訂單節點order業務邏輯——
@Override
@Transactional
@GlobalTransactional(name = "zjq-create-order",rollbackFor = Exception.class)
public RestResponse createOrder(Orders order) {
log.info("當前的XID:"+ RootContext.getXID());
log.info("------>開始新建訂單");
//1、新建訂單
orderMapper.insert(order);
//2、扣減庫存
productService.decrease(order.getProductId(),order.getCount());
//3、扣減賬戶
accountService.decrease(order.getUserId(),order.getMoney());
......
}
在Seata,order充當了TM角色,負責生成一個全域性事務註冊到TC,TC會返回一個全域性事務ID給TM。
在該全域性事務流程裡,每一個分支模組理應都能獲取到這一個共同的全域性事務ID,在該全域性事務ID統籌下,完成分支事務的提交或者回滾。
通過RootContext.getXID()獲取到一個全域性事務ID為:192.168.1.152:8091:458311058765479936
建立訂單成功後,就會執行扣減庫存操作productService.decrease(order.getProductId(),order.getCount())。
在該程式碼案例裡,productService.decrease()內部是通過openfeign遠端去呼叫的——
@FeignClient(contextId = "remoteProductService",value = "zjq-product",fallbackFactory = RemoteProductServiceFallbackFactory.class)
public interface RemoteProductService {
@PostMapping(value = "/product/decrease")
RestResponse decrease(@RequestParam("productId")Long productId, @RequestParam("count") Integer count);
}
最終decrease的服務層虛擬碼大概如下——
@Transactional(propagation = Propagation.REQUIRES_NEW)
public int decrease(Long productId, Integer count) {
log.info("當前的XID:"+ RootContext.getXID());
log.info("---------->開始查詢商品是否存在");
log.info("---------->開始扣減庫存");
......
}
然而,到這一步,發現了一個問題,這裡獲取的全域性事務ID為null——
這說明了一個問題,TM開啟了一個全域性事務後,已經從TC那裡獲取到了一個全域性事務ID,但遠端傳送給product這個RM資源管理器後,沒有傳送成功,同理,另一個分支事務account模組的,同樣獲取到的全域性事務ID為null。
基於這樣一個現象,我就開始嘗試研究了一下全域性事務是如何在Openfeign跨節點環境進行傳輸和獲取的,主要分為TM節點的全域性事務ID傳送和遠端RM節點的接收。
通過debug程式碼去閱讀,在呼叫 productService.decrease(order.getProductId(),order.getCount())時,內部做了反射呼叫,執行了一系列方案呼叫,呼叫核心過程如下——
本文只需要關注在整個HTTP呼叫過程,全域性事務ID是如何放進來的,這個呼叫鏈涉及的類及方法,在後續學習中再進一步研究。
最終在SeataFeignClient的execute方法裡,可以看到以下原始碼——
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = this.getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
其中,在Request modifiedRequest = this.getModifyRequest(request)這行程式碼裡,對請求頭做了一些補充操作。
private Request getModifyRequest(Request request) {
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
} else {
Map<String, Collection<String>> headers = new HashMap(16);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList();
seataXid.add(xid);
headers.put("TX_XID", seataXid);
return Request.create(request.method(), request.url(), headers, request.body(), request.charset());
}
}
debug到這裡,可以看到,這裡將一個全域性事務ID儲存到了headers裡——
這個headers其實是HTTP組裝的請求頭,可以看到,這裡是將全域性事務ID放到了HTTP請求頭裡,傳送給了遠端機器。
Request(HttpMethod method, String url, Map<String, Collection<String>> headers, Body body, RequestTemplate requestTemplate) {
this.httpMethod = (HttpMethod)Util.checkNotNull(method, "httpMethod of %s", new Object[]{method.name()});
this.url = (String)Util.checkNotNull(url, "url", new Object[0]);
this.headers = (Map)Util.checkNotNull(headers, "headers of %s %s", new Object[]{method, url});
this.body = body;
this.requestTemplate = requestTemplate;
}
通過debug,可以發現,在HTTP組裝過程中,已經將全域性事務ID放到了請求頭裡,說明在HTTP發生成功後,是會攜帶全域性事務到遠端product模組的,但是為何product模組列印RootContext.getXID()得到的是null呢?
HTTP請求傳送到遠端product模組後,在呼叫具體的Controller前,會流轉到MVC進行攔截轉發,在這過程當中,涉及到seata分散式事務時,理應會有這樣一個叫TransactionPropagationInterceptor的攔截器,用來處理分散式事務的傳播,有兩個方法,分別是preHandle()和afterCompletion(),暫時只需要關注preHandle方法即可:
在處理遠端請求之前被呼叫,在該方法中,通過RootContext.getXID()獲取到當前執行緒上下文中的全域性事務ID和通過request.getHeader("TX_XID")獲取HTTP請求頭中的事務ID。這裡的請求頭裡的事務ID,正是前面傳送HTTP時放到請求頭裡的。
若RootContext.getXID()獲取到當前執行緒上下文中的全域性事務ID為空並且HTTP請求頭的事務ID不為空,就會將該HTTP請求頭裡的事務ID繫結到該執行緒上下文當中,用於確保全域性事務的傳播和關聯。
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader("TX_XID");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
}
if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
RootContext.bind(rpcXid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] to RootContext", rpcXid);
}
}
return true;
}
進入到bind方法當中,可以看到,這裡是HTTP請求頭裡的事務ID快取到了 CONTEXT_HOLDER.put("TX_XID", xid),它本質其實是一個ThreadLocal,可以儲存執行緒隔離的變數。
public static void bind(@Nonnull String xid) {
if (StringUtils.isBlank(xid)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid is blank, switch to unbind operation!");
}
unbind();
} else {
MDC.put("X-TX-XID", xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind {}", xid);
}
CONTEXT_HOLDER.put("TX_XID", xid);
}
}
快取成功後,下一次通過 RootContext.getXID()就能獲取到該執行緒快取的全域性事務ID了, RootContext.getXID()本質就是——
public static String getXID() {
return (String)CONTEXT_HOLDER.get("TX_XID");
}
在本次搭建seata環境中,發現該TransactionPropagationInterceptor過濾器當中的preHandle方法一直沒有執行,這就造成全域性事務當中,遠端跨環境的分支事務節點一直無法獲取到全域性事務ID。
於是,我嘗試手動將該TransactionPropagationInterceptor攔截器加入到Spring MVC流程中——
@Configuration
public class WebMvcInterceptorsConfig extends WebMvcConfigurationSupport {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new TransactionPropagationInterceptor());
}
@Bean
public ServerCodecConfigurer serverCodecConfigurer() {
return ServerCodecConfigurer.create();
}
}
重新執行後,這次攔截器TransactionPropagationInterceptor終於生效裡,可以debug到了preHandle方法裡,將HTTP請求頭的全域性事務ID取出,然後通過RootContext.bind(rpcXid)快取到執行緒上下文當中——
這時,product節點終於能拿到從TM遠端傳送過來的全域性事務ID了——
最後總結一下,全域性事務ID在SpringCloudAlibaba Seata在Openfeign跨節點環境裡的傳送方式,是將該全域性事務ID放入到HTTP請求頭當中,遠端傳送給分支事務節點,各分支事務節點會在TransactionPropagationInterceptor攔截器當中,取出HTTP請求頭大全域性事務ID,通過RootContext.bind(rpcXid)將全域性事務ID快取到執行緒上下文裡,這樣,分支事務就可以在其執行過程當中,獲取到全域性事務ID啦。