Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/request/headerrequest"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// The kubernetes API server has a complex architecture, which we reproduce partially here.
//
// The API itself is layered:
// - the core API is served by the API server itself,
// - the core API wraps around the extensions server for serving CRDs,
// - the core API is wrapped by the aggregated API server for serving aggregated APIs.
//
// HTTP calls go through the aggregated layer, then the core layer, then the extensions layer,
// each time passing to the next layer the request and response objects if the current layer does
// not handle the request.
//
// Two controllers are started as part of the API server for now: CRD controller and registration
// controller, responsible for registering CRDs and aggregated APIs respectively.
//
// Regarding storage, the API server abstracts access to etcd through the storage package, which
// itself implements some complex interface. The main component we implement is the RestOptionsFactory,
// which is used to get the storage options for a given REST request. The options are then used to create
// the storage backend. The RestOptionsFactory is complex, mostly because the factory pattern is repeated
// multiple times in the codebase, using a StorageFactory for instantiating storage config, which itself
// uses factories for instantiating resource encoding config, etc.
//
// We have our own implementation of RestOptionsFactory in the `k8s` package, see the documenation there
// for more details.
Dependencies: []*Unit{etcd, pkiMaster, vpn, memberlist, kubeLogger},
config, clients, hooks, err := buildConfig(c)
// Allow privileged pods, this is required for some system deployments, and is done
// in a set of static variables inside the capabilities package
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
// 1. The extensions server wraps around a 404 handler
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
// 2. The API server itself wraps around the extensions server
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
for name, hook := range hooks {
err = apiConfig.GenericConfig.AddPostStartHook(name, hook)
if err != nil {
return fmt.Errorf("could not add a post-start-hook: %w", err)
}
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// 3. The aggregator server wraps around the API server
aggregatorConfig, _ := buildAggregatorConfig(c, *config, clients)
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
// Start registration services, which auto register all resources and all
// crd to the aggregator server
// TODO at some point we should probably move these to the controllers component
apiRegistrationClient, err := apiregistrationclient.NewForConfig(clients.KubeConfig)
if err != nil {
return fmt.Errorf("could not initialize aggregator client: %w", err)
}
registrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
crdController := crdregistration.NewCRDRegistrationController(extensionServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), registrationController)
// Start by discovering resource paths from apiserver and registering them manually
// TODO We completely ignore priorities, wchih might become an issue at some point
pathRegex := regexp.MustCompile(`^/apis/([^/]+)/([^/]+)$`)
for _, resourcePath := range apiServer.GenericAPIServer.ListedPaths() {
if match := pathRegex.FindStringSubmatch(resourcePath); match != nil {
registrationController.AddAPIServiceToSyncOnStart(&v1.APIService{
ObjectMeta: meta.ObjectMeta{Name: match[2] + "." + match[1]},
Spec: v1.APIServiceSpec{
Group: match[1],
Version: match[2],
GroupPriorityMinimum: 1,
VersionPriority: 1,
},
})
// Wait for CRD to be ready then start both controllers in a post-start-hook
err = aggregatorServer.GenericAPIServer.AddPostStartHook("autoregistration", func(context server.PostStartHookContext) error {
go crdController.Run(1, ctx.Done())
crdController.WaitForInitialSync()
go registrationController.Run(1, ctx.Done())
return nil
})
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
status := 0
result := clients.Client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&status)
if result.Error() != nil || status != 200 {
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, hooks map[string]server.PostStartHookFunc, err error) {
// Initialize return values
config = server.NewConfig(legacyscheme.Codecs)
hooks = map[string]server.PostStartHookFunc{}
// Initialize a basic configuration object
ver := version.Get()
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Setup the network listener and certificates for server and client authentication,
// this is only used by the actual API server
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.Addr().String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
proxyCA, err := dynamiccertificates.NewDynamicCAContentFromFile("proxy-ca", c.pki.Proxy.CertPath())
if err != nil {
err = fmt.Errorf("could not get proxy CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
// This is performed in vanilla when applying authentication configuration,
// especially in AuthenticationInfo.ApplyClientCet
ClientCA: dynamiccertificates.NewUnionCAContentProvider(clientCA, proxyCA),
config.PublicAddress = c.networking.NodeAddress.Addr().AsSlice()
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
// OpenID configuration, which might just be empty to disable OIDC
OIDCIssuerURL: c.settings.OIDCIssuer,
OIDCClientID: c.settings.OIDCClientID,
OIDCUsernameClaim: "sub",
OIDCUsernamePrefix: "oidc:",
// This is currently not strictly required, since we do not use proxified
// requests to apiservers themselves, though we might at some point. Private
// key is only delivered to apiserver itself, so little harm is done
RequestHeaderConfig: &authenticatorfactory.RequestHeaderConfig{
UsernameHeaders: headerrequest.StaticStringSlice([]string{"X-Remote-User"}),
GroupHeaders: headerrequest.StaticStringSlice([]string{"X-Remote-Group"}),
ExtraHeaderPrefixes: headerrequest.StaticStringSlice([]string{"X-Remote-Extra"}),
CAContentProvider: proxyCA,
},
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
RequestHeaderConfig: authConfig.RequestHeaderConfig,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, ruleResolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = ruleResolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Prepare all OpenAPI endpoints
// TODO at some point we should remove OpenAPI v2 support
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
config.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
config.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// Setup admission controllers
// Vanilla code for this is pretty complex, since it uses dynamic plugins, which might load configurations from files instead
// of structs, so we just copy the relevant parts here
//
// Start, by registering admission plugins
// Full list of plugins is avaiable at: https://github.com/kubernetes/kubernetes/blob/v1.27.4/pkg/kubeapiserver/options/plugins.go
plugins := admission.NewPlugins()
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
// Prepare the plugins configuration
// The plugins config is an instance that just returns nil for every plugin,
// we ignore errors since it never fails to load config, since it does not parse.. any config at all
pluginsConfig, _ := admission.ReadAdmissionConfiguration([]string{}, "", nil)
// Since we do not load configuration from files, the only plugin config we pass is from the plugin initialized chain
// It is a chain of functions that alter the plugin object to initialize it. We build the initialize chain from two sources,
// the admission initializer packager, which sets clients settings mostly, and helpers from kubeapiserver, which setup the
// loopback configuration and informers
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, authz, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
schemaResolver := resolver.NewDefinitionsSchemaResolver(scheme.Scheme, config.OpenAPIConfig.GetDefinitions)
heperInitializers, admissionHook, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil, schemaResolver)
if err != nil {
err = fmt.Errorf("could not prepare the admission config: %w", err)
return
}
hooks["initialize-admission"] = admissionHook
initializersChain = append(initializersChain, heperInitializers...)
// Actually build the admission chain
// The plugins config is an instance that just returns nil for every plugin, the decorators instance does nothing
// since it iterates over an empty list of decorators
admissionChain, err := plugins.NewFromPlugins(plugins.Registered(), pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
// Setup the resource discovery manager, which is especially useful for the aggregation layer
config.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
// Finally authorize loopback clients, so that loopback requests are authorized as system requests
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
return
}
// Customize the generic config then build an extensions server config
func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) {
generic := config // Copy the config before we modify it
// Serve the extension resources as API endpoints
generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
// This is the storage configuration for accessing CRD objects themselves
generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig)
// Build the extensions server (create then customize the configuration)
return &extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: generic,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{
// This is the storage configuration for accessing custom resources, so we do not specify codecs, schemes or
// merged resources, since the storage layer is transparent, and schema validation is done by the extensions server
// instead
},
}, nil
}
// Customize the generic config then build an apiserver config
func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
generic := config // Copy the config before we modify it
// Serve the default API server resources as API endpoints
generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
// Currently unused framework for enabling additional resources, we could've removed this, but reverse engineering
// the logic was hard enough so we just keep it until we need it
generic.MergedResourceConfig.EnableMatchingVersions(func(gv schema.GroupVersion) bool {
return false
})
// The storage configuration for accessing the default API server resources
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig)
generic.RESTOptionsGetter = restOptionsGetter
// Setup the token signer
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return nil, fmt.Errorf("could not initilize service account signer: %w", err)
}
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
TLSClientConfig: client.KubeletTLSConfig{
CertFile: c.masterCerts.Kubelet.CertPath(),
KeyFile: c.masterCerts.Kubelet.KeyPath(),
CAFile: c.pki.TLS.CertPath(),
},
ServiceIPRange: net.IPNet{
IP: c.networking.ServiceNet.Addr().AsSlice(),
Mask: net.CIDRMask(c.networking.ServiceNet.Bits(), c.networking.ServiceNet.Addr().BitLen()),
},
APIServerServiceIP: c.networking.APIAddress.AsSlice(),
APIServerServicePort: 443,
ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768},
EndpointReconcilerType: reconcilers.LeaseEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
APIResourceConfigSource: generic.MergedResourceConfig,
StorageFactory: restOptionsGetter.StorageFactory,
ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{
// This is duplicated information from the authentication layer, so that
// the start-cluster-authentication-info-controller controller properly
// populates the extension-apiserver-authentication ConfigMap with
// authentication info
ClientCA: config.SecureServing.ClientCA,
RequestHeaderCA: config.Authentication.RequestHeaderConfig.CAContentProvider,
RequestHeaderUsernameHeaders: config.Authentication.RequestHeaderConfig.UsernameHeaders,
RequestHeaderGroupHeaders: config.Authentication.RequestHeaderConfig.GroupHeaders,
RequestHeaderExtraHeaderPrefixes: config.Authentication.RequestHeaderConfig.ExtraHeaderPrefixes,
// Customize the generic config then build an aggregator config
func buildAggregatorConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
generic := config // Copy the config before we modify it
generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig)
return &aggregator.Config{
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
// This service resolver is used for exposing aggregated APIs served by
// workloads inside the cluster
// This is for the aggregation layer to authenticate proxified
// requests to webhooks and other aggregated services using a dedicated
// certificate and certificate authority
ProxyClientCertFile: c.masterCerts.Proxy.CertPath(),
ProxyClientKeyFile: c.masterCerts.Proxy.KeyPath(),