微服務拆分治理最佳實踐

2023-02-22 21:02:14

作者:京東零售 徐強 黃威 張均傑

背景

部門中維護了一個老系統,功能都耦合在一個單體應用中(300+介面),表也放在同一個庫中(200+表),導致系統存在很多風險和缺陷。經常出現問題:如資料庫的單點、效能問題,應用的擴充套件受限,複雜性高等問題。

從下圖可見。各業務相互耦合無明確邊界,呼叫阿關係錯綜複雜。

隨著業務快速發展,各種問題越來越明顯,急需對系統進行微服務改造優化。經過思考,整體改造將分為三個階段進行:

  • 資料庫拆分:資料庫按照業務垂直拆分。

  • 應用拆分:應用按照業務垂直拆分。

  • 資料存取許可權收口:資料許可權按照各自業務領域,歸屬到各自的應用,應用與資料庫一對一,禁止交叉存取。

資料庫拆分

單體資料庫的痛點:未進行業務隔離,一個慢SQL易導致系統整體出現問題;吞吐量高,讀寫壓力大,效能下降;

資料庫改造

根據業務劃分,我們計劃將資料庫拆分為9個業務庫。資料同步方式採用主從複製的方式,並且通過binlog過濾將對應的表和資料同步到對應的新資料庫中。

程式碼改造方案

如果一個介面中操作了多張表,之前這些表屬於同一個庫,資料庫拆分後可能會分屬於不同的庫。所以需要針對程式碼進行相應的改造。

目前存在問題的位置:

  • 資料來源選擇:系統之前是支援多資料來源切換的,在service上新增註解來選擇資料來源。資料庫拆分後出現的情況是同一個service中操作的多個mapper從屬於不同的庫。

  • 事務:事務註解目前是存在於service上的,並且事務會快取資料庫連結,一個事務內不支援同時操作多個資料庫。

改造點梳理:

  • 同時寫入多個庫,且是同一事務的介面6個:需改造資料來源,需改造事務,需要關注分散式事務;

  • 同時寫入多個庫,且不是同一事務的介面50+:需改造資料來源,需改造事務,無需關注分散式事務;

  • 同時讀取多個庫 或 讀取一個庫寫入另一個庫的介面200+:需改造資料來源,但無需關注事務;

  • 涉及多個庫的表的聯合查詢8個:需進行程式碼邏輯改造

梳理方式:

採用部門中的切面工具,抓取入口和表的呼叫關係(可識別表的讀/寫操作),找到一個介面中操作了多個表,並且多個表分屬於不同業務庫的情況;

分散式事務:

進行應用拆分和資料收口之後,是不存在分散式事務的問題的,因為操作第二個庫會呼叫對應系統的RPC介面進行操作。所以本次不會正式支援分散式事務,而是採用程式碼邏輯保證一致性的方式來解決;

方案一

將service中分別操作多個庫的mapper,抽取成多個Service。分別新增切換資料來源註解和事務註解。

問題:改動位置多,涉及改動的每個方法都需要梳理歷史業務;service存在很多巢狀呼叫的情況,有時難以理清邏輯;修改200+位置改動工作量大,風險高;

方案二


如圖所示,方案二將資料來源註解移動到Mapper上,並使用自定義的事務實現來處理事務。

將多資料來源註解放到Mapper上的好處是,不需要梳理程式碼邏輯,只需要在Mapper上新增對應資料來源名稱即可。但是這樣又有新的問題出現,

  • 問題1:如上圖,事務的是設定在Service層,當事務開啟時,資料來源的連線並沒有獲取到,因為真正的資料來源設定在Mapper上。所以會報錯,這個錯誤可以通過多資料來源元件的預設資料來源功能解決。

  • 問題2:mybatis的事務實現會快取資料庫連結。當第一次快取了資料庫連結後,後續設定在mapper上的資料來源註解並不會重新獲取資料庫連結,而是直接使用快取起來的資料庫連結。如果後續的mapper要操作其餘資料庫,會出現找不到表的情況。鑑於以上問題,我們開發了一個自定義的事務實現類,用來解決這個問題。

下面將對方案中出現的兩個元件進行簡要說明原理。

多資料來源元件

多資料來源元件是單個應用連線多個資料來源時使用的工具,其核心原理是通過組態檔將資料庫連結在程式啟動時初始化好,在執行到存在註解的方法時,通過切面獲取當前的資料來源名稱來切換資料來源,當一次呼叫涉及多個資料來源時,會利用棧的特性解決資料來源巢狀的問題。

/**
 * 切面方法
 */
public Object switchDataSourceAroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
        //獲取資料來源的名字
        String dsName = getDataSourceName(pjp);
        boolean dataSourceSwitched = false;
        if (StringUtils.isNotEmpty(dsName)
                && !StringUtils.equals(dsName, StackRoutingDataSource.getCurrentTargetKey())) {
            // 見下一段程式碼
            StackRoutingDataSource.setTargetDs(dsName);
            dataSourceSwitched = true;
        }
        try {
            // 執行切面方法
            return pjp.proceed();
        } catch (Throwable e) {
            throw e;
        } finally {
            if (dataSourceSwitched) {
                StackRoutingDataSource.clear();
            }
        }
    }

public static void setTargetDs(String dbName) {
    if (dbName == null) {
	throw new NullPointerException();
    }
    if (contextHolder.get() == null) {
        contextHolder.set(new Stack<String>());
    }
    contextHolder.get().push(dbName);
    log.debug("set current datasource is " + dbName);
}

StackRoutingDataSource繼承 AbstractRoutingDataSource類,AbstractRoutingDataSource是spring-jdbc包提供的一個了AbstractDataSource的抽象類,它實現了DataSource介面的用於獲取資料庫連結的方法。

自定義事務實現

從方案二的圖中可以看到預設的事務實現使用的是mybatis的SpringManagedTransaction。


如上圖,Transaction和SpringManagedTransaction都是mybatis提供的類,他提供了介面供SqlSession使用,處理事務操作。
通過下邊的一段程式碼可以看到,事務物件中存在connection變數,首次獲得資料庫連結後,後續當前事務內的所有資料庫操作都不會重新獲取資料庫連結,而是會使用現有的資料庫連結,從而無法支援跨庫操作。


public class SpringManagedTransaction implements Transaction {

  private static final Log LOGGER = LogFactory.getLog(SpringManagedTransaction.class);

  private final DataSource dataSource;

  private Connection connection;

  private boolean isConnectionTransactional;

  private boolean autoCommit;

  public SpringManagedTransaction(DataSource dataSource) {
    notNull(dataSource, "No DataSource specified");
    this.dataSource = dataSource;
  }
  // 下略
}

MultiDataSourceManagedTransaction是我們自定義的事務實現,繼承自SpringManagedTransaction類,並在內部支援維護多個資料庫連結。每次執行資料庫操作時,會根據資料來源名稱判斷,如果當前資料來源沒有快取的連結則重新獲取連結。這樣,service上的事務註解其實控制了多個單庫事務,且作用域範圍相同,一起進行提交或回滾。

程式碼如下:

public class MultiDataSourceManagedTransaction extends SpringManagedTransaction {
    private DataSource dataSource;

    public ConcurrentHashMap<String, Connection> CON_MAP = new ConcurrentHashMap<>();


    public MultiDataSourceManagedTransaction(DataSource dataSource) {
        super(dataSource);
        this.dataSource = dataSource;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Method getCurrentTargetKey;
        String dataSourceKey;
        try {
            getCurrentTargetKey = dataSource.getClass().getDeclaredMethod("getCurrentTargetKey");
            getCurrentTargetKey.setAccessible(true);
            dataSourceKey = (String) getCurrentTargetKey.invoke(dataSource);
        } catch (Exception e) {
            log.error("MultiDataSourceManagedTransaction invoke getCurrentTargetKey 異常", e);
            return null;
        }

        if (CON_MAP.get(dataSourceKey) == null) {
            Connection connection = dataSource.getConnection();
            if (!TransactionSynchronizationManager.isActualTransactionActive()) {
                connection.setAutoCommit(true);
            } else {
                connection.setAutoCommit(false);
            }
            CON_MAP.put(dataSourceKey, connection);
            return connection;
        }

        return CON_MAP.get(dataSourceKey);
    }

    @Override
    public void commit() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            Connection value = entry.getValue();
            if (!value.isClosed() && !value.getAutoCommit()) {
                value.commit();
            }
        }
    }

    @Override
    public void rollback() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            Connection value = entry.getValue();
            if (value == null) {
                continue;
            }
            if (!value.isClosed() && !value.getAutoCommit()) {
                entry.getValue().rollback();
            }
        }
    }

    @Override
    public void close() throws SQLException {
        if (CON_MAP == null || CON_MAP.size() == 0) {
            return;
        }
        Set<Map.Entry<String, Connection>> entries = CON_MAP.entrySet();
        for (Map.Entry<String, Connection> entry : entries) {
            DataSourceUtils.releaseConnection(entry.getValue(), this.dataSource);
        }
        CON_MAP.clear();
    }
}

注:上面並不是分散式事務。在資料收口之前,它只存在於同一個JVM中。如果專案允許,可以考慮使用Atomikos和Mybatis整合的方案。

資料安全性

本次進行了很多程式碼改造,如何保證資料安全,保證資料不丟失,我們的機制如下,分為三種情況進行討論:

  • 跨庫事務:6處,採用了程式碼保證一致性的改造方式;上線前經過重點測試,保證邏輯無問題;

  • 單庫事務:依賴於自定義事務實現,針對自定義事務實現這一個類進行充分測試即可,測試範圍小,安全性有保障;

  • 其餘單表操作:相關修改是在mapper上新增了資料來源切換註解,改動位置幾百處,幾乎是無腦改動,但也存在遺漏或錯改的可能;測試同學可以覆蓋到核心業務流程,但邊緣業務可能會遺漏;我們新增了線上監測機制,當出現找不到表的錯誤時(說明資料來源切換註解新增錯誤),記錄當前執行sql並報警,我們進行邏輯修復與資料處理。

綜上,通過對三種情況的處理來保證資料的安全性。

應用拆分

系統接近單體架構,存在以下風險:

  1. 系統性風險:一個元件缺陷會導致整個程序崩潰,如記憶體漏失、死鎖。

  2. 複雜性高:系統程式碼繁多,每次修改程式碼都心驚膽戰,任何一個bug都可能導致整個系統崩潰,不敢優化程式碼導致程式碼可讀性也越來越差。

  3. 測試環境衝突,測試效率低:業務都耦合在一個系統,只要有需求就會出現環境搶佔,需要額外拉分支合併程式碼。

拆分方案

與資料庫拆分相同,系統拆分也是根據業務劃分拆成9個新系統。

方案一:搭建空的新系統,然後將老系統的相關程式碼挪到新系統。

  • 優點:一步到位。

  • 缺點:需要主觀挑選程式碼,然後挪到新系統,可視為做了全量業務邏輯的變動,需要全量測試,風險高,週期長。

方案二:從老系統原樣複製出9個新系統,然後直接上線,通過流量路由將老系統流量轉發到新系統,後續再對新系統的冗餘程式碼做刪減。

  • 優點:拆分速度快, 首次上線前無業務邏輯改動,風險低;後續刪減程式碼時依據介面呼叫量情況來判定,也可視為無業務邏輯的改動,風險較低,並且各系統可各自進行,無需整體排期, 較為靈活。

  • 缺點:分為了兩步,拆分上線和刪減程式碼

我們在考慮拆分風險和拆分效率後,最終選擇了方案二。

拆分實踐

1、搭建新系統

直接複製老系統程式碼,修改系統名稱,部署即可

2、流量路由

路由器是拆分的核心,負責分發流量到新系統,同時需要支援識別測試流量,讓測試同學可以提前線上上測試新系統。我們這邊用filter來作為路由器的,原始碼見下方。

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws ServletException, IOException {
    HttpServletRequest servletRequest = (HttpServletRequest) request;
    HttpServletResponse servletResponse = (HttpServletResponse) response;

    // 路由開關(0-不路由, 1-根據指定請求頭路由, 2-全量路由)
    final int systemRouteSwitch = configUtils.getInteger("system_route_switch", 1);
    if (systemRouteSwitch == 0) {
        filterChain.doFilter(request, response);
        return;
    }
    // 只路由測試流量
    if (systemRouteSwitch == 1) {
        // 檢查請求頭是否包含測試流量標識 包含才進行路由
        String systemRoute = ((HttpServletRequest) request).getHeader("systemRoute");
        if (systemRoute == null || !systemRoute.equals("1")) {
            filterChain.doFilter(request, response);
            return;
        }
    }

    String systemRouteMapJsonStr = configUtils.getString("route.map", "");
    Map<String, String> map = JSONObject.parseObject(systemRouteMapJsonStr, Map.class);
    String rootUrl = map.get(servletRequest.getRequestURI());

    if (StringUtils.isEmpty(rootUrl)) {
        log.error("路由失敗,本地服務內部處理。原因:請求地址對映不到對應系統, uri : {}", servletRequest.getRequestURI());
        filterChain.doFilter(request, response);
        return;
    }

    String targetURL = rootUrl + servletRequest.getRequestURI();
    if (servletRequest.getQueryString() != null) {
        targetURL = targetURL + "?" + servletRequest.getQueryString();
    }
    RequestEntity<byte[]> requestEntity = null;
    try {
        log.info("路由開始 targetURL = {}", targetURL);
        requestEntity = createRequestEntity(servletRequest, targetURL);
        ResponseEntity responseEntity = restTemplate.exchange(requestEntity, byte[].class);
        if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
            log.info("路由完成-請求資訊: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()));
        } else {
            log.info("路由完成-請求資訊: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL);
        }

        HttpHeaders headers = responseEntity.getHeaders();
        String resp = null;
        if (responseEntity.getBody() != null && headers != null && headers.get("Content-Encoding") != null && headers.get("Content-Encoding").contains("gzip")) {
            byte[] bytes = new byte[30 * 1024];
            int len = new GZIPInputStream(new ByteArrayInputStream((byte[]) responseEntity.getBody())).read(bytes, 0, bytes.length);
            resp = new String(bytes, 0, len);
        }

        log.info("路由完成-響應資訊: targetURL = {}, headers = {}, resp = {}", targetURL, JSON.toJSONString(headers), resp);
        if (headers != null && headers.containsKey("Location") && CollectionUtils.isNotEmpty(headers.get("Location"))) {
            log.info("路由完成-需要重定向到 {}", headers.get("Location").get(0));
            ((HttpServletResponse) response).sendRedirect(headers.get("Location").get(0));
        }
        addResponseHeaders(servletRequest, servletResponse, responseEntity);
        writeResponse(servletResponse, responseEntity);
    } catch (Exception e) {
        if (requestEntity != null && requestEntity.getBody() != null && requestEntity.getBody().length > 0) {
            log.error("路由異常-請求資訊: requestEntity = {}, body = {}", requestEntity.toString(), new String(requestEntity.getBody()), e);
        } else {
            log.error("路由異常-請求資訊: requestEntity = {}", requestEntity != null ? requestEntity.toString() : targetURL, e);
        }
        response.setCharacterEncoding("UTF-8");
        ((HttpServletResponse) response).addHeader("Content-Type", "application/json");
        response.getWriter().write(JSON.toJSONString(ApiResponse.failed("9999", "網路繁忙哦~,請您稍後重試")));
    }
}

3、介面抓取&歸類

路由filter是根據介面路徑將請求分發到各個新系統的,所以需要抓取一份介面和新系統的對映關係。
我們這邊自定義了一個註解@TargetSystem,用註解標識介面應該路由到的目標系統域名,

@TargetSystem(value = "http://order.demo.com")
@GetMapping("/order/info")
public ApiResponse orderInfo(String orderId) {
    return ApiResponse.success();
}

然後遍歷獲取所有controller根據介面地址和註解生成路由對映關係map

    /**
     * 生成路由對映關係MAP
     * key:介面地址 ,value:路由到目標新系統的域名
     */
    public Map<String, String> generateRouteMap() {
        Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
        Set<Map.Entry<RequestMappingInfo, HandlerMethod>> entries = handlerMethods.entrySet();
        Map<String, String> map = new HashMap<>();
        for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : entries) {
            RequestMappingInfo key = entry.getKey();
            HandlerMethod value = entry.getValue();
            Class declaringClass = value.getMethod().getDeclaringClass();
            TargetSystem targetSystem = (TargetSystem) declaringClass.getAnnotation(TargetSystem.class);
            String targetUrl = targetSystem.value();
            String s1 = key.getPatternsCondition().toString();
            String url = s1.substring(1, s1.length() - 1);
            map.put(url, targetUrl);
        }
        return map;
    }

4、測試流量識別

測試可以用利用抓包工具charles,為每個請求都新增固定的請求頭,也就是測試流量標識,路由器攔截請求後判斷請求頭內是否包含測試流量標,包含就路由到新系統,不包含就是線上流量留在老系統執行。

5、需求程式碼合併

執行系統拆分的過程中,還是有需求正在並行開發,並且需求程式碼是寫在老系統的,系統拆分完成上線後,需要將這部分需求的程式碼合併到新系統,同時要保證git版本記錄不能丟失,那應該怎麼做呢?

我們利用了git可以新增多個多個遠端倉庫來解決需求合併的痛點,命令:git remote add origin 倉庫地址,把新系統的git倉庫地址新增為老系統git的遠端倉庫,老系統的git變動就可以同時push到所有新系統的倉庫內,新系統pull下程式碼後進行合併。

6、上線風險

風險一:JOB在新老系統並行執行。新系統是複製的老系統,JOB也會複製過來,導致新老系統有相同的JOB,如果這時候上線新系統,新系統的JOB就會執行,老系統的JOB也一直在run,這樣一個JOB就會執行2次。新系統剛上線還沒經過測試驗證,這時候執行JOB是有可能失敗的。以上2種情況都會引起線上Bug,影響系統穩定性。

風險二:新系統提前消費MQ。和風險一一樣,新系統監聽和老系統一樣的topic,如果新系統直接上線,訊息是有可能被新系統消費的,新系統剛上線還沒經過測試驗證,消費訊息有可能會出異常,造成訊息丟失或其他問題,影響系統穩定性。

如何解決以上2個上線風險呢?

我們用「動態開關」解決了上述風險,為新老系統的JOB和MQ都加了開關,用開關控制JOB和MQ在新/老系統執行。上線後新系統的JOB和MQ都是關掉的,待QA測試通過後,把老系統的JOB和MQ關掉,把新系統的JOB和MQ開啟就可以了。

系統瘦身

拆分的時候已經梳理出了一份「入口對映關係map」,每個新系統只需要保留自己系統負責的介面、JOB、MQ程式碼就可以了,除此之外都可以進行刪除。

拆分帶來的好處

  1. 系統架構更合理,可用性更高:即使某個服務掛了也不會導致整個系統崩潰

  2. 複雜性可控:每個系統都是單一職責,系統邏輯清晰

  3. 系統效能提升上限大:可以針對每個系統做優化,如加快取

  4. 測試環境衝突的問題解決,不會因為多個系統需求並行而搶佔環境

資料存取許可權收口

問題介紹

資料存取許可權未收口:一個業務的資料庫被其餘業務應用直接存取,未通過rpc介面將資料存取許可權收口到資料擁有方自己的應用。資料存取邏輯分散,存在業務耦合,阻礙後續迭代和優化。

問題產生的背景:之前是單體應用和單體資料庫,未進行業務隔離。在進行資料庫拆分和系統拆分時,為解決系統穩定性的問題需快速上線,所以未優化拆分後跨業務存取資料庫的情況。本階段是對資料庫拆分和應用拆分的延伸和補充。

改造過程

  1. RPC介面統計(如圖一)

進行比對,如程式入口歸類和呼叫的業務DB歸類不一致,則認為Dao方法需提供RPC介面

經統計,應用存取非本業務資料庫的位置有260+。由於涉及位置多,人工改造成本高、效率較低,且有錯改和漏掉的風險,我們採用了開發工具,用工具進行程式碼生成和批次修改的方式進行改造。

  1. RPC介面生成(如圖二)
  • 讀取需要生成RPC介面的Dao檔案,進行解析

  • 獲取檔名稱,Dao方法列表,import導包列表等,放入ClassContext上下文

  • 匹配api、rpc檔案模板,從classContext內取值替換模板變數,通過package路徑生成java檔案到指定服務內

  • 批次將服務內Dao名稱字尾替換為Rpc服務名,減少人工改動風險,例:SettleRuleDao -> SettleRuleRpc

名詞解釋:

  • ftl:Freemarker模板的檔案字尾名,FreeMarker是一個模版引擎,一個基於文字的模板輸出工具。

  • interfaceName:用存放api檔名稱

  • className:用於存放serviceImpl檔名稱

  • methodList:用於存放方法列表,包含入參、出參、返回值等資訊

  • importList:用於存放api和impl檔案內其他參照實體的導包路徑

  • apiPackage:用於存放生成的Api介面類包名

  • implPackage:用於存放生成的Api實現類包名

  • rpcPackage:用於存放生成的rpc呼叫類包名

  1. 灰度方案(如圖三)
  • 資料操作統一走RPC層處理,初期階段RPC層兼顧RPC呼叫,也有之前的DAO呼叫,使用開關切換。

  • RPC層進行雙讀,進行Api層和Dao層返回結果的比對,前期優先返回Dao層結果,驗證無問題後,在全量返回RPC的結果,清除其他業務資料庫連線。

  • 支援開關一鍵切換,按流量進行灰度,降低資料收口風險

收益

  1. 業務資料解耦,資料操作統一由各自垂直系統進行,入口統一

  2. 方便後續在介面粒度上增加快取和降級處理

總結

以上,是我們對單體系統的改造過程,經過了三步優化、上線,將單體系統平滑過渡到了微服務結構。解決了資料庫的單點問題、效能問題,應用業務得到了簡化,更利於分工,迭代。並且可以針對各業務單獨進行優化升級,擴容、縮容,提升了資源的利用率。