From d7c1bff7da8782362aa5abeb1e2f10cae93c75e0 Mon Sep 17 00:00:00 2001 From: kaiyou <dev@kaiyou.fr> Date: Wed, 19 Apr 2023 22:34:28 +0200 Subject: [PATCH] Remove dependency on cmd/apiserver --- services/apiserver.go | 338 +++++++++++++++++++++++++++++++++++++----- services/certs.go | 2 +- services/k8s.go | 20 ++- 3 files changed, 314 insertions(+), 46 deletions(-) diff --git a/services/apiserver.go b/services/apiserver.go index 9076148..bd5bfce 100644 --- a/services/apiserver.go +++ b/services/apiserver.go @@ -3,66 +3,328 @@ package services import ( "context" "fmt" + "net" + "net/url" + "os" + "time" - "go.acides.org/hepto/utils" + "github.com/google/uuid" + // extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + extensions "k8s.io/apiextensions-apiserver/pkg/apiserver" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy" + "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" + "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" + apifilters "k8s.io/apiserver/pkg/endpoints/filters" + openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/dynamiccertificates" + "k8s.io/apiserver/pkg/server/filters" + "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/apiserver/pkg/util/flowcontrol/request" + "k8s.io/apiserver/pkg/util/notfoundhandler" + "k8s.io/apiserver/pkg/util/openapi" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/kubernetes/cmd/kube-apiserver/app" - "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/component-base/version" + "k8s.io/kube-aggregator/pkg/apiserver" + "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/capabilities" + "k8s.io/kubernetes/pkg/cluster/ports" + controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/pkg/controlplane/reconcilers" + generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" + "k8s.io/kubernetes/pkg/kubeapiserver" + kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" + "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" + "k8s.io/kubernetes/pkg/kubeapiserver/authorizer" + "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" + "k8s.io/kubernetes/pkg/kubeapiserver/options" + "k8s.io/kubernetes/pkg/kubelet/client" + "k8s.io/kubernetes/pkg/serviceaccount" ) +const audience = "https://kubernetes.default.svc.cluster.local" +const apiserverPort = 6443 + +// Reimplement the rest options factory to stop depending on opiont parsers +type RestOptionsFactory struct { + StorageFactory storage.StorageFactory + StorageObjectCountTracker request.StorageObjectCountTracker +} + +func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { + storageConfig, err := f.StorageFactory.NewConfig(resource) + if err != nil { + return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) + } + return generic.RESTOptions{ + StorageConfig: storageConfig, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 1, + EnableGarbageCollection: true, + ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), + CountMetricPollPeriod: time.Minute, + StorageObjectCountTracker: f.StorageObjectCountTracker, + }, nil +} + +// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) +// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking +func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) { + // Initialize a basic configuration object + ver := version.Get() + config = server.NewConfig(legacyscheme.Codecs) + config.Version = &ver + config.Serializer = legacyscheme.Codecs + config.LongRunningFunc = filters.BasicLongRunningRequestCheck( + sets.NewString("watch", "proxy"), + sets.NewString("attach", "exec", "proxy", "log", "portforward"), + ) + + // Setup listener + listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort)) + if err != nil { + err = fmt.Errorf("could not initialize listener: %w", err) + return + } + cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath()) + if err != nil { + err = fmt.Errorf("could not get certificate: %w", err) + return + } + clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath()) + if err != nil { + err = fmt.Errorf("could not get api CA file: %w", err) + return + } + config.SecureServing = &server.SecureServingInfo{ + Listener: listener, + Cert: cert, + ClientCA: clientCA, // not setup upstream, might be an issue + } + + // Setup loopback clients (no authorization at this point, handled later) + loopback, err := config.SecureServing.NewLoopbackClientConfig(uuid.NewString(), nil) + loopback.TLSClientConfig.CAFile = c.pki.TLS.CertPath() + if err != nil { + err = fmt.Errorf("could not setup loopback config: %w", err) + return + } + config.LoopbackClientConfig = loopback + clients, err = newClientsForKC(config.LoopbackClientConfig, "apiserver") + if err != nil { + err = fmt.Errorf("could not setup loopback clients: %w", err) + return + } + + // Setup authentication + authConfig := authenticator.Config{ + ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()}, + ServiceAccountIssuers: []string{audience}, + APIAudiences: []string{audience}, + ClientCAContentProvider: clientCA, + ServiceAccountTokenGetter: controllersa.NewGetterFromClient( + clients.Client, + clients.Informer.Core().V1().Secrets().Lister(), + clients.Informer.Core().V1().ServiceAccounts().Lister(), + clients.Informer.Core().V1().Pods().Lister(), + ), + SecretsWriter: clients.Client.CoreV1(), + } + auth, _, err := authConfig.New() + if err != nil { + err = fmt.Errorf("could not setup apiserver authentication: %w", err) + return + } + config.Authentication = server.AuthenticationInfo{ + APIAudiences: []string{audience}, + Authenticator: auth, + } + + // Setup authorizations + authzConfig := authorizer.Config{ + AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC}, + VersionedInformerFactory: clients.Informer, + } + authz, resolver, err := authzConfig.New() + if err != nil { + err = fmt.Errorf("could not seutp apiserver authorizations: %w", err) + return + } + config.RuleResolver = resolver + config.Authorization = server.AuthorizationInfo{ + Authorizer: authz, + } + + // Finally authorize loopback clients + server.AuthorizeClientBearerToken(loopback, &config.Authentication, &config.Authorization) + + // Setup service resolver + localHost, _ := url.Parse(config.LoopbackClientConfig.Host) + serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( + clients.Informer.Core().V1().Services().Lister(), + clients.Informer.Core().V1().Endpoints().Lister(), + ), localHost) + + // Setup admission controllers + admissionConfig := kubeadmission.Config{ + ExternalInformers: clients.Informer, + LoopbackClientConfig: config.LoopbackClientConfig, + } + initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook + if err != nil { + err = fmt.Errorf("could not get admission plugins: %w", err) + return + } + plugins := admission.NewPlugins() + pluginsNames := []string{lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName} + server.RegisterAllAdmissionPlugins(plugins) + pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file + genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify()) + initializersChain := admission.PluginInitializers{genericInitializer} + initializersChain = append(initializersChain, initializers...) + admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{}) + config.AdmissionControl = admissionChain + + return +} + var kubeApiserver = &Unit{ Name: "kube-apiserver", Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger}, Run: func(u *Unit, c *Cluster, ctx context.Context) error { - args := []string{ - "--bind-address", c.networking.NodeAddress.IP.String(), - "--service-cluster-ip-range", c.networking.ServiceNet.String(), - "--tls-cert-file", c.masterCerts.TLS.CertPath(), - "--tls-private-key-file", c.masterCerts.TLS.KeyPath(), - "--client-ca-file", c.pki.API.CertPath(), - "--kubelet-certificate-authority", c.pki.TLS.CertPath(), - "--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(), - "--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(), - "--etcd-servers", "http://[::1]:2379", - "--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(), - "--service-account-key-file", c.masterCerts.APITokens.KeyPath(), - "--service-account-issuer", "https://kubernetes.default.svc.cluster.local", - "--api-audiences", "https://kubernetes.default.svc.cluster.local", - "--authorization-mode", "Node,RBAC", - "--allow-privileged", "true", - } - config := options.NewServerRunOptions() - nfs := config.Flags() - flags := flagsFromNamedFlagSet("apiserver", &nfs) - err := flags.Parse(args) + config, clients, err := buildConfig(c) if err != nil { return err } - completedOptions, err := app.Complete(config) + // Allow privileged pods + capabilities.Setup(true, 0) + + // Configure etcd backend + etcdConfig := storagebackend.NewDefaultConfig("/registry", nil) + etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"} + + // Servers are created in reverse orders for proper delegation + notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey) + + // Tweak the configuration and build the extension server + extensionsConfig := extensions.Config{ + GenericConfig: &server.RecommendedConfig{ + Config: *config, + SharedInformerFactory: clients.Informer, + }, + ExtraConfig: extensions.ExtraConfig{}, + } + extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource() + extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{ + StorageFactory: storage.NewDefaultStorageFactory( + *etcdConfig, + //*storagebackend.NewDefaultConfig("/registry", extensions.Codecs.LegacyCodec(extensionsv1.SchemeGroupVersion)), + runtime.ContentTypeJSON, + legacyscheme.Codecs, + storage.NewDefaultResourceEncodingConfig(extensions.Scheme), + extensionsConfig.GenericConfig.MergedResourceConfig, + kubeapiserver.SpecialDefaultResourcePrefixes, + ), + } + extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter + extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound)) if err != nil { - return err + return fmt.Errorf("could not seutp the extension server: %w", err) } - server, err := app.CreateServerChain(completedOptions) + + // Setup the apiserver itself + storageFactory := storage.NewDefaultStorageFactory( + *etcdConfig, + runtime.ContentTypeJSON, + legacyscheme.Codecs, + storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), + controlplane.DefaultAPIResourceConfigSource(), + kubeapiserver.SpecialDefaultResourcePrefixes, + ) + signer, err := serviceaccount.JWTTokenGenerator("https://kubernetes.default.svc.cluster.local", c.masterCerts.APITokens.Key) if err != nil { - return err + return fmt.Errorf("could not initilize service account signer: %w", err) + } + apiConfig := controlplane.Config{ + GenericConfig: config, + ExtraConfig: controlplane.ExtraConfig{ + APIResourceConfigSource: storageFactory.APIResourceConfigSource, + StorageFactory: storageFactory, + EventTTL: time.Hour, + KubeletClientConfig: client.KubeletClientConfig{ + Port: ports.KubeletPort, + ReadOnlyPort: ports.KubeletReadOnlyPort, + PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)}, + }, + ServiceIPRange: *c.networking.ServiceNet, + APIServerServiceIP: c.networking.APIAddress, + APIServerServicePort: 443, + ServiceNodePortRange: options.DefaultServiceNodePortRange, + EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType, + MasterCount: 1, + ServiceAccountIssuer: signer, + ServiceAccountIssuerURL: "https://kubernetes.default.svc.cluster.local", + ServiceAccountPublicKeys: nil, // TODO + VersionedInformers: clients.Informer, + }, } - prepared, err := server.PrepareRun() + apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory} + getOpenapi := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) + apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(getOpenapi, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) + apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(getOpenapi, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) + apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA + apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer) if err != nil { - return err + return fmt.Errorf("could not initialize generic apiserver: %w", err) + } + + // TODO: no aggregation layer is setup + + server := apiServer.GenericAPIServer.PrepareRun() + + // Write a loopback config (different for every start) + name := "loopback" + clientConfig := api.Config{ + APIVersion: "v1", + Kind: "Config", + CurrentContext: name, + Contexts: map[string]*api.Context{name: { + Cluster: name, + AuthInfo: name, + }}, + Clusters: map[string]*api.Cluster{name: { + Server: config.LoopbackClientConfig.Host, + CertificateAuthority: config.LoopbackClientConfig.TLSClientConfig.CAFile, + }}, + AuthInfos: map[string]*api.AuthInfo{name: { + Token: config.LoopbackClientConfig.BearerToken, + }}, } - rootConfig := KubeConfig{ - URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), - CACert: c.pki.TLS.CertPath(), - ClientKey: c.masterCerts.RootClient.KeyPath(), - ClientCert: c.masterCerts.RootClient.CertPath(), + err = os.MkdirAll("/root/.kube", 0755) + if err != nil { + return fmt.Errorf("could not create kubeconfig dir: %w", err) } - err = utils.WriteConfig(rootConfig, "/root/.kube/config") + clientcmd.WriteToFile(clientConfig, "/root/.kube/config") if err != nil { - return err + return fmt.Errorf("could not write privileged kubeconfig: %w", err) } - return prepared.Run(ctx.Done()) + + return server.Run(ctx.Done()) }, Ready: func(u *Unit, c *Cluster) bool { u.Logger.Info("checking if apiserver is ready") diff --git a/services/certs.go b/services/certs.go index 8f7a654..e094dff 100644 --- a/services/certs.go +++ b/services/certs.go @@ -108,7 +108,7 @@ var pkiMaster = &Unit{ // TLS certificate tlsCert, err := bundle.GetCertOrCSR("tls", pekahi.NewServerTemplate( - []string{"kube-apiserver", "kubernetes.default"}, + []string{"kube-apiserver", "kubernetes.default", "apiserver-loopback-client"}, []net.IP{c.networking.NodeAddress.IP, c.networking.APIAddress}, ), ) diff --git a/services/k8s.go b/services/k8s.go index 545d025..3764b51 100644 --- a/services/k8s.go +++ b/services/k8s.go @@ -16,7 +16,6 @@ import ( "k8s.io/client-go/tools/events" "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler" ) var kubeLogger = &Unit{ @@ -30,6 +29,7 @@ var kubeLogger = &Unit{ type Clients struct { KubeConfig *rest.Config Client *kubernetes.Clientset + DynClient *dynamic.DynamicClient EventClient *kubernetes.Clientset Broadcaster events.EventBroadcasterAdapter Informer informers.SharedInformerFactory @@ -38,13 +38,17 @@ type Clients struct { func newClients(c *Cluster, ua string, masterIP net.IP, cert *pekahi.Certificate) (*Clients, error) { kc := &rest.Config{ - Host: fmt.Sprintf("https://[%s]:6443", masterIP.String()), + Host: fmt.Sprintf("https://[%s]:%d", masterIP.String(), apiserverPort), TLSClientConfig: rest.TLSClientConfig{ CAFile: c.pki.TLS.CertPath(), CertFile: cert.CertPath(), KeyFile: cert.KeyPath(), }, } + return newClientsForKC(kc, ua) +} + +func newClientsForKC(kc *rest.Config, ua string) (*Clients, error) { client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, ua)) if err != nil { return nil, err @@ -53,15 +57,17 @@ func newClients(c *Cluster, ua string, masterIP net.IP, cert *pekahi.Certificate if err != nil { return nil, err } + dynClient, err := dynamic.NewForConfig(kc) + if err != nil { + return nil, err + } broadcaster := events.NewEventBroadcasterAdapter(eventsClient) - - informers := scheduler.NewInformerFactory(client, 0) - dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory( - dynamic.NewForConfigOrDie(kc), 0, core.NamespaceAll, nil, - ) + informers := informers.NewSharedInformerFactory(client, 0) + dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil) return &Clients{ KubeConfig: kc, Client: client, + DynClient: dynClient, EventClient: eventsClient, Broadcaster: broadcaster, Informer: informers, -- GitLab