Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
var pathRegex = regexp.MustCompile(`^/apis/([^/]+)/([^/]+)$`)
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
config, clients, err := buildConfig(c)
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
// Register api services for all delegate servers
apiRegistrationClient, err := apiregistrationclient.NewForConfig(clients.KubeConfig)
if err != nil {
return fmt.Errorf("could not initialize aggregator client: %w", err)
}
registrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
for _, resourcePath := range apiServer.GenericAPIServer.ListedPaths() {
match := pathRegex.FindStringSubmatch(resourcePath)
fmt.Println(resourcePath, match)
if match == nil {
continue
registrationController.AddAPIServiceToSyncOnStart(&v1.APIService{
ObjectMeta: meta.ObjectMeta{Name: match[2] + "." + match[1]},
Spec: v1.APIServiceSpec{
Group: match[1],
Version: match[2],
GroupPriorityMinimum: 1, // default value, TODO might be an issue
VersionPriority: 1,
},
})
go registrationController.Run(1, ctx.Done())
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Disable unneeded post start hooks, these cannot be easily disabled except at runtime, which still takes up
// space in the binary sadly
config.DisabledPostStartHooks = sets.NewString(
"storage-object-count-tracker-hook", // unused
"start-legacy-token-tracking-controller", // legacy and unused
"rbac/bootstrap-roles", // TODO replaced with our own simpler RBAC
)
// TODO listen on management interface only, this is temporary so that the api is reachable on
// localhost
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", "::", apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
config.PublicAddress = c.networking.NodeAddress.IP
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
return
}
// Customize the generic config then build an extensions server config
func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) {
generic := config
generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig)
// Build the extensions server (create then customize the configuration)
return &extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: generic,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
CRDRESTOptionsGetter: generic.RESTOptionsGetter,
},
}, nil
}
// Customize the generic config then build an apiserver config
func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
generic := config
generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
// Currently unused framework for enabling additional resources
generic.MergedResourceConfig.EnableMatchingVersions(func(gv schema.GroupVersion) bool {
return false
})
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig)
generic.RESTOptionsGetter = restOptionsGetter
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
generic.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
generic.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
schemaResolver := resolver.NewDefinitionsSchemaResolver(scheme.Scheme, generic.OpenAPIConfig.GetDefinitions)
initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil, schemaResolver) // TODO: handle post start hook
return nil, fmt.Errorf("could not get admission plugins: %w", err)
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, generic.Authorization.Authorizer, feature.DefaultFeatureGate, generic.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
generic.AdmissionControl = admissionChain
// Setup the token signer
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return nil, fmt.Errorf("could not initilize service account signer: %w", err)
}
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768},
EndpointReconcilerType: reconcilers.LeaseEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
APIResourceConfigSource: generic.MergedResourceConfig,
StorageFactory: restOptionsGetter.StorageFactory,
ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{
ClientCA: config.SecureServing.ClientCA,
},
// Customize the generic config then build an aggregator config
func buildAggregatorConfig(config server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
generic := config
generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig)
return &aggregator.Config{
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: clients.ServiceResolver(),
},