Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
config, clients, err := buildConfig(c)
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
for gv := range aggregatorConfig.GenericConfig.MergedResourceConfig.GroupVersionConfigs {
fmt.Println("gv: ", gv)
}
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
// Write a loopback config (different for every start)
var loopback = &Unit{
Name: "kube-loopback",
Dependencies: []*Unit{kubeApiserver},
Start: func(u *Unit, c *Cluster, ctx context.Context) error {
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: c.masterUrl,
CertificateAuthority: c.pki.TLS.CertPath(),
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: c.loopbackToken,
}},
}
err := os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
}
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)
}
return nil
},
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Disable unneeded post start hooks, these cannot be easily disabled except at runtime, which still takes up
// space in the binary sadly
config.DisabledPostStartHooks = sets.NewString(
// bootstrap-controller : TODO replace endpoint reconciler logic if required
"generic-apiserver-start-informers", // they are started manually
"start-apiextensions-informers", // started manually also
"storage-object-count-tracker-hook", // unused
"start-legacy-token-tracking-controller", // legacy and unused
"rbac/bootstrap-roles", // TODO replaced with our own simpler RBAC
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil) // TODO: handle post start hook
if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err)
return
}
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
return
}
// Customize the generic config then build an extensions server config
func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) {
generic := config
generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig)
// Build the extensions server (create then customize the configuration)
ExtraConfig: extensions.ExtraConfig{
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
CRDRESTOptionsGetter: generic.RESTOptionsGetter,
},
}, nil
// Customize the generic config then build an apiserver config
func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
generic := config
generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig)
generic.RESTOptionsGetter = restOptionsGetter
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
generic.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
generic.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return nil, fmt.Errorf("could not initilize service account signer: %w", err)
}
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768},
// This still triggers an error, stating that stale data was found and cleaned up,
// which is inevitable
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
APIResourceConfigSource: generic.MergedResourceConfig,
StorageFactory: restOptionsGetter.StorageFactory,
ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{
ClientCA: config.SecureServing.ClientCA,
},
// Customize the generic config then build an aggregator config
func buildAggregatorConfig(config server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
generic := config
generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig)
return &aggregator.Config{
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: clients.ServiceResolver(),
},