從零開始閱讀gitlab-runner原始碼 003 worker 和 runner

2020-09-19 16:00:08

前言

gitlab-runner是當前炙手可熱的一個CICD工具,和Gitlab整合非常好。之前在工作中應用了一下,效果相當理想。

為了進一步瞭解gitlab-runner的執行原理,打算學習一下gitlab-runner原始碼。

由於是零基礎,所以打算把整個過程記錄下來,包括每一步的思路,以便覆盤;貽笑大方之處,歡迎評論區打臉。

gitlab聯調

上一章大致看了一下gitlab-runner如何將runner註冊到gitlab。因為比較簡單,而且邏輯重點都在gitlab中,所以講得比較粗糙。

本章驗證一下gitlab-runner使用docker executor工作的原理,在接到gitlab的開始job通知後,如何開始啟動docker container,並完成job。

本地安裝一個gitlab

我們建立一個gitlab,設定一個runner,然後執行pipeline,跟蹤gitlab-runner原始碼。

參考資料:gitlab 安裝指南 docker版

本地註冊一個runner

參考資料:gitlab-runner使用docker executor

找到commands/register.go,使用logrus輸出組態檔地址:

func (s *RegisterCommand) Execute(context *cli.Context) {
    ......
    err = s.saveConfig()
    ......
}
//修改s.saveConfig,輸出地址
func (c *configOptions) saveConfig() error {
    logrus.Printf("saveConfig at" + c.ConfigFile)
    return c.config.SaveConfig(c.ConfigFile)
}

跟蹤後,發現config地址:~/.gitlab-runner/config.toml,如下圖:

config location

如果我們要設定runner屬性,就不用執行程式碼了,直接改檔案就可以了。

啟動pipeline

  • 為專案配個.gitlab-ci.yml,隨意配下即可,後面根據具體的測試內容再調整。
  • 執行專案,等待pipeline接入。下面是debug用到的launch.json
        {
            "name": "gitlab run",
            "type": "go",
            "request": "launch",
            "mode": "debug",
            "program": "${workspaceFolder}",
            "args": ["run"]
        },
  • 斷點打在commands/multi.gofunc (mr *RunCommand) run() {}
  • 執行pipeline,可以看到斷點被點亮,三個紅框分別代表:監工(排程器),工人,等待結束。

go routine

  • 跟蹤程式碼,可以看到multi.go的執行過程大致如下:
// Run命令被執行
func (mr *RunCommand) Execute(_ *cli.Context) {
    ......
        // 起了一個服務
        err = svc.Run() //跳轉下面
    ......
}
func (mr *RunCommand) Start(_ service.Service) error {
    ......
    // 執行命令
    go mr.run() //跳轉下面
    ......
}
// RunCommand執行
func (mr *RunCommand) run() {
    // 建立一些監控方面的服務
    mr.setupMetricsAndDebugServer()
    mr.setupSessionServer()

    // 按照config.toml建立通道runners
    runners := make(chan *common.RunnerConfig)
    // 不斷把設定好的runner放入(feed)通道
    go mr.feedRunners(runners)
    // 系統訊號接收
    signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)
    signal.Notify(mr.reloadSignal, syscall.SIGHUP)
    // 開啟一個管理工人的協程
    startWorker := make(chan int)
    stopWorker := make(chan bool)
    // 每次 startWorker 收到編號,就會啟動一個 processRunners
    go mr.startWorkers(startWorker, stopWorker, runners)

    workerIndex := 0

    for mr.stopSignal == nil {
        // 控制worker數量,多退少補
        // 注意方法中,對 startWorker 的處理
        signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)
        if signaled != nil {
            break
        }

        // 更新config檔案
        signaled = mr.updateConfig()
        if signaled != nil {
            break
        }
    }

    // 系統關閉處理
    for mr.currentWorkers > 0 {
        stopWorker <- true
        mr.currentWorkers--
    }
    mr.log().Info("All workers stopped. Can exit now")
    close(mr.runFinished)
}
  • 由以上程式碼可以知道,系統主要依靠以下不間斷的方法或協程:
    • feedRunners
    • startWorkers
    • updateWorkers
    • updateConfig

下面我們對這幾部分程式碼進行一些簡單的閱讀和分析。

feedRunners

主要功能,將config.runners放到chan *common.RunnerConfig

func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
    for mr.stopSignal == nil {
        ......//wait
        //將設定好的runner,依次放入通道,放一個,歇一會
        for _, runner := range config.Runners {
            // 該方法只是給runner做了個健康檢查
            mr.feedRunner(runner, runners)
            time.Sleep(interval)
        }
    }
    ......
}

startWorkers

程式碼不多,大致意思就是拿到通道中的startworker(工號),啟動這個工人。

func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
    for mr.stopSignal == nil {
        id := <-startWorker
        // 這是個非常關鍵的方法,後面有重點介紹
        go mr.processRunners(id, stopWorker, runners)
    }
}

updateWorkers

這個功能也比較簡單,和系統設定的並行數比較一下,多退少補。增加的方式是使用通道(channel)傳遞值workerIndex。減少的模式是stopWorker=true

func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
    concurrentLimit := mr.config.Concurrent
    if concurrentLimit < 1 {
        mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed")
    }
    // 太多
    for mr.currentWorkers > concurrentLimit {
        select {
        case stopWorker <- true:
        case signaled := <-mr.runSignal:
            return signaled
        }
        mr.currentWorkers--
    }
    // 太少
    for mr.currentWorkers < concurrentLimit {
        select {
        case startWorker <- *workerIndex:
        case signaled := <-mr.runSignal:
            return signaled
        }
        mr.currentWorkers++
        *workerIndex++
    }
    return nil
}

updateConfig

比較簡單,不贅述,就是兩點:

  1. 根據指令讀取設定
  2. 定時讀取設定
func (mr *RunCommand) updateConfig() os.Signal {
    select {
    case <-time.After(common.ReloadConfigInterval * time.Second):
        err := mr.checkConfig()
        if err != nil {
            mr.log().Errorln("Failed to load config", err)
        }

    case <-mr.reloadSignal:
        err := mr.loadConfig()
        if err != nil {
            mr.log().Errorln("Failed to load config", err)
        }

    case signaled := <-mr.runSignal:
        return signaled
    }
    return nil
}

processRunners

如果之前的feedRunners將一個runner設定,放進了通道;那麼下面的程式碼會把runner放進worker
如果之前的updateWorkers要關閉一個worker;那麼也是這裡處理。

func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
    mr.log().WithField("worker", id).Debugln("Starting worker")

    for mr.stopSignal == nil {
        select {
        case runner := <-runners:
            // 將runner放進worker,後面再細看
            err := mr.processRunner(id, runner, runners)
            if err != nil {
                mr.log().
                    WithFields(logrus.Fields{
                        "runner":   runner.ShortDescription(),
                        "executor": runner.Executor,
                    }).
                    WithError(err).
                    Warn("Failed to process runner")
            }

            // force GC cycle after processing build
            runtime.GC()

        case <-stopWorker:
            // 啥都不幹,關閉
            mr.log().
                WithField("worker", id).
                Debugln("Stopping worker")
            return
        }
    }
    <-stopWorker
}

這裡把processRunner拿出來再細看一下,有點複雜,下一章跟蹤偵錯一下,😉

func (mr *RunCommand) processRunner(    id int,    runner *common.RunnerConfig,    runners chan *common.RunnerConfig,) (err error) {
    provider := common.GetExecutorProvider(runner.Executor)
    if provider == nil {
        return
    }

    executorData, err := provider.Acquire(runner)
    if err != nil {
        return fmt.Errorf("failed to update executor: %w", err)
    }
    defer provider.Release(runner, executorData)

    if !mr.buildsHelper.acquireBuild(runner) {
        logrus.WithFields(logrus.Fields{
            "runner": runner.ShortDescription(),
            "worker": id,
        }).Debug("Failed to request job, runner limit met")
        return
    }
    defer mr.buildsHelper.releaseBuild(runner)

    buildSession, sessionInfo, err := mr.createSession(provider)
    if err != nil {
        return
    }

    // Receive a new build
    trace, jobData, err := mr.requestJob(runner, sessionInfo)
    if err != nil || jobData == nil {
        return
    }
    defer func() { mr.traceOutcome(trace, err) }()

    // Create a new build
    build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)
    if err != nil {
        return
    }
    build.Session = buildSession
    build.ArtifactUploader = mr.network.UploadRawArtifacts

    // Add build to list of builds to assign numbers
    mr.buildsHelper.addBuild(build)
    defer mr.buildsHelper.removeBuild(build)

    // Process the same runner by different worker again
    // to speed up taking the builds
    mr.requeueRunner(runner, runners)

    // Process a build
    return build.Run(mr.config, trace)
}

完畢。