diff --git a/k8s/clients.go b/k8s/clients.go new file mode 100644 index 0000000000000000000000000000000000000000..751f1671e250d338a159f42186c2a92afe0bdf54 --- /dev/null +++ b/k8s/clients.go @@ -0,0 +1,103 @@ +package k8s + +import ( + "context" + + "go.acides.org/pekahi" + core "k8s.io/api/core/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/events" +) + +type Clients struct { + KubeConfig *rest.Config + Client *kubernetes.Clientset + DynClient *dynamic.DynamicClient + EventClient *kubernetes.Clientset + MetadataClient metadata.Interface + CachedClient discovery.CachedDiscoveryInterface + RESTMapper *restmapper.DeferredDiscoveryRESTMapper + Broadcaster events.EventBroadcasterAdapter + Informer informers.SharedInformerFactory + DynInformer dynamicinformer.DynamicSharedInformerFactory +} + +// Make clients authenticating with a ceritficate (typically for kubelets) +func NewCertsClient(host string, ca *pekahi.Certificate, cert *pekahi.Certificate) (*Clients, error) { + kc := newKC(host, ca) + kc.TLSClientConfig.CertFile = cert.CertPath() + kc.TLSClientConfig.KeyFile = cert.KeyPath() + return newClientsForKC(kc) +} + +// Make clients from the master context itself +func NewTokenClients(host string, ca *pekahi.Certificate, token string) (*Clients, error) { + kc := newKC(host, ca) + kc.BearerToken = token + return newClientsForKC(kc) +} + +// Make a k8s client config for connecting to the master +func newKC(host string, ca *pekahi.Certificate) *rest.Config { + return &rest.Config{ + Host: host, + TLSClientConfig: rest.TLSClientConfig{ + CAFile: ca.CertPath(), + }, + } +} + +func newClientsForKC(kc *rest.Config) (*Clients, error) { + baseClient, err := kubernetes.NewForConfig(kc) + if err != nil { + return nil, err + } + eventsClient, err := kubernetes.NewForConfig(kc) + if err != nil { + return nil, err + } + dynClient, err := dynamic.NewForConfig(kc) + if err != nil { + return nil, err + } + metadataClient, err := metadata.NewForConfig(kc) + if err != nil { + return nil, err + } + cachedClient := memory.NewMemCacheClient(baseClient) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) + broadcaster := events.NewEventBroadcasterAdapter(eventsClient) + informers := informers.NewSharedInformerFactory(baseClient, 0) + dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil) + return &Clients{ + KubeConfig: kc, + Client: baseClient, + DynClient: dynClient, + EventClient: eventsClient, + MetadataClient: metadataClient, + RESTMapper: restMapper, + Broadcaster: broadcaster, + Informer: informers, + DynInformer: dynInformers, + }, nil +} + +func (c *Clients) Start(ctx context.Context) { + c.Broadcaster.StartRecordingToSink(ctx.Done()) + c.Informer.Start(ctx.Done()) + c.DynInformer.Start(ctx.Done()) + c.Informer.WaitForCacheSync(ctx.Done()) + c.DynInformer.WaitForCacheSync(ctx.Done()) +} + +func (c *Clients) Stop() { + c.Broadcaster.Shutdown() +} diff --git a/k8s/storage.go b/k8s/storage.go new file mode 100644 index 0000000000000000000000000000000000000000..6bbc8f832fd6abebdbd99a22c8f41adcd0dd3a64 --- /dev/null +++ b/k8s/storage.go @@ -0,0 +1,64 @@ +package k8s + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/util/flowcontrol/request" +) + +// Reimplement the rest options factory to stop depending on option parsers, this instantiate the +// storage factory at the right location and prefix for a given resource +type RestOptionsFactory struct { + StorageFactory storage.StorageFactory + StorageObjectCountTracker request.StorageObjectCountTracker +} + +func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { + storageConfig, err := f.StorageFactory.NewConfig(resource) + if err != nil { + return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) + } + return generic.RESTOptions{ + StorageConfig: storageConfig, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 1, + EnableGarbageCollection: true, + ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), + CountMetricPollPeriod: time.Minute, + StorageObjectCountTracker: f.StorageObjectCountTracker, + }, nil +} + +func PrepareStorage(codecs runtime.StorageSerializer, scheme *runtime.Scheme, resources storage.APIResourceConfigSource) *RestOptionsFactory { + // Etcd backend supports being created with a specific codec, however later created storage + // factories override that codec at runtime thanks to using DefaultStorageFactory instead + // of vanilla SimpleStorageFactory, so we create the backend with no pre-configured codec + etcdConfig := storagebackend.NewDefaultConfig("/registry", nil) + etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"} + + // The rest options getter abstracts all storage for the server, by specifying the scheme and codecs + // on top of the storage backend. Vanilla code sets a special multi-group versionner to avoid issues + // with large objects (cee cmd/kube-apiserver/app/apiextensions.go), which we ignore here + return &RestOptionsFactory{ + // TODO: be careful about the default factory using the resourcename as prefix, which might + // cause collisions in the future + StorageFactory: storage.NewDefaultStorageFactory( + *etcdConfig, // copied as value, so the factory may alter some fields + runtime.ContentTypeJSON, // media type, somewhat inefficient but good enough + codecs, + storage.NewDefaultResourceEncodingConfig(scheme), + resources, + // We always use an empty list of prefix overrides: no prefix override, contrary to + // vanilla code which has many overrides for backward compatibility against a given storage target + // This means hepto is not compatible with vanilla when using the same etcd backend, and we might + // want to be careful when upgrading to avoid unexpected prefix changes + make(map[schema.GroupResource]string), + ), + } +} diff --git a/services/apiserver.go b/services/apiserver.go index c19a55dfef42c229b7c4bc192013569b0e28e7c3..71035026f2578560e65fed0948fd896c02df9744 100644 --- a/services/apiserver.go +++ b/services/apiserver.go @@ -8,10 +8,9 @@ import ( "os" "time" + "go.acides.org/hepto/k8s" extensions "k8s.io/apiextensions-apiserver/pkg/apiserver" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/initializer" @@ -22,14 +21,10 @@ import ( "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" apifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" - "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/filters" - "k8s.io/apiserver/pkg/server/storage" - "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/util/feature" - "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/apiserver/pkg/util/openapi" "k8s.io/client-go/tools/clientcmd" @@ -46,7 +41,6 @@ import ( "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane/reconcilers" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" - "k8s.io/kubernetes/pkg/kubeapiserver" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer" @@ -63,30 +57,8 @@ import ( const audience = "https://kubernetes.default.svc.cluster.local" const apiserverPort = 6443 -// Reimplement the rest options factory to stop depending on option parsers -type RestOptionsFactory struct { - StorageFactory storage.StorageFactory - StorageObjectCountTracker request.StorageObjectCountTracker -} - -func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { - storageConfig, err := f.StorageFactory.NewConfig(resource) - if err != nil { - return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) - } - return generic.RESTOptions{ - StorageConfig: storageConfig, - Decorator: generic.UndecoratedStorage, - DeleteCollectionWorkers: 1, - EnableGarbageCollection: true, - ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), - CountMetricPollPeriod: time.Minute, - StorageObjectCountTracker: f.StorageObjectCountTracker, - }, nil -} - // Make a service resolver for a given client config -func getResolver(clients *Clients) apiserver.ServiceResolver { +func getResolver(clients *k8s.Clients) apiserver.ServiceResolver { host, _ := url.Parse(clients.KubeConfig.Host) return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( clients.Informer.Core().V1().Services().Lister(), @@ -96,7 +68,7 @@ func getResolver(clients *Clients) apiserver.ServiceResolver { // Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) // Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking -func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) { +func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) { // Initialize a basic configuration object ver := version.Get() config = server.NewConfig(legacyscheme.Codecs) @@ -141,7 +113,7 @@ func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error } // Setup loopback clients (no authorization at this point, handled later) - clients, err = newLoopbackClients(c) + clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { err = fmt.Errorf("could not setup loopback clients: %w", err) return @@ -252,54 +224,19 @@ var kubeApiserver = &Unit{ // Allow privileged pods capabilities.Setup(true, 0) - // Etcd backend supports being created with a specific codec, however later created storage - // factories override that codec at runtime thanks to using DefaultStorageFactory instead - // of vanilla SimpleStorageFactory, so we create the backend with no pre-configured codec - // and use it for all factories - // TODO: be more ware about resource prefixes in etcd and provide a sane, generic working implentation, - // not taking backward compatibility with vanilla into account - etcdConfig := storagebackend.NewDefaultConfig("/registry", nil) - etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"} - // Servers are created in reverse orders for proper delegation - // First notFound is the default handler, that basically returns a 404, - // then the extensions serves CRD for unknown kinds, finally - // the default apiserver is first in the chain and serves all default kinds - // Note that no API aggregation is setup, which means that k8s API aggregation - // is not supported by hepto. - notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey) - // Tweak the configuration and build the extension server - // The generic config is an instance of RecommendedConfig, which is composed of Config, by - // first copying it we can later change some fields per instance + // Build the extensions server (create then customize the configuration) extensionsConfig := extensions.Config{ GenericConfig: &server.RecommendedConfig{ - Config: *config, // This is the common config being copied + Config: *config, // Copy by value to later change fields SharedInformerFactory: clients.Informer, }, ExtraConfig: extensions.ExtraConfig{}, } - // Then we customize some specific fields - // The resource config is used as-is with no api enablement specification, meaning ga and beta - // are enabled extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource() - // The rest options getter abstracts all storage for the server, by specifying the scheme and codecs - // on top of the storage backend. Vanilla code sets a special multi-group versionner to avoid issues - // with large objects (cee cmd/kube-apiserver/app/apiextensions.go), which we ignore here - extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{ - // TODO: using DefaultStorageFactory might be an issue compared to SimpleStorageFactory from vanilla, - // since it does provide a different resource prefix than SimpleStorageFactory (resource name - // instead of resource group and name) - StorageFactory: storage.NewDefaultStorageFactory( - *etcdConfig, // copied as value, so the factory may alter some fields - runtime.ContentTypeJSON, // media type, somewhat inefficient but good enough - extensions.Codecs, // serializer - storage.NewDefaultResourceEncodingConfig(extensions.Scheme), - extensionsConfig.GenericConfig.MergedResourceConfig, // api resources - map[schema.GroupResource]string{}, // no override for extensions - ), - } + extensionsConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, extensionsConfig.GenericConfig.MergedResourceConfig) // TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound)) @@ -338,16 +275,8 @@ var kubeApiserver = &Unit{ apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig - restOptionsGetter := &RestOptionsFactory{ - StorageFactory: storage.NewDefaultStorageFactory( - *etcdConfig, - runtime.ContentTypeJSON, - legacyscheme.Codecs, - storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), - apiConfig.GenericConfig.MergedResourceConfig, - kubeapiserver.SpecialDefaultResourcePrefixes, - ), - } + // TODO add prefix overrides + restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, apiConfig.GenericConfig.MergedResourceConfig) apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) @@ -369,16 +298,7 @@ var kubeApiserver = &Unit{ }, } aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource() - aggregatorConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{ - StorageFactory: storage.NewDefaultStorageFactory( - *etcdConfig, - runtime.ContentTypeJSON, - aggregatorscheme.Codecs, - storage.NewDefaultResourceEncodingConfig(aggregatorscheme.Scheme), - aggregatorConfig.GenericConfig.MergedResourceConfig, - map[schema.GroupResource]string{}, - ), - } + aggregatorConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, aggregatorConfig.GenericConfig.MergedResourceConfig) aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer) if err != nil { return fmt.Errorf("could not initialize aggregator: %w", err) @@ -391,7 +311,7 @@ var kubeApiserver = &Unit{ }, Ready: func(u *Unit, c *Cluster) bool { u.Logger.Info("checking if apiserver is ready") - clients, err := newLoopbackClients(c) + clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { return false } @@ -403,15 +323,11 @@ var kubeApiserver = &Unit{ }, } +// Write a loopback config (different for every start) var loopback = &Unit{ Name: "kube-loopback", Dependencies: []*Unit{kubeApiserver}, Start: func(u *Unit, c *Cluster, ctx context.Context) error { - clients, err := newLoopbackClients(c) - if err != nil { - return fmt.Errorf("could not get loopback config: %w", err) - } - // Write a loopback config (different for every start) name := "loopback" clientConfig := api.Config{ APIVersion: "v1", @@ -422,14 +338,14 @@ var loopback = &Unit{ AuthInfo: name, }}, Clusters: map[string]*api.Cluster{name: { - Server: clients.KubeConfig.Host, - CertificateAuthority: clients.KubeConfig.TLSClientConfig.CAFile, + Server: c.masterUrl, + CertificateAuthority: c.pki.TLS.CertPath(), }}, AuthInfos: map[string]*api.AuthInfo{name: { - Token: clients.KubeConfig.BearerToken, + Token: c.loopbackToken, }}, } - err = os.MkdirAll("/root/.kube", 0755) + err := os.MkdirAll("/root/.kube", 0755) if err != nil { return fmt.Errorf("could not create kubeconfig dir: %w", err) } diff --git a/services/cm.go b/services/cm.go index a048ed2514478a225e19e1b0acd95d98e0b366aa..70b57ef85e0ede66debb2e57365ac1c3f7fd44b8 100644 --- a/services/cm.go +++ b/services/cm.go @@ -7,6 +7,7 @@ import ( "net" "time" + "go.acides.org/hepto/k8s" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/quota/v1/generic" @@ -40,7 +41,7 @@ var kubeControllerManager = &Unit{ // Used as a replacement for InformersStarted in vanilla code allReady := make(chan struct{}) - clients, err := newLoopbackClients(c) + clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { return err } diff --git a/services/discovery.go b/services/discovery.go index 741b3458600d4492363681fff6814eeeb1fd4802..dd4988d05e4e9cd326bb12487ec58593687fd916 100644 --- a/services/discovery.go +++ b/services/discovery.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "go.acides.org/sml" ) @@ -36,6 +37,7 @@ var memberlist = &Unit{ if node.Role == "master" || node.Role == "full" { u.Manager.Logger.Info("found remote master", "name", node.Name) c.masterNode = node + c.masterUrl = fmt.Sprintf("https://[%s]:%d", node.VpnIP.String(), apiserverPort) u.Manager.Trigger() } } diff --git a/services/k8s.go b/services/k8s.go index 3ee392463545ee04167211cafe8d176865dc5af8..0dd6eeceae2a5e59c977ee33854194e8cce43590 100644 --- a/services/k8s.go +++ b/services/k8s.go @@ -4,18 +4,6 @@ import ( "context" "fmt" - "go.acides.org/pekahi" - core "k8s.io/api/core/v1" - "k8s.io/client-go/discovery" - "k8s.io/client-go/discovery/cached/memory" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/metadata" - "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" ) @@ -31,88 +19,3 @@ var kubeLogger = &Unit{ return nil }, } - -type Clients struct { - KubeConfig *rest.Config - Client *kubernetes.Clientset - DynClient *dynamic.DynamicClient - EventClient *kubernetes.Clientset - MetadataClient metadata.Interface - CachedClient discovery.CachedDiscoveryInterface - RESTMapper *restmapper.DeferredDiscoveryRESTMapper - Broadcaster events.EventBroadcasterAdapter - Informer informers.SharedInformerFactory - DynInformer dynamicinformer.DynamicSharedInformerFactory -} - -// Make a k8s client config for connecting to the master -func newKC(c *Cluster) *rest.Config { - return &rest.Config{ - Host: fmt.Sprintf("https://[%s]:%d", c.masterNode.VpnIP.String(), apiserverPort), - TLSClientConfig: rest.TLSClientConfig{ - CAFile: c.pki.TLS.CertPath(), - }, - } -} - -// Make clients authenticating with a ceritficate (typically for kubelets) -func newCertClients(c *Cluster, cert *pekahi.Certificate) (*Clients, error) { - kc := newKC(c) - kc.TLSClientConfig.CertFile = cert.CertPath() - kc.TLSClientConfig.KeyFile = cert.KeyPath() - return newClientsForKC(kc) -} - -// Make clients from the master context itself -func newLoopbackClients(c *Cluster) (*Clients, error) { - kc := newKC(c) - kc.BearerToken = c.loopbackToken - return newClientsForKC(kc) -} - -func newClientsForKC(kc *rest.Config) (*Clients, error) { - baseClient, err := kubernetes.NewForConfig(kc) - if err != nil { - return nil, err - } - eventsClient, err := kubernetes.NewForConfig(kc) - if err != nil { - return nil, err - } - dynClient, err := dynamic.NewForConfig(kc) - if err != nil { - return nil, err - } - metadataClient, err := metadata.NewForConfig(kc) - if err != nil { - return nil, err - } - cachedClient := memory.NewMemCacheClient(baseClient) - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) - broadcaster := events.NewEventBroadcasterAdapter(eventsClient) - informers := informers.NewSharedInformerFactory(baseClient, 0) - dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil) - return &Clients{ - KubeConfig: kc, - Client: baseClient, - DynClient: dynClient, - EventClient: eventsClient, - MetadataClient: metadataClient, - RESTMapper: restMapper, - Broadcaster: broadcaster, - Informer: informers, - DynInformer: dynInformers, - }, nil -} - -func (c *Clients) Start(ctx context.Context) { - c.Broadcaster.StartRecordingToSink(ctx.Done()) - c.Informer.Start(ctx.Done()) - c.DynInformer.Start(ctx.Done()) - c.Informer.WaitForCacheSync(ctx.Done()) - c.DynInformer.WaitForCacheSync(ctx.Done()) -} - -func (c *Clients) Stop() { - c.Broadcaster.Shutdown() -} diff --git a/services/kubelet.go b/services/kubelet.go index 3de6c530818e531b30ac9bf25f2f0090e8136758..e13005a5dd0ff3504cd9752b99cf20ebc88f69c4 100644 --- a/services/kubelet.go +++ b/services/kubelet.go @@ -7,6 +7,7 @@ import ( "path" "time" + "go.acides.org/hepto/k8s" "go.opentelemetry.io/otel/trace" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,7 +45,7 @@ var kubeKubelet = &Unit{ // (very difficult to check otherwise) time.Sleep(10 * time.Second) kubeletRoot := path.Join(c.settings.DataDir, "kubelet") - clients, err := newCertClients(c, c.certs.API) + clients, err := k8s.NewCertsClient(c.masterUrl, c.certs.TLS, c.certs.API) if err != nil { return fmt.Errorf("could not create clients: %w", err) } diff --git a/services/manager.go b/services/manager.go index df888d0eb05a695e8d8ce142209b87028972e27d..53ba806529741399f3a55f2f58d8c05609d046ed 100644 --- a/services/manager.go +++ b/services/manager.go @@ -63,6 +63,7 @@ type Cluster struct { nodes []*HeptoMeta state *HeptoState masterNode *HeptoMeta + masterUrl string certs *NodeCerts masterCerts *MasterCerts diff --git a/services/scheduler.go b/services/scheduler.go index 3f4aeb31ed2219cd60dcccc1b522499cddcdbe8f..1c4ff84a0c1f280df377c50ff655fdb7d3e3c1ae 100644 --- a/services/scheduler.go +++ b/services/scheduler.go @@ -3,6 +3,7 @@ package services import ( "context" + "go.acides.org/hepto/k8s" "k8s.io/kubernetes/pkg/scheduler" ) @@ -10,7 +11,7 @@ var kubeScheduler = &Unit{ Name: "kube-scheduler", Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger}, Run: func(u *Unit, c *Cluster, ctx context.Context) error { - clients, err := newLoopbackClients(c) + clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { return err }