從原始碼解析Go exec timeout 實現機制

2023-03-29 06:01:03

1. 背景

環境:golang 1.9,drawn
測試使用golang exec 執行命令,並設定過期時間,測試指令碼如下。
現象:執行指令碼後,到超時時間後併為超時退出,反而阻塞住了

func TestExecWithTimeout(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	start := time.Now()
	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 10 && echo end.")
	out, err := cmd.CombinedOutput()
	fmt.Printf("error: [%v]\n", err)
	fmt.Printf("out: [%v]\n", string(out))

	if ctx.Err() == context.DeadlineExceeded {
		fmt.Printf("ctx.Err: [%v]\n", ctx.Err())
	}

	fmt.Printf("process end: %v", time.Since(start))
}
error: [signal: killed]
out: [start
]
ctx.Err: [context deadline exceeded]
process end: 10.010193583sPASS

在用ps檢視程序狀態,發現,超時時間到達後,sh程序就被kill了,而由ssh啟動的sleep 10命令還在進行中,且該程序被1號程序接管,變成了殭屍程序。當我們手動執行kill命令後,程式退出。

~/workpro//mkskit ❯ ps -ef | grep "sleep"
  501 85600 85597   0  3:34下午 ??         0:00.00 sh -c echo start && sleep 20 && echo end.
  501 85601 85600   0  3:34下午 ??         0:00.00 sleep 20
  501 85608 17687   0  3:34下午 ttys004    0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox sleep
~/workpro//mkskit ❯ ps -ef | grep "sleep"
  501 85601     1   0  3:34下午 ??         0:00.00 sleep 20
  501 85652 17687   0  3:35下午 ttys004    0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox sleep

2. 原因分析

從現象推測,exec 建立了兩個程序,超時後 sh程序退出,但是sleep程序還在變成了殭屍程序,sleep程序還未退出,導致整改go主程序阻塞。

2.1 原始碼-分析

// CombinedOutput runs the command and returns its combined standard
// output and standard error.
func (c *Cmd) CombinedOutput() ([]byte, error) {
	if c.Stdout != nil {
		return nil, errors.New("exec: Stdout already set")
	}
	if c.Stderr != nil {
		return nil, errors.New("exec: Stderr already set")
	}
	var b bytes.Buffer
	c.Stdout = &b
	c.Stderr = &b
	err := c.Run()
	return b.Bytes(), err
}

// 執行命令,並等待命令執行完成
func (c *Cmd) Run() error {
	if err := c.Start(); err != nil {
		return err
	}
	return c.Wait()
}

CombinedOutput 方法會初始化一個位元組緩衝區,並標準輸出和標準錯誤導向該緩衝區。然後啟動執行命令。
Run方法中呼叫Start和Wait方法:

  • Start方法用於啟動子程序,啟動後立即返回
  • Wait方法則阻塞,等待子程序結束並回收資源

2.1.1 Wait 方法

// Wait waits for the command to exit and waits for any copying to
// stdin or copying from stdout or stderr to complete.
//
// The command must have been started by Start.
//
// The returned error is nil if the command runs, has no problems
// copying stdin, stdout, and stderr, and exits with a zero exit
// status.
//
// If the command fails to run or doesn't complete successfully, the
// error is of type *ExitError. Other error types may be
// returned for I/O problems.
//
// If any of c.Stdin, c.Stdout or c.Stderr are not an *os.File, Wait also waits
// for the respective I/O loop copying to or from the process to complete.
//
// Wait releases any resources associated with the Cmd.
func (c *Cmd) Wait() error {
	if c.Process == nil {
		return errors.New("exec: not started")
	}
	if c.ProcessState != nil {
		return errors.New("exec: Wait was already called")
	}
	state, err := c.Process.Wait()
	if err == nil && !state.Success() {
		err = &ExitError{ProcessState: state}
	}
	c.ProcessState = state

	// Wait for the pipe-copying goroutines to complete.
	var copyError error
	for range c.goroutine {
		if err := <-c.goroutineErrs; err != nil && copyError == nil {
			copyError = err
		}
	}
	c.goroutine = nil // Allow the goroutines' closures to be GC'd.

	if c.ctxErr != nil {
		interruptErr := <-c.ctxErr
		// If c.Process.Wait returned an error, prefer that.
		// Otherwise, report any error from the interrupt goroutine.
		if interruptErr != nil && err == nil {
			err = interruptErr
		}
	}
	// Report errors from the copying goroutines only if the program otherwise
	// exited normally on its own. Otherwise, the copying error may be due to the
	// abnormal termination.
	if err == nil {
		err = copyError
	}

	c.closeDescriptors(c.closeAfterWait)
	c.closeAfterWait = nil

	return err
}

根據debug可知,阻塞發生在第34行, err := <-c.goroutineErrs;這句,從goroutineErrs中讀取錯誤資訊並返回第一次錯誤給呼叫者。而 <-ch從通道中獲取資料阻塞的原因只有傳送發未準備好,那麼goroutineErr對應的傳送方是誰呢?

2.1.2 Start 方法

// Start starts the specified command but does not wait for it to complete.
//
// If Start returns successfully, the c.Process field will be set.
//
// After a successful call to Start the Wait method must be called in
// order to release associated system resources.
func (c *Cmd) Start() error {
	if c.Path == "" && c.Err == nil && c.lookPathErr == nil {
		c.Err = errors.New("exec: no command")
	}
	if c.Err != nil || c.lookPathErr != nil {
		c.closeDescriptors(c.closeAfterStart)
		c.closeDescriptors(c.closeAfterWait)
		if c.lookPathErr != nil {
			return c.lookPathErr
		}
		return c.Err
	}
	if runtime.GOOS == "windows" {
		lp, err := lookExtensions(c.Path, c.Dir)
		if err != nil {
			c.closeDescriptors(c.closeAfterStart)
			c.closeDescriptors(c.closeAfterWait)
			return err
		}
		c.Path = lp
	}
	if c.Process != nil {
		return errors.New("exec: already started")
	}
	if c.ctx != nil {
		select {
		case <-c.ctx.Done():
			c.closeDescriptors(c.closeAfterStart)
			c.closeDescriptors(c.closeAfterWait)
			return c.ctx.Err()
		default:
		}
	}

	c.childFiles = make([]*os.File, 0, 3+len(c.ExtraFiles))
	type F func(*Cmd) (*os.File, error)
	for _, setupFd := range []F{(*Cmd).stdin, (*Cmd).stdout, (*Cmd).stderr} {
		fd, err := setupFd(c)
		if err != nil {
			c.closeDescriptors(c.closeAfterStart)
			c.closeDescriptors(c.closeAfterWait)
			return err
		}
		c.childFiles = append(c.childFiles, fd)
	}
	c.childFiles = append(c.childFiles, c.ExtraFiles...)

	env, err := c.environ()
	if err != nil {
		return err
	}

	c.Process, err = os.StartProcess(c.Path, c.argv(), &os.ProcAttr{
		Dir:   c.Dir,
		Files: c.childFiles,
		Env:   env,
		Sys:   c.SysProcAttr,
	})
	if err != nil {
		c.closeDescriptors(c.closeAfterStart)
		c.closeDescriptors(c.closeAfterWait)
		return err
	}

	c.closeDescriptors(c.closeAfterStart)

	// Don't allocate the goroutineErrs channel unless there are goroutines to fire.
	if len(c.goroutine) > 0 {
		errc := make(chan error, len(c.goroutine))
		c.goroutineErrs = errc
		for _, fn := range c.goroutine {
			go func(fn func() error) {
				errc <- fn()
			}(fn)
		}
	}

	c.ctxErr = c.watchCtx()

	return nil
}

從第74-82行可以看到,建立了一個len(c.goroutine)的channel, 然後啟動goroutine執行c.goroutine中的方法並將錯誤寫入errc,由wait的現象可知,應該是這個fn()呼叫阻塞了。
繼續追蹤,c.goroutine是在哪裡被賦值的。同樣是在Start方法中
主要看第43行,學習到了,原來函數內還能定義型別。這裡主要是呼叫了三個函數來初始化stdin, stderr, stdout.

這裡主要先看下stdout、stderr

func (c *Cmd) stdout() (f *os.File, err error) {
	return c.writerDescriptor(c.Stdout)
}

func (c *Cmd) stderr() (f *os.File, err error) {
    // 如果stdrr 和 stdout 是同一個輸出目標 則跳過
	if c.Stderr != nil && interfaceEqual(c.Stderr, c.Stdout) {
		return c.childFiles[1], nil
	}
	return c.writerDescriptor(c.Stderr)
}

func (c *Cmd) writerDescriptor(w io.Writer) (f *os.File, err error) {
    // case1
	if w == nil {
		f, err = os.OpenFile(os.DevNull, os.O_WRONLY, 0)
		if err != nil {
			return
		}
		c.closeAfterStart = append(c.closeAfterStart, f)
		return
	}

    // case2
	if f, ok := w.(*os.File); ok {
		return f, nil
	}

    // case3
	pr, pw, err := os.Pipe()
	if err != nil {
		return
	}

	c.closeAfterStart = append(c.closeAfterStart, pw)
	c.closeAfterWait = append(c.closeAfterWait, pr)
	c.goroutine = append(c.goroutine, func() error {
		_, err := io.Copy(w, pr)
		pr.Close() // in case io.Copy stopped due to write error
		return err
	})
	return pw, nil
}

兩個函數都呼叫了writerDescriptor,看程式碼主要由三個分支邏輯

  • case1:如果沒有制定stderr或者stdout,就直接寫入os.DevNull 即 /dev/null
  • case2:如果制定的stderr或stdout 是*os.File型別也直接返回,會把輸出寫入檔案
  • case3:最後一種情況,建立管道,返回寫端點,在Start函數中的第59行,子程序和管道fd進行繫結。關鍵在第38行,goroutine中繫結了一個func(),從管道的讀端點讀取資料並copy到指定的stderr或者stdout(均實現Writer)。
// Implementations must not retain p.
type Writer interface {
	Write(p []byte) (n int, err error)
}

io.Copy方法會一直阻塞到reader被關閉才會返回,這也是為什麼會產生阻塞到原因了。在Wait方法到註釋中可以得知

// If any of c.Stdin, c.Stdout or c.Stderr are not an *os.File, Wait also waits
// for the respective I/O loop copying to or from the process to complete.
//
// Wait releases any resources associated with the Cmd.
func (c *Cmd) Wait() error {}

到這裡瞭解到了阻塞到根本原因,那超時子程序被kill是在哪裡觸發的。在看Start方法時,可以看到最後一行有一個watchCtx呼叫

// watchCtx conditionally starts a goroutine that waits until either c.ctx is
// done or c.Process.Wait has completed (called from Wait).
// If c.ctx is done first, the goroutine terminates c.Process.
//
// If a goroutine was started, watchCtx returns a channel on which its result
// must be received.
func (c *Cmd) watchCtx() <-chan error {
	if c.ctx == nil {
		return nil
	}

	errc := make(chan error)
	go func() {
		select {
		case errc <- nil:
			return
		case <-c.ctx.Done():
		}

		var err error
		if killErr := c.Process.Kill(); killErr == nil {
			// We appear to have successfully delivered a kill signal, so any
			// program behavior from this point may be due to ctx.
			err = c.ctx.Err()
		} else if !errors.Is(killErr, os.ErrProcessDone) {
			err = wrappedError{
				prefix: "exec: error sending signal to Cmd",
				err:    killErr,
			}
		}
		errc <- err
	}()

	return errc
}

可以看到,啟動了一個goroutine,繫結了一個 error chan,selec監聽ctx的狀態,當ctx超時或者提前cancel了,則會出發process.kill,將子程序關閉。
正常情況下,當ctx超時,子程序會被kill,此時管道的寫入端點自然會被關閉,io.Copy則會在copy完成後正常返回,給e.errch中傳送一個nil,Wait方法則中c.errch中讀取到nil。在Wait方法中的第26、40、44行可以看到,err最終被賦值為一個state第結構體物件。
但是在開始的demo中,除了sh這個子程序之外還啟動了一個sleep子程序,context超時後,sleep程序依舊在執行,並持有管道的寫端點,導致io.Copy阻塞。

2.2 相關流程

3. 解決方案

根據上述分析可得,進入case3且產生了子程序才會導致阻塞。那麼只要不滿足其中一個條件即可。

3.1 使用*os.File型別接收輸入輸出

可避免阻塞,但存在的問題:

  • 需要額外處理輸出,如: 從檔案讀取並寫入到需要的地方
  • 程式退出後,子子程序被1號程序託管稱為殭屍程序
func TestExecUseFileOutput(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
	defer cancel()

	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 15 && echo end.")
	combinedOutput, err := ioutil.TempFile("", "stdouterr")
	if err != nil {
		fmt.Println(err)
		return
	}

	defer func() {
		_ = os.Remove(combinedOutput.Name())
	}()
	cmd.Stdout = combinedOutput
	cmd.Stderr = combinedOutput
	err = cmd.Run()
	if err != nil {
		fmt.Println(err)
		return
	}

	_, err = combinedOutput.Seek(0, 0)
	var b bytes.Buffer
	_, err = io.Copy(&b, combinedOutput)
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("output:", b.String())
	fmt.Printf("ctx.Err: [%v]\n", ctx.Err())
	fmt.Printf("error: [%v]\n", err)
}

3.2 避免產生子程序

linux執行指令碼的5種方式

  • 使用絕對路徑執行:/root/sleep.sh
  • 使用相對路徑:./sleep.sh 需要x許可權
  • 使用sh或者bash命令來執行:bash /root/sleep.sh
  • 使用.(空格)指令碼名稱來執行: . ./sleep.sh
  • 使用source來執行(一般用來生效組態檔): source /root/sleep.sh

前三種方式都會產生bash 程序和sleep程序,後兩種則只會產生sleep程序。
因為go中沒有shell環境,如果執行復雜的命令,肯定只是用bash 或者 sh方式執行,肯定會產生一個新的程序,so這個方法無效

3.3 sh -c

bash -c command / sh -c command 方式執行單條命令的時候有相關的優化,是不會產生多個程序的,因此如果將demo中的複雜命令或者指令碼拆分成多個命令執行也可以實現,但這種方式不夠優雅。
原理:
單條命令時:啟動bash程序後發現是一個簡單的命令,在不fork新程序的情況下直接呼叫exec執行命令,然後將子shell替換為sleep命令。
多條命令時:需要使用子shell 來處理 && 操作符,它需要等待第一個命令終止的SIGCHLD,然後根據第一個命令的exit status 決定是否需要執行第二個命令,因此不能將子shell替換為sleep命令。

具體見 shell.c 第 1370 行

3.4 kill -pid kill行程群組

linux kill(2) 指定 pid 為負數時會給這個行程群組中的所有程序傳送訊號

If pid is less than -1, then sig is sent to every process in the        
process group whose ID is -pid.
func TestExecuteInOnPgid(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
	defer cancel()

	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 15 && echo end.")
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	go func() {
		select {
		case <-ctx.Done():
			if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
				fmt.Println("kill failed: ", err)
			}
		}
	}()

	output, err := cmd.CombinedOutput()
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("output:", string(output))
	fmt.Printf("ctx.Err : [%v]\n", ctx.Err())
	fmt.Printf("error   : [%v]\n", err)
}

3.5 社群提案

該問題其實很早就存在了,最早可以追溯到這個 2017 年的 Issue #23019,不過為了保持向後相容,在方案上一直沒有達成共識,最新提案見這個 Issue #50436,根據 #53400 中的最新訊息,該提案可能會在 Go 1.20 中實現。
大致方案為在 exec.Cmd 中新增一個 Interrupt(os.Signal) 欄位,在 context 超時後將這個訊號傳送給子程序以關閉所有子程序。

	// Context is the context that controls the lifetime of the command
	// (typically the one passed to CommandContext).
	Context context.Context

	// If Interrupt is non-nil, Context must also be non-nil and Interrupt will be
	// sent to the child process when Context is done.
	//
	// If the command exits with a success code after the Interrupt signal has
	// been sent, Wait and similar methods will return Context.Err()
	// instead of nil.
	//
	// If the Interrupt signal is not supported on the current platform
	// (for example, if it is os.Interrupt on Windows), Start may fail
	// (and return a non-nil error).
	Interrupt os.Signal

	// If WaitDelay is non-zero, the command's I/O pipes will be closed after
	// WaitDelay has elapsed after either the command's process has exited or
	// (if Context is non-nil) Context is done, whichever occurs first.
	// If the command's process is still running after WaitDelay has elapsed,
	// it will be terminated with os.Kill before the pipes are closed.
	//
	// If the command exits with a success code after pipes are closed due to
	// WaitDelay and no Interrupt signal has been sent, Wait and similar methods
	// will return ErrWaitDelay instead of nil.
	//
	// If WaitDelay is zero (the default), I/O pipes will be read until EOF,
	// which might not occur until orphaned subprocesses of the command have
	// also closed their descriptors for the pipes.
	WaitDelay time.Duration

4. 總結

現象

使用 os/exec 執行 shell 指令碼並設定超時時間,然後到超時時間之後程式並未超時退出,反而一直阻塞。

原因

os/exec 包執行命令時會建立子程序,通過管道連線子程序以收集命令執行結果,goroutine 從管道中讀取命令輸出,超時後會 kill 掉子程序,從而關閉管道,管道被關閉後 goroutine 則自動退出。
如果存在子子程序,佔有管道則會導致 kill 掉子程序後管道依舊未能釋放,讀取輸出的 goroutine 被阻塞,最終導致程式超時後也無法返回。

解決

package ssh

import "syscall"

// 實現Cancel方法
func (e *SomeExecuteCmdForward) Cancel() {
	if pgid := -e.pid; pgid < 0 {
		_ = syscall.Kill(pgid, syscall.SIGKILL)
		e.pid = 0
	}
}

// cmd.SysProcAttr = newSysProcAttr()
func newSysProcAttr() *syscall.SysProcAttr {
	return &syscall.SysProcAttr{
		Setpgid: true,
	}
}

kill -- -pid

5. 思考

為什麼在start 程序起來之後就把檔案控制程式碼close了c.closeDescriptors(c.closeAfterStart),管道還可以繼續讀寫?
在close之前已經將檔案控制程式碼傳遞給子程序了,相當於此時fd的參照計數為2,start呼叫的是關閉主程序參照的檔案控制程式碼,相當於參照計數-1。直到管道寫入端close管道後,管道讀取端就可以收到管道被關閉的訊號,結束讀取。
追蹤closeDescriptors底層呼叫函數,可以看到也是進行參照計數-1,直到為0就關閉。

// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {
	if !fd.fdmu.increfAndClose() {
		return errClosing(fd.isFile)
	}

	// Unblock any I/O.  Once it all unblocks and returns,
	// so that it cannot be referring to fd.sysfd anymore,
	// the final decref will close fd.sysfd. This should happen
	// fairly quickly, since all the I/O is non-blocking, and any
	// attempts to block in the pollDesc will return errClosing(fd.isFile).
	fd.pd.evict()

    // 關鍵程式碼
	// The call to decref will call destroy if there are no other
	// references.
	err := fd.decref()

	// Wait until the descriptor is closed. If this was the only
	// reference, it is already closed. Only wait if the file has
	// not been set to blocking mode, as otherwise any current I/O
	// may be blocking, and that would block the Close.
	// No need for an atomic read of isBlocking, increfAndClose means
	// we have exclusive access to fd.
	if fd.isBlocking == 0 {
		runtime_Semacquire(&fd.csema)
	}

	return err
}

// decref removes a reference from fd.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) decref() error {
	if fd.fdmu.decref() {
		return fd.destroy()
	}
	return nil
}


https://stackoverflow.com/questions/70175281/what-is-the-purpose-of-closeafterstart-in-exec

6. exec案例

6.1 實時讀取標準輸出

func TestReadStdoutRealTime(t *testing.T) {
	cmd := exec.Command("ping", "-c", "5", "192.168.0.1")
	stdout, _ := cmd.StdoutPipe()
	cmd.Start()

	collect := func(output io.Reader) error {
		scanner := bufio.NewScanner(output)
		scanner.Split(bufio.SplitFunc(bufio.ScanLines))
		for scanner.Scan() {
			if scanner.Err() != nil {
				fmt.Println(scanner.Err())
				return scanner.Err()
			}
			line := scanner.Text()
			fmt.Println(line)
		}

		return nil
	}

	if err := collect(stdout); err != nil {
		fmt.Println("collect stdout failed", err)
	}

	if err := cmd.Wait(); err != nil {
		fmt.Println("wait exec failed", err.Error())
	}
}

6.2 實時讀取標準輸出(錯誤用法)

func collector(ctx context.Context, output io.Reader) error {
	scanner := bufio.NewReader(output)
	for {
		// 當非同步收集紀錄檔比較慢,但命令執行很快就退出後,會將pipe關閉,導致管道無法讀取,
        // 報錯 read |0: file already closed
		// time.Sleep(2 * time.Second)
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return ctx.Err()
			}

			return errors.New("process existed")
		default:
			readStr, readErr := scanner.ReadString('\n')
			if readErr != nil {
				if readErr != io.EOF {
					return readErr
				}

				return nil
			}
			fmt.Println(readStr)
		}
	}
}

func TestReadStdoutWrongExample(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
	cmd := exec.CommandContext(ctx, "ping", "-c", "5", "192.168.0.1")

	stdout, _ := cmd.StdoutPipe()
	cmd.Start()

	go func() {
		err := collector(ctx, stdout)
		if err != nil {
			fmt.Println(err)
		}
	}()

	if err := cmd.Wait(); err != nil {
		fmt.Println("wait exec failed", err)
	}
}

7. 參考

Go exec 包執行命令超時失效問題分析及解決方案