Go語言爬取圖片小程式

2020-07-16 10:04:51
在本節中,我們的主要任務是使用網路爬蟲框架編寫一個可以下載目標網站中連結圖片的爬蟲程式。在這個過程中,我們會發現網路爬蟲框架的一些不足,並繼續為之添磚加瓦。這是一種反哺。在軟體開發的過程中,總是應該盡早地為程式編寫使用範例(測試程式也可以視為使用範例,而且能達到一舉多得的效果),並以此來檢查和驗證我們的程式。

概述

現在網際網路中有不少便捷工具可以自動下載(或者說爬取)小說網站的小說、圖片網站的圖片或視訊網站的視訊,這些工具有的以命令方式提供,有的有自己的圖形化使用者介面。下面就帶領大家編寫一個這樣的簡單工具,以起到拋磚引玉的作用。

把這個可以爬取圖片的小程式命名為 finder,並把它的程式碼放置在範例專案的 gopcp.v2/chapter6/webcrawler/examples/finder 程式碼包及其子包中。它以命令的方式為使用者提供功能。大家可以從我的網路硬碟中下載該程式碼包(連結:https://pan.baidu.com/s/1yzWHnK1t2jLDIcTPFMLPCA 提取碼:slm5)。

命令引數

命令引數是指使用者在使用 finder 命令時可以提供的引數。在 Go語言中,這類引數稱為 flag。通過 Go 標準庫中的 flag 程式碼包,可以讀取和解析這類引數。

finder 可以自動完成很多事情。但是,使用者還需要告知它一些必備的引數,包括:首次請求的 URL、目標 URL 的範圍限定(廣度和深度),以及爬取來的圖片檔案存放的目錄。這些必備引數的給定就需要通過 flag 來實現。

為了讓 finder 成為一個開箱即用的命令,這裡為每一個命令引數都提供了預設值。請看下面的程式碼:
//命令引數
var (
    firstURL string
    domains string
    depth uint
    dirPath string
),
func init() {
    flag.StringVar(&firstURL, "first", "http://zhihu.sogou.com/zhihu?query=golang+logo",
        "The first URL which you want to access.")
    flag.StringVar(&domains, "domains", "zhihu.com",
        "The primary domains which you accepted. "+
            "Please using comma-separated multiple domains.")
    flag.UintVar(&depth, "depth", 3,
        "The depth for crawling.")
    flag.StringVar(&dirPath, "dir", "./pictures",
        "The path which you want to save the image files.")
}
func Usage() {
    fmt.Fprintf(os.Stderr, "Usage of %s:n", os.Args[0])
    fmt.Fprintf(os.Stderr, "tfinder [flags] n")
    fmt.Fprintf(os.Stderr, "Flags:n")
    flag.PrintDefaults()
}
這些程式碼以及實現主流程的程式碼都包含在 finder 的主檔案 finder.go 中。

flag 包為了讓我們方便地定義命令引數,提供了很多函數,諸如上面的程式碼呼叫的函數 flag.StringVar 和 flag.UintVar。針對不同基本型別的引數值,flag 幾乎都有一個函數與之相對應。以對 flag.StringVar 函數的第一次呼叫為例,呼叫引數的值依次是存放首次請求 URL 的變數 firstURL 的指標、命令引數的名稱 first、first 的預設值以及 first 的文字說明。

這裡宣告一下,命令引數 first 的預設值並不要針對某些網站,而只是我發現的一個比較容易存取的 URL。該預設 URL 所屬的網站不需要登入,而且在其連結的頁面的原始碼中也比較容易找到圖片。也許它並不是你所知道的最適合的首次請求 URL,但這僅僅是個預設值而已。

我們需要在 main 函數的開始處呼叫 flag.Parse 函數。只有這樣,才能讓 firstURL 等變數與 finder 使用者給定的命令引數值係結。另外,為了讓 finder 命令更加友好,需要把上面的 Usage 函數賦給 flag.Usage 變數,以便使用者在敲入 finder --help 和回車之後能看到命令使用提示資訊。這個提示資訊會包含上面那個 init 函數中宣告的內容。

初始化排程器

我們通過操控排程器來使用網路爬蟲框架提供的各種功能。在初始化排程器之前,我們需要先準備好 3 類引數:請求相關引數、資料相關引數和元件相關引數。其中,請求相關引數可以由命令引數直接給定,資料相關引數也可以由我們評估後給出。至於元件相關引數,我們會利用網路爬蟲框架提供的各類元件的預設實現去建立。

很顯然,分析器處理資料的速度肯定是最快的。其次是條目處理管道和下載器。分析器只會利用 CPU 和記憶體資源做一些計算,條目處理管道會把圖片檔案儲存到計算機的檔案系統中,而下載器則需要通過網路存取和下載內容。

雖然網路 I/O 的速度可能會超過磁碟 I/O 的速度,但是條目處理管道需要處理的資料總是最少的,所以我們可以把條目緩衝池的容量設定得更小。

為了簡單直觀,我們暫時認為,分析器處理資料的總耗時是條目處理管道的 10%,同時是下載器的 1%。注意,這只是一個粗略估計的結果。可以在使用 finder 的時候根據實際情況進行調整。至此,我們可以像下面這樣組裝一個 DataArgs 型別的值,其中的識別符號 sched 代表程式碼包 gopcp.v2/chapter6/webcrawler/scheduler:
dataArgs := sched.DataArgs{
    ReqBufferCap:            50,
    ReqMaxBufferNumber:      1000,
    RespBufferCap:           50,
    RespMaxBufferNumber:     10,
    ItemBufferCap:           50,
    ItemMaxBufferNumber:     100,
    ErrorBufferCap:          50,
    ErrorMaxBufferNumber:    1,
}
或許可以依據這些排程器引數為 finder 新增一些命令引數,以便讓該程式更加靈活。

相比之下,元件相關引數的建立是最煩瑣的。為了給出下載器、分析器和條目處理管道的範例列表,我們需要分別自定義 HTTP 用戶端、HTTP 響應解析函數和條目處理常式。這部分程式碼在 gopcp.v2/chapter6/webcrawler/examples/finder/internal 程式碼包中,並在 finder 的主程式中為該包起個別名——lib。

之前說過,net/http 包中的 Client 型別是做 HTTP 用戶端程式的必選,並且有很多可客製化的地方。它有一個公開的欄位 Transport,是 http.RoundTripper 介面型別的,用於實施對單個 HTTP 請求的處理並輸出 HTTP 響應。我們可以不對它進行設定,而讓程式自動使用由變數 http.DefaultTransport 代表的預設值。實際上,你可以從 http.Default-Transport 的宣告中看到自定義 Transport 欄位的方法。

下面是為生成 HTTP 用戶端而宣告的函數:
//用於生成HTTP用戶端
fund genHTTPClient() *http.Client {
    return &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyFromEnvironment,
            DialContext: (&net?Dialer{
                Timeout:   30 * time.Second,
                KeepAlive: 30 * time.Second,
                DualStack: true,
            }).DialContext,
            MaxIdleConns:          100,
            MaxIdleConnsPerHost:   5,
            IdleConnTimeout:       60  *  time.Second,
            TLSHandshakeTimeout:   10  *  time.Second,
            ExpectContinueTimeout: 1  *  time.Second,
        },
    }
}
http.Transport 結構體型別實現了 http.RoundTripper 介面,其中也有很多公開的欄位供我們設定。我想在這裡解釋的是 MaxIdleConns、MaxIdleConnsPerHost 和 IdleConnTimeout,它們與我們要爬取的目標網站以及對目標 URL 的範圍限定有很大關係。MaxIdleConns 是對空閒連結的最大數量進行設定。空閒的連結可以理解為已經沒有資料在傳輸但是還未斷開的連結。

MaxIdleConns 限制的是通過該 HTTP 用戶端存取的所有域名和 IP 地址的空閒連結總量。而 MaxIdleConnsPerHost 就不同了,它限制的是針對某一個域名或 IP 地址的空閒連結最大數量。

對於這兩個欄位的值,一般需要聯動地設定。當然,可能無法預知目標網站的二、三級域名有多少個,以及在爬取過程中會以怎樣的頻率存取到哪些域名。這顯然也需要一個調優的過程。

IdleConnTimeout 欄位的含義是指定空閒連結的生存時間。如果說 MaxIdleConns 和 MaxIdleConnsPerHost 設定的是什麼情況下應該關閉更多的空閒連結的話,那麼 IdleConnTimeout 設定的就是什麼時候應該進一步減少現有的空閒連結。

HTTP 響應解析函數用於解析 HTTP 響應並試圖找到出新的請求和條目。在 finder 中,我們需要兩個這樣的函數,一個用於查詢新請求,另一個用於查詢新條目,這裡所說的條目即圖片。

為了解析 HTML 格式的 HTTP 響應體,我們需要引入一個第三方程式碼包:github.com/PuerkitoBio/goquery,後面將其簡稱為 goquery。

類似地,宣告了一個 genResponseParsers 函數用於返回 HTTP 響應解析函數的列表:
//用於生成響應解析器
func genResponseParsers() [[module.PaiseResponse {
    parseLink := func(httpResp *http.Response, respDepth uint32) ([]module.Data, []error) {
        //省略部分程式碼
    }
    parselmg := func(httpResp *http.Response, respDepth uint32) ([[module.Data, []error) {
        //省略部分程式碼
        //生成條目
        item := make(map[string]interface{})
        item["reader"] = httpRespBody
        item["name"] = path.Base(reqURL.Path)
        item["ext"] = pictureFomat
        dataList = append(dataList, module.Item(item))
        return dataList, nil
    }
    return []module.ParseResponse{parseLink, parselmg}
}
可以看到, genResponseParsers 函數返回的列表中有兩個 HTTP 響應解析函數 parseLink 和 parseImg。parseLink 函數會利用 goquery 在 HTML 格式的 HTTP 響應體中查詢新的請求。具體流程如下。

1) 檢查響應。檢查響應本身以及後面會用到的各個部分的有效性。如果無效,就忽略後面的步驟,並直接返回空的資料列表和包含相應錯誤值的錯誤列表。

2) 檢查 HTTP 響應頭中的內容型別。檢查 HTTP 響應的 Header 部分中的 Content-Type 的值。如果它不以 text/html 開頭,就忽略後面的步驟,並直接返回空的資料列表和 nil。

3) 解析 HTTP 響應體。在響應體中查詢 a 標籤,並提取它的 href 屬性的值。如果該值是一個 URL,就將其封裝成請求並追加到資料列表。再在響應體中查詢 img 標籤,並提取它的 src 屬性的值。如果該值是一個 URL,就將其封裝成請求並追加到資料列表。如果在解析過程中發生錯誤,就把錯誤值追加到錯誤列表,最後返回資料列表和錯誤列表。

parseImg 函數也會先檢查響應和 HTTP 響應頭中的內容型別。不過,它只會繼續處理內容型別以 image 開頭的 HTTP 響應。一旦可以繼續處理,就說明 HTTP 響應體的內容是一個圖片的位元組序列,這時就可以生成條目了。

如前面的程式碼所示,在建立一個條目(實際上是一個字典)之後,會分別依據 HTTP 響應體、圖片主檔名以及圖片擴充套件名生成鍵 - 元素對並放入條目值。然後,把該條目追加到資料列表並返回。

相應地,在條目處理常式中,會根據條目中的這 3 個鍵 - 元素對在指定的目錄中建立一個圖片檔案。用於生成條目處理常式的 genltemProcessors 函數的宣告如下:
//用於生成條目處理器
func genItemProcessors(dirPath string) []module.ProcessItem {
    savePicture := func(item module.Item) (resuIt module.Item, err error) {
        //省略部分程式碼
        //生成新的條目
        result = make(map[string]interface{})
        for k, v := range item {
            result[k] = v
        }
        result["file_path"] = filePath
        fileInfo, err := file?Stat()
        if err != nil {
            return nil, err
        }
        result["file_size"] = fileInfo.Size()
        return result, nil
    }
    recordPicture := func(item module.Item) (result module.Item, err error) {
        //省略部分程式碼
    }
    return []module.ProcessItem{savePicture, recordPicture}
}
該函數返回的列表中有兩個條目處理常式 savePicture 和 recordPicture。savePicture 函數用於儲存圖片檔案。值得一提的是,在成功儲存檔案之後,savePicture 函數會生成新的條目。該條目中除了包含 savePicture 接受的那個引數條目中的所有鍵 - 元素對外,還放入了用於說明圖片檔案絕對路徑的尺寸的鍵 - 元素對。這樣,recordPicture 函數就可以把已存圖片檔案的資訊完整地記錄在紀錄檔中了。

有了 genHTTPClient,genResponseParsers 和 genItemProcessors 這 3 個函數,我們就可以著手準備各類元件範例的列表了。首先,來看看下載器列表的生成。

要建立一個下載器,光有 HTTP 用戶端是不夠的,還需要設定它的元件 ID 和元件評分計算器。不過,對於其餘兩個引數,gopcp.v2/chapter6/webcrawler/module 程式碼包已經給予了支援。

元件 ID 的生成需要元件型別、序列號和元件網路地址,這裡的元件 ID 不需要包含網路地址,因為我們要生成的元件範例與排程器處於同一進程內。為了統一生成序列號,這裡宣告了一個包級私有的全域性變數:

//元件序列號生成器
var snGen = module.NewSNGenertor(l, 0)

至於下載器的元件型別,我們可以直接用 module.TYPE_DOWNLOADERo

在 module 包中有一個極其簡單的元件評分計算函數 CalculateScoreSimple,這個函數原本是提供給測試函數使用的。不過對於 finder,我們可以直接使用這個函數,其程式碼如下:
//簡易的元件評分計算函數
func CalculateScoreSimple(counts Counts) uint64 {
    return counts.CalledCount +
        counts.AcceptedCount<<1 +
        counts.CompletedCount<<2 +
        counts.HandlingNumber<<4
}
有了上述準備,我們可以編寫一個用於生成下載器列表的函數。把這個函數命名為 GetDownloaders,並把它存在 gopcp.v2/chapter6/webcrawler/examples/finder/internal 中。注意,與前面描述的那些 internal 包的函數不同,它是公開的。該函數的程式碼如下:
//用於獲取下載器列表
func GetDownloaders(number uint8) ([]module.Downloader, error) {
    downloaders := []module.Downloader{}
    if number == 0 {
        return downloaders, nil
    }
    for i := uint8(0); i < number; i++ {
        mid, err := module.GenMID(
            module.TYPE_DOWNLOADER, snGen.Get(), nil)
        if err != nil{
            return downloaders, err
        }
        d, err := downloader.New(
            mid, genHTTPClient(), module.CalculateScoreSimple)
        if err != nil {
            return downloaders, err
        }
        downloaders = append(downloaders, d)
    }
    return downloaders, nil
}
該函數的功能其實就是根據引數值生成一定數量的下載器並返回給呼叫方,其中的識別符號 downloader 代表 gopcp.v2/chapter6/webcrawler/module/local/downloader 程式碼包。

用於生成並返回分析器列表的 GetAnalyzers 函數在流程上與 GetDownloaders 極其相似。在生成分析器的時候,它用到了 gopcp.v2/chapter6/webcrawler/module/local/analyzer 包的 New 函數和本包的 genResponseParsers 函數。

GetPipelines 函數的寫法也是類似的。不過要注意,由於我們前面編寫的條目處理常式需要把圖片存到指定目錄中,所以 GetPipelines 函數的引數除了 number 外,還有一個 string 型別的 dirPath。dirPath 指定圖片存放目錄。在呼叫 genItemProcessors 函數時,GetPipelines 函數會把 dirPath 直接傳入。

另一個需要注意的地方是,我們在生成一個條目處理管道後,還要決定它是否是快速失敗的。在這裡,如果 savePicture 函數沒能成功儲存圖片,那麼我們就沒必要再讓 recordPicture 函數去記錄紀錄檔了。因此,我們要通過呼叫條目處理管道的 SetFailFast 函數把它設定為快速失敗的。

好了,我們現在已經準備好了初始化排程器所需的所有引數。至此,對於 finder.go 中的 main 函數,我們已經可以完成大半了:
func main() {
    flag.Usage = Usage
    flag.Parse()
    //建立排程器
    scheduler := sched.NewScheduler()
    //準備排程器的初始化引數
    domainParts := strings.Split(domains,",")
    acceptedDomains := []string{}
    for _, domain := range domainParts {
        domain = strings.TrimSpace(domain)
        if domain != "" {
            acceptedDomains = append(acceptedDomains, domain)
        }
    }
    requestArgs := sched.RequestArgs{
        AcceptedDomains: acceptedDomains,
        MaxDepth:        uint32(depth),
    }
    dataArgs := sched.DataArgs{
        ReqBufferCap:       50,
        ReqMaxBufferNumber: 1000,
        RespBufferCap:      50,
        RespMaxBufferNumber:10,
        ItemBufferCap:      50,
        ItemMaxBufferNumber:100,
        ErrorBufferCap:     50,
        ErrorMaxBufferNumber:1,
    }
    downloaders, err := lib.GetDownloaders(1)
    if err != nil {
        logger.Fatalf("An error occurs when creating downloaders: %s", err)
    }
    analyzers, err := lib.GetAnalyzers(1)
    if err != nil {
        logger.Fatalf("An error occurs when creating analyzers: %s", err)
    }
    pipelines, err := lib.GetPipelines(1, dirPath)
    if err != nil {
        logger.Fatalf("An error occurs when creating pipelines: %s", err)
    }
    moduleArgs := sched.ModuleArgs{
        Downloaders: downloaders,
        Analyzers:   analyzers,
        Pipelines:   pipelines,
    }
    //初始化排程器
    err = scheduler.init(
        requestArgs,
        dataArgs,
        moduleArgs)
    if err != nil {
        logger.Fatalf("An error occurs when initializing scheduler: %s", err)
    }
    //省略部分程式碼
}
重申一下,其中的識別符號 lib 代表 gopcp.v2/chapter6/webcrawler/examples/finder/ internal 程式碼包。另外注意,logger.Fatalf 總會在列印紀錄檔之後使當前進程非正常終止。所以,一旦排程器初始化失敗,finder 就不會再做任何事了。

監控排程器

現在,我們已經可以啟動排程器了。不過,或許我們可以準備得更充分一些。任何持續執行的軟體服務都應該被監控,不論用什麼方式監控。這幾乎已經是網際網路公司的一條鐵律了。雖然 finder 只是一個可以查詢並儲存圖片的工具,但是由於它可能會為了搜遍目標網站而執行很長一段時間,所以我們還是應該提供一種監控方法。

我會把用於監控排程器的程式碼封裝在一個名為 Monitor 的函數里,並把它置於程式碼包 gopcp.v2/chapter6/webcrawler/examples/finder/monitor 中。

Monitor 函數的功能主要有如下 3 個。
  • 在適當的時候停止自身和排程器。
  • 實時監控排程器及其中的各個模組的執行狀況。
  • 一旦排程器及其模組在執行過程中發生錯誤,及時予以報告。

和其他部分一樣,這些功能可以客製化。

1) 確定引數

對於第一個功能,我們需要明確一點:只有在排程器空閒一段時間之後,才關閉它。所以,我們應該定時回圈地去呼叫排程器的 Idle 方法,以檢查它是否空閒。如果連續若干次的檢查結果均為 true,那麼就可以斷定再沒有新的資料需要處理了。

這時,關閉排程器就是安全的。這裡有兩個可以靈活掌握的環節:一個是檢查的間隔時間,另一個是 檢查結果連續為 true 的次數。只要給定了這兩個可變數,自動關閉排程器的策略就完全 確定了。我們把檢查的間隔時間與檢査結果連續為 true 的最大次數的乘積稱為最長持續 空閒時間,即:

最長持續空閒時間 = 檢查間隔時間 x 檢查結果連續為 true 的最大次數

一旦排程器空閒的時間達到了最長持續空閒時間,就可以關閉排程器了,不過這個決定應該由監控函數的使用方來做。

監控函數的第二個功能是對排程器的執行狀況進行監控。我們在前面編寫排程器以及相關模組的時候都留有摘要或統計介面,所以它實現起來並不難。這裡也存在兩個可變數:摘要獲取間隔時間和摘要記錄的方式。該函數的第三個功能也需要用到第二個可變數。

經過上述分析,我們已經可以確定排程器監控函數的簽名了:
// Monitor用於監控排程器。
//引數 scheduler 代表作為監控目標的排程器。
//引數 checkInterval 代表檢查間隔時間,單位:納秒。
//泰數 summarizeInterval 代表摘要獲取間隔時間,單位:納秒。
//引數 maxIdleCount 代表最大空閒計數。
//引數 autoStop 用來指示該方法是否在排程器空閒足夠長的時間之後自行停止排程器。
//引數 record 代表紀錄檔記錄函數。
//當監控結束之後,該方法會向作為唯一結果值的通道傳送一個代表空閒狀態檢查次數的數值
func Monitor(
    scheduler sched.Scheduler,
    checkInterval time.Duration,
    summarizeInterval time.Duration,
    maxIdleCount uint,
    autoStop bool,
    record Record) <-chan uint64
在 Monitor 函數的引數宣告列表中,record 就是使用方需要客製化的摘要的記錄方式。Record 型別的宣告如下:

// Record 代表紀錄檔記錄函數的型別。
//引數 level 代表紀錄檔級別。級別設定:0-普通;1-警告;2-錯誤
type Record func(level uint8, content string)

這個函數型別表達的含義是根據指定的紀錄檔級別對內容進行記錄。

另一方面,Monitor 函數會返回一個接收通道。在它執行結束之時,它還會向該通道傳送一個數值,這個數值表示它檢査排程器空閒狀態的實際次數。使用方可以通過這個實際次數計算出當次爬取流程的總執行時間。不過,這個通道更重要的作用是作為使用方安全關閉排程器的依據。

2) 制定監控流程

大家可能會發現,Monitor 函數的 3 個功能之間實際上並沒有交集。因此,我們可以在實現該函數的時候保持這 3 個功能的獨立性,以避免它們彼此干擾。Monitor 函數的完整宣告是這樣的:
func Monitor(
    scheduler sched.Scheduler,
    checkInterval time.Duration,
    summarizeInterval time.Duration,
    maxIdleCount uint,
    autoStop bool,
    record Record) <-chan uint64 {
    //防止排程器不可用
    if scheduler == nil {
        panic(errors.New("The scheduler is invalid!"))
    }
    //防止過小的檢查間隔時間對爬取流程造成不良影響
    if checkinterval < time.Millisecond*100 {
        checkinterval = time.Millisecond * 100
    }
    //防止過小的摘要獲取間隔時間對爬取流程造成不良影響
    if summarizeInterval < time.Second {
        summarizeInterval = time.Second
    }
    //防止過小的最大空閒計數造成排程器的過早停止
    if maxIdleCount < 10 {
        maxIdleCount = 10
    }
    logger.Infof("Monitor parameters: checkInterval: %s, summarizeInterval: %s,"+
        "maxIdleCount: %d, autoStop: %v",
        checkInterval, summarizeInterval, maxIdleCount, autoStop)
    //生成監控停止通知器
    stopNotifier, stopFunc := context.WithCancel(context.Background())
    //接收和報告錯誤
    reportError(scheduler, record, stopNotifier)
    //記錄摘要資訊
    recordsummary(scheduler, summarizeInterval, record, stopNotifier)
    //檢查計數通道
    checkCountChan := make(chan uint64, 2)
    //檢查空閒狀態
    checkStatus(scheduler,
        checkInterval,
        maxIdleCount,
        autoStop,
        checkCountChan,
        record,
        stopFunc)
    return checkCountChan
}
這裡簡要解釋一下它體現的監控流程。首先,監控函數必須對傳入函數的引數值進行檢査,其中最重要的是代表排程器範例的 scheduler。如果它為 nil,那麼這個監控流程就完全沒有執行的必要了。監控函數在發現此情況時,會把它視為一個致命的錯誤並引發一個執行時恐慌。

此外,監控函數還需要對檢查間隔時間、摘要獲取間隔時間和最大空閒計數的值進行檢查。這些值既不能是負數,也不能是過小的正數,因為過小的正數會影響到爬取流程的正常執行。所以,在這裡分別為它們設定了最小值。

讓實現那 3 個功能的程式碼並行執行。與排程器的實現類似,需要讓這些程式碼知道什麼時候需要停止。同樣,這裡使用一個可取消的 context.Context 型別值來傳遞停止信號,並由 stopNotifier 變數代表。同時,stopFunc 變數代表觸發停止信號的函數。

函數 reportError、recordSummary 和 checkStatus 分別表示 Monitor 函數需要實現的那 3 個功能,它們都會啟用一個 goroutine 來執行其中的程式碼。稍後會分別描述它們的實現細節。

對於 Monitor 函數的函數體,最後要說明的是變數 checkCountChan,它代表的就是用來傳遞檢查排程器空閒狀態的實際次數的通道。它的值會被傳入 checkStatus 函數,然後 由 Monitor 函數返回給它的呼叫方。

3) 報告錯誤

報告錯誤的功能由 reportError 函數負責實現。一旦排程器啟動,就應該通過呼叫它的 ErrorChan 方法獲取錯誤通道,並不斷地嘗試從中接收錯誤值。我們已經在前面詳述過這樣做的原因。

函數 reportError 接受 3 個引數。除了代表排程器的 scheduler 之外,還有代表紀錄檔記錄方式的 record,以及代表監控停止通知器的 stopNotifier。下面是它的完整宣告:
//用於接收和報告錯誤
func reportError(
    scheduler sched.Scheduler,
    record Record,
    stopNotifier context.Context) {
    go func() {
        //等待排程器開啟
        waitForSchedulerStart(scheduler)
        errorChan := scheduler.ErrorChan()
        for {
            //檢視監控停止通知器
            select {
            case <- stopNotifier.Done():
                return
            default:
            }
            err, ok := <- errorChan
            if ok {
                errMsg := fmt.Sprintf("Received an error from error channel: %s", err)
                record(2, errMsg)
            }
            time.Sleep(time.Microsecond)
        }
    }()
}
這個函數啟用了一個 goroutine 來執行其中的程式碼。在 go 函數中,它首先呼叫了 waitForSchedulerStart 函數。我們都知道,排程器有一個公開的方法 Status,該方法會返回一個可以表示排程器當前狀態的值。因此,該函數要做的就是不斷呼叫排程器的 Status 方法,直到該方法的結果值等於 sched.SCHED_STATUS_STARTED 為止。

當然,在對 Status 方法的多次呼叫之間,都需要有一個小小的停頓,這個停頓是通過 time.Sleep 函數實現的。這樣做是為了避免因 for 迴圈疊代得太過頻繁而可能帶來的一些問題,比如擠掉其他 goroutine 執行的機會、致使 CPU 的使用率過高,等等。這是一種保護措施,雖然那些問題不一定會發生。

go 函數中的第 2 條語句,是呼叫排程器的 ErrorChan 方法並獲取到錯誤通道。還記得 ErrorChan 方法是怎樣實現的嗎?它在每次呼叫時,都會建立一個錯誤通道,並持續從當前排程器持有的錯誤緩衝池向這個錯誤通道搬運錯誤值,直到錯誤緩衝池被關閉。正因為如此,我們不應該在 for 語句中呼叫它。

在 for 語句每次疊代開始,go 函數都會嘗試用 select 語句從 stopNotifier 獲取停止信號。一旦獲取到停止信號,它就馬上返回以結束當前流程的執行。select 語句中的 default case 意味著這個獲取操作只是嘗試一下而已。即使沒獲取到停止信號,select 語句的執行也會立即結束。

之後,go 函數就會試圖從錯誤通道那裡接收錯誤值。一旦接收到一個有效的錯誤值, 它就呼叫 record 函數記錄下這個錯誤值。注意,與嘗試獲取停止信號的方式不同,這裡的接收操作是阻塞式的。

在每次疊代的最後,go 函數也會通過呼叫 time.Sleep 函數實現一個小停頓。

4) 記錄摘要資訊

recordSummary 函數負責記錄摘要資訊,它的簽名如下:
//用於記錄摘要資訊
func recordSummary(
    scheduler sched.Scheduler,
    summarizeInterval time.Duration,
    record Record,
    stopNotifier context.Context)
可以看到,它接受 4 個引數,其中的 3 個引數也是 reportError 函數所接受的。多出的那個引數是 summarizeInterval,即摘要獲取間隔時間。

這個函數同樣啟用了一個 goroutine 來進行相關操作。與 reportError 函數相同,在一開始依然要先呼叫 waitForSchedulerStart 函數,以等待排程器完全啟動。一旦排程器已啟動,go 函數就要開始為摘要資訊的獲取、比對、組裝和記錄做準備了。這裡需要先宣告如下幾個變數:

var prevSchedSummaryStruct sched.SummaryStruct
var prevNumGoroutine int
var recordCount uint64 = 1
startTime := time.Now()

其中,變數 recordCount 和 startTime 的值會參與到最終的摘要資訊的組裝過程中去。前者代表了記錄的次數,而後者則代表開始準備記錄時的時間。在它們前面宣告的兩個變數 prevSchedSummaryStruct 和 prevNumGoroutine 的含義分別是前一次獲得的排程器摘要資訊和 goroutine 數量,它們是是否需要真正記錄當次摘要資訊的決定因素。

go 函數每次都會把當前獲取到的摘要資訊與前一次的做比對。只有確定它們不同,才會對當前的摘要資訊予以記錄,這主要是為了減少摘要資訊對其他紀錄檔的干擾。

go 函數應該在停下來之前定時且回圈地獲取和比對摘要資訊。因此,我把後面的程式碼都放到了一個 for 程式碼塊中。在每次疊代開始時,仍然需要通過 stopNotifier 檢查停止信號。如果停止信號還沒有發出,那麼就開始著手獲取摘要資訊的各個部分,即:goroutine 數量和排程器摘要資訊。goroutine 數量代表的是當前的 Go 執行時系統中活躍的 goroutine 的數量,而排程器摘要資訊則體現了排程器當前的狀態。獲取它們的方式如下:

//獲取 goroutine 數量和排程器摘要資訊
currNumGoroutine := runtime.NumGoroutine()
currSchedSummaryStruct := scheduler.Summary().Struct()

一旦得到它們,就分別把它們與變數 prevNumGoroutine 和 prevSchedSummaryStruct 的值進行比較。這裡的比較操作很簡單。變數 currNumGoroutine 及 prevNumGoroutine 都是 int 型別的,可以直接比較,而排程器摘要資訊的型別 sched.SummaryStruct 也提供了可判斷相同性的 Same 方法。如果它們兩兩相同,就不再進行後面的組裝和記錄操作了。否則,就開始組裝摘要資訊:
//比對前後兩份摘要資訊的一致性,只有不一致時才會記錄
if currNumGoroutine != prevNumGoroutine ||
    !currSchedSummaryStruct.Same(prevSchedSummaryStruct) {
    //記錄摘要資訊
    summay := summary{
        NumGoroutine: runtime.NumGoroutine(),
        SchedSummary: currSchedSummaryStruct,
        EscapedTime: time.Since(startTime).String(),
    }
    b, err := json.MarshalIndent(summay,"","    ")
    if err != nil {
        logger.Errorf("Occur error when generate scheduler summary: %sn", err)
        continue
    }
    msg := fmt.Sprintf("Monitor summary[%d]:n%s", recordCount, b)
    record(0, msg)
    prevNumGoroutine = currNumGoroutine
    prevSchedSummaryStruct = currSchedSummaryStruct
    recordCount++
}
組裝摘要資訊用到了當前包中宣告的結構體型別 summary,其定義如下:
//代表監控結果摘要的結構
type summary struct {
    // goroutine 的數量
    NumGoroutine int 'json:"goroutine_number"'
    //排程器的摘要資訊
    SchedSummary sched.SummaryStruct 'json:"sched_summary"'
    //從開始監控至今流逝的時間
    EscapedTime string 'json:"escaped_time"'
}
可以看到,該型別也為 JSON 格式的序列化做好了準備。使用 encoding/json 包中的 MarshalIndent 函數把該型別的值序列化為易讀的 JSON 格式字串,然後通過呼叫 record 函數記錄它們。

緊接著,對 prevNumGoroutine 和 pievSchedSummaryStruct 進行賦值,以便進行後續的比較操作。最後,遞增 recordCount 的值也非常必要,因為它是摘要資訊的重要組成部分。

函數 recordSummary 中的 go 函數所包含的 for 程式碼塊基本上就是如此。此外,為了讓這個 for 迴圈在疊代之間能有一個小小的停頓,把下面這條語句放在了 for 程式碼塊的最後:

time.Sleep(time.Microsecond)

實際上,與 reportError 函數相比,recordSummary 函數更有必要加上這條語句。

與錯誤的接收和報告一樣,對摘要資訊的獲取、比對、組裝和記錄也是獨立進行的。它由 Monitor 函數啟動,並會在接收到停止信號之後結束。傳送停止信號的程式碼存在於 checkstatus 函數中,因為只有它才知道什麼時候停止監控。

5) 檢查狀態

函數 checkstatus 的主要功能是定時檢查排程器是否空閒,並在它空閒持續一段時間之後停止監控和排程器。為此,該函數需要適時檢查各種計數值,並在必要時發出停止信號。checkstatus 是一個比較重要的輔助函數,職責也較多,它的簽名如下:
//用於檢查狀態,並在滿足持續空閒葉間的條件時採取必要措施
func checkstatus(
    scheduler sched.Scheduler,
    checkInterval time.Duration,
    maxIdleCount uint,
    autoStop bool,
    checkCountChan chan <- uint64,
    record Record,
    stopFunc context.CancelFunc)
其中的引數前面都介紹過。注意,引數 checkCountChan 的型別是一個傳送通道,這是為了限制 checkstatus 函數對它的操作。

checkstatus 函數也把所有程式碼都放入了 go 函數。go 函數中的第一條語句是:

var checkCount uint64

該變數代表的是檢查計數值。緊接著是一條 defer 語句:
defer func() {
    stopFunc()
    checkCountChan <- checkCount
}()
它的作用是保證在go函數執行即將結束時發出停止信號和傳送檢查計數值,這個時 機非常關鍵。這個go函數總會在排程器空閒的時間達到最長持續空閒時間時結束執行。
在等待排程器開啟之後,go函數首先要做的就是準確判定最長持續空閒時間是否到 達。為了讓這一判定有據可依,下面這個變數是必需的:

var idleCount uint

它的值將會代表監控函數連續發現排程器空閒的計數值。另外,為了記錄排程器持續空閒的時間,還需要宣告一個這樣的變數:

var firstIdleTime time.Time

注意,真實的持續空閒時間與理想的持續空閒時間(由引數 checkInterval 和 maxIdleCount 的值相乘得出的那個時間)之間肯定是有偏差的。並且,前者肯定會大於後者,因為執行判定的程式碼也是需要耗時的。

可以肯定的是,我們需要在一個 for 迴圈中進行與持續空閒時間判定有關的那些操作。由於這條 for 語句中的條件判斷比較多且複雜,所以我先貼出它們然後再進行解釋:
for {
    //檢查排程器的空閒狀態
    if scheduler.Idle() {
        idleCount++
        if idleCount == 1 {
            firstIdleTime = time.Now()
        }
        if idleCount >= maxIdleCount {
            msg :=    fmt.Sprintf(msgReachMaxIdleCount, time.Since(firstIdleTime).String())
            record(0, msg)
            //再次檢查排程器的空閒狀態,確保它已經可以停止
            if scheduler.Idle() {
                if autoStop {
                    var result string
                    if err := scheduler.Stop(); err == nil {
                        result = "success"    '
                    } else {
                        result = fmt.Sprintf("failing(%s)", err)
                    }
                    msg = fmt.Sprintf(msgStopScheduler, result)
                    record(0, msg)
                }
                break
            } else {
                if idleCount > 0 {
                    idleCount = 0
                }
            }
        }
    } else {
        if idleCount > 0 {
            idleCount = 0
        }
    }
    checkCount++
    time.Sleep(checkInterval)
}
可以看到,總是讓 checkCount 的值隨著疊代的進行而遞增,同時也會依據 checkInterval 讓每次疊代之間存在一定的時間間隔。

在此 for 程式碼塊的最開始,通過呼叫排程器的 Idle 方法來判斷它是否已經空閒。如果不是,就及時清零 idleCount 的值。因為一旦發現排程器未空閒,就要重新進行計數。

反過來講,如果發現排程器已空閒,就需要遞增 idleCount 的值。同時,如果發現重新計數剛剛開始,就會把 firstIdleTime 的值設定為當前時間。只有這樣才能在 idleCount 的值達到最大空閒計數時,根據 firstIdleTime 的值準確計算出真實的最長持續空閒時間。

在做好計數和起始時間的檢查和校正工作之後,會馬上把 idleCount 的值與最大空閒計數相比較。如果前者大於或等於後者,就可以初步判定排程器已經空閒了足夠長的時間,這時,會立刻記下一條基於模板 msgReachMaxIdleCount 生成的訊息。該模板的宣告如下:

//已達到最大空閒計數的訊息模板
var msgReachMaxIdleCount = "The scheduler has been idle for a period of time" +
    "(about %s)." + " Consider to stop it now."

這條訊息建議網路爬蟲框架的使用方關閉排程器。不過,使用方可以通過把引數 autoStop 的值設定為 true,讓排程器監控函數自動關閉排程器,這也是後面再次呼叫排程器的 Idle 方法的原因之一。

如果這裡的呼叫結果值和 autoStop 引數的值均為 true,那麼函數就幫助使用方停止排程器。如果呼叫結果值為 false,那麼 idleCount 變數的值也會被及時清零,對排程器空閒的計數將重新開始。這顯然是一種比較保守的做法,但卻可以有效地避免過早地停止排程器。

實際上,只要對 Idle 方法第二次呼叫的結果值為 true,不管 autoStop 引數的值是怎樣的,都會退出當前的 for 程式碼塊。for 程式碼塊執行結束就意味著 checkstatus 函數非同步執行結束。還記得嗎?在 checkstatus 中的 go 函數執行結束之際,它會發出停止信號,同時向通道 checkCountChan 傳送檢查計數值。

至此,已經展示和說明了 checkstatus 函數以及 Monitor 函數涉及的絕大多數程式碼。

6) 使用監控函數

有了 Monitor 函數,就可以在 finder 的 main 函數中這樣使用它來啟動對排程器的監控了:
//準備監控引數
checkInterval := time.Second
summarizeInterval := 100 * time.Millisecond
maxIdleCount := uint(5)
//開始監控
checkCountChan := monitor.Monitor(
    scheduler,
    checkInterval,
    summarizeInterval,
    maxIdleCount,
    tiue,
    lib.Record)
//省略部分程式碼
//等待監控結束
<-checkCountChan
把檢查間隔時間設定為 10 毫秒,並把最大空閒計數設定為 50 同時,讓 Monitor 函數在排程器的持續空閒時間達到最長持續空閒時間後自動關閉排程器。

呼叫 Monitor 函數的時機是在初始化排程器之後,以及啟動排程器之前。因為只有排程器被初始化過,它的大多數方法才能正常執行。另外,只有在排程器啟動之前開始監控,才能記錄下它啟動時的狀況。

給予 Monitor 函數的引數值 lib.Record 代表前面所說的紀錄檔記錄函數,它的宣告是這樣的:
//記錄紀錄檔
func Record(level byte, content string) {
    if content == "" {
        return
    }
    switch level {
        case 0:
            logger.Infoln(content)
        case 1:
            logger.Warnln(content)
        case 2:
            logger.Infoln(content)
    }
}
其中 logger 代表 gopcp.v2/helper/log/base 包下 MyLogger 型別的紀錄檔記錄器。

啟動排程器

現在,我們真的可以啟動排程器了。做了這麼多準備工作,只需區區幾行程式碼就可以啟動排程器了,這些程式碼在 finder 的 main 函數的最後面:
//準備排程器的啟動引數
firstHTTPReq, err := http.NewRequest("GET", firstURL, nil)
if err != nil {
    logger.Fatalln(err)
    return
}
//開啟排程器
err = scheduler.Start(firstHTTPReq)
if err != nil {
    logger.Fatalf("An error occurs when starting scheduler: %s", err)
}
//等待監控結束
<-checkCountChan
基於命令引數 firstURL,我們可以很容易地建立出首次請求。如果啟動排程器不成功,就記下一條嚴重錯誤級別的紀錄檔。還記得嗎?這會使當前進程非正常終止。縱觀 main 函數的程式碼你就會發現,它遇到任何錯誤都會這樣做。這是因為一旦主流程出錯,finder 就真的無法再執行下去了。

最後,為了讓主 goroutine 等待監控和排程器的停止,我們還加入了對檢查計數通道 checkCountChan 的接收操作。

到這裡,我們講述了圖片爬蟲程式 finder 涉及的幾乎所有流程的程式碼。強烈建議大家在自己的計算機上執行 finder,然後試著改變各種引數的值,再去執行它,並且多多試幾次。當然,也可以修改 finder 甚至網路爬蟲框架的程式碼。總之,不論在哪個階段,閱讀、理解、修改、試驗是學習程式設計的必經之路。