Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authentication/request/headerrequest"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
Dependencies: []*Unit{etcd, pkiMaster, vpn, memberlist, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
config, clients, err := buildConfig(c)
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
aggregatorConfig, _ := buildAggregatorConfig(c, *config, clients)
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
// Start registration services, which auto register all resources and all
// crd to the aggregator server
apiRegistrationClient, err := apiregistrationclient.NewForConfig(clients.KubeConfig)
if err != nil {
return fmt.Errorf("could not initialize aggregator client: %w", err)
}
registrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
crdController := crdregistration.NewCRDRegistrationController(extensionServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), registrationController)
// Start by discovery resource paths from apiserver and registering them manually
// TODO We completely ignore priorities, wchih might become an issue at some point
pathRegex := regexp.MustCompile(`^/apis/([^/]+)/([^/]+)$`)
for _, resourcePath := range apiServer.GenericAPIServer.ListedPaths() {
if match := pathRegex.FindStringSubmatch(resourcePath); match != nil {
registrationController.AddAPIServiceToSyncOnStart(&v1.APIService{
ObjectMeta: meta.ObjectMeta{Name: match[2] + "." + match[1]},
Spec: v1.APIServiceSpec{
Group: match[1],
Version: match[2],
GroupPriorityMinimum: 1,
VersionPriority: 1,
},
})
// Wait for CRD to be ready then start both controllers in a post-start-hook
err = aggregatorServer.GenericAPIServer.AddPostStartHook("autoregistration", func(context server.PostStartHookContext) error {
go crdController.Run(1, ctx.Done())
crdController.WaitForInitialSync()
go registrationController.Run(1, ctx.Done())
return nil
})
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
status := 0
result := clients.Client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&status)
if result.Error() != nil || status != 200 {
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.Addr().String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
proxyCA, err := dynamiccertificates.NewDynamicCAContentFromFile("proxy-ca", c.pki.Proxy.CertPath())
if err != nil {
err = fmt.Errorf("could not get proxy CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
config.PublicAddress = c.networking.NodeAddress.Addr().AsSlice()
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
// This is currently not strictly required, since we do not use proxified
// requests to apiservers themselves, though we might at some point. Private
// key is only delivered to apiserver itself, so little harm is done
RequestHeaderConfig: &authenticatorfactory.RequestHeaderConfig{
UsernameHeaders: headerrequest.StaticStringSlice([]string{"X-Remote-User"}),
GroupHeaders: headerrequest.StaticStringSlice([]string{"X-Remote-Group"}),
ExtraHeaderPrefixes: headerrequest.StaticStringSlice([]string{"X-Remote-Extra"}),
CAContentProvider: proxyCA,
},
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Setup the resource discovery manager
config.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
return
}
// Customize the generic config then build an extensions server config
func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) {
generic := config
generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig)
// Build the extensions server (create then customize the configuration)
return &extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: generic,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{
},
}, nil
}
// Customize the generic config then build an apiserver config
func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
generic := config
generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
// Currently unused framework for enabling additional resources
generic.MergedResourceConfig.EnableMatchingVersions(func(gv schema.GroupVersion) bool {
return false
})
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig)
generic.RESTOptionsGetter = restOptionsGetter
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
generic.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
generic.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
schemaResolver := resolver.NewDefinitionsSchemaResolver(scheme.Scheme, generic.OpenAPIConfig.GetDefinitions)
initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil, schemaResolver) // TODO: handle post start hook
return nil, fmt.Errorf("could not get admission plugins: %w", err)
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, generic.Authorization.Authorizer, feature.DefaultFeatureGate, generic.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
generic.AdmissionControl = admissionChain
// Setup the token signer
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return nil, fmt.Errorf("could not initilize service account signer: %w", err)
}
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
TLSClientConfig: client.KubeletTLSConfig{
CertFile: c.masterCerts.Kubelet.CertPath(),
KeyFile: c.masterCerts.Kubelet.KeyPath(),
CAFile: c.pki.TLS.CertPath(),
},
ServiceIPRange: net.IPNet{
IP: c.networking.ServiceNet.Addr().AsSlice(),
Mask: net.CIDRMask(c.networking.ServiceNet.Bits(), c.networking.ServiceNet.Addr().BitLen()),
},
APIServerServiceIP: c.networking.APIAddress.AsSlice(),
APIServerServicePort: 443,
ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768},
EndpointReconcilerType: reconcilers.LeaseEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
APIResourceConfigSource: generic.MergedResourceConfig,
StorageFactory: restOptionsGetter.StorageFactory,
ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{
// This is duplicated information from the authentication layer, so that
// the start-cluster-authentication-info-controller controller properly
// populates the extension-apiserver-authentication ConfigMap with
// authentication info
ClientCA: config.SecureServing.ClientCA,
RequestHeaderCA: config.Authentication.RequestHeaderConfig.CAContentProvider,
RequestHeaderUsernameHeaders: config.Authentication.RequestHeaderConfig.ExtraHeaderPrefixes,
RequestHeaderGroupHeaders: config.Authentication.RequestHeaderConfig.GroupHeaders,
RequestHeaderExtraHeaderPrefixes: config.Authentication.RequestHeaderConfig.ExtraHeaderPrefixes,
// Customize the generic config then build an aggregator config
func buildAggregatorConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
generic := config
generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig)
return &aggregator.Config{
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: clients.ServiceResolver(),
// This is for the aggregation layer to authenticate proxified
// requests to webhooks and other aggregated services using a dedicated
// certificate and certificate authority
ProxyClientCertFile: c.masterCerts.Proxy.CertPath(),
ProxyClientKeyFile: c.masterCerts.Proxy.KeyPath(),