Go語言並行目錄遍歷

2020-07-16 10:05:17
在本節中,我們將構建一個程式,根據命令列指定的輸入,報告一個或多個目錄的磁碟使用情況,類似於 UNIX 的du命令。該程式大多數工作是由下面的 walkDir 函數完成,它使用 dirents 輔助函數來列舉目錄中的條目,如下所示:
// wakjDir 遞回地遍歷以 dir 為根目錄的整個檔案樹,並在 filesizes 上傳送每個已找到檔案的大小
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %vn", err)
        return nil
    }
    return entries
}
ioutil.ReadDir 函數返回一個 os.FileInfo 型別的 slice,針對單個檔案同樣的資訊可以通過呼叫 os.Stat 函數來返回。對每一個子目錄,walkDir 遞回呼叫它自己,對於每一個檔案,walkDir 傳送一條訊息到 fileSizes 通道,訊息的內容為檔案所占用的位元組數。

程式的完整程式碼如下所示,程式碼中 main 函數使用兩個 goroutine,後台 goroutine 呼叫 walkDir 遍歷命令列上指定的每一個目錄,最後關閉 fileSizes 通道;主 goroutine 計算從通道中接收的檔案的大小的和,最後輸出總數。
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
)

func main() {
    // 確定初始目錄
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    // 遍歷檔案樹
    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()

    // 輸出結果
    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GBn", nfiles, float64(nbytes)/1e9)
}

// wakjDir 遞回地遍歷以 dir 為根目錄的整個檔案樹,並在 filesizes 上傳送每個已找到的檔案的大小
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %vn", err)
        return nil
    }
    return entries
}
在輸出結果前,程式等待較長時間:

go run main.go D:/code
18681 files  0.5 GB

如果程式可以通知它的進度,將會更友好,但是僅把 printDiskUsage 呼叫移動到回圈內部會使它輸出數千行結果,所以這裡對上面的程式進行一些調整,在有-v標識的時候周期性的輸出當前目錄的總和,如果只想看到最終的結果省略-v即可。
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "time"
)

var verbose = flag.Bool("v", false, "顯示詳細進度")

func main() {
    // ...啟動後台 goroutine...
    // 確定初始目錄
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }
    // 遍歷檔案樹
    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()
    // 定期列印結果
    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes 關閉
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes) // 最終總數
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GBn", nfiles, float64(nbytes)/1e9)
}

// wakjDir 遞回地遍歷以 dir 為根目錄的整個檔案樹,並在 filesizes 上傳送每個已找到的檔案的大小
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %vn", err)
        return nil
    }
    return entries
}
因為這個程式沒有使用 range 迴圈,所以第一個 select 情況必須顯式判斷 fileSizes 通道是否已經關閉,使用兩個返回值的形式進行接收操作。如果通道已經關閉,程式退出迴圈。標籤化的 break 語句將跳出 select 和 for 迴圈的邏輯。沒有標籤的 break 只能跳出 select 的邏輯,導致迴圈的下一次疊代。

執行結果如下所示:

go run main.go -v D:
296077 files  57.9 GB
302142 files  58.0 GB
306669 files  58.1 GB
314725 files  58.2 GB
320050 files  58.3 GB
341713 files  58.6 GB
346102 files  64.2 GB

此程式的弊端也很明顯,它依然會耗費太長的時間。

所以,下面為每一個 walkDir 的呼叫建立一個新的 goroutine。它使用 sync.WaitGroup 來為當前存活的 walkDir 呼叫計數,一個 goroutine 在計數器減為 0 的時候關閉 fileSizes 通道。
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "sync"
    "time"
)

var verbose = flag.Bool("v", false, "顯示詳細進度")

func main() {
    // ...確定根目錄...
    flag.Parse()
    // 確定初始目錄
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }
    // 並行遍歷每一個檔案樹
    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()
    // 定期列印結果
    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes 關閉
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes) // 最終總數
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GBn", nfiles, float64(nbytes)/1e9)
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// sema是一個用於限制目錄並行數的計數號誌
var sema = make(chan struct{}, 20)

// dirents返回directory目錄中的條目
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // 獲取令牌
    defer func() { <-sema }() // 釋放令牌
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %vn", err)
        return nil
    }
    return entries
}
儘管系統與系統之間有很多的不同,但是這個版本的速度比前一個版本快幾倍。