解決庫存扣減及訂單建立時防止並行死鎖的問題

2022-05-26 18:00:51

解決庫存扣減及訂單建立時防止並行死鎖的問題

在我們日常開發的過程可有會遇到以下錯誤

事務(程序 ID 82)與另一個程序被死鎖在 鎖 資源上,並且已被選作死鎖犧牲品。請重新執行該事務

很多開發人員對於這個問題的排查起來是比較困難的,而生產生的原因多種多樣,很多人認是因為表中的資料太多了同時操作的人多人才會產生這種錯誤,下面我們來還原一下死鎖的過程。

我們看一下以下sql程式碼(該樣例程式碼測試環境為SqlServer)

1. 第一先建立一個測試表H_Test

複製以下程式碼

  SET ANSI_NULLS ON
  GO

  SET QUOTED_IDENTIFIER ON
  GO

  CREATE TABLE [dbo].[H_TEST](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [DID] [int] NULL,
    [UNAME] [nvarchar](50) NULL,
    [UNAME2] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_TEST_3994ceeb-a4b8-41e1-b06b-1e59a2e51d8c] PRIMARY KEY CLUSTERED 
  (
    [Id] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'自增主鍵' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'Id'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'DID' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'DID'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'UNAME' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'UNAME'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'UNAME2' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'UNAME2'
  GO

  insert [dbo].[H_TEST](DID,UNAME,UNAME2) VALUES(1,'HI','HI2');
  insert [dbo].[H_TEST](DID,UNAME,UNAME2) VALUES(2,'HISQL','HISQL2');

2. 開啟兩個查詢視窗

在視窗1中複製以下程式碼

  begin tran 
  update dbo.H_TEST 
  set UNAME='d1' 
  where dID=1 
  waitfor delay '00:00:10' 
  update H_TEST 
  set UNAME='d2' 
  where dID=2
  commit tran 

在視窗2中複製以下程式碼

  begin tran 
  update H_TEST 
  set UNAME='d2' 
  where dID=2
  waitfor delay '00:00:10' 


  update dbo.H_TEST 
  set UNAME='d1' 
  where dID=1
  commit tran 

3. 執行程式碼

同時執行視窗1和視窗2的程式碼,在等待一段時間後你就可以看到以下錯誤如下所示

死鎖圖

通過以上的測試就還原了產生死鎖的過程,剛才的測試表H_Test中只有兩條資料,其實產生死鎖與資料大小沒有很大的關係,其實與整個事務的執行長短有關係,兩個業務都在操作同一條資料,且一個事務中包含非常複雜的處理邏輯且執行時間比較長那麼在並行或相對較多的業務操作時就會產生死鎖。

那麼怎樣解決死鎖?

1. 減少事務的執行時間。

優化程式碼將不需要包在事務的邏輯分離出來以減少鎖的佔用時間.可以減少一部分的死鎖,但在高並行操作時依然會產生死鎖

2. 業務鎖

日常我們用到的鎖都是高度依賴於資料來鎖定來保證資料的原子性問題,但這樣有一個很大的BUG就是對資料庫的效能壓力非常大,在出現高並行時可能應用扛得住資料庫扛不住的情況

下面介紹的就是基於HiSql 的業務鎖機制解決死鎖問題,我們模擬一種場景 扣減庫存並生成訂單那麼我們模擬建立兩張表 庫存表H_Stock 及訂單表H_Order 表建立的sql如下

HiSql怎樣使用 請參照hisql快速上手

庫存表sql程式碼


  CREATE TABLE [dbo].[H_Stock](
    [Batch] [varchar](20) NOT NULL,
    [Material] [varchar](20) NOT NULL,
    [Location] [varchar](5) NULL,
    [st_kc] [decimal](18, 2) NULL,
    [CreateTime] [datetime] NULL,
    [CreateName] [nvarchar](50) NULL,
    [ModiTime] [datetime] NULL,
    [ModiName] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_Stock] PRIMARY KEY CLUSTERED 
  (
    [Batch] ASC,
    [Material] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_st_kc]  DEFAULT ((0)) FOR [st_kc]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_CreateTime]  DEFAULT (getdate()) FOR [CreateTime]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_CreateName]  DEFAULT ('') FOR [CreateName]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_ModiTime]  DEFAULT (getdate()) FOR [ModiTime]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_ModiName]  DEFAULT ('') FOR [ModiName]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'批次號' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Batch'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'款號' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Material'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'庫位' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Location'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'庫存數' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'st_kc'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'建立時間' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'CreateTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'建立人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'CreateName'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改時間' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'ModiTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'ModiName'
  GO


訂單表sql


  CREATE TABLE [dbo].[H_Order](
    [OrderId] [bigint] NOT NULL,
    [Batch] [varchar](20) NOT NULL,
    [Material] [varchar](20) NOT NULL,
    [Shop] [varchar](5) NULL,
    [Location] [varchar](5) NULL,
    [SalesNum] [decimal](18, 2) NULL,
    [CreateTime] [datetime] NULL,
    [CreateName] [nvarchar](50) NULL,
    [ModiTime] [datetime] NULL,
    [ModiName] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_Order] PRIMARY KEY CLUSTERED 
  (
    [OrderId] ASC,
    [Batch] ASC,
    [Material] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_SalesNum]  DEFAULT ((0)) FOR [SalesNum]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_CreateTime]  DEFAULT (getdate()) FOR [CreateTime]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_CreateName]  DEFAULT ('') FOR [CreateName]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_ModiTime]  DEFAULT (getdate()) FOR [ModiTime]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_ModiName]  DEFAULT ('') FOR [ModiName]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'批次號' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Batch'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'款號' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Material'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'門店' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Shop'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'出庫庫位' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Location'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'銷售數量' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'SalesNum'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'建立時間' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'CreateTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'建立人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'CreateName'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改時間' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'ModiTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'ModiName'
  GO



測試場景

開啟多個執行緒隨機產生不同的訂單(一個訂單中有不同批次和數量)直至庫存扣減完成並檢測是否有鎖產生,且庫存有沒有少扣和超扣,如果達到這兩個目標說明測試是成功的

c# 程式碼

class Program
  {
    static void Main(string[] args)
    {
        Console.WriteLine("測試!");
        StockThread();
        var s = Console.ReadLine();
    }

    static void StockThread()
    {

        //如果有安裝redis可以啟用以下測試一下
        //HiSql.Global.RedisOn = true;//開啟redis快取
        //HiSql.Global.RedisOptions = new RedisOptions { Host = "172.16.80.178", PassWord = "pwd123", Port = 6379, CacheRegion = "TST", Database = 0 };
        HiSqlClient sqlClient = Demo_Init.GetSqlClient();

        //清除庫存表和訂單表資料
        sqlClient.CodeFirst.Truncate("H_Stock");
        sqlClient.CodeFirst.Truncate("H_Order");

        //初始化庫存資料
        sqlClient.Modi("H_Stock", new List<object> {
            new { Batch="9000112112",Material="ST0021",Location="A001",st_kc=5000},
            new { Batch="8000252241",Material="ST0080",Location="A001",st_kc=1000},
            new { Batch="7000252241",Material="ST0026",Location="A001",st_kc=1500}

        }).ExecCommand();

        //第一種場景 一個訂單中只有一個批次
        string[] grp_arr1 = new string[] { "9000112112" };

        //第二種場景 一個訂單中有兩個批次
        string[] grp_arr2 = new string[] {  "8000252241" , "9000112112"   };

        //第三中場景一個訂單中有三個批次
        string[] grp_arr3 = new string[] { "8000252241", "9000112112", "7000252241" };


        Random random = new Random();

        HiSqlClient _sqlClient = Demo_Init.GetSqlClient();


        //表結構快取預熱
        var _dt1= _sqlClient.HiSql("select * from H_Order").Take(1).Skip(1).ToTable();
        var _dt2 = _sqlClient.HiSql("select * from H_Stock").Take(1).Skip(1).ToTable();


        //開啟10個執行緒執行
        Parallel.For(0, 10, (index, y) => {

            
          int grpidx = index % 3;

          string[] grparr = null;
          if (grpidx == 0)
              grparr = grp_arr1;
          else if (grpidx == 1)
              grparr = grp_arr2;
          else
              grparr = grp_arr3;

          //Thread.Sleep(random.Next(10) * 200);

          Console.WriteLine($" {index}執行緒Id:{Thread.CurrentThread.ManagedThreadId}\t{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}");
          //執行訂單建立
          var rtn = CreateSale(grparr);

          Console.WriteLine(rtn.Item2);

        });
    }




    static Tuple<bool, string> CreateSale(string[] grparr)
    {
        Random random = new Random();
        HiSqlClient _sqlClient = Demo_Init.GetSqlClient();
        bool _flag = true;

        Tuple<bool, string> rtn = new Tuple<bool, string>(true, "執行");

        //指定雪花ID使用的引擎 (可以不指定)
        HiSql.Snowflake.SnowType = SnowType.IdWorker;
        //產生一個唯一的訂單號
        Int64 orderid = HiSql.Snowflake.NextId();

        //加鎖並執行 將一個訂單的批次都加鎖防止同一時間被其它業務修改
        var _rtn = HiSql.Lock.LockOnExecute(grparr, () =>
        {

            //能執行到此說明已經加鎖成功(注:非資料庫級加鎖)

            DataTable dt = _sqlClient.HiSql($"select Batch,Material,Location,st_kc from H_Stock where  Batch in ({grparr.ToSqlIn()}) and st_kc>0").ToTable();

            if (dt.Rows.Count > 0)
            {
                List<object> lstorder = new List<object>();
                
                Console.WriteLine($"雪花ID{orderid}");
                string _shop = "4301";//門店編號
                _sqlClient.BeginTran();
                foreach (string n in grparr)
                {
                    int s = random.Next(1,10);
                    int v = _sqlClient.Update("H_Stock", new { st_kc = $"`st_kc`-{s}" }).Where($"Batch='{n}' and st_kc>={s}").ExecCommand();
                    if (v == 0)
                    {
                        _flag = false;
                        Console.WriteLine($"批次:[{n}]扣減[{s}]失敗");
                        rtn = new Tuple<bool, string>(false, $"批次:[{n}]庫存已經不足");
                        _sqlClient.RollBackTran();
                        break;
                    }
                    else
                    {
                        DataRow _drow = dt.AsEnumerable().Where(s => s.Field<string>("Batch").Equals(n)).FirstOrDefault();
                        if (_drow != null)
                        {
                            lstorder.Add(
                                new
                                {
                                    OrderId = orderid,
                                    Batch = _drow["Batch"].ToString(),
                                    Material = _drow["Material"].ToString(),
                                    Shop = _shop,
                                    Location = _drow["Location"].ToString(),
                                    SalesNum = s,
                                }

                                );


                        }
                        else
                        {
                            _flag = false;
                            Console.WriteLine($"批次:[{n}]扣減[{s}]失敗 未找到庫存");
                            _sqlClient.RollBackTran();
                            break;

                        }

                    }
                }
                if (_flag)
                {
                    //生成訂單
                    if (lstorder.Count > 0)
                        _sqlClient.Insert("H_Order", lstorder).ExecCommand();
                    _sqlClient.CommitTran();
                }
            }
            else
            {
                Console.WriteLine($"庫存不足...");
                rtn = new Tuple<bool, string>(false, "庫存已經不足");
            }



        }, new LckInfo
        {
            UName = "tanar",
            Ip = "127.0.0.1"


        }, 20, 10);//加鎖超時時間設定
        _sqlClient.Close();

        Console.WriteLine(_rtn.Item2);

        //可以註釋執行緒等待
        //Thread.Sleep(random.Next(1,10)*100);


        if (rtn.Item1)
            return CreateSale(grparr);
        else
            return rtn;


    }
  }

資料庫連線設定

  internal class Demo_Init
  {
      public static HiSqlClient GetSqlClient()
      {


          HiSqlClient sqlclient = new HiSqlClient(
                    new ConnectionConfig()
                    {
                        DbType = DBType.SqlServer,
                        DbServer = "local-HoneBI",
                        ConnectionString = "server=(local);uid=sa;pwd=Hone@123;database=HiSql;Encrypt=True; TrustServerCertificate=True;",//; MultipleActiveResultSets = true;
                        User = "tansar",//可以指定登陸使用者的帳號
                        
                        Schema = "dbo",
                        IsEncrypt = true,
                        IsAutoClose = true,
                        SqlExecTimeOut = 60000,
                        AppEvents = new AopEvent()
                        {
                            OnDbDecryptEvent = (connstr) =>
                            {
                                //解密連線欄位
                                //Console.WriteLine($"資料庫連線:{connstr}");
                                return connstr;
                            },
                            OnLogSqlExecuting = (sql, param) =>
                            {
                                //sql執行前 紀錄檔記錄 (非同步)

                                //Console.WriteLine($"sql執行前記錄{sql} time:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}");
                            },
                            OnLogSqlExecuted = (sql, param) =>
                            {
                                //sql執行後 紀錄檔記錄 (非同步)
                                //Console.WriteLine($"sql執行後記錄{sql} time:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}");
                            },
                            OnSqlError = (sqlEx) =>
                            {
                                //sql執行錯誤後 紀錄檔記錄 (非同步)
                                Console.WriteLine(sqlEx.Message.ToString());
                            },
                            OnTimeOut = (int timer) =>
                            {
                                //Console.WriteLine($"執行SQL語句超過[{timer.ToString()}]毫秒...");
                            }
                        }
                    }
                    );


          //sqlclient.CodeFirst.InstallHisql();

          return sqlclient;
      }
  }

通過查詢庫存和訂單資訊核對庫存是否扣減正常

select * from H_Stock 
select batch,sum(salesnum) as salesnum from H_Order group by batch
select orderid,sum(salesnum) as salesnum from H_Order group by orderid
select * from H_Order

核驗結果

核驗明細

通過測試過程可以發現 不會產生死鎖也不會造成庫存扣減異常保證了資料的一致性