高並行下如何避免產生重複資料?

2022-06-14 06:06:11

前言

最近測試給我提了一個bug,說我之前提供的一個批次複製商品的介面,產生了重複的商品資料。

追查原因之後發現,這個事情沒想象中簡單,可以說一波多折。

1. 需求

產品有個需求:使用者選擇一些品牌,點選確定按鈕之後,系統需要基於一份預設品牌的商品資料,複製出一批的商品。

拿到這個需求時覺得太簡單了,三下五除二就搞定。

我提供了一個複製商品的基礎介面,給商城系統呼叫。

當時的流程圖如下:

如果每次複製的商品數量不多,使用同步介面呼叫的方案問題也不大。

2. 效能優化

但由於每次需要複製的商品數量比較多,可能有幾千。

如果每次都是用同步介面的方式複製商品,可能會有效能問題。

因此,後來我把複製商品的邏輯改成使用mq非同步處理。

改造之後的流程圖:

複製商品的結果還需要通知商城系統:

這個方案看起來,挺不錯的。

但後來出現問題了。

3. 出問題了

測試給我們提了一個bug,說我之前提供的一個批次複製商品的介面,產生了重複的商品資料。

經過追查之後發現,商城系統為了效能考慮,也改成非同步了。

他們沒有在介面中直接呼叫基礎系統的複製商品介面,而是在job中呼叫的。

站在他們的視角流程圖是這樣的:

使用者呼叫商城的介面,他們會往請求記錄表中寫入一條資料,然後在另外一個job中,非同步呼叫基礎系統的介面去複製商品。

但實際情況是這樣的:商城系統內部出現了bug,在請求記錄表中,同一條請求產生了重複的資料。這樣導致的結果是,在job中呼叫基礎系統複製商品介面時,傳送了重複的請求。

剛好基礎系統現在是使用RocketMQ非同步處理的。由於商城的job一次會取一批資料(比如:20條記錄),在極短的時間內(其實就是在一個for迴圈中)多次呼叫介面,可能存在相同的請求引數連續呼叫複製商品介面情況。於是,出現了並行插入重複資料的問題。

為什麼會出現這個問題呢?

4. 多執行緒消費

RocketMQ的消費者,為了效能考慮,預設是用多執行緒並行消費的,最大支援64個執行緒。

例如:

@RocketMQMessageListener(topic = "${com.susan.topic:PRODUCT_TOPIC}",
        consumerGroup = "${com.susan.group:PRODUCT_TOPIC_GROUP}")
@Service
public class MessageReceiver implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String message = new String(message.getBody(), StandardCharsets.UTF_8);
        doSamething(message);
    }
}

也就是說,如果在極短的時間內,連續傳送重複的訊息,就會被不同的執行緒消費。

即使在程式碼中有這樣的判斷:

Product oldProduct = query(hashCode);
if(oldProduct == null) {
    productMapper.insert(product);
}

在插入資料之前,先判斷該資料是否已經存在,只有不存在才會插入。

但由於在並行情況下,不同的執行緒都判斷商品資料不存在,於是同時進行了插入操作,所以就產生了重複資料

如下圖所示:

5. 順序消費

為了解決上述並行消費重複訊息的問題,我們從兩方面著手:

  1. 商城系統修復產生重複記錄的bug。
  2. 基礎系統將訊息改成單執行緒順序消費

我仔細思考了一下,如果只靠商城系統修復bug,以後很難避免不出現類似的重複商品問題,比如:如果使用者在極短的時間內點選建立商品按鈕多次,或者商城系統主動發起重試。

所以,基礎系統還需進一步處理。

其實RocketMQ本身是支援順序消費的,需要訊息的生產者和消費者一起改。

生產者改為:

rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, new SendCallback() {
  @Override
  public void onSuccess(SendResult sendResult) {
      log.info("sendMessage success");
  }

  @Override
  public void onException(Throwable e) {
      log.error("sendMessage failed!");
  }
});

重點是要呼叫rocketMQTemplate物件的asyncSendOrderly方法,傳送順序訊息。

消費者改為:

@RocketMQMessageListener(topic = "${com.susan.topic:PRODUCT_TOPIC}",
        consumeMode = ConsumeMode.ORDERLY,
        consumerGroup = "${com.susan.group:PRODUCT_TOPIC_GROUP}")
@Service
public class MessageReceiver implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String message = new String(message.getBody(), StandardCharsets.UTF_8);
        doSamething(message);
    }
}

接收訊息的重點是RocketMQMessageListener註解中的consumeMode引數,要設定成ConsumeMode.ORDERLY,這樣就能順序消費訊息了。

修改後關鍵流程圖如下:

兩邊都修改之後,複製商品這一塊就沒有再出現重複商品的問題了。

But,修完bug之後,我又思考了良久。

複製商品只是建立商品的其中一個入口,如果有其他入口,跟複製商品功能同時建立新商品呢?

不也會出現重複商品問題?

雖說,這種概率非常非常小。

但如果一旦出現重複商品問題,後續涉及到要合併商品的資料,非常麻煩。

經過這一次的教訓,一定要防微杜漸。

不管是使用者,還是自己的內部系統,從不同的入口建立商品,都需要解決重複商品建立問題。

那麼,如何解決這個問題呢?

6. 唯一索引

解決重複商品資料問題,最快成本最低最有效的辦法是:給表建唯一索引

想法是好的,但我們這邊有個規範就是:業務表必須都是邏輯刪除

而我們都知道,要刪除表的某條記錄的話,如果用delete語句操作的話。

例如:

delete from product where id=123;

這種delete操作是物理刪除,即該記錄被刪除之後,後續通過sql語句基本查不出來。(不過通過其他技術手段可以找回,那是後話了)

還有另外一種是邏輯刪除,主要是通過update語句操作的。

例如:

update product set delete_status=1,edit_time=now(3) 
where id=123;

邏輯刪除需要在表中額外增加一個刪除狀態列位,用於記錄資料是否被刪除。在所有的業務查詢的地方,都需要過濾掉已經刪除的資料。

通過這種方式刪除資料之後,資料任然還在表中,只是從邏輯上過濾了刪除狀態的資料而已。

其實對於這種邏輯刪除的表,是沒法加唯一索引的。

為什麼呢?

假設之前給商品表中的name和model加了唯一索引,如果使用者把某條記錄刪除了,delete_status設定成1了。後來,該使用者發現不對,又重新新增了一模一樣的商品。

由於唯一索引的存在,該使用者第二次新增商品會失敗,即使該商品已經被刪除了,也沒法再新增了。

這個問題顯然有點嚴重。

有人可能會說:把name、model和delete_status三個欄位同時做成唯一索引不就行了?

答:這樣做確實可以解決使用者邏輯刪除了某個商品,後來又重新新增相同的商品時,新增不了的問題。但如果第二次新增的商品,又被刪除了。該使用者第三次新增相同的商品,不也出現問題了?

由此可見,如果表中有邏輯刪除功能,是不方便建立唯一索引的。

5. 分散式鎖

接下來,你想到的第二種解決資料重複問題的辦法可能是:加分散式鎖

目前最常用的效能最高的分散式鎖,可能是redis分散式鎖了。

使用redis分散式鎖的虛擬碼如下:

try{
  String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
  if ("OK".equals(result)) {
      doSamething();
      return true;
  }
  return false;
} finally {
    unlock(lockKey,requestId);
}  

不過需要在finally程式碼塊中釋放鎖

其中lockKey是由商品表中的name和model組合而成的,requestId是每次請求的唯一標識,以便於它每次都能正確得釋放鎖。還需要設定一個過期時間expireTime,防止釋放鎖失敗,鎖一直存在,導致後面的請求沒法獲取鎖。

如果只是單個商品,或者少量的商品需要複製新增,則加分散式鎖沒啥問題。

主要流程如下:

可以在複製新增商品之前,先嚐試加鎖。如果加鎖成功,則在查詢商品是否存在,如果不存在,則新增商品。此外,在該流程中如果加鎖失敗,或者查詢商品時不存在,則直接返回。

加分散式鎖的目的是:保證查詢商品和新增商品的兩個操作是原子性的操作。

但現在的問題是,我們這次需要複製新增的商品數量很多,如果每新增一個商品都要加分散式鎖的話,會非常影響效能。

顯然對於批次介面,加redis分散式鎖,不是一個理想的方案。

6. 統一mq非同步處理

前面我們已經聊過,在批次複製商品的介面,我們是通過RocketMQ的順序訊息,單執行緒非同步複製新增商品的,可以暫時解決商品重複的問題。

但那隻改了一個新增商品的入口,還有其他新增商品的入口。

能不能把新增商品的底層邏輯統一一下,最終都呼叫同一段程式碼。然後通過RocketMQ的順序訊息,單執行緒非同步新增商品。

主要流程如下圖所示:

這樣確實能夠解決重複商品的問題。

但同時也帶來了另外兩個問題:

  1. 現在所有的新增商品功能都改成非同步了,之前同步新增商品的介面如何返回資料呢?這就需要修改前端互動,否則會影響使用者體驗。
  2. 之前不同的新增商品入口,是多執行緒新增商品的,現在改成只能由一個執行緒新增商品,這樣修改的結果導致新增商品的整體效率降低了。

由此,綜合考慮了一下各方面因素,這個方案最終被否定了。

7. insert on duplicate key update

其實,在mysql中存在這樣的語法,即:insert on duplicate key update

在新增資料時,mysql發現資料不存在,則直接insert。如果發現資料已經存在了,則做update操作。

不過要求表中存在唯一索引PRIMARY KEY,這樣當這兩個值相同時,才會觸發更新操作,否則是插入。

現在的問題是PRIMARY KEY是商品表的主鍵,是根據雪花演演算法提前生成的,不可能產生重複的資料。

但由於商品表有邏輯刪除功能,導致唯一索引在商品表中建立不了。

由此,insert on duplicate key update這套方案,暫時也沒法用。

此外,insert on duplicate key update在高並行的情況下,可能會產生死鎖問題,需要特別注意一下。

感興趣的小夥伴,也可以找我私聊。

其實insert on duplicate key update的實戰,我在另一篇文章《我用kafka兩年踩過的一些非比尋常的坑》中介紹過的,感興趣的小夥伴,可以看看。

8. insert ignore

在mysql中還存在這樣的語法,即:insert ... ignore

在insert語句執行的過程中:mysql發現如果資料重複了,就忽略,否則就會插入。

它主要是用來忽略,插入重複資料產生的Duplicate entry 'XXX' for key 'XXXX'異常的。

不過也要求表中存在唯一索引PRIMARY KEY

但由於商品表有邏輯刪除功能,導致唯一索引在商品表中建立不了。

由此可見,這個方案也不行。

溫馨的提醒一下,使用insert ... ignore也有可能會導致死鎖

9. 防重表

之前聊過,因為有邏輯刪除功能,給商品表加唯一索引,行不通。

後面又說了加分散式鎖,或者通過mq單執行緒非同步新增商品,影響建立商品的效能。

那麼,如何解決問題呢?

我們能否換一種思路,加一張防重表,在防重表中增加商品表的name和model欄位作為唯一索引。

例如:

CREATE TABLE `product_unique` (
  `id` bigint(20) NOT NULL COMMENT 'id',
  `name` varchar(130) DEFAULT NULL COMMENT '名稱',
  `model` varchar(255)  NOT NULL COMMENT '規格',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '建立使用者id',
  `user_name` varchar(30)  NOT NULL COMMENT '建立使用者名稱稱',
  `create_date` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '建立時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_name_model` (`name`,`model`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品防重表';

其中表中的id可以用商品表的id,表中的name和model就是商品表的name和model,不過在這張防重表中增加了這兩個欄位的唯一索引。

視野一下子被開啟了。

在新增商品資料之前,先新增防重表。如果新增成功,則說明可以正常新增商品,如果新增失敗,則說明有重複資料。

防重表新增失敗,後續的業務處理,要根據實際業務需求而定。

如果業務上允許新增一批商品時,發現有重複的,直接拋異常,則可以提示使用者:系統檢測到重複的商品,請重新整理頁面重試。

例如:

try {
  transactionTemplate.execute((status) -> {
      productUniqueMapper.batchInsert(productUniqueList);
      productMapper.batchInsert(productList);
  return Boolean.TRUE;
  });
} catch(DuplicateKeyException e) {
   throw new BusinessException("系統檢測到重複的商品,請重新整理頁面重試");
}

在批次插入資料時,如果出現了重複資料,捕獲DuplicateKeyException異常,轉換成BusinessException這樣執行時的業務異常。

還有一種業務場景,要求即使出現了重複的商品,也不拋異常,讓業務流程也能夠正常走下去。

例如:

try {
  transactionTemplate.execute((status) -> {
      productUniqueMapper.insert(productUnique);
      productMapper.insert(product);
  return Boolean.TRUE;
  });
} catch(DuplicateKeyException e) {
   product = productMapper.query(product);
}

在插入資料時,如果出現了重複資料,則捕獲DuplicateKeyException,在catch程式碼塊中再查詢一次商品資料,將資料庫已有的商品直接返回。

如果呼叫了同步新增商品的介面,這裡非常關鍵的一點,是要返回已有資料的id,業務系統做後續操作,要拿這個id操作。

當然在執行execute之前,還是需要先查一下商品資料是否存在,如果已經存在,則直接返回已有資料,如果不存在,才執行execute方法。這一步千萬不能少。

例如:

Product oldProduct = productMapper.query(product);
if(Objects.nonNull(oldProduct)) {
    return oldProduct;
}

try {
  transactionTemplate.execute((status) -> {
      productUniqueMapper.insert(productUnique);
      productMapper.insert(product);
  return Boolean.TRUE;
  });
} catch(DuplicateKeyException e) {
   product = productMapper.query(product);
}
return product;

千萬注意:防重表和新增商品的操作必須要在同一個事務中,否則會出問題。

順便說一下,還需要對商品的刪除功能做特殊處理一下,在邏輯刪除商品表的同時,要物理刪除防重表。用商品表id作為查詢條件即可。

說實話,解決重複資料問題的方案挺多的,沒有最好的方案,只有最適合業務場景的,最優的方案。

此外,如果你對重複資料衍生出的冪等性問題感興趣的話,可以看看我的另一篇文章《高並行下如何保證介面的冪等性?》,裡面有非常詳細的介紹。

¥¥ 最後說一句(求關注,別白嫖我)
如果這篇文章對您有所幫助,或者有所啟發的話,幫忙掃描下發二維條碼關注一下,您的支援是我堅持寫作最大的動力。

求一鍵三連:點贊、轉發、在看。

關注公眾號:【蘇三說技術】,在公眾號中回覆:面試、程式碼神器、開發手冊、時間管理有超讚的粉絲福利,另外回覆:加群,可以跟很多BAT大廠的前輩交流和學習。