場景之多資料來源查詢及資料下載問題

2022-07-31 06:01:18

前言:本文將介紹常用後臺功能中的資料獲取以及下載的一些注意事項和實現。

承接上文資料分頁查詢
當通過分頁查詢到資料之後,接著還會遇到其他需求:

  • 繼續其他資料來源查詢:分頁查詢到的資料並非全部需要的資料,這個時候主要欄位查出來了,需要去其他表或者其他服務呼叫再去獲取資訊。
  • 資料獲取整合之後進行下載

一、繼續查詢

1、需求

比如根據資料庫查詢出來商品的id,商品名等主要資訊。但是要去查詢商品的購買量以及原始售價,這些與商品基本資訊不太相關的資料資訊,通常考慮到業務資料量,系統中已經進行了分庫或者分表,甚至一些成交資料是維護在數倉等其他業務服務中。

商品id | 商品名 | 商品主圖 | 商品售價 | 商品購買量 | 商品售價 | .....

這個時候如果順序的再去執行第三方服務和去查其他庫,顯然會導致請求的時延很大。既然商品基本資訊已經批次查詢出來了,並且購買量和原始售價分數不同的資料儲存,就可以開執行緒或者協程去一塊查詢其他資料,最後合併彙總。或者本身第一次查詢的資料量,這個時候可以分批次的去查第三方服務資料。
因此分為幾種情況

  • (1)同一批資料再去分別查詢不同的資料來源
  • (2)去查詢相同的資料來源,但是原始資料太大,需要分批查詢
  • (3)對於從不同資料來源二次查詢到的資料,需要作進一步規則計算

下面來看下對於不同的情況的做法以及go語言實現和一些注意事項。

2、實現

2.1 封裝waitgroup和業務物件

首先為了通用性,對sync.WaitGroup做一下簡單封裝,方便協程使用。

type waitGroup struct {
	wg *sync.WaitGroup
}

func WaitGroup() *waitGroup {
	return &waitGroup{&sync.WaitGroup{}}
}

func (wg *waitGroup) NewGoroutineStart(f func()) {
	wg.wg.Add(1)
	go func() {
		defer func() {
			if err := recover(); err != nil {
                		// 為了保證系統安全通常要做異常捕獲,保證goroutine是有recover
				fmt.Println("紀錄檔列印Goroutine處理失敗")
			}
			wg.wg.Done()
		}()
		f()
	}()
}

func (wg *waitGroup) Wait() {
	wg.wg.Wait()
}

這裡通常會對協程呼叫中可能出現的panic,進行異常捕獲recover,為了保證系統的安全執行。

type Product struct {
	ID   int
	Name string
}

type ProductRes struct {
	ID    int
	Name  string
	Order int
	Price int
}

同時為了更好的展示效果,定義了一個商品類Product,和一個商品ProductRes結果類用於整合資料。

2.2 第一種情況

用分頁查詢出來的資料整批再去其他多個資料來源分別讀取資料,比如用商品的id,去獲取商品的成交量order以及商品的售價。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// 商品類定義 與 waitgroup定義省略

func main() {

	var data []*Product
        // 模擬從分頁查詢出來的商品
	for i := 1; i <= 20; i++ {
		data = append(data, &Product{
			i,
			fmt.Sprintf("%d th-Product", i),
		})
	}

        // 用於儲存商品id和商品單價以及商品訂單關係
	productPriceMp := make(map[int]int, 5)
	productOrderMp := make(map[int]int, 5)

	wg := WaitGroup()
	wg.NewGoroutineStart(func() {
		//模擬讀取資料來源1資料
		fmt.Println("開始讀取資料來源1資料")
		time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
		for i := 0; i < len(data); i++ {
			productPriceMp[data[i].ID] = rand.Intn(100)
		}
                fmt.Println("資料來源1資料獲取成功")
	})

	wg.NewGoroutineStart(func() {
		//模擬讀取資料來源2資料
		fmt.Println("開始讀取資料來源2資料")
		time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
		for i := 0; i < len(data); i++ {
			productOrderMp[data[i].ID] = rand.Intn(100)
		}
                fmt.Println("資料來源2資料獲取成功")
	})
	// 阻塞等待
	wg.Wait()

	fmt.Println("資料同步讀取完成,開始合併")
	var res []*ProductRes
	for _, product := range data {
		res = append(res, &ProductRes{
			product.ID,
			product.Name,
			productOrderMp[product.ID],
			productPriceMp[product.ID],
		})
	}

        // 展示商品資料
	for _, productRes := range res {
		fmt.Println(productRes)
	}
}

說明:

  • 這裡首先模擬生成20條分頁查詢得到的商品記錄,存放在data切片中
  • 定義兩個map分別儲存商品與訂單,商品與售價之間的關係,每個map設定為10,是一個預估容量,一般為了防止map在實際增加資料的時候容量上成倍擴增,可以根據情況初始化一個比較小的值。
  • 然後用封裝好得waitgroup開啟兩個協程去不同的資料來源獲取資料,這裡我們用sleep以及rand來模擬
  • 最後將整合好的結果放在res切片中,一般到這裡在框架中的servers層,邏輯就處理完了,將res就可作為返回結果

來看下實際的執行結果

可以看到同時從兩個資料來源獲取資料,並且整合成功,列印出來。

2.2 第二種情況

以上是所有分頁查詢得到的資料去不同資料來源二次獲取資料,下面這種情況更加常見,對於第一次分頁查詢的資料很多,要分批再去做其他資料來源查詢。因此就涉及到多個協程之間去讀取統一切片,也就是並行資料讀取問題。首先來看個錯誤demo。

func main() {
	var data []*Product
	for i := 1; i <= 1000; i++ {
		data = append(data, &Product{
			i,
			fmt.Sprintf("%d th-Product", i),
		})
	}

	wg := WaitGroup()
	var newData []*Product
	var startInd, endInd = 0, 0
	for i := 0; i < len(data); i += 100 {
		if i+100 < len(data) {
			startInd, endInd = i, i+100
		} else {
			startInd, endInd = i, len(data)
		}
		newData = data[startInd:endInd]
		wg.NewGoroutineStart(func() {
			fmt.Println("開始分批去其他資料來源獲取")
			time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
			fmt.Println(newData[0])
		})
	}
	// 阻塞等待
	wg.Wait()
}

說明:

  • 用data切片模擬第一次查詢出來的很多資料記錄
  • 為了分批傳入go程,預先定義了一個newData切片,
  • 用封裝好得waitgroup開啟go程,每100條記錄,新開一個go程用time和rand模擬新資料來源獲取
  • fmt.PrintLn(newData[0])來作為紀錄檔,列印goroutine處理的資料

來猜一下紀錄檔會輸出什麼

問題:從紀錄檔列印可以看出,雖然開啟了10個go程,並且按照邏輯應該是每個go分別用的是從1-100,101-200.....分批次的資料,而實際是所有的go程式用的都是同一批資料也就是最後一批從901下標開始的資料。也就是說出現了多個執行單元使用的都是同一批資料。
原因就在於:newData作為一個切片是參照型別,而由於newData但是預先定義在for迴圈之外的,所有的goroutine用的同一個newData指標,也因此都是同一批資料。
鑑於上述原因,可以在每次迴圈內部重新定義一個新的newData作為協程的傳入,這樣就不會有資料衝突的問題發生。作如下修改

	wg := WaitGroup()
	var startInd, endInd = 0, 0
	for i := 0; i < len(data); i += 100 {
		if i+100 < len(data) {
			startInd, endInd = i, i+100
		} else {
			startInd, endInd = i, len(data)
		}
                // 新定義newData作為部分資料切片,傳入goroutine
		newData := data[startInd:endInd]
		wg.NewGoroutineStart(func() {
			fmt.Println("開始分批去其他資料來源獲取")
			time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
			fmt.Println(newData[0])
		})
	}

以上是基於已經封裝好的waitgroup的使用,因為封裝的時候是無引數的,如果使用go func的方式開啟,也可傳入引數避免以上問題。
接下來再來看一個demo,會有類似的問題。

	for i := 0; i <= 9; i++ {
		go func() {
			time.Sleep(3 * time.Second)
			fmt.Println("i =", i)
		}()
	}

	time.Sleep(5 * time.Second)

說明:

  • 連續開啟10個協程執行不同的任務,fmt.Println模擬紀錄檔,列印處理的資料
  • 用time.sleep阻塞等待所有的協程執行完畢

來看下結果

結果列印所有的i值都為10
原因是:同上面的多協程讀取統一資料類似的,協程內讀取到的是變數的指標,也就是統一值,而最後i的值變為10for迴圈才會結束,因此所有的協程內部紀錄檔列印的也都是10。

2.3 二次查詢資料重新計算

2.2中實現了巨量資料量中不同批次的資料二次查詢,如果有一個全域性指標,比如商品的總成交量,而每個商品的成交量是開啟多個goroutine二次查詢的,二次查詢之後還要做規則計算。也就是涉及到並行資料寫入的問題。

func main() {

	var data []*Product
	for i := 1; i <= 1000; i++ {
		data = append(data, &Product{
			i,
			fmt.Sprintf("%d th-Product", i),
		})
	}

	wg := WaitGroup()
	dataLock := sync.Mutex{}

	var totalOrder int
	var startInd, endInd = 0, 0
	for i := 0; i < len(data); i += 100 {
		if i+100 < len(data) {
			startInd, endInd = i, i+100
		} else {
			startInd, endInd = i, len(data)
		}
		newData := data[startInd:endInd]
		wg.NewGoroutineStart(func() {
			fmt.Println("開始分批去其他資料來源獲取")
			time.Sleep(time.Second * time.Duration(rand.Intn(3)))
			var newDataOrder []int
			for _, pro := range newData {
				newDataOrder = append(newDataOrder, rand.Intn(10)*(pro.ID+1))
			}

			dataLock.Lock()
			defer dataLock.Unlock()
			for _, order := range newDataOrder {
				totalOrder += order
			}
		})
	}
	// 阻塞等待
	wg.Wait()
	fmt.Println("總成交量:", totalOrder)
}

說明:

  • 用data切片模擬第一次查詢出來的很多資料記錄,總共1000條
  • 為了分批傳入go程,為了避免goroutine讀取資料問題,每個for迴圈開啟goroutine之前預先定義一個新的newData切片
  • 用封裝好得waitgroup開啟go程,為了保證資料同步的問題,預先互斥鎖mutex
  • 用time和rand模擬從其他資料來源獲取訂單資料
  • rand.Intn(10)*(pro.ID+1)表示用商品id+1乘以一個亂數模擬商品對應的訂單量
  • 用互斥鎖控制並行流程下資料totalOrder的寫入,並且使用defer保證了每次goroutine釋放鎖

執行結果如下

二、資料下載

雖然從底表分頁查詢,並且多資料來源也合併到資料,有時候對於一些後臺運營,是希望將這些資料以表格的形式匯出。

1、資料匯出實現

這裡針對excel表格下載,可以使用開源工具
https://github.com/360EntSecGroup-Skylar/excelize
基本的包括

  • excel格式定義
  • 設定表名
  • 設定第一行表頭
  • 資料寫入寫入檔案
    具體的步驟指南,不再做贅述,可以直接github檢視。

2、資料下載限制

伺服器端資料以表格格式返回二進位制格式表格資料,如果資料量很大的話,肯定需要做一些限制。

  • 根據日期限制最近幾個月生成的記錄,入參的時候做校驗。
  • 對於下載資料的記錄條數做限制。

但是兩種限制條件都不夠靈活,根據日期和記錄數做限制無法適用於所有的使用者,對於伺服器端我們希望的是對下載頻率的限制而不僅僅是對下載資料量的限制。通用的方法可以在後臺設定一把鎖,這個鎖是有失效時間,比如3秒,每傳送一次下載請求就新增一把鎖,使得規定時間限制之內的請求都無法獲取到鎖,也因此無法下載。

這個鎖可以用redis中的setnx實現,setnx的意思就是指定的 key 不存在時,為 key 設定指定的值。
邏輯可以寫成如下

lock := redis.setnx(key, value)
if !lock {
    // 上鎖失敗,流程終止
    return
}
// 設定失效時間
redis.expire(key, 3)
// 以下下載流程
download

說明

  • 首先用setnx設定一把鎖,key在業務中可以用專案名+使用者id來實現,比如"project-product-download:user_id",因為對於下載操作來說,肯定是隻暴露給已經認證登入的使用者,value的設定可以不做限制
  • 如果lock==1,說明設定成功,否則設定上鎖失敗,退出
  • 為了控制下載頻率,用expire對key設定有效時間為3s,3s之內進來的下載請求都會獲取鎖失敗。

以上邏輯可以完成功能,但是在極端情況下確實可能出現問題,就是setnx獲取鎖和expire不是原子性操作,假設有一極端情況,一個請求發進來setnx獲取到鎖,還沒來得及執行expire設定鎖的過期時間,服務就宕機了,那是不是鎖永遠不會到失效時間而永遠存在?使用者也就無法再進行下載,這個問題可以使用set命令解決,我們先來看一下這個命令的語法

SET key value [EX seconds] [PX milliseconds] [NX|XX]

從 Redis 2.6.12 版本開始, SET 命令的行為可以通過一系列引數來修改:

  • EX seconds : 將鍵的過期時間設定為 seconds 秒。 執行 SET key value EX seconds 的效果等同於執行 SETEX key seconds value 。
  • PX milliseconds : 將鍵的過期時間設定為 milliseconds 毫秒。 執行 SET key value PX milliseconds 的效果等同於執行 PSETEX key milliseconds value 。
  • NX : 只在鍵不存在時, 才對鍵進行設定操作。 執行 SET key value NX 的效果等同於執行 SETNX key value 。
  • XX : 只在鍵已經存在時, 才對鍵進行設定操作。

那麼在邏輯中,用set代替setnx如下

lock := redis.set(key,value,"NX","EX",3)
if !lock {
    // 上鎖失敗,流程終止
    return
}
// 以下為下載流程
download

也就是當redis中key不存在的時候,才能設定鎖成功,如果設定成功,則設定失效時間為3s。

問題:有的場景下set可以作為分散式鎖實現,多使用者之間共用資源的問題,那麼這裡怎麼解決鎖被誤刪問題?
答:這裡對於下載請求的時候,首先是隻對登入過的使用者開放,設定鎖的時候是根據使用者的id進行設定的key,不同的使用者設定的也就是不同的key,因此不存在誤刪的問題。

三、總結

  • 首先,在上篇文中通用的管理功能分頁查詢中,介紹了sql語句的實現和可能的查詢優化方案。
  • 在本文中,對分頁首次查詢獲取到資料之後,接著進行二次查詢的情況進行了介紹,涉及到了對鎖的使用和waitgroup的封裝,並行讀資料和並行寫資料的操作和一些注意事項。
  • 接著對於資料整合之後的資料下載需求,限制使用者操作頻率,通過使用redis鎖給出了實現。