Newer
Older
package services
import (
"context"
"fmt"
"github.com/google/uuid"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubeapiserver/options"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// Reimplement the rest options factory to stop depending on opiont parsers
type RestOptionsFactory struct {
StorageFactory storage.StorageFactory
StorageObjectCountTracker request.StorageObjectCountTracker
}
func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: time.Minute,
StorageObjectCountTracker: f.StorageObjectCountTracker,
}, nil
}
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
// Setup loopback clients (no authorization at this point, handled later)
loopback, err := config.SecureServing.NewLoopbackClientConfig(uuid.NewString(), nil)
loopback.TLSClientConfig.CAFile = c.pki.TLS.CertPath()
if err != nil {
err = fmt.Errorf("could not setup loopback config: %w", err)
return
}
config.LoopbackClientConfig = loopback
clients, err = newClientsForKC(config.LoopbackClientConfig)
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(loopback, &config.Authentication, &config.Authorization)
// Setup service resolver
localHost, _ := url.Parse(config.LoopbackClientConfig.Host)
serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
clients.Informer.Core().V1().Services().Lister(),
clients.Informer.Core().V1().Endpoints().Lister(),
), localHost)
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err)
return
}
plugins := admission.NewPlugins()
pluginsNames := []string{lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName}
server.RegisterAllAdmissionPlugins(plugins)
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
return
}
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
if err != nil {
return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Configure the etcd backend, this is used by both the default and CRD apiservers
etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}
// Servers are created in reverse orders for proper delegation
// First notFound is the default handler, that basically returns a 404,
// then the extensions serves CRD for unknown kinds, finally
// the default apiserver is first in the chain and serves all default kinds
// Note that no API aggregation is setup, which means that k8s API aggregation
// is not supported by hepto.
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
// Tweak the configuration and build the extension server
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(extensions.Scheme),
extensionsConfig.GenericConfig.MergedResourceConfig,
kubeapiserver.SpecialDefaultResourcePrefixes,
),
}
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
// Setup the apiserver itself
storageFactory := storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
controlplane.DefaultAPIResourceConfigSource(),
kubeapiserver.SpecialDefaultResourcePrefixes,
)
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return fmt.Errorf("could not initilize service account signer: %w", err)
}
apiConfig := controlplane.Config{
GenericConfig: config,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: options.DefaultServiceNodePortRange,
// This still triggers an error, stating that stale data was found and cleaned up,
// which is inevitable
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
},
apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory}
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// Write a loopback config (different for every start)
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: config.LoopbackClientConfig.Host,
CertificateAuthority: config.LoopbackClientConfig.TLSClientConfig.CAFile,
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: config.LoopbackClientConfig.BearerToken,
}},
err = os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)
// Finally start the apiserver
server := apiServer.GenericAPIServer.PrepareRun()
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
// Use the scheduler certificate for readiness test, which is more relevant than
// using the internal privileged API token
kc := &rest.Config{
Host: fmt.Sprintf("https://[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
CertFile: c.masterCerts.SchedulerAPI.CertPath(),
KeyFile: c.masterCerts.SchedulerAPI.KeyPath(),
},
}
client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler"))
if err != nil {
return false
}
_, err = client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}