kubernetes叢集中的排程程式 kube-scheduler
會 watch
未分配節點的新建立的Pod,並未該Pod找到可執行的最佳(特定)節點。那麼這些動作或者說這些原理是怎麼實現的呢,讓我們往下剖析下。
對於新建立的 pod 或其他未排程的 pod來講,kube-scheduler 選擇一個最佳節點供它們執行。但是,Pod 中的每個容器對資源的要求都不同,每個 Pod 也有不同的要求。因此,需要根據具體的排程要求對現有節點進行過濾。
在Kubernetes叢集中,滿足 Pod 排程要求的節點稱為可行節點 ( feasible nodes
FN) 。如果沒有合適的節點,則 pod 將保持未排程狀態,直到排程程式能夠放置它。也就是說,當我們建立Pod時,如果長期處於 Pending
狀態,這個時候應該看你的叢集排程器是否因為某些問題沒有合適的節點了
排程器為 Pod 找到 FN 後,然後執行一組函數對 FN 進行評分,並在 FN 中找到得分最高的節點來執行 Pod。
排程策略在決策時需要考慮的因素包括個人和集體資源需求、硬體/軟體/策略約束 (constraints
)、親和性 (affinity
) 和反親和性( anti-affinity
)規範、資料區域性性、工作負載間干擾等。
kube-scheduler
為pod選擇節點會分位兩部:
Filtering
)Scoring
)過濾也被稱為預選 (Predicates
),該步驟會找到可排程的節點集,然後通過是否滿足特定資源的請求,例如通過 PodFitsResources
過濾器檢查候選節點是否有足夠的資源來滿足 Pod 資源的請求。這個步驟完成後會得到一個包含合適的節點的列表(通常為多個),如果列表為空,則Pod不可排程。
打分也被稱為優選(Priorities
),在該步驟中,會對上一個步驟的輸出進行打分,Scheduer 通過打分的規則為每個通過 Filtering
步驟的節點計算出一個分數。
完成上述兩個步驟之後,kube-scheduler
會將Pod分配給分數最高的 Node,如果存在多個相同分數的節點,會隨機選擇一個。
Kubernetes 1.21之前版本可以在程式碼 kubernetes\pkg\scheduler\algorithmprovider\registry.go 中看到對應的註冊模式,在1.22 scheduler 更換了其路徑,對於registry檔案更換到了kubernetes\pkg\scheduler\framework\plugins\registry.go ;對於kubernetes官方說法為,排程策略是用於「預選」 (Predicates
)或 過濾(filtering
) 和 用於 優選(Priorities
)或 評分 (scoring
)的
注:kubernetes官方沒有找到預選和優選的概念,而Predicates和filtering 是處於預選階段的動詞,而Priorities和scoring是優選階段的動詞。後面用PF和PS代替這個兩個詞。
上面也提到了,filtering
的目的是為了排除(過濾)掉不滿足 Pod 要求的節點。例如,某個節點上的閒置資源小於 Pod 所需資源,則該節點不會被考慮在內,即被過濾掉。在 「Predicates」 階段實現的 filtering 策略,包括:
NoDiskConflict
:評估是否有合適Pod請求的卷NoVolumeZoneConflict
:在給定zone限制情況下,評估Pod請求所需的卷在Node上是否可用PodFitsResources
:檢查空閒資源(CPU、記憶體)是否滿足Pod請求PodFitsHostPorts
:檢查Pod所需埠在Node上是否被佔用HostName
: 過濾除去,PodSpec
中 NodeName
欄位中指定的Node之外的所有Node。MatchNodeSelector
:檢查Node的 label 是否與 Pod 設定中 nodeSelector
欄位中指定的 label 匹配,並且從 Kubernetes v1.2 開始, 如果存在 nodeAffinity
也會匹配。CheckNodeMemoryPressure
:檢查是否可以在已出現記憶體壓力情況節點上排程 Pod。CheckNodeDiskPressure
:檢查是否可以在報告磁碟壓力情況的節點上排程 Pod具體對應得策略可以在 kubernetes\pkg\scheduler\framework\plugins\registry.go 看到
通過上面步驟過濾過得列表則是適合託管的Pod,這個結果通常來說是一個列表,如何選擇最優Node進行排程,則是接下來打分的步驟步驟。
例如:Kubernetes對剩餘節點進行優先順序排序,優先順序由一組函數計算;優先順序函數將為剩餘節點給出從0~10
的分數,10 表示最優,0 表示最差。每個優先順序函數由一個正數加權組成,每個Node的得分是通過將所有加權得分相加來計算的。設有兩個優先順序函數,priorityFunc1
和 priorityFunc2
加上權重因子 weight1
和weight2
,那麼這個Node的最終得分為:\(finalScore = (w1 \times priorityFunc1) + (w2 \times priorityFunc2)\)。計算完分數後,選擇最高分數的Node做為Pod的宿主機,存在多個相同分數Node情況下會隨機選擇一個Node。
目前kubernetes提供了一些在打分 Scoring 階段演演算法:
LeastRequestedPriority
:Node的優先順序基於Node的空閒部分\(\frac{capacity\ -\ Node上所有存在的Pod\ -\ 正在排程的Pod請求}{capacity}\),通過計算具有最高分數的Node是FNBalancedResourceAllocation
:該演演算法會將 Pod 放在一個Node上,使得在Pod 部署後 CPU 和記憶體的使用率為平衡的SelectorSpreadPriority
:通過最小化資源方式,將屬於同一種服務、控制器或同一Node上的Replica的 Pod的數量來分佈Pod。如果節點上存在Zone,則會調整優先順序,以便 pod可以分佈在Zone之上。CalculateAntiAffinityPriority
:根據label來分佈,按照相同service上相同label值的pod進行分配ImageLocalityPriority
:根據Node上映象進行打分,Node上存在Pod請求所需的映象優先順序較高。以 PodFitsHostPorts
演演算法為例,因為是Node類演演算法,在kubernetes\pkg\scheduler\framework\plugins\nodeports
排程框架 (scheduling framework
SF ) 是kubernetes為 scheduler設計的一個pluggable的架構。SF 將scheduler設計為 Plugin 式的 API,API將上一章中提到的一些列排程策略實現為 Plugin
。
在 SF 中,定義了一些擴充套件點 (extension points
EP ),而被實現為Plugin的排程程式將被註冊在一個或多個 EP 中,換句話來說,在這些 EP 的執行過程中如果註冊在多個 EP 中,將會在多個 EP 被呼叫。
每次排程都分為兩個階段,排程週期(Scheduling Cycel
)與繫結週期(Binding Cycle
)。
排程週期與繫結週期結合一起,被稱為排程上下文 (Scheduling Context
),下圖則是排程上下文的工作流
注:如果決策結果為Pod的排程結果無可用節點,或存在內部錯誤,則中止 SC 或 BC。Pod將重入佇列重試
擴充套件點(Extension points
)是指在排程上下文中的每個可延伸API,通過圖提現為[圖1]。其中 Filter
相當於 Predicate
而 Scoring
相當於 Priority
。
對於排程階段會通過以下擴充套件點:
Sort
:該外掛提供了排序功能,用於對在排程佇列中待處理 Pod 進行排序。一次只能啟用一個佇列排序。
preFilter
:該外掛用於在過濾之前預處理或檢查 Pod 或叢集的相關資訊。這裡會終止排程
filter
:該外掛相當於排程上下文中的 Predicates
,用於排除不能執行 Pod 的節點。Filter 會按設定的順序進行呼叫。如果有一個filter將節點標記位不可用,則將 Pod 標記為不可排程(即不會向下執行)。
postFilter
:當沒有為 pod 找到FN時,該外掛會按照設定的順序進行呼叫。如果任何postFilter
外掛將 Pod 標記為schedulable,則不會呼叫其餘外掛。即 filter
成功後不會進行這步驟
preScore
:可用於進行預Score工作(通知性的擴充套件點)。
score
:該外掛為每個通過 filter
階段的Node提供打分服務。然後Scheduler將選擇具有最高加權分數總和的Node。
reserve
:因為繫結事件時非同步發生的,該外掛是為了避免Pod在繫結到節點前時,排程到新的Pod,使節點使用資源超過可用資源情況。如果後續階段發生錯誤或失敗,將觸發 UnReserve
回滾(通知性擴充套件點)。這也是作為排程週期中最後一個狀態,要麼成功到 postBind
,要麼失敗觸發 UnReserve
。
permit
:該外掛可以阻止或延遲 Pod 的繫結,一般情況下這步驟會做三件事:
appove
:排程器繼續繫結過程Deny
:如果任何一個Premit拒絕了Pod與節點的繫結,那麼將觸發 UnReserve
,並重入佇列Wait
: 如果 Permit 外掛返回 Wait
,該 Pod 將保留在內部 Wait
Pod 列表中,直到被 Appove
。如果發生超時,wait
變為 deny
,將Pod放回至排程佇列中,並觸發 Unreserve
回滾 。preBind
:該外掛用於在 bind Pod 之前執行所需的前置工作。如,preBind
可能會提供一個網路卷並將其掛載到目標節點上。如果在該步驟中的任意外掛返回錯誤,則Pod 將被 deny
並放置到排程佇列中。
bind
:在所有的 preBind
完成後,該外掛將用於將Pod繫結到Node,並按順序呼叫繫結該步驟的外掛。如果有一個外掛處理了這個事件,那麼則忽略其餘所有外掛。
postBind
:該外掛在繫結 Pod 後呼叫,可用於清理相關資源(通知性的擴充套件點)。
multiPoint
:這是一個僅設定欄位,允許同時為所有適用的擴充套件點啟用或禁用外掛。
對於 kube-scheduler
元件的分析,包含 kube-scheduler
啟動流程,以及scheduler排程流程。這裡會主要針對啟動流程分析,後面演演算法及二次開發部分會切入排程分析。
對於我們部署時使用的 kube-scheduler
位於 cmd/kube-scheduler ,在 Alpha (1.16) 版本提供了排程框架的模式,到 Stable (1.19) ,從程式碼結構上是相似的;直到1.22後改變了程式碼風格。
首先看到的是 kube-scheduler
的入口 cmd/kube-scheduler ,這裡主要作為兩部分,構建引數與啟動server
,這裡嚴格來講 kube-scheduer
是作為一個server,而排程框架等部分是另外的。
func main() {
command := app.NewSchedulerCommand()
code := cli.Run(command)
os.Exit(code)
}
cli.Run
提供了cobra構成的命令列cli,紀錄檔將輸出為標準輸出
// 這裡是main中執行的Run
func Run(cmd *cobra.Command) int {
if logsInitialized, err := run(cmd); err != nil {
if !logsInitialized {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
} else {
klog.ErrorS(err, "command failed")
}
return 1
}
return 0
}
// 這個run作為
func run(cmd *cobra.Command) (logsInitialized bool, err error) {
rand.Seed(time.Now().UnixNano())
defer logs.FlushLogs()
cmd.SetGlobalNormalizationFunc(cliflag.WordSepNormalizeFunc)
if !cmd.SilenceUsage {
cmd.SilenceUsage = true
cmd.SetFlagErrorFunc(func(c *cobra.Command, err error) error {
// Re-enable usage printing.
c.SilenceUsage = false
return err
})
}
// In all cases error printing is done below.
cmd.SilenceErrors = true
// This is idempotent.
logs.AddFlags(cmd.PersistentFlags())
// Inject logs.InitLogs after command line parsing into one of the
// PersistentPre* functions.
switch {
case cmd.PersistentPreRun != nil:
pre := cmd.PersistentPreRun
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
logs.InitLogs()
logsInitialized = true
pre(cmd, args)
}
case cmd.PersistentPreRunE != nil:
pre := cmd.PersistentPreRunE
cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
logs.InitLogs()
logsInitialized = true
return pre(cmd, args)
}
default:
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
logs.InitLogs()
logsInitialized = true
}
}
err = cmd.Execute()
return
}
可以看到最終是呼叫 command.Execute()
執行,這個是執行本身構建的命令,而真正被執行的則是上面的 app.NewSchedulerCommand()
,那麼來看看這個是什麼
app.NewSchedulerCommand() 構建了一個cobra.Commond物件, runCommand() 被封裝在內,這個是作為啟動scheduler的函數
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
nfs := opts.Flags
verflag.AddFlags(nfs.FlagSet("global"))
globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
fs := cmd.Flags()
for _, f := range nfs.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
klog.ErrorS(err, "Failed to mark flag filename")
}
return cmd
}
下面來看下 runCommand() 在啟動 scheduler 時提供了什麼功能。
在新版中已經沒有 algorithmprovider
的概念,所以在 runCommand
中做的也就是僅僅啟動這個 scheduler
,而 scheduler 作為kubernetes元件,也是會watch等操作,自然少不了informer。其次作為和 controller-manager
相同的工作特性,kube-scheduler
也是 基於Leader選舉的。
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
上面看到了 runCommend
是作為啟動 scheduler 的工作,那麼通過引數構建一個 scheduler 則是在 Setup 中完成的。
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if cfg, err := latest.Default(); err != nil {
return nil, nil, err
} else {
opts.ComponentConfig = cfg
}
// 驗證引數
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 構建一個config物件
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// 返回一個config物件,包含了scheduler所需的設定,如informer,leader selection
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// 建立出來的scheduler
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
completedProfiles = append(completedProfiles, profile)
}),
)
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}
return &cc, sched, nil
}
上面瞭解到了 scheduler 是如何被構建出來的,下面就看看 構建時引數是如何傳遞進來的,而物件 option就是對應需要的設定結構,而 ApplyTo 則是將啟動時傳入的引數轉化為構建 scheduler 所需的設定。
對於Deprecated flags可以參考官方對於kube-scheduler啟動引數的說明 [5]
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
if len(o.ConfigFile) == 0 {
// 在沒有指定 --config時會找到 Deprecated flags:引數
// 通過kube-scheduler --help可以看到這些被棄用的引數
o.ApplyDeprecated()
o.ApplyLeaderElectionTo(o.ComponentConfig)
c.ComponentConfig = *o.ComponentConfig
} else {
// 這裡就是指定了--config
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// 這裡會將leader選舉的引數附加到設定中
o.ApplyLeaderElectionTo(cfg)
if err := validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
return err
}
c.ComponentConfig = *cfg
}
if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
}
o.Metrics.Apply()
// Apply value independently instead of using ApplyDeprecated() because it can't be configured via ComponentConfig.
if o.Deprecated != nil {
c.PodMaxInUnschedulablePodsDuration = o.Deprecated.PodMaxInUnschedulablePodsDuration
}
return nil
}
Setup
後會new一個 schedueler
, New 則是這個動作,在裡面可以看出,會初始化一些informer與 Pod的list等操作。
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
options := defaultSchedulerOptions // 預設排程策略,如percentageOfNodesToScore
for _, opt := range opts {
opt(&options) // opt 是傳入的函數,會返回一個schedulerOptions即相應的一些設定
}
if options.applyDefaultProfile { // 這個是個bool型別,預設scheduler會到這裡
// Profile包含了排程器的名稱與排程器在兩個過程中使用的外掛
var versionedCfg v1beta3.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{} // 初始化一個設定,這個是--config傳入的型別。因為預設的排程策略會初始化
// convert 會將in轉為out即versionedCfg轉換為cfg
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
registry := frameworkplugins.NewInTreeRegistry() // 排程框架的註冊
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register() // 指標類
extenders, err := buildExtenders(options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator(podLister)
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)
sched := newScheduler(
schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
stopEverything,
podQueue,
profiles,
client,
snapshot,
options.percentageOfNodesToScore,
)
// 這個就是controller中onAdd等那三個必須的事件函數
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
return sched, nil
}
接下來會啟動這個 scheduler, 在上面我們看到 NewSchedulerCommand 構建了一個cobra.Commond物件, runCommand() 最終會返回個 Run,而這個Run就是啟動這個 sche 的。
下面這個 run 是 sche 的執行,他執行並watch資源,直到上下文完成。
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
}
而呼叫這個 Run 的部分則是作為server的 kube-scheduler 中的 run
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
而上面的 server.Run 會被 runCommand
也就是在 NewSchedulerCommand
時被返回,在 kube-scheduler
的入口檔案中被執行。
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
return Run(ctx, cc, sched)
至此,整個 kube-scheduler
啟動流就分析完了,這個的流程可以用下圖表示
Reference
[1] kube scheduler
[2] Scheduler Algorithm in Kubernetes
[4] permit