前言:本文將介紹常用後臺功能中的資料獲取以及下載的一些注意事項和實現。
承接上文資料分頁查詢
當通過分頁查詢到資料之後,接著還會遇到其他需求:
比如根據資料庫查詢出來商品的id,商品名等主要資訊。但是要去查詢商品的購買量以及原始售價,這些與商品基本資訊不太相關的資料資訊,通常考慮到業務資料量,系統中已經進行了分庫或者分表,甚至一些成交資料是維護在數倉等其他業務服務中。
商品id | 商品名 | 商品主圖 | 商品售價 | 商品購買量 | 商品售價 | .....
這個時候如果順序的再去執行第三方服務和去查其他庫,顯然會導致請求的時延很大。既然商品基本資訊已經批次查詢出來了,並且購買量和原始售價分數不同的資料儲存,就可以開執行緒或者協程去一塊查詢其他資料,最後合併彙總。或者本身第一次查詢的資料量,這個時候可以分批次的去查第三方服務資料。
因此分為幾種情況
下面來看下對於不同的情況的做法以及go語言實現和一些注意事項。
首先為了通用性,對sync.WaitGroup做一下簡單封裝,方便協程使用。
type waitGroup struct {
wg *sync.WaitGroup
}
func WaitGroup() *waitGroup {
return &waitGroup{&sync.WaitGroup{}}
}
func (wg *waitGroup) NewGoroutineStart(f func()) {
wg.wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
// 為了保證系統安全通常要做異常捕獲,保證goroutine是有recover
fmt.Println("紀錄檔列印Goroutine處理失敗")
}
wg.wg.Done()
}()
f()
}()
}
func (wg *waitGroup) Wait() {
wg.wg.Wait()
}
這裡通常會對協程呼叫中可能出現的panic,進行異常捕獲recover,為了保證系統的安全執行。
type Product struct {
ID int
Name string
}
type ProductRes struct {
ID int
Name string
Order int
Price int
}
同時為了更好的展示效果,定義了一個商品類Product,和一個商品ProductRes結果類用於整合資料。
用分頁查詢出來的資料整批再去其他多個資料來源分別讀取資料,比如用商品的id,去獲取商品的成交量order以及商品的售價。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 商品類定義 與 waitgroup定義省略
func main() {
var data []*Product
// 模擬從分頁查詢出來的商品
for i := 1; i <= 20; i++ {
data = append(data, &Product{
i,
fmt.Sprintf("%d th-Product", i),
})
}
// 用於儲存商品id和商品單價以及商品訂單關係
productPriceMp := make(map[int]int, 5)
productOrderMp := make(map[int]int, 5)
wg := WaitGroup()
wg.NewGoroutineStart(func() {
//模擬讀取資料來源1資料
fmt.Println("開始讀取資料來源1資料")
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
for i := 0; i < len(data); i++ {
productPriceMp[data[i].ID] = rand.Intn(100)
}
fmt.Println("資料來源1資料獲取成功")
})
wg.NewGoroutineStart(func() {
//模擬讀取資料來源2資料
fmt.Println("開始讀取資料來源2資料")
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
for i := 0; i < len(data); i++ {
productOrderMp[data[i].ID] = rand.Intn(100)
}
fmt.Println("資料來源2資料獲取成功")
})
// 阻塞等待
wg.Wait()
fmt.Println("資料同步讀取完成,開始合併")
var res []*ProductRes
for _, product := range data {
res = append(res, &ProductRes{
product.ID,
product.Name,
productOrderMp[product.ID],
productPriceMp[product.ID],
})
}
// 展示商品資料
for _, productRes := range res {
fmt.Println(productRes)
}
}
說明:
來看下實際的執行結果
可以看到同時從兩個資料來源獲取資料,並且整合成功,列印出來。
以上是所有分頁查詢得到的資料去不同資料來源二次獲取資料,下面這種情況更加常見,對於第一次分頁查詢的資料很多,要分批再去做其他資料來源查詢。因此就涉及到多個協程之間去讀取統一切片,也就是並行資料讀取問題。首先來看個錯誤demo。
func main() {
var data []*Product
for i := 1; i <= 1000; i++ {
data = append(data, &Product{
i,
fmt.Sprintf("%d th-Product", i),
})
}
wg := WaitGroup()
var newData []*Product
var startInd, endInd = 0, 0
for i := 0; i < len(data); i += 100 {
if i+100 < len(data) {
startInd, endInd = i, i+100
} else {
startInd, endInd = i, len(data)
}
newData = data[startInd:endInd]
wg.NewGoroutineStart(func() {
fmt.Println("開始分批去其他資料來源獲取")
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println(newData[0])
})
}
// 阻塞等待
wg.Wait()
}
說明:
來猜一下紀錄檔會輸出什麼
問題:從紀錄檔列印可以看出,雖然開啟了10個go程,並且按照邏輯應該是每個go分別用的是從1-100,101-200.....分批次的資料,而實際是所有的go程式用的都是同一批資料也就是最後一批從901下標開始的資料。也就是說出現了多個執行單元使用的都是同一批資料。
原因就在於:newData作為一個切片是參照型別,而由於newData但是預先定義在for迴圈之外的,所有的goroutine用的同一個newData指標,也因此都是同一批資料。
鑑於上述原因,可以在每次迴圈內部重新定義一個新的newData作為協程的傳入,這樣就不會有資料衝突的問題發生。作如下修改
wg := WaitGroup()
var startInd, endInd = 0, 0
for i := 0; i < len(data); i += 100 {
if i+100 < len(data) {
startInd, endInd = i, i+100
} else {
startInd, endInd = i, len(data)
}
// 新定義newData作為部分資料切片,傳入goroutine
newData := data[startInd:endInd]
wg.NewGoroutineStart(func() {
fmt.Println("開始分批去其他資料來源獲取")
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println(newData[0])
})
}
以上是基於已經封裝好的waitgroup的使用,因為封裝的時候是無引數的,如果使用go func的方式開啟,也可傳入引數避免以上問題。
接下來再來看一個demo,會有類似的問題。
for i := 0; i <= 9; i++ {
go func() {
time.Sleep(3 * time.Second)
fmt.Println("i =", i)
}()
}
time.Sleep(5 * time.Second)
說明:
來看下結果
結果列印所有的i值都為10
原因是:同上面的多協程讀取統一資料類似的,協程內讀取到的是變數的指標,也就是統一值,而最後i的值變為10for迴圈才會結束,因此所有的協程內部紀錄檔列印的也都是10。
2.2中實現了巨量資料量中不同批次的資料二次查詢,如果有一個全域性指標,比如商品的總成交量,而每個商品的成交量是開啟多個goroutine二次查詢的,二次查詢之後還要做規則計算。也就是涉及到並行資料寫入的問題。
func main() {
var data []*Product
for i := 1; i <= 1000; i++ {
data = append(data, &Product{
i,
fmt.Sprintf("%d th-Product", i),
})
}
wg := WaitGroup()
dataLock := sync.Mutex{}
var totalOrder int
var startInd, endInd = 0, 0
for i := 0; i < len(data); i += 100 {
if i+100 < len(data) {
startInd, endInd = i, i+100
} else {
startInd, endInd = i, len(data)
}
newData := data[startInd:endInd]
wg.NewGoroutineStart(func() {
fmt.Println("開始分批去其他資料來源獲取")
time.Sleep(time.Second * time.Duration(rand.Intn(3)))
var newDataOrder []int
for _, pro := range newData {
newDataOrder = append(newDataOrder, rand.Intn(10)*(pro.ID+1))
}
dataLock.Lock()
defer dataLock.Unlock()
for _, order := range newDataOrder {
totalOrder += order
}
})
}
// 阻塞等待
wg.Wait()
fmt.Println("總成交量:", totalOrder)
}
說明:
執行結果如下
雖然從底表分頁查詢,並且多資料來源也合併到資料,有時候對於一些後臺運營,是希望將這些資料以表格的形式匯出。
這裡針對excel表格下載,可以使用開源工具
https://github.com/360EntSecGroup-Skylar/excelize
基本的包括
伺服器端資料以表格格式返回二進位制格式表格資料,如果資料量很大的話,肯定需要做一些限制。
但是兩種限制條件都不夠靈活,根據日期和記錄數做限制無法適用於所有的使用者,對於伺服器端我們希望的是對下載頻率的限制而不僅僅是對下載資料量的限制。通用的方法可以在後臺設定一把鎖,這個鎖是有失效時間,比如3秒,每傳送一次下載請求就新增一把鎖,使得規定時間限制之內的請求都無法獲取到鎖,也因此無法下載。
這個鎖可以用redis中的setnx實現,setnx的意思就是指定的 key 不存在時,為 key 設定指定的值。
邏輯可以寫成如下
lock := redis.setnx(key, value)
if !lock {
// 上鎖失敗,流程終止
return
}
// 設定失效時間
redis.expire(key, 3)
// 以下下載流程
download
說明
以上邏輯可以完成功能,但是在極端情況下確實可能出現問題,就是setnx獲取鎖和expire不是原子性操作,假設有一極端情況,一個請求發進來setnx獲取到鎖,還沒來得及執行expire設定鎖的過期時間,服務就宕機了,那是不是鎖永遠不會到失效時間而永遠存在?使用者也就無法再進行下載,這個問題可以使用set命令解決,我們先來看一下這個命令的語法
SET key value [EX seconds] [PX milliseconds] [NX|XX]
從 Redis 2.6.12 版本開始, SET 命令的行為可以通過一系列引數來修改:
那麼在邏輯中,用set代替setnx如下
lock := redis.set(key,value,"NX","EX",3)
if !lock {
// 上鎖失敗,流程終止
return
}
// 以下為下載流程
download
也就是當redis中key不存在的時候,才能設定鎖成功,如果設定成功,則設定失效時間為3s。
問題:有的場景下set可以作為分散式鎖實現,多使用者之間共用資源的問題,那麼這裡怎麼解決鎖被誤刪問題?
答:這裡對於下載請求的時候,首先是隻對登入過的使用者開放,設定鎖的時候是根據使用者的id進行設定的key,不同的使用者設定的也就是不同的key,因此不存在誤刪的問題。