kubernetes:kube-apiserver
系列文章:
上幾篇文章介紹了 kubernetes
的核心資料結構 scheme
和 Kubernetes:kube-apiserver
的啟動流程。在啟動流程篇中重點關注的是啟動的核心邏輯,並沒有關注 kube-apiserver
和外部元件的互動。
而,互動是非常必要的,其定義了邊界和依賴。
從 Kubernetes
架構圖可以看出,kube-apiserver
是唯一和 etcd
互動的元件。因此,這裡將 kube-apiserver
和 etcd
互動的部分單獨拿出來加以介紹,做到知其然,知其所以然。
既然是互動,首先需要了解的是怎麼用互動的元件。這裡同 kube-apiserver
互動的是大名鼎鼎的 etcd
,不需要多介紹它。
僅給出範例:
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
})
if err != nil {
log.Fatal(err)
}
fmt.Println("connect to etcd success.")
defer cli.Close()
// lease with 5 second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// delete key:name after expire of lease
_, err = cli.Put(context.TODO(), "name", "hxia", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
}
詳細內容可參考 go-by-example: etcd 和 Quickstart。
順序看 kube-apiserver
和 etcd
的互動是非常複雜的,容易頭暈。這裡,逆序的看 kube-apiserver
和 etcd
的互動。首先,找到它們在哪裡互動的,接著從這一點開始發散,摸清整體脈絡。
那麼,它們在哪裡互動的呢?這個問題不難回答,在 handler
。作為 RESTful API
的處理單元,handler
內定義了 kube-apiserver
和 etcd
的互動。
以處理 GET
的 handler
為例:
# kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
addParams(route, action.Params)
routes = append(routes, route)
}
}
進入 restfulGetResource
看 handler
是怎麼建立的。
func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
}
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request) (runtime.Object, error) {
...
return r.Get(ctx, name, &options)
})
}
// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
// Get finds a resource in the storage by name and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
可以看到:
restfulGetResource
返回一個路由函數,路由函數內包含傳遞給 restfulGetResource
的 getter
物件。getter
的 Get
方法獲取資源物件 runtime.Object
。這裡的 getter
是實現 Getter
介面的物件。基於上述分析,現在重點就變成 getter
呼叫的 Get
具體做了什麼。通過逐級向上追溯,找到了 Getter
介面的範例物件 customResourceDefinitionStorage
。
# kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
// 呼叫 NetREST 建立資源實體 customResourceDefinitionStorage
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[resource] = customResourceDefinitionStorage
storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
}
}
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
strategy := NewStrategy(scheme)
store := &genericregistry.Store{
...
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return &REST{store}, nil
}
介面的範例物件找到了,繼續看範例物件的 Get
做了什麼。
# kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
// rest implements a RESTStorage for API services against etcd
type REST struct {
*genericregistry.Store
}
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {
Storage DryRunnableStorage
}
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
type DryRunnableStorage struct {
Storage storage.Interface
Codec runtime.Codec
}
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
// Get retrieves the item from storage.
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
if e.Decorator != nil {
e.Decorator(obj)
}
return obj, nil
}
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return s.Storage.Get(ctx, key, opts, objPtr)
}
REST
物件包含 *genericregistry.Store
,其繼承了 Store
的 Get
方法。在 Store.Get
方法內,通過 e.Storage.Get
呼叫 DryRunnableStorage
的 Get
方法。實際是通過 DryRunnableStorage
內的 Storage
儲存介面呼叫 Get
方法,從而存取 etcd
。
DryRunnableStorage.Storage
是一個介面,它的實體物件是什麼呢?
還是從資源實體入手,看 REST{store}
是如何範例化的。
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
strategy := NewStrategy(scheme)
store := &genericregistry.Store{
...
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
// 進入 CompleteWithOptions
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return &REST{store}, nil
}
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
if e.Storage.Storage == nil {
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
}
}
看到這裡,已經知道哪裡範例化的 storage.Interface
物件了。這裡的 opts.Decorator
是一個裝飾函數。接著,繼續探案,看這個裝飾函數幹了什麼,知道它幹了什麼就能挖出來最關鍵的一環,儲存介面是怎麼存取 etcd
的。
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
// 通過 options.RESTOptions.GetRESTOptions 範例化 opts
// options.RESTOptions 是滿足 RESTOptionsGetter 介面的範例
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
}
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/options.go
type RESTOptionsGetter interface {
GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
}
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
strategy := NewStrategy(scheme)
store := &genericregistry.Store{
...
}
// 建立 options
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
// 將 options 作為引數傳遞給 Store.CompleteWithOptions
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return &REST{store}, nil
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[resource] = customResourceDefinitionStorage
storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
}
}
可以看到,c.GenericConfig.RESTOptionsGetter
即為 optsGetter
,呼叫 c.GenericConfig.RESTOptionsGetter
的 GetRESTOptions
得到 RESTOptions
。
c.GenericConfig.RESTOptionsGetter
在哪裡範例化的呢?
還記得前面建立通用設定的 BuildGenericConfig
嗎?在該函數內,範例化了 c.GenericConfig.RESTOptionsGetter
。
# kubernetes/pkg/controlplane/apiserver/config.go
func BuildGenericConfig(
s controlplaneapiserver.CompletedOptions,
schemes []*runtime.Scheme,
getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
){
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
}
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
return nil
}
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
if resourceTransformers != nil {
factory = &transformerStorageFactory{
delegate: factory,
resourceTransformers: resourceTransformers,
}
}
return &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
}
過程也不復雜,可以看到,RESTOptionsGetter
介面的範例化物件是 &StorageFactoryRestOptionsFactory
。
呼叫 c.GenericConfig.RESTOptionsGetter
的 GetRESTOptions
實際呼叫的是 StorageFactoryRestOptionsFactory.GetRESTOptions
。
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
return ret, nil
}
RESTOptions
中包含了 Decorator
的建立,這裡我們的重點是 Decorator
,進入 generic.UndecoratedStorage
看它是怎麼一個函數。
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
func UndecoratedStorage(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config, newFunc, newListFunc, resourcePrefix)
}
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc, newListFunc, resourcePrefix)
}
# kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
# kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
s := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: pathPrefix,
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return s
}
上述程式碼基本都是函數的順序呼叫,不用介紹太多。
可以看到 opts.Decorator
做的事情是範例化了一個存取 etcd
的介面範例 store
。store
中儲存了存取 etcd
的 client
,client
是通過 newETCD3Client(c.Transport)
建立的。
到這裡,基本破案了。存取 etcd
實際是通過 store
和 etcd
進行互動。這裡的 store
只是儲存了 client
並沒有實際存取,實際存取在 handler
。
再回頭看 DryRunnableStorage.Get
方法內的 s.Storage.Get
即可知道其呼叫的是 store
的 Get
方法。
# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return s.Storage.Get(ctx, key, opts, objPtr)
}
# kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
startTime := time.Now()
// 通過 client 存取 key 對應的 value
getResp, err := s.client.KV.Get(ctx, preparedKey)
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
if err != nil {
return storage.NewInternalError(err.Error())
}
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
return nil
}
最後,通過本文介紹了 kube-apiserver
和 etcd
的互動。下一步將重點介紹 kube-apiserver
是怎麼做鑑權,認證和准入機制的。