最新發現一個比較有意思的庫ksniff,它是一個kubectl 外掛,使用tcpdump來遠端捕獲Kubernetes叢集中的pod流量並儲存到檔案或輸出到wireshark中,釋出網路問題定位。使用方式如下:
kubectl sniff hello-minikube-7c77b68cff-qbvsd -c hello-minikube
要知道很多pod中其實是沒有tcpdump這個可執行檔案的,那它是如何在Kubernetes叢集的Pod中遠端執行tcpdump命令的?又是如何倒出Pod的tcpdump的輸出並將輸出直接傳遞給wireshark的?下面分析一下該工具的實現方式。
ksniff有兩種執行模式:特權模式和非特權模式。首先看下非特權模式。
非特權模式的執行邏輯為:
ksniff使用tar命令對tcpdump可執行檔案進行打包,然後通過client-go的remotecommand
庫將其解壓到pod中,最後執行tcpdump命令即可:
fileContent, err := ioutil.ReadFile(req.Src) //讀取tcpdump可執行檔案
if err != nil {
return 0, err
}
tarFile, err := WrapAsTar(destFileName, fileContent)//將使用tar命令對tcpdump進行打包
if err != nil {
return 0, err
}
stdIn := bytes.NewReader(tarFile) //通過標準輸入傳遞給容器
tarCmd := []string{"tar", "-xf", "-"} //構建解壓命令
destDir := path.Dir(req.Dst)
if len(destDir) > 0 {
tarCmd = append(tarCmd, "-C", destDir)
}
execTarRequest := ExecCommandRequest{
KubeRequest: KubeRequest{
Clientset: req.Clientset,
RestConfig: req.RestConfig,
Namespace: req.Namespace,
Pod: req.Pod,
Container: req.Container,
},
Command: tarCmd,
StdIn: stdIn,
StdOut: stdOut,
StdErr: stdErr,
}
exitCode, err := PodExecuteCommand(execTarRequest)
tar打包的實現如下:
func WrapAsTar(fileNameOnTar string, fileContent []byte) ([]byte, error) {
var buf bytes.Buffer
tw := tar.NewWriter(&buf)
hdr := &tar.Header{
Name: fileNameOnTar,
Mode: 0755,
Size: int64(len(fileContent)),
}
if err := tw.WriteHeader(hdr); err != nil {
return nil, err
}
if _, err := tw.Write(fileContent); err != nil {
return nil, err
}
if err := tw.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
下面是遠端在pod中執行命令的程式碼,是client-go remotecommand
庫的標準用法,沒有什麼特別之處:
func (k *KubernetesApiServiceImpl) ExecuteCommand(podName string, containerName string, command []string, stdOut io.Writer) (int, error) {
log.Infof("executing command: '%s' on container: '%s', pod: '%s', namespace: '%s'", command, containerName, podName, k.targetNamespace)
stdErr := new(Writer)
executeTcpdumpRequest := ExecCommandRequest{
KubeRequest: KubeRequest{
Clientset: k.clientset,
RestConfig: k.restConfig,
Namespace: k.targetNamespace,
Pod: podName,
Container: containerName,
},
Command: command,
StdErr: stdErr,
StdOut: stdOut,
}
exitCode, err := PodExecuteCommand(executeTcpdumpRequest)
if err != nil {
log.WithError(err).Errorf("failed executing command: '%s', exitCode: '%d', stdErr: '%s'",
command, exitCode, stdErr.Output)
return exitCode, err
}
log.Infof("command: '%s' executing successfully exitCode: '%d', stdErr :'%s'", command, exitCode, stdErr.Output)
return exitCode, err
}
func PodExecuteCommand(req ExecCommandRequest) (int, error) {
execRequest := req.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(req.Pod).
Namespace(req.Namespace).
SubResource("exec")
execRequest.VersionedParams(&corev1.PodExecOptions{
Container: req.Container,
Command: req.Command,
Stdin: req.StdIn != nil,
Stdout: req.StdOut != nil,
Stderr: req.StdErr != nil,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(req.RestConfig, "POST", execRequest.URL())
if err != nil {
return 0, err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: req.StdIn,
Stdout: req.StdOut, //重定向的輸出,可以是檔案或wireshark
Stderr: req.StdErr,
Tty: false,
})
var exitCode = 0
if err != nil {
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
exitCode = exitErr.ExitStatus()
err = nil
}
}
return exitCode, err
}
該步驟就是組裝遠端命令,並在目標pod中執行即可:
func (u *StaticTcpdumpSnifferService) Start(stdOut io.Writer) error {
log.Info("start sniffing on remote container")
command := []string{u.settings.UserSpecifiedRemoteTcpdumpPath, "-i", u.settings.UserSpecifiedInterface,
"-U", "-w", "-", u.settings.UserSpecifiedFilter}
exitCode, err := u.kubernetesApiService.ExecuteCommand(u.settings.UserSpecifiedPodName, u.settings.UserSpecifiedContainer, command, stdOut)
if err != nil || exitCode != 0 {
return errors.Errorf("executing sniffer failed, exit code: '%d'", exitCode)
}
log.Infof("done sniffing on remote container")
return nil
}
wireshark庫支援輸入重定向,使用o.wireshark.StdinPipe()
建立出輸入之後,將其作為遠端呼叫tcpdump命令的StreamOptions.Stdout
的引數即可將pod的輸出重定向到wireshark中:
title := fmt.Sprintf("gui.window_title:%s/%s/%s", o.resultingContext.Namespace, o.settings.UserSpecifiedPodName, o.settings.UserSpecifiedContainer)
o.wireshark = exec.Command("wireshark", "-k", "-i", "-", "-o", title)
stdinWriter, err := o.wireshark.StdinPipe() //建立輸入
if err != nil {
return err
}
go func() {
err := o.snifferService.Start(stdinWriter)//將wireshark建立的輸入作為pod的輸出
if err != nil {
log.WithError(err).Errorf("failed to start remote sniffing, stopping wireshark")
_ = o.wireshark.Process.Kill()
}
}()
err = o.wireshark.Run()
特權模式的處理有一些複雜,該模式下,ksniff會在目標pod所在的node節點(通過目標pod的pod.Spec.NodeName
欄位獲取)上建立一個許可權為privileged
的pod,並掛載主機的/
目錄和預設的容器socket,然後在特權pod內呼叫對應的容器執行時命令來執行tcpdump命令。ksniff支援三種常見的容器執行時:docker
、cri-o
和containerd
,對應的容器執行時的預設目錄如下:
/var/run/docker.sock
/var/run/crio/crio.sock
/run/containerd/containerd.sock
由於特權模式可能會建立一個新的pod,因此在命令執行完之後需要清理掉新建的pod。
特權模式下會呼叫目標節點上的容器執行時命令,不同容器執行時的命令是不同的,那麼ksniff是如何區分不同的容器執行時呢?
ksniff會通過kubernetes clientset來獲取目標pod資訊,通過pod.status.containerStatuses.containerID
欄位來確定所使用的CRI,如下例,其CRI為containerd
,containerId為0f76ee399228ed02f8ba13a6bbec6bb8b696f4f1997176882b309edbe3a56ee1
:
status:
containerStatuses:
- containerID: containerd://0f76ee399228ed02f8ba13a6bbec6bb8b696f4f1997176882b309edbe3a56ee1
....
容器執行時和ContainerId的獲取方式如下:
func (o *Ksniff) findContainerId(pod *corev1.Pod) error {
for _, containerStatus := range pod.Status.ContainerStatuses {
if o.settings.UserSpecifiedContainer == containerStatus.Name {
result := strings.Split(containerStatus.ContainerID, "://")
if len(result) != 2 {
break
}
o.settings.DetectedContainerRuntime = result[0] //獲取容器執行時
o.settings.DetectedContainerId = result[1] //獲取containerID
return nil
}
}
return errors.Errorf("couldn't find container: '%s' in pod: '%s'", o.settings.UserSpecifiedContainer, o.settings.UserSpecifiedPodName)
}
下面看下不同執行時是如何執行tcpdump命令的。
Containerd會在特權pod內通過crictl pull
來拉取tcpdump映象並啟動tcpdump容器,使其和目標容器(containerId
)共用相同的網路名稱空間,這樣就可以使用tcpdump抓取目標容器的報文。在命令執行完之後需要清理建立出來的tcpdump容器。
func (d *ContainerdBridge) BuildTcpdumpCommand(containerId *string, netInterface string, filter string, pid *string, socketPath string, tcpdumpImage string) []string {
d.tcpdumpContainerName = "ksniff-container-" + utils.GenerateRandomString(8)
d.socketPath = socketPath
tcpdumpCommand := fmt.Sprintf("tcpdump -i %s -U -w - %s", netInterface, filter)
shellScript := fmt.Sprintf(`
set -ex
export CONTAINERD_SOCKET="%s"
export CONTAINERD_NAMESPACE="k8s.io"
export CONTAINER_RUNTIME_ENDPOINT="unix:///host${CONTAINERD_SOCKET}"
export IMAGE_SERVICE_ENDPOINT=${CONTAINER_RUNTIME_ENDPOINT}
crictl pull %s >/dev/null
netns=$(crictl inspect %s | jq '.info.runtimeSpec.linux.namespaces[] | select(.type == "network") | .path' | tr -d '"')
exec chroot /host ctr -a ${CONTAINERD_SOCKET} run --rm --with-ns "network:${netns}" %s %s %s
`, d.socketPath, tcpdumpImage, *containerId, tcpdumpImage, d.tcpdumpContainerName, tcpdumpCommand)
command := []string{"/bin/sh", "-c", shellScript}
return command
}
func (d *ContainerdBridge) BuildCleanupCommand() []string {
shellScript := fmt.Sprintf(`
set -ex
export CONTAINERD_SOCKET="%s"
export CONTAINERD_NAMESPACE="k8s.io"
export CONTAINER_ID="%s"
chroot /host ctr -a ${CONTAINERD_SOCKET} task kill -s SIGKILL ${CONTAINER_ID}
`, d.socketPath, d.tcpdumpContainerName)
command := []string{"/bin/sh", "-c", shellScript}
return command
}
Cri-o通過nsenter指定目標容器的程序進入目標網路名稱空間來執行tcpdump命令,由於它沒有使用tcpdump映象,因此要求目標節點上需要存在tcpdump可執行檔案:
func (c *CrioBridge) BuildTcpdumpCommand(containerId *string, netInterface string, filter string, pid *string, socketPath string, tcpdumpImage string) []string {
return []string{"nsenter", "-n", "-t", *pid, "--", "tcpdump", "-i", netInterface, "-U", "-w", "-", filter}
}
這種方式下沒有在特權pod內部建立容器,因此不需要清理環境。
docker的處理方式和containerd類似,也是通過啟動tcpdump容器,並和目標容器共用網路名稱空間實現的:
func (d *DockerBridge) BuildTcpdumpCommand(containerId *string, netInterface string, filter string, pid *string, socketPath string, tcpdumpImage string) []string {
d.tcpdumpContainerName = "ksniff-container-" + utils.GenerateRandomString(8)
containerNameFlag := fmt.Sprintf("--name=%s", d.tcpdumpContainerName)
command := []string{"docker", "--host", "unix://" + socketPath,
"run", "--rm", "--log-driver", "none", containerNameFlag,
fmt.Sprintf("--net=container:%s", *containerId), tcpdumpImage, "-i",
netInterface, "-U", "-w", "-", filter}
d.cleanupCommand = []string{"docker", "--host", "unix://" + socketPath,
"rm", "-f", d.tcpdumpContainerName}
return command
}
func (d *DockerBridge) BuildCleanupCommand() []string {
return d.cleanupCommand
}
由於特權模式下建立了特權pod,containerd和docker還會在特權pod內建立tcpdump容器,因此在進行環境清理時需要清理掉建立出來的tcpdump容器,然後再清理掉特權pod:
func (p *PrivilegedPodSnifferService) Cleanup() error {
command := p.runtimeBridge.BuildCleanupCommand()
if command != nil {
log.Infof("removing privileged container: '%s'", p.privilegedContainerName)
exitCode, err := p.kubernetesApiService.ExecuteCommand(p.privilegedPod.Name, p.privilegedContainerName, command, &kube.NopWriter{})
if err != nil {
log.WithError(err).Errorf("failed to remove privileged container: '%s', exit code: '%d', "+
"please manually remove it", p.privilegedContainerName, exitCode)
} else {
log.Infof("privileged container: '%s' removed successfully", p.privilegedContainerName)
}
}
if p.privilegedPod != nil {
log.Infof("removing pod: '%s'", p.privilegedPod.Name)
err := p.kubernetesApiService.DeletePod(p.privilegedPod.Name)
if err != nil {
log.WithError(err).Errorf("failed to remove pod: '%s", p.privilegedPod.Name)
return err
}
log.Infof("pod: '%s' removed successfully", p.privilegedPod.Name)
}
return nil
}
非特權模式的實現比較簡單,不需要考慮容器執行時的問題,但它也有一個缺點,就是需要考慮目標容器的執行環境,比如32位元/64位元、amd/arm等,可能需要在本地準備多套tcpdump來滿足不同的容器執行環境。
特權模式的實現相對比較複雜,如果還有其他的執行時,就需要對ksniff進行功能擴充套件。且有些叢集節點上可能會禁用特權pod,導致該方法行不通。
儘管存在一些使用上的限制,但本文在檔案上傳以及對不同容器執行時方面的處理還是很值得借鑑的。
本文來自部落格園,作者:charlieroro,轉載請註明原文連結:https://www.cnblogs.com/charlieroro/p/17109943.html