//命令引數 var ( firstURL string domains string depth uint dirPath string ), func init() { flag.StringVar(&firstURL, "first", "http://zhihu.sogou.com/zhihu?query=golang+logo", "The first URL which you want to access.") flag.StringVar(&domains, "domains", "zhihu.com", "The primary domains which you accepted. "+ "Please using comma-separated multiple domains.") flag.UintVar(&depth, "depth", 3, "The depth for crawling.") flag.StringVar(&dirPath, "dir", "./pictures", "The path which you want to save the image files.") } func Usage() { fmt.Fprintf(os.Stderr, "Usage of %s:n", os.Args[0]) fmt.Fprintf(os.Stderr, "tfinder [flags] n") fmt.Fprintf(os.Stderr, "Flags:n") flag.PrintDefaults() }這些程式碼以及實現主流程的程式碼都包含在 finder 的主檔案 finder.go 中。
dataArgs := sched.DataArgs{ ReqBufferCap: 50, ReqMaxBufferNumber: 1000, RespBufferCap: 50, RespMaxBufferNumber: 10, ItemBufferCap: 50, ItemMaxBufferNumber: 100, ErrorBufferCap: 50, ErrorMaxBufferNumber: 1, }或許可以依據這些排程器引數為 finder 新增一些命令引數,以便讓該程式更加靈活。
//用於生成HTTP用戶端 fund genHTTPClient() *http.Client { return &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net?Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, MaxIdleConns: 100, MaxIdleConnsPerHost: 5, IdleConnTimeout: 60 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, }, } }http.Transport 結構體型別實現了 http.RoundTripper 介面,其中也有很多公開的欄位供我們設定。我想在這裡解釋的是 MaxIdleConns、MaxIdleConnsPerHost 和 IdleConnTimeout,它們與我們要爬取的目標網站以及對目標 URL 的範圍限定有很大關係。MaxIdleConns 是對空閒連結的最大數量進行設定。空閒的連結可以理解為已經沒有資料在傳輸但是還未斷開的連結。
//用於生成響應解析器 func genResponseParsers() [[module.PaiseResponse { parseLink := func(httpResp *http.Response, respDepth uint32) ([]module.Data, []error) { //省略部分程式碼 } parselmg := func(httpResp *http.Response, respDepth uint32) ([[module.Data, []error) { //省略部分程式碼 //生成條目 item := make(map[string]interface{}) item["reader"] = httpRespBody item["name"] = path.Base(reqURL.Path) item["ext"] = pictureFomat dataList = append(dataList, module.Item(item)) return dataList, nil } return []module.ParseResponse{parseLink, parselmg} }可以看到, genResponseParsers 函數返回的列表中有兩個 HTTP 響應解析函數 parseLink 和 parseImg。parseLink 函數會利用 goquery 在 HTML 格式的 HTTP 響應體中查詢新的請求。具體流程如下。
//用於生成條目處理器 func genItemProcessors(dirPath string) []module.ProcessItem { savePicture := func(item module.Item) (resuIt module.Item, err error) { //省略部分程式碼 //生成新的條目 result = make(map[string]interface{}) for k, v := range item { result[k] = v } result["file_path"] = filePath fileInfo, err := file?Stat() if err != nil { return nil, err } result["file_size"] = fileInfo.Size() return result, nil } recordPicture := func(item module.Item) (result module.Item, err error) { //省略部分程式碼 } return []module.ProcessItem{savePicture, recordPicture} }該函數返回的列表中有兩個條目處理常式 savePicture 和 recordPicture。savePicture 函數用於儲存圖片檔案。值得一提的是,在成功儲存檔案之後,savePicture 函數會生成新的條目。該條目中除了包含 savePicture 接受的那個引數條目中的所有鍵 - 元素對外,還放入了用於說明圖片檔案絕對路徑的尺寸的鍵 - 元素對。這樣,recordPicture 函數就可以把已存圖片檔案的資訊完整地記錄在紀錄檔中了。
//元件序列號生成器
var snGen = module.NewSNGenertor(l, 0)
//簡易的元件評分計算函數 func CalculateScoreSimple(counts Counts) uint64 { return counts.CalledCount + counts.AcceptedCount<<1 + counts.CompletedCount<<2 + counts.HandlingNumber<<4 }有了上述準備,我們可以編寫一個用於生成下載器列表的函數。把這個函數命名為 GetDownloaders,並把它存在 gopcp.v2/chapter6/webcrawler/examples/finder/internal 中。注意,與前面描述的那些 internal 包的函數不同,它是公開的。該函數的程式碼如下:
//用於獲取下載器列表 func GetDownloaders(number uint8) ([]module.Downloader, error) { downloaders := []module.Downloader{} if number == 0 { return downloaders, nil } for i := uint8(0); i < number; i++ { mid, err := module.GenMID( module.TYPE_DOWNLOADER, snGen.Get(), nil) if err != nil{ return downloaders, err } d, err := downloader.New( mid, genHTTPClient(), module.CalculateScoreSimple) if err != nil { return downloaders, err } downloaders = append(downloaders, d) } return downloaders, nil }該函數的功能其實就是根據引數值生成一定數量的下載器並返回給呼叫方,其中的識別符號 downloader 代表 gopcp.v2/chapter6/webcrawler/module/local/downloader 程式碼包。
func main() { flag.Usage = Usage flag.Parse() //建立排程器 scheduler := sched.NewScheduler() //準備排程器的初始化引數 domainParts := strings.Split(domains,",") acceptedDomains := []string{} for _, domain := range domainParts { domain = strings.TrimSpace(domain) if domain != "" { acceptedDomains = append(acceptedDomains, domain) } } requestArgs := sched.RequestArgs{ AcceptedDomains: acceptedDomains, MaxDepth: uint32(depth), } dataArgs := sched.DataArgs{ ReqBufferCap: 50, ReqMaxBufferNumber: 1000, RespBufferCap: 50, RespMaxBufferNumber:10, ItemBufferCap: 50, ItemMaxBufferNumber:100, ErrorBufferCap: 50, ErrorMaxBufferNumber:1, } downloaders, err := lib.GetDownloaders(1) if err != nil { logger.Fatalf("An error occurs when creating downloaders: %s", err) } analyzers, err := lib.GetAnalyzers(1) if err != nil { logger.Fatalf("An error occurs when creating analyzers: %s", err) } pipelines, err := lib.GetPipelines(1, dirPath) if err != nil { logger.Fatalf("An error occurs when creating pipelines: %s", err) } moduleArgs := sched.ModuleArgs{ Downloaders: downloaders, Analyzers: analyzers, Pipelines: pipelines, } //初始化排程器 err = scheduler.init( requestArgs, dataArgs, moduleArgs) if err != nil { logger.Fatalf("An error occurs when initializing scheduler: %s", err) } //省略部分程式碼 }重申一下,其中的識別符號 lib 代表 gopcp.v2/chapter6/webcrawler/examples/finder/ internal 程式碼包。另外注意,logger.Fatalf 總會在列印紀錄檔之後使當前進程非正常終止。所以,一旦排程器初始化失敗,finder 就不會再做任何事了。
// Monitor用於監控排程器。 //引數 scheduler 代表作為監控目標的排程器。 //引數 checkInterval 代表檢查間隔時間,單位:納秒。 //泰數 summarizeInterval 代表摘要獲取間隔時間,單位:納秒。 //引數 maxIdleCount 代表最大空閒計數。 //引數 autoStop 用來指示該方法是否在排程器空閒足夠長的時間之後自行停止排程器。 //引數 record 代表紀錄檔記錄函數。 //當監控結束之後,該方法會向作為唯一結果值的通道傳送一個代表空閒狀態檢查次數的數值 func Monitor( scheduler sched.Scheduler, checkInterval time.Duration, summarizeInterval time.Duration, maxIdleCount uint, autoStop bool, record Record) <-chan uint64在 Monitor 函數的引數宣告列表中,record 就是使用方需要客製化的摘要的記錄方式。Record 型別的宣告如下:
// Record 代表紀錄檔記錄函數的型別。
//引數 level 代表紀錄檔級別。級別設定:0-普通;1-警告;2-錯誤
type Record func(level uint8, content string)
func Monitor( scheduler sched.Scheduler, checkInterval time.Duration, summarizeInterval time.Duration, maxIdleCount uint, autoStop bool, record Record) <-chan uint64 { //防止排程器不可用 if scheduler == nil { panic(errors.New("The scheduler is invalid!")) } //防止過小的檢查間隔時間對爬取流程造成不良影響 if checkinterval < time.Millisecond*100 { checkinterval = time.Millisecond * 100 } //防止過小的摘要獲取間隔時間對爬取流程造成不良影響 if summarizeInterval < time.Second { summarizeInterval = time.Second } //防止過小的最大空閒計數造成排程器的過早停止 if maxIdleCount < 10 { maxIdleCount = 10 } logger.Infof("Monitor parameters: checkInterval: %s, summarizeInterval: %s,"+ "maxIdleCount: %d, autoStop: %v", checkInterval, summarizeInterval, maxIdleCount, autoStop) //生成監控停止通知器 stopNotifier, stopFunc := context.WithCancel(context.Background()) //接收和報告錯誤 reportError(scheduler, record, stopNotifier) //記錄摘要資訊 recordsummary(scheduler, summarizeInterval, record, stopNotifier) //檢查計數通道 checkCountChan := make(chan uint64, 2) //檢查空閒狀態 checkStatus(scheduler, checkInterval, maxIdleCount, autoStop, checkCountChan, record, stopFunc) return checkCountChan }這裡簡要解釋一下它體現的監控流程。首先,監控函數必須對傳入函數的引數值進行檢査,其中最重要的是代表排程器範例的 scheduler。如果它為 nil,那麼這個監控流程就完全沒有執行的必要了。監控函數在發現此情況時,會把它視為一個致命的錯誤並引發一個執行時恐慌。
//用於接收和報告錯誤 func reportError( scheduler sched.Scheduler, record Record, stopNotifier context.Context) { go func() { //等待排程器開啟 waitForSchedulerStart(scheduler) errorChan := scheduler.ErrorChan() for { //檢視監控停止通知器 select { case <- stopNotifier.Done(): return default: } err, ok := <- errorChan if ok { errMsg := fmt.Sprintf("Received an error from error channel: %s", err) record(2, errMsg) } time.Sleep(time.Microsecond) } }() }這個函數啟用了一個 goroutine 來執行其中的程式碼。在 go 函數中,它首先呼叫了 waitForSchedulerStart 函數。我們都知道,排程器有一個公開的方法 Status,該方法會返回一個可以表示排程器當前狀態的值。因此,該函數要做的就是不斷呼叫排程器的 Status 方法,直到該方法的結果值等於 sched.SCHED_STATUS_STARTED 為止。
//用於記錄摘要資訊 func recordSummary( scheduler sched.Scheduler, summarizeInterval time.Duration, record Record, stopNotifier context.Context)可以看到,它接受 4 個引數,其中的 3 個引數也是 reportError 函數所接受的。多出的那個引數是 summarizeInterval,即摘要獲取間隔時間。
var prevSchedSummaryStruct sched.SummaryStruct
var prevNumGoroutine int
var recordCount uint64 = 1
startTime := time.Now()
//獲取 goroutine 數量和排程器摘要資訊
currNumGoroutine := runtime.NumGoroutine()
currSchedSummaryStruct := scheduler.Summary().Struct()
//比對前後兩份摘要資訊的一致性,只有不一致時才會記錄 if currNumGoroutine != prevNumGoroutine || !currSchedSummaryStruct.Same(prevSchedSummaryStruct) { //記錄摘要資訊 summay := summary{ NumGoroutine: runtime.NumGoroutine(), SchedSummary: currSchedSummaryStruct, EscapedTime: time.Since(startTime).String(), } b, err := json.MarshalIndent(summay,""," ") if err != nil { logger.Errorf("Occur error when generate scheduler summary: %sn", err) continue } msg := fmt.Sprintf("Monitor summary[%d]:n%s", recordCount, b) record(0, msg) prevNumGoroutine = currNumGoroutine prevSchedSummaryStruct = currSchedSummaryStruct recordCount++ }組裝摘要資訊用到了當前包中宣告的結構體型別 summary,其定義如下:
//代表監控結果摘要的結構 type summary struct { // goroutine 的數量 NumGoroutine int 'json:"goroutine_number"' //排程器的摘要資訊 SchedSummary sched.SummaryStruct 'json:"sched_summary"' //從開始監控至今流逝的時間 EscapedTime string 'json:"escaped_time"' }可以看到,該型別也為 JSON 格式的序列化做好了準備。使用 encoding/json 包中的 MarshalIndent 函數把該型別的值序列化為易讀的 JSON 格式字串,然後通過呼叫 record 函數記錄它們。
time.Sleep(time.Microsecond)
實際上,與 reportError 函數相比,recordSummary 函數更有必要加上這條語句。//用於檢查狀態,並在滿足持續空閒葉間的條件時採取必要措施 func checkstatus( scheduler sched.Scheduler, checkInterval time.Duration, maxIdleCount uint, autoStop bool, checkCountChan chan <- uint64, record Record, stopFunc context.CancelFunc)其中的引數前面都介紹過。注意,引數 checkCountChan 的型別是一個傳送通道,這是為了限制 checkstatus 函數對它的操作。
var checkCount uint64
該變數代表的是檢查計數值。緊接著是一條 defer 語句:defer func() { stopFunc() checkCountChan <- checkCount }()它的作用是保證在go函數執行即將結束時發出停止信號和傳送檢查計數值,這個時 機非常關鍵。這個go函數總會在排程器空閒的時間達到最長持續空閒時間時結束執行。
var idleCount uint
它的值將會代表監控函數連續發現排程器空閒的計數值。另外,為了記錄排程器持續空閒的時間,還需要宣告一個這樣的變數:var firstIdleTime time.Time
注意,真實的持續空閒時間與理想的持續空閒時間(由引數 checkInterval 和 maxIdleCount 的值相乘得出的那個時間)之間肯定是有偏差的。並且,前者肯定會大於後者,因為執行判定的程式碼也是需要耗時的。for { //檢查排程器的空閒狀態 if scheduler.Idle() { idleCount++ if idleCount == 1 { firstIdleTime = time.Now() } if idleCount >= maxIdleCount { msg := fmt.Sprintf(msgReachMaxIdleCount, time.Since(firstIdleTime).String()) record(0, msg) //再次檢查排程器的空閒狀態,確保它已經可以停止 if scheduler.Idle() { if autoStop { var result string if err := scheduler.Stop(); err == nil { result = "success" ' } else { result = fmt.Sprintf("failing(%s)", err) } msg = fmt.Sprintf(msgStopScheduler, result) record(0, msg) } break } else { if idleCount > 0 { idleCount = 0 } } } } else { if idleCount > 0 { idleCount = 0 } } checkCount++ time.Sleep(checkInterval) }可以看到,總是讓 checkCount 的值隨著疊代的進行而遞增,同時也會依據 checkInterval 讓每次疊代之間存在一定的時間間隔。
//已達到最大空閒計數的訊息模板
var msgReachMaxIdleCount = "The scheduler has been idle for a period of time" +
"(about %s)." + " Consider to stop it now."
//準備監控引數 checkInterval := time.Second summarizeInterval := 100 * time.Millisecond maxIdleCount := uint(5) //開始監控 checkCountChan := monitor.Monitor( scheduler, checkInterval, summarizeInterval, maxIdleCount, tiue, lib.Record) //省略部分程式碼 //等待監控結束 <-checkCountChan把檢查間隔時間設定為 10 毫秒,並把最大空閒計數設定為 50 同時,讓 Monitor 函數在排程器的持續空閒時間達到最長持續空閒時間後自動關閉排程器。
//記錄紀錄檔 func Record(level byte, content string) { if content == "" { return } switch level { case 0: logger.Infoln(content) case 1: logger.Warnln(content) case 2: logger.Infoln(content) } }其中 logger 代表 gopcp.v2/helper/log/base 包下 MyLogger 型別的紀錄檔記錄器。
//準備排程器的啟動引數 firstHTTPReq, err := http.NewRequest("GET", firstURL, nil) if err != nil { logger.Fatalln(err) return } //開啟排程器 err = scheduler.Start(firstHTTPReq) if err != nil { logger.Fatalf("An error occurs when starting scheduler: %s", err) } //等待監控結束 <-checkCountChan基於命令引數 firstURL,我們可以很容易地建立出首次請求。如果啟動排程器不成功,就記下一條嚴重錯誤級別的紀錄檔。還記得嗎?這會使當前進程非正常終止。縱觀 main 函數的程式碼你就會發現,它遇到任何錯誤都會這樣做。這是因為一旦主流程出錯,finder 就真的無法再執行下去了。