gorm操作sqlite3,高並行讀寫如何避免鎖庫?

2023-03-09 12:01:10

1. 場景

這兩天一直被這個sqlit3困擾,起因是專案中需要有這樣一箇中間,中介軟體承擔著API角色和流量轉發的角色,需要接收來自至少300個agent的請求資料,和健康檢測的請求。 所以當即想到用go來實現,因為資料教訓,不考慮使用pg大型資料庫,所以就選擇了輕量化的sqlite資料庫。程式很快就開發完了。上線,執行幾個節點,資料讀寫都未發生異常,但是當測試資料到達一定量級後,會出現database is locked錯誤。 查了些資料,大意是sqlite並行讀支援不錯,但是並行寫就不太友好,所以有了此次的實踐。

ps: 部分程式碼來自於chatGPT,不得不說chatGPT太香了。

在 Gorm 中操作 SQLite3資料庫時,由於 SQLite3 的寫鎖機制是針對整個資料庫而不是單個表或行,因此高並行讀寫可能會導致鎖庫的情況。


2. 如何避免

為了避免鎖庫問題,可以採用以下幾種方法:

  1. 使用 WAL 模式

    使用 SQLite3 的 WAL(Write-Ahead Logging)模式可以顯著降低鎖庫的概率。在 WAL 模式下,讀操作不會阻塞寫操作,寫操作也不會阻塞讀操作,因此可以實現高並行的讀寫操作。

    可以在 Gorm 中使用以下程式碼開啟 WAL 模式:

    import "gorm.io/driver/sqlite"
     db, err := gorm.Open(sqlite.Open("test.db"), &gorm.Config{
         DSN: "mode=wal",
     })
     
     // 上面這種引數設定方式已經不適用新的設定方法如下
     if Inst, err = gorm.Open(sqlite.Open(dsn), gormConfig); err == nil {
                 // 啟用 WAL 模式
                 _ = Inst.Exec("PRAGMA journal_mode=WAL;")
                 //_ = Inst.Exec("PRAGMA journal_size_limit=104857600;")
               //_ = Inst.Exec("PRAGMA busy_timeout=999999;")
     }
    
  2. 合理控制事務範圍

    在進行高並行讀寫操作時,需要注意事務範圍的控制,儘可能縮小事務的範圍,減少寫鎖的佔用時間。例如,在進行批次寫入操作時,可以將每次寫入拆分為多個事務,以減少寫鎖的佔用時間。

  3. 使用快取

    使用快取可以減少對資料庫的讀操作,從而減少鎖庫的概率。可以使用第三方快取庫(如 Redis)來實現快取功能。

  4. 增加資料庫連線數

    增加資料庫連線數可以提高資料庫的並行處理能力,減少鎖庫的概率。可以在 Gorm 中使用以下程式碼來增加資料庫連線數:

    import "gorm.io/driver/sqlite"
    
    db, err := gorm.Open(sqlite.Open("test.db"), &gorm.Config{})
    
    sqlDB, dbError := db.DB()
    if dbError != nil {
        return nil, fmt.Errorf("failed to create sqlDB")
    }
    
    // SetMaxIdleConns 設定空閒連線池中連線的最大數量
    sqlDB.SetMaxIdleConns(10)
    
    // SetMaxOpenConns 設定開啟資料庫連線的最大數量。
    sqlDB.SetMaxOpenConns(100)'
    

    需要注意的是,增加連線數也會增加伺服器的負載,因此需要根據實際情況進行調整。

    綜上所述,通過採用合適的鎖機制、事務控制、快取和連線數設定等措施,可以有效避免 SQLite3 資料庫的鎖庫問題。


3. 完整的程式碼範例

  1. 範例1:

    下面是一個完整的 Gorm 操作 SQLite3 資料庫的程式碼範例,其中包括開啟 WAL 模式、控制事務範圍、使用快取和增加資料庫連線數等措施,以避免鎖庫問題。

    import (
        "gorm.io/driver/sqlite"
        "gorm.io/gorm"
        "time"
    )
    
    // 定義模型結構體
    type User struct {
        ID        uint
        Name      string
        Age       uint8
        CreatedAt time.Time
        UpdatedAt time.Time
    }
    
    // 初始化資料庫連線
    func InitDB() (*gorm.DB, error) {
        db, err := gorm.Open(sqlite.Open("test.db"), &gorm.Config{
            // 開啟 WAL 模式
            DSN: "mode=wal",
            // 增加最大連線數為 100
            MaxOpenConns: 100,
        })
        if err != nil {
            return nil, err
        }
        // 設定資料庫連線池引數
        sqlDB, err := db.DB()
        if err != nil {
            return nil, err
        }
        sqlDB.SetMaxIdleConns(10)
        sqlDB.SetMaxOpenConns(100)
        sqlDB.SetConnMaxLifetime(time.Hour)
    
        return db, nil
    }
    
    // 定義批次寫入函數
    func BatchInsertUsers(db *gorm.DB, users []User) error {
        // 每次寫入 1000 條資料
        batchSize := 1000
        batchCount := (len(users) + batchSize - 1) / batchSize
        for i := 0; i < batchCount; i++ {
            start := i * batchSize
            end := (i + 1) * batchSize
            if end > len(users) {
                end = len(users)
            }
            batch := users[start:end]
            // 啟用事務
            tx := db.Begin()
            if err := tx.Error; err != nil {
                return err
            }
            if err := tx.Create(&batch).Error; err != nil {
                tx.Rollback()
                return err
            }
            // 提交事務
            if err := tx.Commit().Error; err != nil {
                return err
            }
        }
        return nil
    }
    
    // 查詢使用者資訊
    func GetUsers(db *gorm.DB) ([]User, error) {
        var users []User
        // 使用快取,減少對資料庫的讀操作
        err := db.Cache(&users).Find(&users).Error
        if err != nil {
            return nil, err
        }
        return users, nil
    }
    
    // 範例程式碼
    func main() {
        // 初始化資料庫連線
        db, err := InitDB()
        if err != nil {
            panic(err)
        }
        defer db.Close()
    
        // 批次插入資料
        users := []User{}
        for i := 0; i < 100000; i++ {
            user := User{
                Name:      "user_" + string(i),
                Age:       uint8(i % 100),
                CreatedAt: time.Now(),
                UpdatedAt: time.Now(),
            }
            users = append(users, user)
        }
        err = BatchInsertUsers(db, users)
        if err != nil {
            panic(err)
        }
    
        // 查詢資料
        users, err = GetUsers(db)
        if err != nil {
            panic(err)
        }
        for _, user := range users {
            fmt.Println(user)
        }
    }
    
  2. 範例2:使用 WAL 模式和事務控制來避免鎖庫問題:

    package main
    
    import (
        "fmt"
        "gorm.io/driver/sqlite"
        "gorm.io/gorm"
    )
    
    type User struct {
        ID   uint
        Name string
    }
    
    func main() {
        // 建立 SQLite3 資料庫連線
        db, err := gorm.Open(sqlite.Open("test.db"), &gorm.Config{
            // 開啟 WAL 模式
            DSN: "mode=wal",
        })
        if err != nil {
            panic("failed to connect database")
        }
        // 設定連線池大小
        sqlDB, err := db.DB()
        if err != nil {
            panic("failed to set database pool size")
        }
        sqlDB.SetMaxIdleConns(10)
        sqlDB.SetMaxOpenConns(100)
    
        // 自動遷移 User 模型對應的表
        err = db.AutoMigrate(&User{})
        if err != nil {
            panic("failed to migrate table")
        }
    
        // 並行寫入 1000 條資料
        for i := 0; i < 1000; i++ {
            go func(i int) {
                err := db.Transaction(func(tx *gorm.DB) error {
                    user := User{Name: fmt.Sprintf("user_%d", i)}
                    result := tx.Create(&user)
                    return result.Error
                })
                if err != nil {
                    fmt.Printf("failed to write data: %v\n", err)
                }
            }(i)
        }
    
        // 並行讀取資料
        for i := 0; i < 1000; i++ {
            go func() {
                var users []User
                err := db.Transaction(func(tx *gorm.DB) error {
                    result := tx.Find(&users)
                    return result.Error
                })
                if err != nil {
                    fmt.Printf("failed to read data: %v\n", err)
                } else {
                    fmt.Printf("read %d records\n", len(users))
                }
            }()
        }
    
        // 等待 10 秒鐘,以便所有的寫入和讀取操作都完成
        time.Sleep(10 * time.Second)
    }   
    

    在這個程式碼範例中,我們首先使用 gorm.Open 函數建立了一個 SQLite3 資料庫連線,並設定了連線池大小和 WAL 模式。然後,我們使用 d b.AutoMigrate 函數自動遷移 User 模型對應的表。

    接著,我們在迴圈中並行地寫入 1000 條資料,並使用事務控制來控制事務的範圍。每個寫入操作都會建立一個 User 物件,並使用 tx.Create 函數將其寫入資料庫。

    然後,我們在另一個迴圈中並行地讀取資料,並使用事務控制來控制事務的範圍。每個讀取操作都會使用 tx.Find 函數從資料庫中讀取所有的 User 記錄,並列印出讀取的記錄數。

    最後,我們等待 10 秒鐘,以便所有的寫入和讀取操作都完成。在這個範例中,我們使用了並行的寫入和讀取