k8s驅逐篇(7)-kube-controller-manager驅逐-taintManager原始碼分析

2023-06-25 12:01:38

概述

taintManager的主要功能為:當某個node被打上NoExecute汙點後,其上面的pod如果不能容忍該汙點,則taintManager將會驅逐這些pod,而新建的pod也需要容忍該汙點才能排程到該node上;

通過kcm啟動引數--enable-taint-manager來確定是否啟動taintManagertrue時啟動(啟動引數預設值為true);

kcm啟動引數--feature-gates=TaintBasedEvictions=xxx,預設值true,配合--enable-taint-manager共同作用,兩者均為true,才會開啟汙點驅逐;

kcm汙點驅逐

當node出現NoExecute汙點時,判斷node上的pod是否能容忍node的汙點,不能容忍的pod,會被立即刪除,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,pod被刪除;

原始碼分析

1.結構體分析

1.1 NoExecuteTaintManager結構體分析

NoExecuteTaintManager結構體為taintManager的主要結構體,其主要屬性有:
(1)taintEvictionQueue:不能容忍node上NoExecute的汙點的pod,會被加入到該佇列中,然後pod會被刪除;
(2)taintedNodes:記錄了每個node的taint;
(3)nodeUpdateQueue:當node物件發生add、delete、update(新舊node物件的taint不相同)事件時,node會進入該佇列;
(4)podUpdateQueue:當pod物件發生add、delete、update(新舊pod物件的NodeNameTolerations不相同)事件時,pod會進入該佇列;
(5)nodeUpdateChannelsnodeUpdateChannels即8個nodeUpdateItem型別的channel,有worker負責消費nodeUpdateQueue佇列,然後根據node name計算出index,把node放入其中1個nodeUpdateItem型別的channel中;
(6)podUpdateChannelspodUpdateChannels即8個podUpdateItem型別的channel,有worker負責消費podUpdateQueue佇列,然後根據pod的node name計算出index,把pod放入其中1個podUpdateItem型別的channel中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
type NoExecuteTaintManager struct {
	client                clientset.Interface
	recorder              record.EventRecorder
	getPod                GetPodFunc
	getNode               GetNodeFunc
	getPodsAssignedToNode GetPodsByNodeNameFunc

	taintEvictionQueue *TimedWorkerQueue
	// keeps a map from nodeName to all noExecute taints on that Node
	taintedNodesLock sync.Mutex
	taintedNodes     map[string][]v1.Taint

	nodeUpdateChannels []chan nodeUpdateItem
	podUpdateChannels  []chan podUpdateItem

	nodeUpdateQueue workqueue.Interface
	podUpdateQueue  workqueue.Interface
}

1.2 taintEvictionQueue分析

taintEvictionQueue屬性是一個TimedWorkerQueue型別的佇列,呼叫tc.taintEvictionQueue.AddWork,會將pod新增到該佇列中,會新增一個定時器,然後到期之後會自動執行workFunc,初始化taintEvictionQueue時,傳入的workFuncdeletePodHandler函數,作用是刪除pod;

所以進入taintEvictionQueue中的pod,會在設定好的時間,被刪除;

1.3 pod.Spec.Tolerations分析

pod.Spec.Tolerations設定的是pod的汙點容忍資訊;

// vendor/k8s.io/api/core/v1/types.go
type Toleration struct {
	Key string `json:"key,omitempty" protobuf:"bytes,1,opt,name=key"`
	Operator TolerationOperator `json:"operator,omitempty" protobuf:"bytes,2,opt,name=operator,casttype=TolerationOperator"`
	Value string `json:"value,omitempty" protobuf:"bytes,3,opt,name=value"`
	Effect TaintEffect `json:"effect,omitempty" protobuf:"bytes,4,opt,name=effect,casttype=TaintEffect"`
	TolerationSeconds *int64 `json:"tolerationSeconds,omitempty" protobuf:"varint,5,opt,name=tolerationSeconds"`
}

Tolerations的屬性值解析如下:
(1)Key:匹配node汙點的Key;
(2)Operator:表示Tolerations中Key與node汙點的Key相同時,其Value與node汙點的Value的關係,預設值Equal,代表相等,Exists則代表Tolerations中Key與node汙點的Key相同即可,不用比較其Value值;
(3)Value:匹配node汙點的Value;
(4)Effect:匹配node汙點的Effect;
(5)TolerationSeconds:node汙點容忍時間;

設定範例:

tolerations:
- key: "key1"
  operator: "Equal"
  value: "value1"
  effect: "NoExecute"
  tolerationSeconds: 3600

上述設定表示如果該pod正在執行,同時一個匹配的汙點被新增到其所在的node節點上,那麼該pod還將繼續在節點上執行3600秒,然後會被驅逐(如果在此之前其匹配的node汙點被刪除了,則該pod不會被驅逐);

2.初始化分析

2.1 NewNodeLifecycleController

NewNodeLifecycleControllerNodeLifecycleController的初始化函數,裡面給taintManager註冊了pod與node的EventHandlerAddUpdateDelete事件都會呼叫taintManagerPodUpdatedNodeUpdated方法來做處理;

// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func NewNodeLifecycleController(
    ...
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			...
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(nil, pod)
			}
		},
		UpdateFunc: func(prev, obj interface{}) {
			...
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(prevPod, newPod)
			}
		},
		DeleteFunc: func(obj interface{}) {
			...
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(pod, nil)
			}
		},
	})
    ...
    if nc.runTaintManager {
		podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
		nodeLister := nodeInformer.Lister()
		nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
		nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
				nc.taintManager.NodeUpdated(nil, node)
				return nil
			}),
			UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
				nc.taintManager.NodeUpdated(oldNode, newNode)
				return nil
			}),
			DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
				nc.taintManager.NodeUpdated(node, nil)
				return nil
			}),
		})
	}
	...
}

2.1.1 tc.NodeUpdated

tc.NodeUpdated方法會判斷新舊node物件的taint是否相同,不相同則呼叫tc.nodeUpdateQueue.Add,將該node放入到nodeUpdateQueue佇列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
	nodeName := ""
	oldTaints := []v1.Taint{}
	if oldNode != nil {
		nodeName = oldNode.Name
		oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
	}

	newTaints := []v1.Taint{}
	if newNode != nil {
		nodeName = newNode.Name
		newTaints = getNoExecuteTaints(newNode.Spec.Taints)
	}

	if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
		return
	}
	updateItem := nodeUpdateItem{
		nodeName: nodeName,
	}

	tc.nodeUpdateQueue.Add(updateItem)
}

2.1.2 tc.PodUpdated

tc.PodUpdated方法會判斷新舊pod物件的NodeNameTolerations是否相同,不相同則呼叫tc.podUpdateQueue.Add,將該pod放入到podUpdateQueue佇列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
	podName := ""
	podNamespace := ""
	nodeName := ""
	oldTolerations := []v1.Toleration{}
	if oldPod != nil {
		podName = oldPod.Name
		podNamespace = oldPod.Namespace
		nodeName = oldPod.Spec.NodeName
		oldTolerations = oldPod.Spec.Tolerations
	}
	newTolerations := []v1.Toleration{}
	if newPod != nil {
		podName = newPod.Name
		podNamespace = newPod.Namespace
		nodeName = newPod.Spec.NodeName
		newTolerations = newPod.Spec.Tolerations
	}

	if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
		return
	}
	updateItem := podUpdateItem{
		podName:      podName,
		podNamespace: podNamespace,
		nodeName:     nodeName,
	}

	tc.podUpdateQueue.Add(updateItem)
}

2.2 taintEvictionQueue

看到TaintManager的初始化方法NewNoExecuteTaintManager中,呼叫CreateWorkerQueuetaintEvictionQueue做了初始化;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func NewNoExecuteTaintManager(...) ... {
    ...
    tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
    ...
}

CreateWorkerQueue函數初始化並返回TimedWorkerQueue結構體;

// pkg/controller/nodelifecycle/scheduler/timed_workers.go
func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue {
	return &TimedWorkerQueue{
		workers:  make(map[string]*TimedWorker),
		workFunc: f,
	}
}

2.2.1 deletePodHandler

初始化taintEvictionQueue時傳入了deletePodHandler作為佇列中元素的處理方法;deletePodHandler函數的主要邏輯是請求apiserver,刪除pod物件,所以說,被放入到taintEvictionQueue佇列中的pod,會被刪除;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name
		klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		for i := 0; i < retries; i++ {
			err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}

2.2.2 tc.taintEvictionQueue.AddWork

再來看一下tc.taintEvictionQueue.AddWork方法,作用是新增pod進入taintEvictionQueue佇列,即呼叫CreateWorker給該pod建立一個worker來刪除該pod;

// pkg/controller/nodelifecycle/scheduler/timed_workers.go
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
	key := args.KeyFromWorkArgs()
	klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)

	q.Lock()
	defer q.Unlock()
	if _, exists := q.workers[key]; exists {
		klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
		return
	}
	worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
	q.workers[key] = worker
}

CreateWorker函數會先判斷是否應該立即執行workFunc,是的話立即拉起一個goroutine來執行workFunc並返回,否則定義一個timer定時器,到時間後自動拉起一個goroutine執行workFunc

// pkg/controller/nodelifecycle/scheduler/timed_workers.go
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
	delay := fireAt.Sub(createdAt)
	if delay <= 0 {
		go f(args)
		return nil
	}
	timer := time.AfterFunc(delay, func() { f(args) })
	return &TimedWorker{
		WorkItem:  args,
		CreatedAt: createdAt,
		FireAt:    fireAt,
		Timer:     timer,
	}
}

2.2.3 tc.taintEvictionQueue.Cancel

tc.taintEvictionQueue.AddWork方法,作用是停止對應的pod的timer,即停止執行對應pod的workFunc(不刪除pod);

// pkg/controller/nodelifecycle/scheduler/timed_workers.go
func (w *TimedWorker) Cancel() {
	if w != nil {
		w.Timer.Stop()
	}
}

3.核心處理邏輯分析

nc.taintManager.Run

nc.taintManager.RuntaintManager的啟動方法,處理邏輯都在這,主要是判斷node上的pod是否能容忍node的NoExecute汙點,不能容忍的pod,會被刪除,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,被刪除;

主要邏輯:
(1)建立8個型別為nodeUpdateItem的channel(緩衝區大小10),並賦值給tc.nodeUpdateChannels
建立8個型別為podUpdateItem的channel(緩衝區大小1),並賦值給podUpdateChannels

(2)消費tc.nodeUpdateQueue佇列,根據node name計算hash,將node放入對應的tc.nodeUpdateChannels[hash]中;

(3)消費tc.podUpdateQueue佇列,根據pod的node name計算hash,將node放入對應的tc.podUpdateChannels[hash]中;

(4)啟動8個goroutine,呼叫tc.worker對其中一個tc.nodeUpdateChannelstc.podUpdateChannels做處理,判斷node上的pod是否能容忍node的NoExecute汙點,不能容忍的pod,會被刪除,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,被刪除;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	klog.V(0).Infof("Starting NoExecuteTaintManager")

	for i := 0; i < UpdateWorkerSize; i++ {
		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
	}

	// Functions that are responsible for taking work items out of the workqueues and putting them
	// into channels.
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(nodeUpdateItem)
			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.nodeUpdateQueue.Done(item)
				return
			case tc.nodeUpdateChannels[hash] <- nodeUpdate:
				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
			}
		}
	}(stopCh)

	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			// The fact that pods are processed by the same worker as nodes is used to avoid races
			// between node worker setting tc.taintedNodes and pod worker reading this to decide
			// whether to delete pod.
			// It's possible that even without this assumption this code is still correct.
			podUpdate := item.(podUpdateItem)
			hash := hash(podUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.podUpdateQueue.Done(item)
				return
			case tc.podUpdateChannels[hash] <- podUpdate:
				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
			}
		}
	}(stopCh)

	wg := sync.WaitGroup{}
	wg.Add(UpdateWorkerSize)
	for i := 0; i < UpdateWorkerSize; i++ {
		go tc.worker(i, wg.Done, stopCh)
	}
	wg.Wait()
}

tc.worker

tc.worker方法負責消費nodeUpdateChannelspodUpdateChannels,分別呼叫tc.handleNodeUpdatetc.handlePodUpdate方法做進一步處理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
	defer done()

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			return
		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
			tc.handleNodeUpdate(nodeUpdate)
			tc.nodeUpdateQueue.Done(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannels[worker]:
			// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
					tc.handleNodeUpdate(nodeUpdate)
					tc.nodeUpdateQueue.Done(nodeUpdate)
				default:
					break priority
				}
			}
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
			tc.podUpdateQueue.Done(podUpdate)
		}
	}
}

3.1 tc.handleNodeUpdate

tc.handleNodeUpdate方法主要是判斷node上的pod是否能容忍node的NoExecute汙點,不能容忍的pod,會被刪除,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,被刪除;

主要邏輯:
(1)從informer本地快取中獲取node物件;
(2)從node.Spec.Taints中獲取NoExecutetaints
(3)將該node的NoExecutetaints更新到tc.taintedNodes中;
(4)呼叫tc.getPodsAssignedToNode,獲取該node上的所有pod,如果pod數量為0,直接return;
(5)如果node的NoExecutetaints數量為0,則遍歷該node上所有pod,呼叫tc.cancelWorkWithEvent,將該pod從taintEvictionQueue佇列中移除,然後直接return;
(6)遍歷該node上所有pod,呼叫tc.processPodOnNode,對pod做進一步處理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
	node, err := tc.getNode(nodeUpdate.nodeName)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
			tc.taintedNodesLock.Lock()
			defer tc.taintedNodesLock.Unlock()
			delete(tc.taintedNodes, nodeUpdate.nodeName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
		return
	}

	// Create or Update
	klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
	taints := getNoExecuteTaints(node.Spec.Taints)
	func() {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
		if len(taints) == 0 {
			delete(tc.taintedNodes, node.Name)
		} else {
			tc.taintedNodes[node.Name] = taints
		}
	}()

	// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
	// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
	// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
	pods, err := tc.getPodsAssignedToNode(node.Name)
	if err != nil {
		klog.Errorf(err.Error())
		return
	}
	if len(pods) == 0 {
		return
	}
	// Short circuit, to make this controller a bit faster.
	if len(taints) == 0 {
		klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
		for i := range pods {
			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
		}
		return
	}

	now := time.Now()
	for _, pod := range pods {
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
	}
}

3.1.1 tc.processPodOnNode

tc.processPodOnNode方法主要作用是判斷pod是否能容忍node上所有的NoExecute的汙點,如果不能,則將該pod加到taintEvictionQueue佇列中,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,加到taintEvictionQueue佇列中;

主要邏輯:
(1)如果node的NoExecutetaints數量為0,則呼叫tc.cancelWorkWithEvent,將該pod從taintEvictionQueue佇列中移除;
(2)呼叫v1helper.GetMatchingTolerations,判斷pod是否容忍node上所有的NoExecute的taints,以及獲取能容忍taints的容忍列表;
(3)如果不能容忍所有汙點,則呼叫tc.taintEvictionQueue.AddWork,將該pod加到taintEvictionQueue佇列中;
(4)如果能容忍所有汙點,則等待所有汙點的容忍時間裡最小值後,再呼叫tc.taintEvictionQueue.AddWork,將該pod加到taintEvictionQueue佇列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) processPodOnNode(
	podNamespacedName types.NamespacedName,
	nodeName string,
	tolerations []v1.Toleration,
	taints []v1.Taint,
	now time.Time,
) {
	if len(taints) == 0 {
		tc.cancelWorkWithEvent(podNamespacedName)
	}
	allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
	if !allTolerated {
		klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
		// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
		tc.cancelWorkWithEvent(podNamespacedName)
		tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
		return
	}
	minTolerationTime := getMinTolerationTime(usedTolerations)
	// getMinTolerationTime returns negative value to denote infinite toleration.
	if minTolerationTime < 0 {
		klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
		return
	}

	startTime := now
	triggerTime := startTime.Add(minTolerationTime)
	scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
	if scheduledEviction != nil {
		startTime = scheduledEviction.CreatedAt
		if startTime.Add(minTolerationTime).Before(triggerTime) {
			return
		}
		tc.cancelWorkWithEvent(podNamespacedName)
	}
	tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

3.2 tc.handlePodUpdate

tc.handlePodUpdate方法最終也是呼叫了tc.processPodOnNode對pod做進一步處理;

tc.processPodOnNode方法在上面已經分析過了,這裡不再進行分析;

主要邏輯:
(1)從informer本地快取中獲取pod物件;
(2)獲取pod的node name,如果為空,直接return;
(3)根據node name從tc.taintedNodes中獲取node的汙點,如果汙點為空,直接return;
(4)呼叫tc.processPodOnNode對pod做進一步處理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
	pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
			klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
			tc.cancelWorkWithEvent(podNamespacedName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
		return
	}

	// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
	if pod.Spec.NodeName != podUpdate.nodeName {
		return
	}

	// Create or Update
	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
	klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
	nodeName := pod.Spec.NodeName
	if nodeName == "" {
		return
	}
	taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		taints, ok := tc.taintedNodes[nodeName]
		return taints, ok
	}()
	// It's possible that Node was deleted, or Taints were removed before, which triggered
	// eviction cancelling if it was needed.
	if !ok {
		return
	}
	tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}

總結

taintManager的主要功能為:當某個node被打上NoExecute汙點後,其上面的pod如果不能容忍該汙點,則taintManager將會驅逐這些pod,而新建的pod也需要容忍該汙點才能排程到該node上;

通過kcm啟動引數--enable-taint-manager來確定是否啟動taintManagertrue時啟動(啟動引數預設值為true);

kcm啟動引數--feature-gates=TaintBasedEvictions=xxx,預設值true,配合--enable-taint-manager共同作用,兩者均為true,才會開啟汙點驅逐;

kcm汙點驅逐

當node出現NoExecute汙點時,判斷node上的pod是否能容忍node的汙點,不能容忍的pod,會被立即刪除,能容忍所有汙點的pod,則等待所有汙點的容忍時間裡最小值後,pod被刪除;