這篇文章,我們聊聊訊息佇列中非常重要的最佳實踐之一:消費冪等。
消費冪等是指:當出現 RocketMQ 消費者對某條訊息重複消費的情況時,重複消費的結果與消費一次的結果是相同的,並且多次消費並未對業務系統產生任何負面影響。
例如,在支付場景下,消費者消費扣款訊息,對一筆訂單執行扣款操作,扣款金額為100元。
如果因網路不穩定等原因導致扣款訊息重複投遞,消費者重複消費了該扣款訊息,但最終的業務結果是隻扣款一次,扣費100元,且使用者的扣款記錄中對應的訂單隻有一條扣款流水,不會多次扣除費用。那麼這次扣款操作是符合要求的,整個消費過程實現了消費冪等。
RocketMQ 訊息重複的場景如下:
傳送時訊息重複
當一條訊息已被成功傳送到伺服器端並完成持久化,此時出現了網路閃斷或者使用者端宕機,導致伺服器端對使用者端應答失敗。
如果此時生產者意識到訊息傳送失敗並嘗試再次傳送訊息,消費者後續會收到兩條內容相同但 Message ID 不同的訊息。
投遞時訊息重複
訊息消費的場景下,訊息已投遞到消費者並完成業務處理,當用戶端給伺服器端反饋應答的時候網路閃斷。為了保證訊息至少被消費一次,Broker 伺服器端將在網路恢復後再次嘗試投遞之前已被處理過的訊息,消費者後續會收到兩條內容相同並且 Message ID 也相同的訊息。
負載均衡時訊息重複(包括但不限於網路抖動、Broker 重啟以及消費者應用重啟)
Broker 端或使用者端重啟、擴容或縮容時,會觸發 Rebalance ,此時消費者可能會收到少量重複訊息。
因為不同的 Message ID 對應的訊息內容可能相同,有可能出現衝突(重複)的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據。
最好的方式是以業務唯一標識作為冪等處理的關鍵依據,訊息必須攜帶業務唯一標識。
訊息攜帶業務唯一標識一般來講有兩種方式:
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
message.setKey("ORDERID_100"); // 訂單編號
SendResult sendResult = producer.send(message);
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
(JSON.toJSONString(orderDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
message.setKey("ORDERID_100"); // 訂單編號
SendResult sendResult = producer.send(message);
消費者收到訊息時,從訊息中獲取訂單號來實現訊息冪等 :
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
// 方法1: 根據業務唯一標識的Key做冪等處理
String orderId = message.getKeys();
// 方法2: 從訊息body體重解析出訂單號
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
String orderId = orderPO.getId();
// TODO 業務處理邏輯
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
為了保證冪等,一定要做業務邏輯判斷,筆者認為這是保證冪等的首要條件。
筆者曾經服務於神州專車,乘客在使用者端點選立即叫車,訂單服務建立訂單,首先儲存到資料庫後,然後將訂單資訊同步儲存到快取中。
在訂單的載客生命週期裡,訂單的修改操作先修改快取,然後傳送訊息到 MetaQ ,訂單落盤服務消費訊息,並判斷訂單資訊是否正常(比如有無亂序),若訂單資料無誤,則儲存到資料庫中。
訂單狀態機按順序分別是:建立、已分配司機、司機已出發、司機已到達、司機已接到乘客、已到達。
這種設計是為了快速提升系統效能,由於網路問題有非常小的概率,消費者會收到亂序的訊息。
當訂單狀態是司機已到達時,消費者可能會收到司機已出發的訊息,也就是先發的訊息因為網路原因被延遲消費了。
此時,消費者需要判斷當前的專車訂單狀態機,儲存最合理的訂單資料,就可以忽略舊的訊息,列印相關紀錄檔即可。
資料庫去重表有兩個要點 :
舉一個電商場景的例子:使用者購物車結算時,系統會建立支付訂單。使用者支付成功後支付訂單的狀態會由未支付修改為支付成功,然後系統給使用者增加積分。
我們可以使用 RocketMQ 事務訊息的方案,該方案能夠發揮 MQ 的優勢:非同步和解耦,以及事務的最終一致性的特性。
在消費監聽器邏輯裡,冪等非常重要
。積分表 SQL 如下:
CREATE TABLE `t_points` (
`id` bigint(20) NOT NULL COMMENT '主鍵',
`user_id` bigint(20) NOT NULL COMMENT '使用者id',
`order_id` bigint(20) NOT NULL COMMENT '訂單編號',
`points` int(4) NOT NULL COMMENT '積分',
`remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '備註',
`create_time` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '訂單唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
當收到訂單資訊後,首先判斷該訂單是否有積分記錄,若沒有記錄,才插入積分記錄。
就算出現極端並行場景下,訂單編號也是唯一鍵,資料庫中也必然不會存在相同訂單的多條積分記錄。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
logger.info("orderJSON:" + orderJSON);
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
// 首先查詢是否處理完成
PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
if (pointsPO == null) {
Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
pointsPO = new PointsPO();
pointsPO.setId(id);
pointsPO.setOrderId(orderPO.getId());
pointsPO.setUserId(orderPO.getUserId());
// 新增積分數 30
pointsPO.setPoints(30);
pointsPO.setCreateTime(new Date());
pointsPO.setRemarks("新增積分數 30");
pointsMapper.insert(pointsPO);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
在消費者接收到訊息後,首先判斷 Redis 中是否存在該業務主鍵的標誌位,若存在標誌位,則認為消費成功,否則,則執行業務邏輯,執行完成後,在快取中新增標誌位。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String bizKey = messageExt.getKeys(); // 唯一業務主鍵
//1. 判斷是否存在標誌
if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
continue;
}
//2. 執行業務邏輯
//TODO do business
//3. 設定標誌位
redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
僅僅有業務邏輯判斷是不夠的,為了應對並行場景,我們可以使用分散式鎖。
分散式鎖一般有三種方案:
資料樂觀鎖假設認為資料一般情況下不會造成衝突,所以在資料進行提交更新的時候,才會正式對資料的衝突與否進行檢測,如果發現衝突了,則讓返回使用者錯誤的資訊,讓使用者決定如何去做。
由於樂觀鎖沒有了鎖等待,提高了吞吐量,所以樂觀鎖適合讀多寫少的場景。
實現樂觀鎖:一般是在資料表中加上一個資料版本號 version
欄位,表示資料被修改的次數,當資料被修改時,version 值會加一。
當執行緒 A 要更新資料值時,在讀取資料的同時也會讀取version
值,在提交更新時,若剛才讀取到的 version
值為當前資料庫中的 version
值相等時才更新,否則重試更新操作,直到更新成功。
步驟 1 : 查詢出條目資料
select version from my_table where id = #{id}
步驟 2 :修改條目資料,傳遞版本引數
update my_table set n = n + 1, version = version + 1 where id=#{id} and version = #{version};
從樂觀鎖的實現角度來講,樂觀鎖非常容易實現,但它有兩個缺點:
消費端演示程式碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long version = orderMapper.selectVersionByOrderId(orderPO.getId()); //版本
orderPO.setVersion(version);
// 對應 SQL:update t_order t set version = version + 1 , status = #{status} where id = #{id}
// and version = #{version}
int affectedCount = orderMapper.updateOrder(orderPO);
if(affectedCount == 0) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
當我們要對一個資料庫中的一條資料進行修改的時候,為了避免同時被其他人修改,最好的辦法就是直接對該資料進行加鎖以防止並行。
這種藉助資料庫鎖機制在修改資料之前先鎖定,再修改的方式被稱之為悲觀並行控制(又名「悲觀鎖」,Pessimistic Concurrency Control,縮寫「PCC」)。
之所以叫做悲觀鎖,是因為這是一種對資料的修改抱有悲觀態度的並行控制方式。我們一般認為資料被並行修改的概率比較大,所以需要在修改之前先加鎖。
悲觀並行控制實際上是「先取鎖再存取」的保守策略,為資料處理的安全提供了保證。
MySQL 悲觀鎖的使用方法如下:
begin;
-- 讀取資料並加鎖
select ... for update;
-- 修改資料
update ...;
commit;
例如,以下程式碼將讀取 t_order
表中 id
為 1 的記錄,並將該記錄的 status
欄位修改為 3
:
begin;
select * from t_order where id = 1 for update;
update t_order set status = '3' where id = 1;
commit;
如果 t_order
表中 id
為 1 的記錄正在被其他事務修改,則上述程式碼會等待該記錄被釋放鎖後才能繼續執行。
消費端演示程式碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
//呼叫service的修改訂單資訊,該方法事務加鎖, 當修改訂單記錄時,該其他執行緒會等待該記錄被釋放才能繼續執行
orderService.updateOrderForUpdate(orderId ,orderPO);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
使用資料庫鎖是非常重的一個操作,我們可以使用更輕量級的 Redis 鎖來替換,因為 Redis 效能高,同時有非常豐富的生態(類庫)支援不同型別的分散式鎖。
我們選擇 Redisson 框架提供的分散式鎖功能,簡化的範例程式碼如下:
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
Long orderId = orderPo.getId();
RLock lock = redissonClient.getLock("order-lock-" + orderId);
rLock.lock(10, TimeUnit.SECONDS);
// TODO 業務邏輯
rLock.unlock();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
這篇文章,我們詳細剖析瞭如何實現 RocketMQ 消費冪等。
1、消費冪等:當出現 RocketMQ 消費者對某條訊息重複消費的情況時,重複消費的結果與消費一次的結果是相同的,並且多次消費並未對業務系統產生任何負面影響。
2、適用場景:傳送時訊息重複、投遞時訊息重複、負載均衡時訊息重複
3、業務唯一標識:以業務唯一標識作為冪等處理的關鍵依據,訊息必須攜帶業務唯一標識。
4、冪等策略:業務邏輯程式碼中需要判斷業務狀態機,同時根據實際條件選擇全域性處理標識和分散式鎖兩種方式處理。
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高質量的文章,非常感謝!