作者:京東零售 徐強 黃威 張均傑
部門中維護了一個老系統,功能都耦合在一個單體應用中(300+介面),表也放在同一個庫中(200+表),導致系統存在很多風險和缺陷。經常出現問題:如資料庫的單點、效能問題,應用的擴充套件受限,複雜性高等問題。
從下圖可見。各業務相互耦合無明確邊界,呼叫阿關係錯綜複雜。
隨著業務快速發展,各種問題越來越明顯,急需對系統進行微服務改造優化。經過思考,整體改造將分為三個階段進行:
資料庫拆分:資料庫按照業務垂直拆分。
應用拆分:應用按照業務垂直拆分。
資料存取許可權收口:資料許可權按照各自業務領域,歸屬到各自的應用,應用與資料庫一對一,禁止交叉存取。
單體資料庫的痛點:未進行業務隔離,一個慢SQL易導致系統整體出現問題;吞吐量高,讀寫壓力大,效能下降;
根據業務劃分,我們計劃將資料庫拆分為9個業務庫。資料同步方式採用主從複製的方式,並且通過binlog過濾將對應的表和資料同步到對應的新資料庫中。
如果一個介面中操作了多張表,之前這些表屬於同一個庫,資料庫拆分後可能會分屬於不同的庫。所以需要針對程式碼進行相應的改造。
目前存在問題的位置:
資料來源選擇:系統之前是支援多資料來源切換的,在service上新增註解來選擇資料來源。資料庫拆分後出現的情況是同一個service中操作的多個mapper從屬於不同的庫。
事務:事務註解目前是存在於service上的,並且事務會快取資料庫連結,一個事務內不支援同時操作多個資料庫。
改造點梳理:
同時寫入多個庫,且是同一事務的介面6個:需改造資料來源,需改造事務,需要關注分散式事務;
同時寫入多個庫,且不是同一事務的介面50+:需改造資料來源,需改造事務,無需關注分散式事務;
同時讀取多個庫 或 讀取一個庫寫入另一個庫的介面200+:需改造資料來源,但無需關注事務;
涉及多個庫的表的聯合查詢8個:需進行程式碼邏輯改造
梳理方式:
採用部門中的切面工具,抓取入口和表的呼叫關係(可識別表的讀/寫操作),找到一個介面中操作了多個表,並且多個表分屬於不同業務庫的情況;
分散式事務:
進行應用拆分和資料收口之後,是不存在分散式事務的問題的,因為操作第二個庫會呼叫對應系統的RPC介面進行操作。所以本次不會正式支援分散式事務,而是採用程式碼邏輯保證一致性的方式來解決;
將service中分別操作多個庫的mapper,抽取成多個Service。分別新增切換資料來源註解和事務註解。
問題:改動位置多,涉及改動的每個方法都需要梳理歷史業務;service存在很多巢狀呼叫的情況,有時難以理清邏輯;修改200+位置改動工作量大,風險高;
如圖所示,方案二將資料來源註解移動到Mapper上,並使用自定義的事務實現來處理事務。
將多資料來源註解放到Mapper上的好處是,不需要梳理程式碼邏輯,只需要在Mapper上新增對應資料來源名稱即可。但是這樣又有新的問題出現,
問題1:如上圖,事務的是設定在Service層,當事務開啟時,資料來源的連線並沒有獲取到,因為真正的資料來源設定在Mapper上。所以會報錯,這個錯誤可以通過多資料來源元件的預設資料來源功能解決。
問題2:mybatis的事務實現會快取資料庫連結。當第一次快取了資料庫連結後,後續設定在mapper上的資料來源註解並不會重新獲取資料庫連結,而是直接使用快取起來的資料庫連結。如果後續的mapper要操作其餘資料庫,會出現找不到表的情況。鑑於以上問題,我們開發了一個自定義的事務實現類,用來解決這個問題。
下面將對方案中出現的兩個元件進行簡要說明原理。
多資料來源元件是單個應用連線多個資料來源時使用的工具,其核心原理是通過組態檔將資料庫連結在程式啟動時初始化好,在執行到存在註解的方法時,通過切面獲取當前的資料來源名稱來切換資料來源,當一次呼叫涉及多個資料來源時,會利用棧的特性解決資料來源巢狀的問題。
/**
* 切面方法
*/
public Object switchDataSourceAroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
//獲取資料來源的名字
String dsName = getDataSourceName(pjp);
boolean dataSourceSwitched = false;
if (StringUtils.isNotEmpty(dsName)
&& !StringUtils.equals(dsName, StackRoutingDataSource.getCurrentTargetKey())) {
// 見下一段程式碼
StackRoutingDataSource.setTargetDs(dsName);
dataSourceSwitched = true;
}
try {
// 執行切面方法
return pjp.proceed();
} catch (Throwable e) {
throw e;
} finally {
if (dataSourceSwitched) {
StackRoutingDataSource.clear();
}
}
}
public static void setTargetDs(String dbName) {
if (dbName == null) {
throw new NullPointerException();
}
if (contextHolder.get() == null) {
contextHolder.set(new Stack<String>());
}
contextHolder.get().push(dbName);
log.debug("set current datasource is " + dbName);
}
StackRoutingDataSource繼承 AbstractRoutingDataSource類,AbstractRoutingDataSource是spring-jdbc包提供的一個了AbstractDataSource的抽象類,它實現了DataSource介面的用於獲取資料庫連結的方法。
從方案二的圖中可以看到預設的事務實現使用的是mybatis的SpringManagedTransaction。
如上圖,Transaction和SpringManagedTransaction都是mybatis提供的類,他提供了介面供SqlSession使用,處理事務操作。
通過下邊的一段程式碼可以看到,事務物件中存在connection變數,首次獲得資料庫連結後,後續當前事務內的所有資料庫操作都不會重新獲取資料庫連結,而是會使用現有的資料庫連結,從而無法支援跨庫操作。
public class SpringManagedTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(SpringManagedTransaction.class);
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public SpringManagedTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
}
// 下略
}
MultiDataSourceManagedTransaction是我們自定義的事務實現,繼承自SpringManagedTransaction類,並在內部支援維護多個資料庫連結。每次執行資料庫操作時,會根據資料來源名稱判斷,如果當前資料來源沒有快取的連結則重新獲取連結。這樣,service上的事務註解其實控制了多個單庫事務,且作用域範圍相同,一起進行提交或回滾。
程式碼如下:
public class MultiDataSourceManagedTransaction extends SpringManagedTransaction {
private DataSource dataSource;
public ConcurrentHashMap<String, Connection> CON_MAP = new ConcurrentHashMap<>();
public MultiDataSourceManagedTransaction(DataSource dataSource) {
super(dataSource);
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
Method getCurrentTargetKey;
String dataSourceKey;
try {
getCurrentTargetKey = dataSource.getClass().getDeclaredMethod("getCurrentTargetKey");
getCurrentTargetKey.setAccessible(true);
dataSourceKey = (String) getCurrentTargetKey.invoke(dataSource);
} catch (Exception e) {
log.error("MultiDataSourceManagedTransaction invoke getCurrentTargetKey 異常", e);
return null;
}
if (CON_MAP.get(dataSourceKey) == null) {
Connection connection = dataSource.getConnection();
if (!TransactionSynchronizationManager.isActualTransactionActive()) {
connection.setAutoCommit(true);
} else {
connection.setAutoCommit(false);
}
CON_MAP.put(dataSourceKey, connection);
return connection;
}
return CON_MAP.get(dataSourceKey);
}
@Override
public void commit() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
Connection value = entry.getValue();
if (!value.isClosed() && !value.getAutoCommit()) {
value.commit();
}
}
}
@Override
public void rollback() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
Connection value = entry.getValue();
if (value == null) {
continue;
}
if (!value.isClosed() && !value.getAutoCommit()) {
entry.getValue().rollback();
}
}
}
@Override
public void close() throws SQLException {
if (CON_MAP == null || CON_MAP.size() == 0) {
return;
}
Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
for (Map.Entry<String, Connection> entry : entries) {
DataSourceUtils.releaseConnection(entry.getValue(), this.dataSource);
}
CON_MAP.clear();
}
}
注:上面並不是分散式事務。在資料收口之前,它只存在於同一個JVM中。如果專案允許,可以考慮使用Atomikos和Mybatis整合的方案。
本次進行了很多程式碼改造,如何保證資料安全,保證資料不丟失,我們的機制如下,分為三種情況進行討論:
跨庫事務:6處,採用了程式碼保證一致性的改造方式;上線前經過重點測試,保證邏輯無問題;
單庫事務:依賴於自定義事務實現,針對自定義事務實現這一個類進行充分測試即可,測試範圍小,安全性有保障;
其餘單表操作:相關修改是在mapper上新增了資料來源切換註解,改動位置幾百處,幾乎是無腦改動,但也存在遺漏或錯改的可能;測試同學可以覆蓋到核心業務流程,但邊緣業務可能會遺漏;我們新增了線上監測機制,當出現找不到表的錯誤時(說明資料來源切換註解新增錯誤),記錄當前執行sql並報警,我們進行邏輯修復與資料處理。
綜上,通過對三種情況的處理來保證資料的安全性。
系統接近單體架構,存在以下風險:
系統性風險:一個元件缺陷會導致整個程序崩潰,如記憶體漏失、死鎖。
複雜性高:系統程式碼繁多,每次修改程式碼都心驚膽戰,任何一個bug都可能導致整個系統崩潰,不敢優化程式碼導致程式碼可讀性也越來越差。
測試環境衝突,測試效率低:業務都耦合在一個系統,只要有需求就會出現環境搶佔,需要額外拉分支合併程式碼。
與資料庫拆分相同,系統拆分也是根據業務劃分拆成9個新系統。
方案一:搭建空的新系統,然後將老系統的相關程式碼挪到新系統。
優點:一步到位。
缺點:需要主觀挑選程式碼,然後挪到新系統,可視為做了全量業務邏輯的變動,需要全量測試,風險高,週期長。
方案二:從老系統原樣複製出9個新系統,然後直接上線,通過流量路由將老系統流量轉發到新系統,後續再對新系統的冗餘程式碼做刪減。
優點:拆分速度快, 首次上線前無業務邏輯改動,風險低;後續刪減程式碼時依據介面呼叫量情況來判定,也可視為無業務邏輯的改動,風險較低,並且各系統可各自進行,無需整體排期, 較為靈活。
缺點:分為了兩步,拆分上線和刪減程式碼
我們在考慮拆分風險和拆分效率後,最終選擇了方案二。
1、搭建新系統
直接複製老系統程式碼,修改系統名稱,部署即可
2、流量路由
路由器是拆分的核心,負責分發流量到新系統,同時需要支援識別測試流量,讓測試同學可以提前線上上測試新系統。我們這邊用filter來作為路由器的,原始碼見下方。
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws ServletException, IOException {
HttpServletRequest servletRequest = (HttpServletRequest) request;
HttpServletResponse servletResponse = (HttpServletResponse) response;
// 路由開關(0-不路由, 1-根據指定請求頭路由, 2-全量路由)
final int systemRouteSwitch = configUtils.getInteger("system_route_switch", 1);
if (systemRouteSwitch == 0) {
filterChain.doFilter(request, response);
return;
}
// 只路由測試流量
if (systemRouteSwitch == 1) {
// 檢查請求頭是否包含測試流量標識 包含才進行路由
String systemRoute = ((HttpServletRequest) request).getHeader("systemRoute");
if (systemRoute == null || !systemRoute.equals("1")) {
filterChain.doFilter(request, response);
return;
}
}
String systemRouteMapJsonStr = configUtils.getString("route.map", "");
Map<String, String> map = JSONObject.parseObject(systemRouteMapJsonStr, Map.class);
String rootUrl = map.get(servletRequest.getRequestURI());
if (StringUtils.isEmpty(rootUrl)) {
log.error("路由失敗,本地服務內部處理。原因:請求地址對映不到對應系統, uri : {}", servletRequest.getRequestURI());
filterChain.doFilter(request, response);
return;
}
String targetURL = rootUrl + servletRequest.getRequestURI();
if (servletRequest.getQueryString() != null) {
targetURL = targetURL + "?" + servletRequest.getQueryString();
}
RequestEntity<byte[]> requestEntity = null;
try {
log.info("路由開始 targetURL = {}", targetURL);
requestEntity = createRequestEntity(servletRequest, targetURL);
ResponseEntity responseEntity = restTemplate.exchange(requestEntity, byte[].class);
if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
log.info("路由完成-請求資訊: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()));
} else {
log.info("路由完成-請求資訊: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL);
}
HttpHeaders headers = responseEntity.getHeaders();
String resp = null;
if (responseEntity.getBody() != null && headers != null && headers.get("Content-Encoding") != null && headers.get("Content-Encoding").contains("gzip")) {
byte[] bytes = new byte[30 * 1024];
int len = new GZIPInputStream(new ByteArrayInputStream((byte[]) responseEntity.getBody())).read(bytes, 0, bytes.length);
resp = new String(bytes, 0, len);
}
log.info("路由完成-響應資訊: targetURL = {}, headers = {}, resp = {}", targetURL, JSON.toJSONString(headers), resp);
if (headers != null && headers.containsKey("Location") && CollectionUtils.isNotEmpty(headers.get("Location"))) {
log.info("路由完成-需要重定向到 {}", headers.get("Location").get(0));
((HttpServletResponse) response).sendRedirect(headers.get("Location").get(0));
}
addResponseHeaders(servletRequest, servletResponse, responseEntity);
writeResponse(servletResponse, responseEntity);
} catch (Exception e) {
if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
log.error("路由異常-請求資訊: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()), e);
} else {
log.error("路由異常-請求資訊: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL, e);
}
response.setCharacterEncoding("UTF-8");
((HttpServletResponse) response).addHeader("Content-Type", "application/json");
response.getWriter().write(JSON.toJSONString(ApiResponse.failed("9999", "網路繁忙哦~,請您稍後重試")));
}
}
3、介面抓取&歸類
路由filter是根據介面路徑將請求分發到各個新系統的,所以需要抓取一份介面和新系統的對映關係。
我們這邊自定義了一個註解@TargetSystem,用註解標識介面應該路由到的目標系統域名,
@TargetSystem(value = "http://order.demo.com")
@GetMapping("/order/info")
public ApiResponse orderInfo(String orderId) {
return ApiResponse.success();
}
然後遍歷獲取所有controller根據介面地址和註解生成路由對映關係map
/**
* 生成路由對映關係MAP
* key:介面地址 ,value:路由到目標新系統的域名
*/
public Map<String, String> generateRouteMap() {
Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
Set<Map.Entry<RequestMappingInfo, HandlerMethod>> entries = handlerMethods.entrySet();
Map<String, String> map = new HashMap<>();
for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : entries) {
RequestMappingInfo key = entry.getKey();
HandlerMethod value = entry.getValue();
Class declaringClass = value.getMethod().getDeclaringClass();
TargetSystem targetSystem = (TargetSystem) declaringClass.getAnnotation(TargetSystem.class);
String targetUrl = targetSystem.value();
String s1 = key.getPatternsCondition().toString();
String url = s1.substring(1, s1.length() - 1);
map.put(url, targetUrl);
}
return map;
}
4、測試流量識別
測試可以用利用抓包工具charles,為每個請求都新增固定的請求頭,也就是測試流量標識,路由器攔截請求後判斷請求頭內是否包含測試流量標,包含就路由到新系統,不包含就是線上流量留在老系統執行。
5、需求程式碼合併
執行系統拆分的過程中,還是有需求正在並行開發,並且需求程式碼是寫在老系統的,系統拆分完成上線後,需要將這部分需求的程式碼合併到新系統,同時要保證git版本記錄不能丟失,那應該怎麼做呢?
我們利用了git可以新增多個多個遠端倉庫來解決需求合併的痛點,命令:git remote add origin 倉庫地址,把新系統的git倉庫地址新增為老系統git的遠端倉庫,老系統的git變動就可以同時push到所有新系統的倉庫內,新系統pull下程式碼後進行合併。
6、上線風險
風險一:JOB在新老系統並行執行。新系統是複製的老系統,JOB也會複製過來,導致新老系統有相同的JOB,如果這時候上線新系統,新系統的JOB就會執行,老系統的JOB也一直在run,這樣一個JOB就會執行2次。新系統剛上線還沒經過測試驗證,這時候執行JOB是有可能失敗的。以上2種情況都會引起線上Bug,影響系統穩定性。
風險二:新系統提前消費MQ。和風險一一樣,新系統監聽和老系統一樣的topic,如果新系統直接上線,訊息是有可能被新系統消費的,新系統剛上線還沒經過測試驗證,消費訊息有可能會出異常,造成訊息丟失或其他問題,影響系統穩定性。
如何解決以上2個上線風險呢?
我們用「動態開關」解決了上述風險,為新老系統的JOB和MQ都加了開關,用開關控制JOB和MQ在新/老系統執行。上線後新系統的JOB和MQ都是關掉的,待QA測試通過後,把老系統的JOB和MQ關掉,把新系統的JOB和MQ開啟就可以了。
拆分的時候已經梳理出了一份「入口對映關係map」,每個新系統只需要保留自己系統負責的介面、JOB、MQ程式碼就可以了,除此之外都可以進行刪除。
系統架構更合理,可用性更高:即使某個服務掛了也不會導致整個系統崩潰
複雜性可控:每個系統都是單一職責,系統邏輯清晰
系統效能提升上限大:可以針對每個系統做優化,如加快取
測試環境衝突的問題解決,不會因為多個系統需求並行而搶佔環境
資料存取許可權未收口:一個業務的資料庫被其餘業務應用直接存取,未通過rpc介面將資料存取許可權收口到資料擁有方自己的應用。資料存取邏輯分散,存在業務耦合,阻礙後續迭代和優化。
問題產生的背景:之前是單體應用和單體資料庫,未進行業務隔離。在進行資料庫拆分和系統拆分時,為解決系統穩定性的問題需快速上線,所以未優化拆分後跨業務存取資料庫的情況。本階段是對資料庫拆分和應用拆分的延伸和補充。
進行比對,如程式入口歸類和呼叫的業務DB歸類不一致,則認為Dao方法需提供RPC介面
經統計,應用存取非本業務資料庫的位置有260+。由於涉及位置多,人工改造成本高、效率較低,且有錯改和漏掉的風險,我們採用了開發工具,用工具進行程式碼生成和批次修改的方式進行改造。
讀取需要生成RPC介面的Dao檔案,進行解析
獲取檔名稱,Dao方法列表,import導包列表等,放入ClassContext上下文
匹配api、rpc檔案模板,從classContext內取值替換模板變數,通過package路徑生成java檔案到指定服務內
批次將服務內Dao名稱字尾替換為Rpc服務名,減少人工改動風險,例:SettleRuleDao -> SettleRuleRpc
名詞解釋:
ftl:Freemarker模板的檔案字尾名,FreeMarker是一個模版引擎,一個基於文字的模板輸出工具。
interfaceName:用存放api檔名稱
className:用於存放serviceImpl檔名稱
methodList:用於存放方法列表,包含入參、出參、返回值等資訊
importList:用於存放api和impl檔案內其他參照實體的導包路徑
apiPackage:用於存放生成的Api介面類包名
implPackage:用於存放生成的Api實現類包名
rpcPackage:用於存放生成的rpc呼叫類包名
資料操作統一走RPC層處理,初期階段RPC層兼顧RPC呼叫,也有之前的DAO呼叫,使用開關切換。
RPC層進行雙讀,進行Api層和Dao層返回結果的比對,前期優先返回Dao層結果,驗證無問題後,在全量返回RPC的結果,清除其他業務資料庫連線。
支援開關一鍵切換,按流量進行灰度,降低資料收口風險
業務資料解耦,資料操作統一由各自垂直系統進行,入口統一
方便後續在介面粒度上增加快取和降級處理
以上,是我們對單體系統的改造過程,經過了三步優化、上線,將單體系統平滑過渡到了微服務結構。解決了資料庫的單點問題、效能問題,應用業務得到了簡化,更利於分工,迭代。並且可以針對各業務單獨進行優化升級,擴容、縮容,提升了資源的利用率。