Skip to content
Snippets Groups Projects
apiserver.go 13.9 KiB
Newer Older
	"net"
	"net/url"
	"os"
	"time"
	"github.com/google/uuid"
	extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/util/sets"
	"k8s.io/apiserver/pkg/admission"
	"k8s.io/apiserver/pkg/admission/initializer"
	"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
	"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
	"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
	"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
	apifilters "k8s.io/apiserver/pkg/endpoints/filters"
	openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
	"k8s.io/apiserver/pkg/registry/generic"
	"k8s.io/apiserver/pkg/server"
	"k8s.io/apiserver/pkg/server/dynamiccertificates"
	"k8s.io/apiserver/pkg/server/filters"
	"k8s.io/apiserver/pkg/server/storage"
	"k8s.io/apiserver/pkg/storage/storagebackend"
	"k8s.io/apiserver/pkg/util/feature"
	"k8s.io/apiserver/pkg/util/flowcontrol/request"
	"k8s.io/apiserver/pkg/util/notfoundhandler"
	"k8s.io/apiserver/pkg/util/openapi"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/clientcmd/api"
	"k8s.io/component-base/version"
	"k8s.io/kube-aggregator/pkg/apiserver"
	"k8s.io/kubernetes/pkg/api/legacyscheme"
	"k8s.io/kubernetes/pkg/apis/core"
	"k8s.io/kubernetes/pkg/capabilities"
	"k8s.io/kubernetes/pkg/cluster/ports"
	controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
	"k8s.io/kubernetes/pkg/controlplane"
	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
	generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
	"k8s.io/kubernetes/pkg/kubeapiserver"
	kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
	"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
	"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
	"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
	"k8s.io/kubernetes/pkg/kubeapiserver/options"
	"k8s.io/kubernetes/pkg/kubelet/client"
	"k8s.io/kubernetes/pkg/serviceaccount"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443

// Reimplement the rest options factory to stop depending on opiont parsers
type RestOptionsFactory struct {
	StorageFactory            storage.StorageFactory
	StorageObjectCountTracker request.StorageObjectCountTracker
}

func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}
	return generic.RESTOptions{
		StorageConfig:             storageConfig,
		Decorator:                 generic.UndecoratedStorage,
		DeleteCollectionWorkers:   1,
		EnableGarbageCollection:   true,
		ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:     time.Minute,
		StorageObjectCountTracker: f.StorageObjectCountTracker,
	}, nil
}

// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) {
	// Initialize a basic configuration object
	ver := version.Get()
	config = server.NewConfig(legacyscheme.Codecs)
	config.Version = &ver
	config.Serializer = legacyscheme.Codecs
	config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
		sets.NewString("watch", "proxy"),
		sets.NewString("attach", "exec", "proxy", "log", "portforward"),
	)

	// Setup listener
	listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
	if err != nil {
		err = fmt.Errorf("could not initialize listener: %w", err)
		return
	}
	cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
	if err != nil {
		err = fmt.Errorf("could not get certificate: %w", err)
		return
	}
	clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
	if err != nil {
		err = fmt.Errorf("could not get api CA file: %w", err)
		return
	}
	config.SecureServing = &server.SecureServingInfo{
		Listener: listener,
		Cert:     cert,
		ClientCA: clientCA, // not setup upstream, might be an issue
	}

	// Setup loopback clients (no authorization at this point, handled later)
	loopback, err := config.SecureServing.NewLoopbackClientConfig(uuid.NewString(), nil)
	loopback.TLSClientConfig.CAFile = c.pki.TLS.CertPath()
	if err != nil {
		err = fmt.Errorf("could not setup loopback config: %w", err)
		return
	}
	config.LoopbackClientConfig = loopback
	clients, err = newClientsForKC(config.LoopbackClientConfig)
	if err != nil {
		err = fmt.Errorf("could not setup loopback clients: %w", err)
		return
	}

	// Setup authentication
	authConfig := authenticator.Config{
		ServiceAccountKeyFiles:  []string{c.masterCerts.APITokens.KeyPath()},
		ServiceAccountIssuers:   []string{audience},
		APIAudiences:            []string{audience},
		ClientCAContentProvider: clientCA,
		ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
			clients.Client,
			clients.Informer.Core().V1().Secrets().Lister(),
			clients.Informer.Core().V1().ServiceAccounts().Lister(),
			clients.Informer.Core().V1().Pods().Lister(),
		),
		SecretsWriter: clients.Client.CoreV1(),
	}
	auth, _, err := authConfig.New()
	if err != nil {
		err = fmt.Errorf("could not setup apiserver authentication: %w", err)
		return
	}
	config.Authentication = server.AuthenticationInfo{
		APIAudiences:  []string{audience},
		Authenticator: auth,
	}

	// Setup authorizations
	authzConfig := authorizer.Config{
		AuthorizationModes:       []string{modes.ModeNode, modes.ModeRBAC},
		VersionedInformerFactory: clients.Informer,
	}
	authz, resolver, err := authzConfig.New()
	if err != nil {
		err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
		return
	}
	config.RuleResolver = resolver
	config.Authorization = server.AuthorizationInfo{
		Authorizer: authz,
	}

	// Finally authorize loopback clients
	server.AuthorizeClientBearerToken(loopback, &config.Authentication, &config.Authorization)

	// Setup service resolver
	localHost, _ := url.Parse(config.LoopbackClientConfig.Host)
	serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
		clients.Informer.Core().V1().Services().Lister(),
		clients.Informer.Core().V1().Endpoints().Lister(),
	), localHost)

	// Setup admission controllers
	admissionConfig := kubeadmission.Config{
		ExternalInformers:    clients.Informer,
		LoopbackClientConfig: config.LoopbackClientConfig,
	}
	initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
	if err != nil {
		err = fmt.Errorf("could not get admission plugins: %w", err)
		return
	}
	plugins := admission.NewPlugins()
	pluginsNames := []string{lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName}
	server.RegisterAllAdmissionPlugins(plugins)
	pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
	genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
	initializersChain := admission.PluginInitializers{genericInitializer}
	initializersChain = append(initializersChain, initializers...)
	admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
	config.AdmissionControl = admissionChain

	return
}

var kubeApiserver = &Unit{
	Name:         "kube-apiserver",
	Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
	Run: func(u *Unit, c *Cluster, ctx context.Context) error {
		config, clients, err := buildConfig(c)
		// Allow privileged pods
		capabilities.Setup(true, 0)

		// Configure the etcd backend, this is used by both the default and CRD apiservers
		etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
		etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}

		// Servers are created in reverse orders for proper delegation
		// First notFound is the default handler, that basically returns a 404,
		// then the extensions serves CRD for unknown kinds, finally
		// the default apiserver is first in the chain and serves all default kinds
		// Note that no API aggregation is setup, which means that k8s API aggregation
		// is not supported by hepto.

		notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)

		// Tweak the configuration and build the extension server
		extensionsConfig := extensions.Config{
			GenericConfig: &server.RecommendedConfig{
				Config:                *config,
				SharedInformerFactory: clients.Informer,
			},
			ExtraConfig: extensions.ExtraConfig{},
		}
		extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
		extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
			StorageFactory: storage.NewDefaultStorageFactory(
				*etcdConfig,
				runtime.ContentTypeJSON,
				legacyscheme.Codecs,
				storage.NewDefaultResourceEncodingConfig(extensions.Scheme),
				extensionsConfig.GenericConfig.MergedResourceConfig,
				kubeapiserver.SpecialDefaultResourcePrefixes,
			),
		}
		extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
		extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
			return fmt.Errorf("could not seutp the extension server: %w", err)

		// Setup the apiserver itself
		storageFactory := storage.NewDefaultStorageFactory(
			*etcdConfig,
			runtime.ContentTypeJSON,
			legacyscheme.Codecs,
			storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
			controlplane.DefaultAPIResourceConfigSource(),
			kubeapiserver.SpecialDefaultResourcePrefixes,
		)
		signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
			return fmt.Errorf("could not initilize service account signer: %w", err)
		}
		apiConfig := controlplane.Config{
			GenericConfig: config,
			ExtraConfig: controlplane.ExtraConfig{
				APIResourceConfigSource: storageFactory.APIResourceConfigSource,
				StorageFactory:          storageFactory,
				EventTTL:                time.Hour,
				KubeletClientConfig: client.KubeletClientConfig{
					Port:                  ports.KubeletPort,
					ReadOnlyPort:          ports.KubeletReadOnlyPort,
					PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
				},
				ServiceIPRange:       *c.networking.ServiceNet,
				APIServerServiceIP:   c.networking.APIAddress,
				APIServerServicePort: 443,
				ServiceNodePortRange: options.DefaultServiceNodePortRange,
				// This still triggers an error, stating that stale data was found and cleaned up,
				// which is inevitable
				EndpointReconcilerType:   reconcilers.NoneEndpointReconcilerType,
				MasterCount:              1,
				ServiceAccountIssuer:     signer,
				ServiceAccountIssuerURL:  audience,
				ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
				VersionedInformers:       clients.Informer,
			},
		apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory}
		openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
		apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
		apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
		apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
		apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
			return fmt.Errorf("could not initialize generic apiserver: %w", err)
		}

		// Write a loopback config (different for every start)
		name := "loopback"
		clientConfig := api.Config{
			APIVersion:     "v1",
			Kind:           "Config",
			CurrentContext: name,
			Contexts: map[string]*api.Context{name: {
				Cluster:  name,
				AuthInfo: name,
			}},
			Clusters: map[string]*api.Cluster{name: {
				Server:               config.LoopbackClientConfig.Host,
				CertificateAuthority: config.LoopbackClientConfig.TLSClientConfig.CAFile,
			}},
			AuthInfos: map[string]*api.AuthInfo{name: {
				Token: config.LoopbackClientConfig.BearerToken,
			}},
		err = os.MkdirAll("/root/.kube", 0755)
		if err != nil {
			return fmt.Errorf("could not create kubeconfig dir: %w", err)
		clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
			return fmt.Errorf("could not write privileged kubeconfig: %w", err)
		// Finally start the apiserver
		server := apiServer.GenericAPIServer.PrepareRun()
		return server.Run(ctx.Done())
	},
	Ready: func(u *Unit, c *Cluster) bool {
		u.Logger.Info("checking if apiserver is ready")
		// Use the scheduler certificate for readiness test, which is more relevant than
		// using the internal privileged API token
			Host: fmt.Sprintf("https://[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort),
			TLSClientConfig: rest.TLSClientConfig{
				CAFile:   c.pki.TLS.CertPath(),
				CertFile: c.masterCerts.SchedulerAPI.CertPath(),
				KeyFile:  c.masterCerts.SchedulerAPI.KeyPath(),
			},
		}
		client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler"))
		if err != nil {
			return false
		}
		_, err = client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
		if err != nil {
			return false
		}
		return true
	},
}