Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version"
"k8s.io/kube-aggregator/pkg/apiserver"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubeapiserver/options"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// Make a service resolver for a given client config
func getResolver(clients *k8s.Clients) apiserver.ServiceResolver {
host, _ := url.Parse(clients.KubeConfig.Host)
return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
clients.Informer.Core().V1().Services().Lister(),
clients.Informer.Core().V1().Endpoints().Lister(),
), host)
}
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Disable unneeded post start hooks, these cannot be easily disabled except at runtime, which still takes up
// space in the binary sadly
config.DisabledPostStartHooks = sets.NewString(
// bootstrap-controller : TODO replace endpoint reconciler logic if required
"generic-apiserver-start-informers", // they are started manually
"start-apiextensions-informers", // started manually also
"storage-object-count-tracker-hook", // unused
"start-legacy-token-tracking-controller", // legacy and unused
"rbac/bootstrap-roles", // TODO replaced with our own simpler RBAC
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err)
return
}
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
return
}
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
// Build the extensions server (create then customize the configuration)
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config, // Copy by value to later change fields
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
extensionsConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, extensionsConfig.GenericConfig.MergedResourceConfig)
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
// Setup the apiserver itself, same logics as the extensions server
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return fmt.Errorf("could not initilize service account signer: %w", err)
}
apiConfig := controlplane.Config{
GenericConfig: &(*config), // copy by value, so we can modify fields
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: options.DefaultServiceNodePortRange,
// This still triggers an error, stating that stale data was found and cleaned up,
// which is inevitable
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
},
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig
// TODO add prefix overrides
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, apiConfig.GenericConfig.MergedResourceConfig)
apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.GenericConfig.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// Setup an API aggregator
aggregatorConfig := &aggregator.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: getResolver(clients),
},
}
aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
aggregatorConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, aggregatorConfig.GenericConfig.MergedResourceConfig)
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
// Write a loopback config (different for every start)
Dependencies: []*Unit{kubeApiserver},
Start: func(u *Unit, c *Cluster, ctx context.Context) error {
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: c.masterUrl,
CertificateAuthority: c.pki.TLS.CertPath(),
}},
AuthInfos: map[string]*api.AuthInfo{name: {
err := os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)