Skip to content
Snippets Groups Projects
Commit d7c1bff7 authored by kaiyou's avatar kaiyou
Browse files

Remove dependency on cmd/apiserver

parent 37f43e2e
No related branches found
No related tags found
No related merge requests found
Pipeline #25221 passed
......@@ -3,66 +3,328 @@ package services
import (
"context"
"fmt"
"net"
"net/url"
"os"
"time"
"go.acides.org/hepto/utils"
"github.com/google/uuid"
// extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cluster/ports"
controllersa "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/kubeapiserver/options"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/serviceaccount"
)
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// Reimplement the rest options factory to stop depending on opiont parsers
type RestOptionsFactory struct {
StorageFactory storage.StorageFactory
StorageObjectCountTracker request.StorageObjectCountTracker
}
func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: time.Minute,
StorageObjectCountTracker: f.StorageObjectCountTracker,
}, nil
}
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
config.Version = &ver
config.Serializer = legacyscheme.Codecs
config.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// Setup listener
listener, err := net.Listen("tcp6", fmt.Sprintf("[%s]:%d", c.networking.NodeAddress.IP.String(), apiserverPort))
if err != nil {
err = fmt.Errorf("could not initialize listener: %w", err)
return
}
cert, err := dynamiccertificates.NewDynamicServingContentFromFiles("serving-cert", c.masterCerts.TLS.CertPath(), c.masterCerts.TLS.KeyPath())
if err != nil {
err = fmt.Errorf("could not get certificate: %w", err)
return
}
clientCA, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca", c.pki.API.CertPath())
if err != nil {
err = fmt.Errorf("could not get api CA file: %w", err)
return
}
config.SecureServing = &server.SecureServingInfo{
Listener: listener,
Cert: cert,
ClientCA: clientCA, // not setup upstream, might be an issue
}
// Setup loopback clients (no authorization at this point, handled later)
loopback, err := config.SecureServing.NewLoopbackClientConfig(uuid.NewString(), nil)
loopback.TLSClientConfig.CAFile = c.pki.TLS.CertPath()
if err != nil {
err = fmt.Errorf("could not setup loopback config: %w", err)
return
}
config.LoopbackClientConfig = loopback
clients, err = newClientsForKC(config.LoopbackClientConfig, "apiserver")
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
}
// Setup authentication
authConfig := authenticator.Config{
ServiceAccountKeyFiles: []string{c.masterCerts.APITokens.KeyPath()},
ServiceAccountIssuers: []string{audience},
APIAudiences: []string{audience},
ClientCAContentProvider: clientCA,
ServiceAccountTokenGetter: controllersa.NewGetterFromClient(
clients.Client,
clients.Informer.Core().V1().Secrets().Lister(),
clients.Informer.Core().V1().ServiceAccounts().Lister(),
clients.Informer.Core().V1().Pods().Lister(),
),
SecretsWriter: clients.Client.CoreV1(),
}
auth, _, err := authConfig.New()
if err != nil {
err = fmt.Errorf("could not setup apiserver authentication: %w", err)
return
}
config.Authentication = server.AuthenticationInfo{
APIAudiences: []string{audience},
Authenticator: auth,
}
// Setup authorizations
authzConfig := authorizer.Config{
AuthorizationModes: []string{modes.ModeNode, modes.ModeRBAC},
VersionedInformerFactory: clients.Informer,
}
authz, resolver, err := authzConfig.New()
if err != nil {
err = fmt.Errorf("could not seutp apiserver authorizations: %w", err)
return
}
config.RuleResolver = resolver
config.Authorization = server.AuthorizationInfo{
Authorizer: authz,
}
// Finally authorize loopback clients
server.AuthorizeClientBearerToken(loopback, &config.Authentication, &config.Authorization)
// Setup service resolver
localHost, _ := url.Parse(config.LoopbackClientConfig.Host)
serviceResolver := apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
clients.Informer.Core().V1().Services().Lister(),
clients.Informer.Core().V1().Endpoints().Lister(),
), localHost)
// Setup admission controllers
admissionConfig := kubeadmission.Config{
ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig,
}
initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err)
return
}
plugins := admission.NewPlugins()
pluginsNames := []string{lifecycle.PluginName, mutating.PluginName, validatingadmissionpolicy.PluginName, validating.PluginName}
server.RegisterAllAdmissionPlugins(plugins)
pluginsConfig, _ := admission.ReadAdmissionConfiguration(pluginsNames, "", nil) // Never fails without config file
genericInitializer := initializer.New(clients.Client, clients.DynClient, clients.Informer, config.Authorization.Authorizer, feature.DefaultFeatureGate, config.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, initializers...)
admissionChain, err := plugins.NewFromPlugins(pluginsNames, pluginsConfig, initializersChain, admission.Decorators{})
config.AdmissionControl = admissionChain
return
}
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
args := []string{
"--bind-address", c.networking.NodeAddress.IP.String(),
"--service-cluster-ip-range", c.networking.ServiceNet.String(),
"--tls-cert-file", c.masterCerts.TLS.CertPath(),
"--tls-private-key-file", c.masterCerts.TLS.KeyPath(),
"--client-ca-file", c.pki.API.CertPath(),
"--kubelet-certificate-authority", c.pki.TLS.CertPath(),
"--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(),
"--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(),
"--etcd-servers", "http://[::1]:2379",
"--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-issuer", "https://kubernetes.default.svc.cluster.local",
"--api-audiences", "https://kubernetes.default.svc.cluster.local",
"--authorization-mode", "Node,RBAC",
"--allow-privileged", "true",
}
config := options.NewServerRunOptions()
nfs := config.Flags()
flags := flagsFromNamedFlagSet("apiserver", &nfs)
err := flags.Parse(args)
config, clients, err := buildConfig(c)
if err != nil {
return err
}
completedOptions, err := app.Complete(config)
// Allow privileged pods
capabilities.Setup(true, 0)
// Configure etcd backend
etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
// Tweak the configuration and build the extension server
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config,
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
//*storagebackend.NewDefaultConfig("/registry", extensions.Codecs.LegacyCodec(extensionsv1.SchemeGroupVersion)),
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(extensions.Scheme),
extensionsConfig.GenericConfig.MergedResourceConfig,
kubeapiserver.SpecialDefaultResourcePrefixes,
),
}
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return err
return fmt.Errorf("could not seutp the extension server: %w", err)
}
server, err := app.CreateServerChain(completedOptions)
// Setup the apiserver itself
storageFactory := storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
controlplane.DefaultAPIResourceConfigSource(),
kubeapiserver.SpecialDefaultResourcePrefixes,
)
signer, err := serviceaccount.JWTTokenGenerator("https://kubernetes.default.svc.cluster.local", c.masterCerts.APITokens.Key)
if err != nil {
return err
return fmt.Errorf("could not initilize service account signer: %w", err)
}
apiConfig := controlplane.Config{
GenericConfig: config,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: options.DefaultServiceNodePortRange,
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: "https://kubernetes.default.svc.cluster.local",
ServiceAccountPublicKeys: nil, // TODO
VersionedInformers: clients.Informer,
},
}
prepared, err := server.PrepareRun()
apiConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{StorageFactory: storageFactory}
getOpenapi := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(getOpenapi, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(getOpenapi, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return err
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// TODO: no aggregation layer is setup
server := apiServer.GenericAPIServer.PrepareRun()
// Write a loopback config (different for every start)
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: config.LoopbackClientConfig.Host,
CertificateAuthority: config.LoopbackClientConfig.TLSClientConfig.CAFile,
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: config.LoopbackClientConfig.BearerToken,
}},
}
rootConfig := KubeConfig{
URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
CACert: c.pki.TLS.CertPath(),
ClientKey: c.masterCerts.RootClient.KeyPath(),
ClientCert: c.masterCerts.RootClient.CertPath(),
err = os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
}
err = utils.WriteConfig(rootConfig, "/root/.kube/config")
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return err
return fmt.Errorf("could not write privileged kubeconfig: %w", err)
}
return prepared.Run(ctx.Done())
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
......
......@@ -108,7 +108,7 @@ var pkiMaster = &Unit{
// TLS certificate
tlsCert, err := bundle.GetCertOrCSR("tls",
pekahi.NewServerTemplate(
[]string{"kube-apiserver", "kubernetes.default"},
[]string{"kube-apiserver", "kubernetes.default", "apiserver-loopback-client"},
[]net.IP{c.networking.NodeAddress.IP, c.networking.APIAddress},
),
)
......
......@@ -16,7 +16,6 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler"
)
var kubeLogger = &Unit{
......@@ -30,6 +29,7 @@ var kubeLogger = &Unit{
type Clients struct {
KubeConfig *rest.Config
Client *kubernetes.Clientset
DynClient *dynamic.DynamicClient
EventClient *kubernetes.Clientset
Broadcaster events.EventBroadcasterAdapter
Informer informers.SharedInformerFactory
......@@ -38,13 +38,17 @@ type Clients struct {
func newClients(c *Cluster, ua string, masterIP net.IP, cert *pekahi.Certificate) (*Clients, error) {
kc := &rest.Config{
Host: fmt.Sprintf("https://[%s]:6443", masterIP.String()),
Host: fmt.Sprintf("https://[%s]:%d", masterIP.String(), apiserverPort),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
CertFile: cert.CertPath(),
KeyFile: cert.KeyPath(),
},
}
return newClientsForKC(kc, ua)
}
func newClientsForKC(kc *rest.Config, ua string) (*Clients, error) {
client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, ua))
if err != nil {
return nil, err
......@@ -53,15 +57,17 @@ func newClients(c *Cluster, ua string, masterIP net.IP, cert *pekahi.Certificate
if err != nil {
return nil, err
}
dynClient, err := dynamic.NewForConfig(kc)
if err != nil {
return nil, err
}
broadcaster := events.NewEventBroadcasterAdapter(eventsClient)
informers := scheduler.NewInformerFactory(client, 0)
dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamic.NewForConfigOrDie(kc), 0, core.NamespaceAll, nil,
)
informers := informers.NewSharedInformerFactory(client, 0)
dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil)
return &Clients{
KubeConfig: kc,
Client: client,
DynClient: dynClient,
EventClient: eventsClient,
Broadcaster: broadcaster,
Informer: informers,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment