Newer
Older
package services
import (
"context"
"fmt"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version"
"k8s.io/kube-aggregator/pkg/apiserver"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubeapiserver/options"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// Reimplement the rest options factory to stop depending on option parsers
type RestOptionsFactory struct {
StorageFactory storage.StorageFactory
StorageObjectCountTracker request.StorageObjectCountTracker
}
func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: time.Minute,
StorageObjectCountTracker: f.StorageObjectCountTracker,
}, nil
}
// Make a service resolver for a given client config
func getResolver(clients *Clients) apiserver.ServiceResolver {
host, _ := url.Parse(clients.KubeConfig.Host)
return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
clients.Informer.Core().V1().Services().Lister(),
clients.Informer.Core().V1().Endpoints().Lister(),
), host)
}
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Disable unneeded post start hooks, these cannot be easily disabled except at runtime, which still takes up
// space in the binary sadly
config.DisabledPostStartHooks = sets.NewString(
// bootstrap-controller : TODO replace endpoint reconciler logic if required
"generic-apiserver-start-informers", // they are started manually
"start-apiextensions-informers", // started manually also
"storage-object-count-tracker-hook", // unused
"start-legacy-token-tracking-controller", // legacy and unused
"rbac/bootstrap-roles", // TODO replaced with our own simpler RBAC
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
// Setup loopback clients (no authorization at this point, handled later)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err)
return
}
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
return
}
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Etcd backend supports being created with a specific codec, however later created storage
// factories override that codec at runtime thanks to using DefaultStorageFactory instead
// of vanilla SimpleStorageFactory, so we create the backend with no pre-configured codec
// and use it for all factories
// TODO: be more ware about resource prefixes in etcd and provide a sane, generic working implentation,
// not taking backward compatibility with vanilla into account
etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}
// Servers are created in reverse orders for proper delegation
// First notFound is the default handler, that basically returns a 404,
// then the extensions serves CRD for unknown kinds, finally
// the default apiserver is first in the chain and serves all default kinds
// Note that no API aggregation is setup, which means that k8s API aggregation
// is not supported by hepto.
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
// Tweak the configuration and build the extension server
// The generic config is an instance of RecommendedConfig, which is composed of Config, by
// first copying it we can later change some fields per instance
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config, // This is the common config being copied
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
// Then we customize some specific fields
// The resource config is used as-is with no api enablement specification, meaning ga and beta
// are enabled
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
// The rest options getter abstracts all storage for the server, by specifying the scheme and codecs
// on top of the storage backend. Vanilla code sets a special multi-group versionner to avoid issues
// with large objects (cee cmd/kube-apiserver/app/apiextensions.go), which we ignore here
extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
// TODO: using DefaultStorageFactory might be an issue compared to SimpleStorageFactory from vanilla,
// since it does provide a different resource prefix than SimpleStorageFactory (resource name
// instead of resource group and name)
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig, // copied as value, so the factory may alter some fields
runtime.ContentTypeJSON, // media type, somewhat inefficient but good enough
extensions.Codecs, // serializer
storage.NewDefaultResourceEncodingConfig(extensions.Scheme),
extensionsConfig.GenericConfig.MergedResourceConfig, // api resources
map[schema.GroupResource]string{}, // no override for extensions
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
// Setup the apiserver itself, same logics as the extensions server
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return fmt.Errorf("could not initilize service account signer: %w", err)
}
apiConfig := controlplane.Config{
GenericConfig: &(*config), // copy by value, so we can modify fields
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: options.DefaultServiceNodePortRange,
// This still triggers an error, stating that stale data was found and cleaned up,
// which is inevitable
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
},
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig
restOptionsGetter := &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
apiConfig.GenericConfig.MergedResourceConfig,
kubeapiserver.SpecialDefaultResourcePrefixes,
),
}
apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.GenericConfig.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// Setup an API aggregator
aggregatorConfig := &aggregator.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: getResolver(clients),
},
}
aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
aggregatorConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
aggregatorscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(aggregatorscheme.Scheme),
aggregatorConfig.GenericConfig.MergedResourceConfig,
map[schema.GroupResource]string{},
),
}
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := newLoopbackClients(c)
if err != nil {
return false
}
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
var loopback = &Unit{
Dependencies: []*Unit{kubeApiserver},
Start: func(u *Unit, c *Cluster, ctx context.Context) error {
clients, err := newLoopbackClients(c)
if err != nil {
return fmt.Errorf("could not get loopback config: %w", err)
}
// Write a loopback config (different for every start)
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: clients.KubeConfig.Host,
CertificateAuthority: clients.KubeConfig.TLSClientConfig.CAFile,
}},
AuthInfos: map[string]*api.AuthInfo{name: {
err = os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)