Newer
Older
package services
import (
"context"
"fmt"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
saplugin "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/resize"
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
Dependencies: []*Unit{etcd, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
config, clients, err := buildConfig(c)
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(*config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
// Start registration services, which auto register all resources and all
// crd to the aggregator server
apiRegistrationClient, err := apiregistrationclient.NewForConfig(clients.KubeConfig)
if err != nil {
return fmt.Errorf("could not initialize aggregator client: %w", err)
}
registrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
crdController := crdregistration.NewCRDRegistrationController(extensionServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), registrationController)
// Start by discovery resource paths from apiserver and registering them manually
// TODO We completely ignore priorities, wchih might become an issue at some point
pathRegex := regexp.MustCompile(`^/apis/([^/]+)/([^/]+)$`)
for _, resourcePath := range apiServer.GenericAPIServer.ListedPaths() {
if match := pathRegex.FindStringSubmatch(resourcePath); match != nil {
registrationController.AddAPIServiceToSyncOnStart(&v1.APIService{
ObjectMeta: meta.ObjectMeta{Name: match[2] + "." + match[1]},
Spec: v1.APIServiceSpec{
Group: match[1],
Version: match[2],
GroupPriorityMinimum: 1,
VersionPriority: 1,
},
})
// Wait for CRD to be ready then start both controllers in a post-start-hook
err = aggregatorServer.GenericAPIServer.AddPostStartHook("autoregistration", func(context server.PostStartHookContext) error {
go crdController.Run(1, ctx.Done())
crdController.WaitForInitialSync()
go registrationController.Run(1, ctx.Done())
return nil
})
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
status := 0
result := clients.Client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&status)
if result.Error() != nil || status != 200 {
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Setup listener
// TODO listen on management interface only, this is temporary so that the api is reachable on
// localhost
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", "::", apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
config.PublicAddress = c.networking.NodeAddress.IP
// Setup loopback clients (no authorization at this point, handled later)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
config.LoopbackClientConfig = clients.KubeConfig
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Setup the resource discovery manager
config.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
server.AuthorizeClientBearerToken(clients.KubeConfig, &config.Authentication, &config.Authorization)
return
}
// Customize the generic config then build an extensions server config
func buildExtensionsConfig(config server.Config, clients *k8s.Clients) (*extensions.Config, error) {
generic := config
generic.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, generic.MergedResourceConfig)
// Build the extensions server (create then customize the configuration)
return &extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: generic,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{
},
}, nil
}
// Customize the generic config then build an apiserver config
func buildApiConfig(c *Cluster, config server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
generic := config
generic.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
// Currently unused framework for enabling additional resources
generic.MergedResourceConfig.EnableMatchingVersions(func(gv schema.GroupVersion) bool {
return false
})
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, generic.MergedResourceConfig)
generic.RESTOptionsGetter = restOptionsGetter
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
generic.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
generic.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
schemaResolver := resolver.NewDefinitionsSchemaResolver(scheme.Scheme, generic.OpenAPIConfig.GetDefinitions)
initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil, schemaResolver) // TODO: handle post start hook
return nil, fmt.Errorf("could not get admission plugins: %w", err)
// Some default plugins are not enabled here:
// limitranger, since we do not support LimitRange
// setdefault, since we do not support default storage class
// defaulttolrationseconds, as we do not use this feature
// storageobjectinuseprotection, as we do not use this feature
// podpriority, as we do not use this feature
// runtimeclass, as we do not use this feature
// defaultingressclass, as we do not use this feature
lifecycle.Register(plugins)
mutating.Register(plugins)
validatingadmissionpolicy.Register(plugins)
validating.Register(plugins)
resize.Register(plugins)
resourcequota.Register(plugins)
nodetaint.Register(plugins)
podsecurity.Register(plugins)
saplugin.Register(plugins)
pluginsNames := []string{
lifecycle.PluginName,
mutating.PluginName,
validatingadmissionpolicy.PluginName,
validating.PluginName,
resize.PluginName,
resourcequota.PluginName,
nodetaint.PluginName,
podsecurity.PluginName,
saplugin.PluginName,
}
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, generic.Authorization.Authorizer, feature.DefaultFeatureGate, generic.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
generic.AdmissionControl = admissionChain
// Setup the token signer
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return nil, fmt.Errorf("could not initilize service account signer: %w", err)
}
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: utilnet.PortRange{Base: 30000, Size: 2768},
EndpointReconcilerType: reconcilers.LeaseEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
APIResourceConfigSource: generic.MergedResourceConfig,
StorageFactory: restOptionsGetter.StorageFactory,
ClusterAuthenticationInfo: clusterauthenticationtrust.ClusterAuthenticationInfo{
ClientCA: config.SecureServing.ClientCA,
},
// Customize the generic config then build an aggregator config
func buildAggregatorConfig(config server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
generic := config
generic.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
generic.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, generic.MergedResourceConfig)
return &aggregator.Config{
SharedInformerFactory: clients.Informer,
},
ExtraConfig: aggregator.ExtraConfig{
ServiceResolver: clients.ServiceResolver(),
},