package services import ( "context" "fmt" "net" "net/url" "os" "time" "github.com/google/uuid" extensions "k8s.io/apiextensions-apiserver/pkg/apiserver" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/initializer" "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy" "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" apifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/apiserver/pkg/util/openapi" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" "k8s.io/component-base/version" "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/cluster/ports" controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane/reconcilers" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/kubeapiserver" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/pkg/kubeapiserver/options" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/serviceaccount" ) const audience = "https://kubernetes.default.svc.cluster.local" const apiserverPort = 6443 // Reimplement the rest options factory to stop depending on opiont parsers type RestOptionsFactory struct { StorageFactory storage.StorageFactory StorageObjectCountTracker request.StorageObjectCountTracker } func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { storageConfig, err := f.StorageFactory.NewConfig(resource) if err != nil { return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) } return generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, EnableGarbageCollection: true, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), CountMetricPollPeriod: time.Minute, StorageObjectCountTracker: f.StorageObjectCountTracker, }, nil } // Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) // Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) { // Initialize a basic configuration object ver := version.Get() config = server.NewConfig(legacyscheme.Codecs) config.Version = &ver config.Serializer = legacyscheme.Codecs config.LongRunningFunc = filters.BasicLongRunningRequestCheck( sets.NewString("watch", "proxy"), sets.NewString("attach", "exec", "proxy", "log", "portforward"), ) // Setup listener listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort)) if err != nil { err = fmt.Errorf("could not initialize listener: %w", err) return } cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath()) if err != nil { err = fmt.Errorf("could not get certificate: %w", err) return } clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath()) if err != nil { err = fmt.Errorf("could not get api CA file: %w", err) return } config.SecureServing = &server.SecureServingInfo{ Listener: listener, Cert: cert, ClientCA: clientCA, // not setup upstream, might be an issue } // Setup loopback clients (no authorization at this point, handled later) loopback, err := config.SecureServing.NewLoopbackClientConfig(uuid.NewString(), nil) loopback.TLSClientConfig.CAFile = c.pki.TLS.CertPath() if err != nil { err = fmt.Errorf("could not setup loopback config: %w", err) return } config.LoopbackClientConfig = loopback clients, err = newClientsForKC(config.LoopbackClientConfig, "apiserver") if err != nil { err = fmt.Errorf("could not setup loopback clients: %w", err) return } // Setup authentication authConfig := authenticator.Config{ ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()}, ServiceAccountIssuers: []string{audience}, APIAudiences: []string{audience}, ClientCAContentProvider: clientCA, ServiceAccountTokenGetter: controllersa.NewGetterFromClient( clients.Client, clients.Informer.Core().V1().Secrets().Lister(), clients.Informer.Core().V1().ServiceAccounts().Lister(), clients.Informer.Core().V1().Pods().Lister(), ), SecretsWriter: clients.Client.CoreV1(), } auth, _, err := authConfig.New() if err != nil { err = fmt.Errorf("could not setup apiserver authentication: %w", err) return } config.Authentication = server.AuthenticationInfo{ APIAudiences: []string{audience}, Authenticator: auth, } // Setup authorizations authzConfig := authorizer.Config{ AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC}, VersionedInformerFactory: clients.Informer, } authz, resolver, err := authzConfig.New() if err != nil { err = fmt.Errorf("could not seutp apiserver authorizations: %w", err) return } config.RuleResolver = resolver config.Authorization = server.AuthorizationInfo{ Authorizer: authz, } // Finally authorize loopback clients server.AuthorizeClientBearerToken(loopback, &config.Authentication, &config.Authorization) // Setup service resolver localHost, _ := url.Parse(config.LoopbackClientConfig.Host) serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( clients.Informer.Core().V1().Services().Lister(), clients.Informer.Core().V1().Endpoints().Lister(), ), localHost) // Setup admission controllers admissionConfig := kubeadmission.Config{ ExternalInformers: clients.Informer, LoopbackClientConfig: config.LoopbackClientConfig, } initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook if err != nil { err = fmt.Errorf("could not get admission plugins: %w", err) return } plugins := admission.NewPlugins() pluginsNames := []string{lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName} server.RegisterAllAdmissionPlugins(plugins) pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify()) initializersChain := admission.PluginInitializers{genericInitializer} initializersChain = append(initializersChain, initializers...) admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{}) config.AdmissionControl = admissionChain return } var kubeApiserver = &Unit{ Name: "kube-apiserver", Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger}, Run: func(u *Unit, c *Cluster, ctx context.Context) error { config, clients, err := buildConfig(c) if err != nil { return err } // Allow privileged pods capabilities.Setup(true, 0) // Configure the etcd backend, this is used by both the default and CRD apiservers etcdConfig := storagebackend.NewDefaultConfig("/registry", nil) etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"} // Servers are created in reverse orders for proper delegation // First notFound is the default handler, that basically returns a 404, // then the extensions serves CRD for unknown kinds, finally // the default apiserver is first in the chain and serves all default kinds // Note that no API aggregation is setup, which means that k8s API aggregation // is not supported by hepto. notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey) // Tweak the configuration and build the extension server extensionsConfig := extensions.Config{ GenericConfig: &server.RecommendedConfig{ Config: *config, SharedInformerFactory: clients.Informer, }, ExtraConfig: extensions.ExtraConfig{}, } extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource() extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{ StorageFactory: storage.NewDefaultStorageFactory( *etcdConfig, runtime.ContentTypeJSON, legacyscheme.Codecs, storage.NewDefaultResourceEncodingConfig(extensions.Scheme), extensionsConfig.GenericConfig.MergedResourceConfig, kubeapiserver.SpecialDefaultResourcePrefixes, ), } extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound)) if err != nil { return fmt.Errorf("could not seutp the extension server: %w", err) } // Setup the apiserver itself storageFactory := storage.NewDefaultStorageFactory( *etcdConfig, runtime.ContentTypeJSON, legacyscheme.Codecs, storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), controlplane.DefaultAPIResourceConfigSource(), kubeapiserver.SpecialDefaultResourcePrefixes, ) signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key) if err != nil { return fmt.Errorf("could not initilize service account signer: %w", err) } apiConfig := controlplane.Config{ GenericConfig: config, ExtraConfig: controlplane.ExtraConfig{ APIResourceConfigSource: storageFactory.APIResourceConfigSource, StorageFactory: storageFactory, EventTTL: time.Hour, KubeletClientConfig: client.KubeletClientConfig{ Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)}, }, ServiceIPRange: *c.networking.ServiceNet, APIServerServiceIP: c.networking.APIAddress, APIServerServicePort: 443, ServiceNodePortRange: options.DefaultServiceNodePortRange, // This still triggers an error, stating that stale data was found and cleaned up, // which is inevitable EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType, MasterCount: 1, ServiceAccountIssuer: signer, ServiceAccountIssuerURL: audience, ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey}, VersionedInformers: clients.Informer, }, } apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory} openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer) if err != nil { return fmt.Errorf("could not initialize generic apiserver: %w", err) } // Write a loopback config (different for every start) name := "loopback" clientConfig := api.Config{ APIVersion: "v1", Kind: "Config", CurrentContext: name, Contexts: map[string]*api.Context{name: { Cluster: name, AuthInfo: name, }}, Clusters: map[string]*api.Cluster{name: { Server: config.LoopbackClientConfig.Host, CertificateAuthority: config.LoopbackClientConfig.TLSClientConfig.CAFile, }}, AuthInfos: map[string]*api.AuthInfo{name: { Token: config.LoopbackClientConfig.BearerToken, }}, } err = os.MkdirAll("/root/.kube", 0755) if err != nil { return fmt.Errorf("could not create kubeconfig dir: %w", err) } clientcmd.WriteToFile(clientConfig, "/root/.kube/config") if err != nil { return fmt.Errorf("could not write privileged kubeconfig: %w", err) } // Finally start the apiserver server := apiServer.GenericAPIServer.PrepareRun() return server.Run(ctx.Done()) }, Ready: func(u *Unit, c *Cluster) bool { u.Logger.Info("checking if apiserver is ready") // Use the scheduler certificate for readiness test, which is more relevant than // using the internal privileged API token kc := &rest.Config{ Host: fmt.Sprintf("https://[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort), TLSClientConfig: rest.TLSClientConfig{ CAFile: c.pki.TLS.CertPath(), CertFile: c.masterCerts.SchedulerAPI.CertPath(), KeyFile: c.masterCerts.SchedulerAPI.KeyPath(), }, } client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler")) if err != nil { return false } _, err = client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{}) if err != nil { return false } return true }, }