package services import ( "context" "fmt" "net" "os" "time" "go.acides.org/hepto/k8s" extensions "k8s.io/apiextensions-apiserver/pkg/apiserver" meta "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/initializer" "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" "k8s.io/apiserver/pkg/admission/plugin/resourcequota" "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy" "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" apifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/apiserver/pkg/util/openapi" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" "k8s.io/component-base/version" aggregator "k8s.io/kube-aggregator/pkg/apiserver" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/cluster/ports" controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/controlplane/reconcilers" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" "k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize" ) const audience = "https://kubernetes.default.svc.cluster.local" const apiserverPort = 6443 var kubeApiserver = &Unit{ Name: "kube-apiserver", Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger}, Run: func(u *Unit, c *Cluster, ctx context.Context) error { config, clients, err := buildConfig(c) if err != nil { return err } // Allow privileged pods capabilities.Setup(true, 0) // Servers are created in reverse orders for proper delegation notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey) extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound)) if err != nil { return fmt.Errorf("could not seutp the extension server: %w", err) } apiConfig, err := buildApiConfig(c, *config, clients) if err != nil { return fmt.Errorf("could not build apiserver config: %w", err) } apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer) if err != nil { return fmt.Errorf("could not initialize generic apiserver: %w", err) } aggregatorConfig, _ := buildAggregatorConfig(*config, clients) aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer) if err != nil { return fmt.Errorf("could not initialize aggregator: %w", err) } for gv := range aggregatorConfig.GenericConfig.MergedResourceConfig.GroupVersionConfigs { fmt.Println("gv: ", gv) } // Finally start the apiserver server := aggregatorServer.GenericAPIServer.PrepareRun() go clients.Start(ctx) return server.Run(ctx.Done()) }, Ready: func(u *Unit, c *Cluster) bool { u.Logger.Info("checking if apiserver is ready") clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { return false } _, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{}) if err != nil { return false } return true }, } // Write a loopback config (different for every start) var loopback = &Unit{ Name: "kube-loopback", Dependencies: []*Unit{kubeApiserver}, Start: func(u *Unit, c *Cluster, ctx context.Context) error { name := "loopback" clientConfig := api.Config{ APIVersion: "v1", Kind: "Config", CurrentContext: name, Contexts: map[string]*api.Context{name: { Cluster: name, AuthInfo: name, }}, Clusters: map[string]*api.Cluster{name: { Server: c.masterUrl, CertificateAuthority: c.pki.TLS.CertPath(), }}, AuthInfos: map[string]*api.AuthInfo{name: { Token: c.loopbackToken, }}, } err := os.MkdirAll("/root/.kube", 0755) if err != nil { return fmt.Errorf("could not create kubeconfig dir: %w", err) } clientcmd.WriteToFile(clientConfig, "/root/.kube/config") if err != nil { return fmt.Errorf("could not write privileged kubeconfig: %w", err) } return nil }, } // Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) // Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) { // Initialize a basic configuration object ver := version.Get() config = server.NewConfig(legacyscheme.Codecs) config.Version = &ver config.Serializer = legacyscheme.Codecs config.LongRunningFunc = filters.BasicLongRunningRequestCheck( sets.NewString("watch", "proxy"), sets.NewString("attach", "exec", "proxy", "log", "portforward"), ) // Disable unneeded post start hooks, these cannot be easily disabled except at runtime, which still takes up // space in the binary sadly config.DisabledPostStartHooks = sets.NewString( // bootstrap-controller : TODO replace endpoint reconciler logic if required "generic-apiserver-start-informers", // they are started manually "start-apiextensions-informers", // started manually also "storage-object-count-tracker-hook", // unused "start-legacy-token-tracking-controller", // legacy and unused "rbac/bootstrap-roles", // TODO replaced with our own simpler RBAC ) // Setup listener listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort)) if err != nil { err = fmt.Errorf("could not initialize listener: %w", err) return } cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath()) if err != nil { err = fmt.Errorf("could not get certificate: %w", err) return } clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath()) if err != nil { err = fmt.Errorf("could not get api CA file: %w", err) return } config.SecureServing = &server.SecureServingInfo{ Listener: listener, Cert: cert, ClientCA: clientCA, // not setup upstream, might be an issue } // Setup loopback clients (no authorization at this point, handled later) clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) if err != nil { err = fmt.Errorf("could not setup loopback clients: %w", err) return } config.LoopbackClientConfig = clients.KubeConfig // Setup authentication authConfig := authenticator.Config{ ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()}, ServiceAccountIssuers: []string{audience}, APIAudiences: []string{audience}, ClientCAContentProvider: clientCA, ServiceAccountTokenGetter: controllersa.NewGetterFromClient( clients.Client, clients.Informer.Core().V1().Secrets().Lister(), clients.Informer.Core().V1().ServiceAccounts().Lister(), clients.Informer.Core().V1().Pods().Lister(), ), SecretsWriter: clients.Client.CoreV1(), } auth, _, err := authConfig.New() if err != nil { err = fmt.Errorf("could not setup apiserver authentication: %w", err) return } config.Authentication = server.AuthenticationInfo{ APIAudiences: []string{audience}, Authenticator: auth, } // Setup authorizations authzConfig := authorizer.Config{ AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC}, VersionedInformerFactory: clients.Informer, } authz, resolver, err := authzConfig.New() if err != nil { err = fmt.Errorf("could not seutp apiserver authorizations: %w", err) return } config.RuleResolver = resolver config.Authorization = server.AuthorizationInfo{ Authorizer: authz, } // Finally authorize loopback clients server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization) // Setup admission controllers admissionConfig := kubeadmission.Config{ ExternalInformers: clients.Informer, LoopbackClientConfig: config.LoopbackClientConfig, } initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil) // TODO: handle post start hook if err != nil { err = fmt.Errorf("could not get admission plugins: %w", err) return } // Some default plugins are not enabled here: // limitranger, since we do not support LimitRange // setdefault, since we do not support default storage class // defaulttolrationseconds, as we do not use this feature // storageobjectinuseprotection, as we do not use this feature // podpriority, as we do not use this feature // runtimeclass, as we do not use this feature // defaultingressclass, as we do not use this feature plugins := admission.NewPlugins() lifecycle.Register(plugins) mutating.Register(plugins) validatingadmissionpolicy.Register(plugins) validating.Register(plugins) resize.Register(plugins) resourcequota.Register(plugins) nodetaint.Register(plugins) podsecurity.Register(plugins) saplugin.Register(plugins) pluginsNames := []string{ lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName, resize.PluginName, resourcequota.PluginName, nodetaint.PluginName, podsecurity.PluginName, saplugin.PluginName, } pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify()) initializersChain := admission.PluginInitializers{genericInitializer} initializersChain = append(initializersChain, initializers...) admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{}) config.AdmissionControl = admissionChain return } // Customize the generic config then build an extensions server config func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) { generic := config generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource() generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig) // Build the extensions server (create then customize the configuration) return &extensions.Config{ GenericConfig: &server.RecommendedConfig{ Config: generic, SharedInformerFactory: clients.Informer, }, ExtraConfig: extensions.ExtraConfig{ // TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD CRDRESTOptionsGetter: generic.RESTOptionsGetter, }, }, nil } // Customize the generic config then build an apiserver config func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) { generic := config generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig) generic.RESTOptionsGetter = restOptionsGetter openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) generic.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) generic.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme)) signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key) if err != nil { return nil, fmt.Errorf("could not initilize service account signer: %w", err) } return &controlplane.Config{ GenericConfig: &generic, ExtraConfig: controlplane.ExtraConfig{ EventTTL: time.Hour, KubeletClientConfig: client.KubeletClientConfig{ Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)}, }, ServiceIPRange: *c.networking.ServiceNet, APIServerServiceIP: c.networking.APIAddress, APIServerServicePort: 443, ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768}, // This still triggers an error, stating that stale data was found and cleaned up, // which is inevitable EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType, MasterCount: 1, ServiceAccountIssuer: signer, ServiceAccountIssuerURL: audience, ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey}, VersionedInformers: clients.Informer, APIResourceConfigSource: generic.MergedResourceConfig, StorageFactory: restOptionsGetter.StorageFactory, ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{ ClientCA: config.SecureServing.ClientCA, }, }, }, nil } // Customize the generic config then build an aggregator config func buildAggregatorConfig(config server.Config, clients *k8s.Clients) (*aggregator.Config, error) { generic := config generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource() generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig) return &aggregator.Config{ GenericConfig: &server.RecommendedConfig{ Config: generic, SharedInformerFactory: clients.Informer, }, ExtraConfig: aggregator.ExtraConfig{ ServiceResolver: clients.ServiceResolver(), }, }, nil }