gitlab-runner是當前炙手可熱的一個CICD工具,和Gitlab整合非常好。之前在工作中應用了一下,效果相當理想。
為了進一步瞭解gitlab-runner的執行原理,打算學習一下gitlab-runner原始碼。
由於是零基礎,所以打算把整個過程記錄下來,包括每一步的思路,以便覆盤;貽笑大方之處,歡迎評論區打臉。
上一章大致看了一下gitlab-runner
如何將runner
註冊到gitlab
。因為比較簡單,而且邏輯重點都在gitlab
中,所以講得比較粗糙。
本章驗證一下gitlab-runner
使用docker executor
工作的原理,在接到gitlab
的開始job通知後,如何開始啟動docker container
,並完成job。
我們建立一個gitlab
,設定一個runner
,然後執行pipeline,跟蹤gitlab-runner
原始碼。
參考資料:gitlab 安裝指南 docker版
參考資料:gitlab-runner使用docker executor
找到commands/register.go
,使用logrus
輸出組態檔地址:
func (s *RegisterCommand) Execute(context *cli.Context) {
......
err = s.saveConfig()
......
}
//修改s.saveConfig,輸出地址
func (c *configOptions) saveConfig() error {
logrus.Printf("saveConfig at" + c.ConfigFile)
return c.config.SaveConfig(c.ConfigFile)
}
跟蹤後,發現config地址:~/.gitlab-runner/config.toml
,如下圖:
如果我們要設定runner
屬性,就不用執行程式碼了,直接改檔案就可以了。
.gitlab-ci.yml
,隨意配下即可,後面根據具體的測試內容再調整。 {
"name": "gitlab run",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}",
"args": ["run"]
},
commands/multi.go
的func (mr *RunCommand) run() {}
。multi.go
的執行過程大致如下:// Run命令被執行
func (mr *RunCommand) Execute(_ *cli.Context) {
......
// 起了一個服務
err = svc.Run() //跳轉下面
......
}
func (mr *RunCommand) Start(_ service.Service) error {
......
// 執行命令
go mr.run() //跳轉下面
......
}
// RunCommand執行
func (mr *RunCommand) run() {
// 建立一些監控方面的服務
mr.setupMetricsAndDebugServer()
mr.setupSessionServer()
// 按照config.toml建立通道runners
runners := make(chan *common.RunnerConfig)
// 不斷把設定好的runner放入(feed)通道
go mr.feedRunners(runners)
// 系統訊號接收
signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)
signal.Notify(mr.reloadSignal, syscall.SIGHUP)
// 開啟一個管理工人的協程
startWorker := make(chan int)
stopWorker := make(chan bool)
// 每次 startWorker 收到編號,就會啟動一個 processRunners
go mr.startWorkers(startWorker, stopWorker, runners)
workerIndex := 0
for mr.stopSignal == nil {
// 控制worker數量,多退少補
// 注意方法中,對 startWorker 的處理
signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)
if signaled != nil {
break
}
// 更新config檔案
signaled = mr.updateConfig()
if signaled != nil {
break
}
}
// 系統關閉處理
for mr.currentWorkers > 0 {
stopWorker <- true
mr.currentWorkers--
}
mr.log().Info("All workers stopped. Can exit now")
close(mr.runFinished)
}
下面我們對這幾部分程式碼進行一些簡單的閱讀和分析。
主要功能,將config.runners
放到chan *common.RunnerConfig
。
func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
......//wait
//將設定好的runner,依次放入通道,放一個,歇一會
for _, runner := range config.Runners {
// 該方法只是給runner做了個健康檢查
mr.feedRunner(runner, runners)
time.Sleep(interval)
}
}
......
}
程式碼不多,大致意思就是拿到通道中的startworker
(工號),啟動這個工人。
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
id := <-startWorker
// 這是個非常關鍵的方法,後面有重點介紹
go mr.processRunners(id, stopWorker, runners)
}
}
這個功能也比較簡單,和系統設定的並行數比較一下,多退少補。增加的方式是使用通道(channel
)傳遞值workerIndex
。減少的模式是stopWorker=true
。
func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
concurrentLimit := mr.config.Concurrent
if concurrentLimit < 1 {
mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed")
}
// 太多
for mr.currentWorkers > concurrentLimit {
select {
case stopWorker <- true:
case signaled := <-mr.runSignal:
return signaled
}
mr.currentWorkers--
}
// 太少
for mr.currentWorkers < concurrentLimit {
select {
case startWorker <- *workerIndex:
case signaled := <-mr.runSignal:
return signaled
}
mr.currentWorkers++
*workerIndex++
}
return nil
}
比較簡單,不贅述,就是兩點:
func (mr *RunCommand) updateConfig() os.Signal {
select {
case <-time.After(common.ReloadConfigInterval * time.Second):
err := mr.checkConfig()
if err != nil {
mr.log().Errorln("Failed to load config", err)
}
case <-mr.reloadSignal:
err := mr.loadConfig()
if err != nil {
mr.log().Errorln("Failed to load config", err)
}
case signaled := <-mr.runSignal:
return signaled
}
return nil
}
如果之前的feedRunners
將一個runner
設定,放進了通道;那麼下面的程式碼會把runner
放進worker
。
如果之前的updateWorkers
要關閉一個worker
;那麼也是這裡處理。
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
mr.log().WithField("worker", id).Debugln("Starting worker")
for mr.stopSignal == nil {
select {
case runner := <-runners:
// 將runner放進worker,後面再細看
err := mr.processRunner(id, runner, runners)
if err != nil {
mr.log().
WithFields(logrus.Fields{
"runner": runner.ShortDescription(),
"executor": runner.Executor,
}).
WithError(err).
Warn("Failed to process runner")
}
// force GC cycle after processing build
runtime.GC()
case <-stopWorker:
// 啥都不幹,關閉
mr.log().
WithField("worker", id).
Debugln("Stopping worker")
return
}
}
<-stopWorker
}
這裡把processRunner
拿出來再細看一下,有點複雜,下一章跟蹤偵錯一下,😉
func (mr *RunCommand) processRunner( id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig,) (err error) {
provider := common.GetExecutorProvider(runner.Executor)
if provider == nil {
return
}
executorData, err := provider.Acquire(runner)
if err != nil {
return fmt.Errorf("failed to update executor: %w", err)
}
defer provider.Release(runner, executorData)
if !mr.buildsHelper.acquireBuild(runner) {
logrus.WithFields(logrus.Fields{
"runner": runner.ShortDescription(),
"worker": id,
}).Debug("Failed to request job, runner limit met")
return
}
defer mr.buildsHelper.releaseBuild(runner)
buildSession, sessionInfo, err := mr.createSession(provider)
if err != nil {
return
}
// Receive a new build
trace, jobData, err := mr.requestJob(runner, sessionInfo)
if err != nil || jobData == nil {
return
}
defer func() { mr.traceOutcome(trace, err) }()
// Create a new build
build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)
if err != nil {
return
}
build.Session = buildSession
build.ArtifactUploader = mr.network.UploadRawArtifacts
// Add build to list of builds to assign numbers
mr.buildsHelper.addBuild(build)
defer mr.buildsHelper.removeBuild(build)
// Process the same runner by different worker again
// to speed up taking the builds
mr.requeueRunner(runner, runners)
// Process a build
return build.Run(mr.config, trace)
}
完畢。