好好學習,天天向上
本文已收錄至我的Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star
回顧一下上一篇文章中講到的下單的流程。當使用者點選下單之後,使用者名稱和商品id就會組裝成一個SeckillStatus物件存入Redis佇列中等待被處理,這個過程叫做排隊。所以說,只要使用者點選了一次下單後不論最後是否下單成功,他都會進入到排隊的狀態。如果使用者重複點選下單,那麼Redis佇列中就會有很多個相同的SeckillStatus物件,也就是一個使用者排隊多次,這顯然是不符合邏輯的,一個使用者應該只能排隊一次。
為了避免使用者重複排隊的情況,可以為每個使用者在Redis中設定一個自增值,每次排隊的時候加1,如果大於1,說明重複排隊了,那麼直接丟擲異常,告訴使用者重複排隊了。
//SeckillOrderServiceImpl
@Override
public boolean add(Long id, String time, String username) {
Long increment = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).increment(username, 1);
if (increment>1) { //記錄指定hashkey的增量,大於1說明排隊次數超過1次,重複排隊
throw new RuntimeException("重複排隊");
}
…………
}
這段程式碼中,新增了對使用者重複排隊的判斷,先自增1,再進行判斷。這裡的key設定的是username,因為一個使用者只能下單一件商品,如果去下單其它商品,同樣也是重複排隊。
測試了一下,是成功的。但是有一個問題:如果使用者在這裡排隊未成功,該怎麼清理排隊資訊呢?這個下一步就會說,接著往下看👇
現在的程式碼看似很完美,但是漏洞百出,比如就存在並行超賣的問題。為什麼這麼說,看程式碼說話:
這個是多執行緒下單的方法,流程是查庫存——>下單——>減庫存。假如現在有件商品還剩1件,正好有多個執行緒同時走到了查詢庫存這一步,結果查出來都是一件,然後這三個執行緒就可以往下接著走,最後三個執行緒都成功下單了,不就多賣了兩件嘛。所以這段程式碼還存在問題,那怎麼解決呢?可不可以採用加鎖的方法,不可以。因為如果是在叢集環境下,一臺機器上多個執行緒走到了同一步確實可以鎖住防止超賣,但是不同機器上的執行緒走到了同一部就鎖不住了。
所以可以採用Redis佇列的方式去解決。
給每個sku建立一個佇列,比如id為4399的商品數量為4,那麼就在4399的佇列裡放入4件商品。然後每次查詢就從佇列裡去取,假如現在有五個執行緒去查庫存,因為只有4件商品,所以5個執行緒只有4個執行緒能夠查詢出庫存。因為Redis是單執行緒的,所以不會出現多個執行緒同時存取資料出錯的情況,這樣就可以避免並行超賣的問題。
之前在SeckillGoodsPushTask中只是將商品存入Redis中,現在再加一步,為每個sku都建立一個佇列並存入庫存數量的資料到佇列中。
//定時將秒殺商品載入到redis中
@Scheduled(cron = "0/5 * * * * ?")
public void loadGoodsPushRedis() {
…………
for (SeckillGoods seckillGood : seckillGoods) {
boundHashOperations.put(seckillGood.getId(),seckillGood); //把商品存入到redis
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGood.getId())
.leftPushAll(getGoodsNumber(seckillGood.getNum())); //存到Redis佇列
}
}
}
//獲取秒殺商品數量的陣列
public Byte[] getGoodsNumber(int num) {
Byte[] arr = new Byte[num];
for (int i = 0; i < num; i++) {
arr[i] = '0';
}
return arr;
}
佇列的內容就是商品數量的Byte,視訊中用的是商品id,但是商品id是Long型的,Byte比Long要省空間,而且放什麼無所謂關鍵是放幾個,所以我就放了對應數量的Byte進去。
接下來就該在下單之前獲取庫存的資訊:
@Async
public void createOrder() {
…………
//從秒殺商品佇列中獲取資料,如果獲取不到則說明已經賣完了,清除掉排隊資訊
Object o = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.rightPop();
if (o == null) {
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(seckillStatus.getUsername()); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(seckillStatus.getUsername()); //排隊狀態佇列
return;
}
//建立秒殺訂單
…………
}
如果商品庫存不足,那麼應該清除掉排隊的資訊,否則使用者該商品下不了單還不能下單其它商品。這裡將排隊的佇列以及查詢狀態的佇列清除了。
解決了並行超賣的問題之後,還有一個庫存數量不精準的問題。這個問題出現的原因和超賣問題類似,假如現在同時有兩個執行緒下單完成了開始遞減庫存,A執行緒查詢出庫存有3個,B執行緒也查詢出庫存有3個,然後它們同時遞減,都是2個,寫到了資料庫中。其實此時庫存應該還剩一個。
解決的辦法也很簡單,因為現在是呼叫**seckillGoods.getStockCount()**查詢出的庫存,那我們就不用這個查詢,直接用上一節中的佇列,佇列中剩餘多少就說明現在的庫存是多少,絕對準確。
@Async
public void createOrder() {
…………
//減庫存,如果庫存沒了就從redis中刪除,並將庫存資料寫到MySQL中
//seckillGoods.setStockCount(seckillGoods.getStockCount()-1);
Long size = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId()).size();//獲取庫存
//if (seckillGoods.getStockCount() <= 0) {
seckillGoods.setNum(size.intValue());
if (size <= 0) {
seckillGoodsBoundHashOps.delete(seckillStatus.getGoodsId());
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoodsBoundHashOps.put(seckillStatus.getGoodsId(),seckillGoods);
}
//建立秒殺訂單
…………
}
秒殺支付的流程和之前做的類似,只不過現在秒殺訂單的支付狀態傳送到Queue2中,普通訂單還是傳送到queue1中,但是我們怎麼知道該將訂單的支付狀態傳送給queue1還是queue2呢?如果微信伺服器可以將MQ佇列的exchange和routingKey返回給我們就好了,這樣我們就可以動態地指定要傳送的MQ了。
從微信支付的官方檔案中我們可以知道在建立二維條碼和接收支付結果的引數中都有一個attach引數
這個是自定義的資料,也就是說我們在建立二維條碼的時候傳送給微信伺服器什麼,返回支付結果的時候就會返回給我們什麼。所以在建立二維條碼的時候由前端將指定的exchange和routingKey傳送給後端,然後再新增到attach引數中。就可以實現將不同的訂單動態地傳送到指定的佇列了。
普通訂單:exchange:exchange.order routingKey:routing.order
秒殺訂單:exchange:exchange.seckill_order routingKey:routing.seckill_order
由於之前寫的createNative方法是接收一個order物件,所以在Order裡面新增兩個欄位:
private String exchange; //mq交換機的名稱
private String routingKey; //mq的路由鍵
修改之前**createNative()**的程式碼,新增attach引數,
@Override
public Map<String, String> createNative(Order order) {
…………
//獲取exchange和routingKey,封裝程map集合,新增到attach引數中
String exchange = order.getExchange();
String routingKey = order.getRoutingKey();
Map<String,String> attachMap = new HashMap<>(2);
attachMap.put("exchange",exchange);
attachMap.put("routingKey",routingKey);
String attach = JSON.toJSONString(attachMap);
map.put("attach",attach);
…………
}
然後再修改**WeChatPayController.notifyUrl()**方法,從伺服器返回的Map集合中獲取attach,並從attach中獲取exchange和routingKey。
@RequestMapping("/notify/url")
public String notifyUrl(HttpServletRequest request) throws Exception {
…………
Map<String, String> xmlMap = WXPayUtil.xmlToMap(xmlString);
String attach = xmlMap.get("attach");
Map<String, String> attachMap = JSONObject.parseObject(attach, Map.class);
//將java物件轉換成amqp訊息傳送出去,呼叫的是send方法
//rabbitTemplate.convertAndSend("exchange.order","routing.order", xmlString);
rabbitTemplate.convertAndSend(attachMap.get("exchange"),attachMap.get("routingKey"), xmlString);
…………
}
前面已經將訊息傳送到訊息佇列中了現在就可以去監聽訊息佇列了。
從流程圖中可以看到,在寫監聽的方法之前,需要有兩個方法:改訂單狀態和刪除訂單。
SeckillOrderServiceImpl.updatePayStatus
public void updatePayStatus(String username, String transactionId, String endTime) {
//從Redis中將訂單資訊查詢出來
SeckillOrder order = (SeckillOrder) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY)
.get(username);
if (order != null) {
try {
order.setStatus("1");
order.setTransactionId(transactionId);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
order.setPayTime(simpleDateFormat.parse(endTime));
seckillOrderMapper.insertSelective(order); //將訂單資訊存到mysql中
//刪除redis中的訂單資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//刪除使用者的排隊資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀態佇列
} catch (ParseException e) {
e.printStackTrace();
}
}
}
首先將訂單資訊從Redis中查詢出來,將訂單狀態改為已支付,然後將交易流水號和支付時間補充完整存入MySQL。這時候交易已經完成了,可以將訂單資訊從Redis中刪除,並將使用者的排隊資訊也一併刪除。
SeckillOrderServiceImpl.deleteOrder
public void deleteOrder(String username) {
//刪除Redis中的訂單
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//刪除使用者的排隊資訊
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊佇列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀態佇列
//查詢出秒殺的狀態資訊
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY)
.get(username);
//回滾庫存
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.get(seckillStatus.getGoodsId());
if (seckillGoods == null) {
seckillGoodsMapper.selectByPrimaryKey(seckillGoods.getId());
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
}
redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.put(seckillGoods.getId(),seckillGoods);
//將商品放入佇列
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.leftPush("0");
}
支付失敗了,應該將訂單刪除掉。首先將Redis中的訂單刪除,然後刪除使用者的排隊資訊。接著回滾庫存,如果Redis中沒有則說明已經賣完了,就從MySQL中查詢出來然後將商品數量加1再存入MySQL;如果Redis中有資料就將Redis中的商品數量加1即可。上面講防止並行超賣的時候不是為每個商品都在Redis佇列中存放了一下麼,所以最後將商品放回到佇列中。
SeckillMessageListener
@Component
@RabbitListener(queues = "queue.seckillorder")
public class SeckillMessageListener {
@Autowired
private SeckillOrderService seckillOrderService;
//https://pay.weixin.qq.com/wiki/doc/api/native.php?chapter=9_7&index=8
@RabbitHandler
public void getMessage(String message) {
try {
Map<String, String> resultMap = JSON.parseObject(message,Map.class);
String returnCode = resultMap.get("return_code"); //狀態碼
if ("SUCCESS".equals(returnCode)) {
String resultCode = resultMap.get("result_code"); //業務結果
String attach = resultMap.get("attach");
Map<String,String> attachMap = JSON.parseObject(attach,Map.class);
if ("SUCCESS".equals(resultCode)) {
//改訂單狀態
seckillOrderService.updatePayStatus(attachMap.get("username"),
resultMap.get("transaction_id"),resultMap.get("time_end"));
} else {
//刪除訂單
seckillOrderService.deleteOrder(attachMap.get("username"));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
這個方法使用來監聽秒殺佇列的訊息的,exchange和queue需要我們手動地在RabbitMQ的網頁中建立並進行繫結
在該方法中,首先去讀取狀態碼和業務結果,如果都為「SUCCESS」的話則說明訂單支付成功,修改訂單的狀態。反之訂單支付失敗,刪除訂單。
文章鴿了快一個月了,終於補上了,主要是上篇文章寫完後就在做個小東西,然後就是國慶節放假,在家待著有點懶。回校後又在參加電賽,沒時間。所以一路鴿到現在。
這篇文章主要是將之前的秒殺流程進行一個完善,實現了防止秒殺重複排隊,解決並行超賣的問題,並解決了同步庫存不精準的問題。最後實現了秒殺支付。
碼字不易,可以的話,給我來個
點贊
,收藏
,關注
如果你喜歡我的文章,歡迎關注微信公眾號 『 R o b o d 』
程式碼:https://github.com/RobodLee/changgou
本文已收錄至我的Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star