diff --git a/services/apiserver.go b/services/apiserver.go index 9d5f85ef4b8c7e319a5b8080e41b008192143c8f..c19a55dfef42c229b7c4bc192013569b0e28e7c3 100644 --- a/services/apiserver.go +++ b/services/apiserver.go @@ -8,7 +8,6 @@ import ( "os" "time" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" extensions "k8s.io/apiextensions-apiserver/pkg/apiserver" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -37,6 +36,8 @@ import ( "k8s.io/client-go/tools/clientcmd/api" "k8s.io/component-base/version" "k8s.io/kube-aggregator/pkg/apiserver" + aggregator "k8s.io/kube-aggregator/pkg/apiserver" + aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/capabilities" @@ -84,6 +85,15 @@ func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (gene }, nil } +// Make a service resolver for a given client config +func getResolver(clients *Clients) apiserver.ServiceResolver { + host, _ := url.Parse(clients.KubeConfig.Host) + return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( + clients.Informer.Core().V1().Services().Lister(), + clients.Informer.Core().V1().Endpoints().Lister(), + ), host) +} + // Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) // Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) { @@ -180,18 +190,12 @@ func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error // Finally authorize loopback clients server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization) - // Setup service resolver - localHost, _ := url.Parse(config.LoopbackClientConfig.Host) - serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( - clients.Informer.Core().V1().Services().Lister(), - clients.Informer.Core().V1().Endpoints().Lister(), - ), localHost) - // Setup admission controllers admissionConfig := kubeadmission.Config{ ExternalInformers: clients.Informer, LoopbackClientConfig: config.LoopbackClientConfig, } + serviceResolver := getResolver(clients) initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook if err != nil { err = fmt.Errorf("could not get admission plugins: %w", err) @@ -303,26 +307,15 @@ var kubeApiserver = &Unit{ return fmt.Errorf("could not seutp the extension server: %w", err) } - // Setup the apiserver itself - config.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() - storageFactory := storage.NewDefaultStorageFactory( - *etcdConfig, - runtime.ContentTypeJSON, - legacyscheme.Codecs, - storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), - config.MergedResourceConfig, - kubeapiserver.SpecialDefaultResourcePrefixes, - ) + // Setup the apiserver itself, same logics as the extensions server signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key) if err != nil { return fmt.Errorf("could not initilize service account signer: %w", err) } apiConfig := controlplane.Config{ - GenericConfig: config, + GenericConfig: &(*config), // copy by value, so we can modify fields ExtraConfig: controlplane.ExtraConfig{ - APIResourceConfigSource: storageFactory.APIResourceConfigSource, - StorageFactory: storageFactory, - EventTTL: time.Hour, + EventTTL: time.Hour, KubeletClientConfig: client.KubeletClientConfig{ Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, @@ -342,34 +335,57 @@ var kubeApiserver = &Unit{ VersionedInformers: clients.Informer, }, } - apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory} + apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA + apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() + apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig + restOptionsGetter := &RestOptionsFactory{ + StorageFactory: storage.NewDefaultStorageFactory( + *etcdConfig, + runtime.ContentTypeJSON, + legacyscheme.Codecs, + storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), + apiConfig.GenericConfig.MergedResourceConfig, + kubeapiserver.SpecialDefaultResourcePrefixes, + ), + } + apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter + apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) apiConfig.GenericConfig.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) - apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer) if err != nil { return fmt.Errorf("could not initialize generic apiserver: %w", err) } - // Explicitely register api extensions group for discovery, otherwise it will never be exposed - // anywhere. Currently the aggregator (which hepto does not instantiate) has a hardcoded list of - // all apigroups for setting up discovery properly through delegates, this situation has been going - // on since kubernetes 1.7 and requires massive refactoring. - // TODO: this still does not explain how CRD apigroups are discovered, might require some more - // investigation - extensionsVersion := meta.GroupVersionForDiscovery{ - GroupVersion: "apiextensions.k8s.io/v1", - Version: "v1", + // Setup an API aggregator + aggregatorConfig := &aggregator.Config{ + GenericConfig: &server.RecommendedConfig{ + Config: *config, + SharedInformerFactory: clients.Informer, + }, + ExtraConfig: aggregator.ExtraConfig{ + ServiceResolver: getResolver(clients), + }, + } + aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource() + aggregatorConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{ + StorageFactory: storage.NewDefaultStorageFactory( + *etcdConfig, + runtime.ContentTypeJSON, + aggregatorscheme.Codecs, + storage.NewDefaultResourceEncodingConfig(aggregatorscheme.Scheme), + aggregatorConfig.GenericConfig.MergedResourceConfig, + map[schema.GroupResource]string{}, + ), + } + aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer) + if err != nil { + return fmt.Errorf("could not initialize aggregator: %w", err) } - apiServer.GenericAPIServer.DiscoveryGroupManager.AddGroup(meta.APIGroup{ - Name: apiextensions.GroupName, - Versions: []meta.GroupVersionForDiscovery{extensionsVersion}, - PreferredVersion: extensionsVersion, - }) // Finally start the apiserver - server := apiServer.GenericAPIServer.PrepareRun() + server := aggregatorServer.GenericAPIServer.PrepareRun() go clients.Start(ctx) return server.Run(ctx.Done()) }, diff --git a/services/k8s.go b/services/k8s.go index 862e5e6b607961f8463a02e3e9eff66fb433f5b7..3ee392463545ee04167211cafe8d176865dc5af8 100644 --- a/services/k8s.go +++ b/services/k8s.go @@ -111,7 +111,6 @@ func (c *Clients) Start(ctx context.Context) { c.DynInformer.Start(ctx.Done()) c.Informer.WaitForCacheSync(ctx.Done()) c.DynInformer.WaitForCacheSync(ctx.Done()) - } func (c *Clients) Stop() {