Skip to content
Snippets Groups Projects
Commit 29b0a28f authored by kaiyou's avatar kaiyou
Browse files

Start refactoring the k8s code using a utility package

parent f5ba9b99
No related branches found
No related tags found
No related merge requests found
package k8s
import (
"context"
"go.acides.org/pekahi"
core "k8s.io/api/core/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/events"
)
type Clients struct {
KubeConfig *rest.Config
Client *kubernetes.Clientset
DynClient *dynamic.DynamicClient
EventClient *kubernetes.Clientset
MetadataClient metadata.Interface
CachedClient discovery.CachedDiscoveryInterface
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
Broadcaster events.EventBroadcasterAdapter
Informer informers.SharedInformerFactory
DynInformer dynamicinformer.DynamicSharedInformerFactory
}
// Make clients authenticating with a ceritficate (typically for kubelets)
func NewCertsClient(host string, ca *pekahi.Certificate, cert *pekahi.Certificate) (*Clients, error) {
kc := newKC(host, ca)
kc.TLSClientConfig.CertFile = cert.CertPath()
kc.TLSClientConfig.KeyFile = cert.KeyPath()
return newClientsForKC(kc)
}
// Make clients from the master context itself
func NewTokenClients(host string, ca *pekahi.Certificate, token string) (*Clients, error) {
kc := newKC(host, ca)
kc.BearerToken = token
return newClientsForKC(kc)
}
// Make a k8s client config for connecting to the master
func newKC(host string, ca *pekahi.Certificate) *rest.Config {
return &rest.Config{
Host: host,
TLSClientConfig: rest.TLSClientConfig{
CAFile: ca.CertPath(),
},
}
}
func newClientsForKC(kc *rest.Config) (*Clients, error) {
baseClient, err := kubernetes.NewForConfig(kc)
if err != nil {
return nil, err
}
eventsClient, err := kubernetes.NewForConfig(kc)
if err != nil {
return nil, err
}
dynClient, err := dynamic.NewForConfig(kc)
if err != nil {
return nil, err
}
metadataClient, err := metadata.NewForConfig(kc)
if err != nil {
return nil, err
}
cachedClient := memory.NewMemCacheClient(baseClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
broadcaster := events.NewEventBroadcasterAdapter(eventsClient)
informers := informers.NewSharedInformerFactory(baseClient, 0)
dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil)
return &Clients{
KubeConfig: kc,
Client: baseClient,
DynClient: dynClient,
EventClient: eventsClient,
MetadataClient: metadataClient,
RESTMapper: restMapper,
Broadcaster: broadcaster,
Informer: informers,
DynInformer: dynInformers,
}, nil
}
func (c *Clients) Start(ctx context.Context) {
c.Broadcaster.StartRecordingToSink(ctx.Done())
c.Informer.Start(ctx.Done())
c.DynInformer.Start(ctx.Done())
c.Informer.WaitForCacheSync(ctx.Done())
c.DynInformer.WaitForCacheSync(ctx.Done())
}
func (c *Clients) Stop() {
c.Broadcaster.Shutdown()
}
package k8s
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/flowcontrol/request"
)
// Reimplement the rest options factory to stop depending on option parsers, this instantiate the
// storage factory at the right location and prefix for a given resource
type RestOptionsFactory struct {
StorageFactory storage.StorageFactory
StorageObjectCountTracker request.StorageObjectCountTracker
}
func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: time.Minute,
StorageObjectCountTracker: f.StorageObjectCountTracker,
}, nil
}
func PrepareStorage(codecs runtime.StorageSerializer, scheme *runtime.Scheme, resources storage.APIResourceConfigSource) *RestOptionsFactory {
// Etcd backend supports being created with a specific codec, however later created storage
// factories override that codec at runtime thanks to using DefaultStorageFactory instead
// of vanilla SimpleStorageFactory, so we create the backend with no pre-configured codec
etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}
// The rest options getter abstracts all storage for the server, by specifying the scheme and codecs
// on top of the storage backend. Vanilla code sets a special multi-group versionner to avoid issues
// with large objects (cee cmd/kube-apiserver/app/apiextensions.go), which we ignore here
return &RestOptionsFactory{
// TODO: be careful about the default factory using the resourcename as prefix, which might
// cause collisions in the future
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig, // copied as value, so the factory may alter some fields
runtime.ContentTypeJSON, // media type, somewhat inefficient but good enough
codecs,
storage.NewDefaultResourceEncodingConfig(scheme),
resources,
// We always use an empty list of prefix overrides: no prefix override, contrary to
// vanilla code which has many overrides for backward compatibility against a given storage target
// This means hepto is not compatible with vanilla when using the same etcd backend, and we might
// want to be careful when upgrading to avoid unexpected prefix changes
make(map[schema.GroupResource]string),
),
}
}
......@@ -8,10 +8,9 @@ import (
"os"
"time"
"go.acides.org/hepto/k8s"
extensions "k8s.io/apiextensions-apiserver/pkg/apiserver"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
......@@ -22,14 +21,10 @@ import (
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/tools/clientcmd"
......@@ -46,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
......@@ -63,30 +57,8 @@ import (
const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443
// Reimplement the rest options factory to stop depending on option parsers
type RestOptionsFactory struct {
StorageFactory storage.StorageFactory
StorageObjectCountTracker request.StorageObjectCountTracker
}
func (f *RestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: time.Minute,
StorageObjectCountTracker: f.StorageObjectCountTracker,
}, nil
}
// Make a service resolver for a given client config
func getResolver(clients *Clients) apiserver.ServiceResolver {
func getResolver(clients *k8s.Clients) apiserver.ServiceResolver {
host, _ := url.Parse(clients.KubeConfig.Host)
return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
clients.Informer.Core().V1().Services().Lister(),
......@@ -96,7 +68,7 @@ func getResolver(clients *Clients) apiserver.ServiceResolver {
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
// Be careful, config is returned as a pointer, so it must be explicitely shallow copied before tweaking
func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error) {
func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err error) {
// Initialize a basic configuration object
ver := version.Get()
config = server.NewConfig(legacyscheme.Codecs)
......@@ -141,7 +113,7 @@ func buildConfig(c *Cluster) (config *server.Config, clients *Clients, err error
}
// Setup loopback clients (no authorization at this point, handled later)
clients, err = newLoopbackClients(c)
clients, err = k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
err = fmt.Errorf("could not setup loopback clients: %w", err)
return
......@@ -252,54 +224,19 @@ var kubeApiserver = &Unit{
// Allow privileged pods
capabilities.Setup(true, 0)
// Etcd backend supports being created with a specific codec, however later created storage
// factories override that codec at runtime thanks to using DefaultStorageFactory instead
// of vanilla SimpleStorageFactory, so we create the backend with no pre-configured codec
// and use it for all factories
// TODO: be more ware about resource prefixes in etcd and provide a sane, generic working implentation,
// not taking backward compatibility with vanilla into account
etcdConfig := storagebackend.NewDefaultConfig("/registry", nil)
etcdConfig.Transport.ServerList = []string{"http://[::1]:2379"}
// Servers are created in reverse orders for proper delegation
// First notFound is the default handler, that basically returns a 404,
// then the extensions serves CRD for unknown kinds, finally
// the default apiserver is first in the chain and serves all default kinds
// Note that no API aggregation is setup, which means that k8s API aggregation
// is not supported by hepto.
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
// Tweak the configuration and build the extension server
// The generic config is an instance of RecommendedConfig, which is composed of Config, by
// first copying it we can later change some fields per instance
// Build the extensions server (create then customize the configuration)
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config, // This is the common config being copied
Config: *config, // Copy by value to later change fields
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
// Then we customize some specific fields
// The resource config is used as-is with no api enablement specification, meaning ga and beta
// are enabled
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
// The rest options getter abstracts all storage for the server, by specifying the scheme and codecs
// on top of the storage backend. Vanilla code sets a special multi-group versionner to avoid issues
// with large objects (cee cmd/kube-apiserver/app/apiextensions.go), which we ignore here
extensionsConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
// TODO: using DefaultStorageFactory might be an issue compared to SimpleStorageFactory from vanilla,
// since it does provide a different resource prefix than SimpleStorageFactory (resource name
// instead of resource group and name)
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig, // copied as value, so the factory may alter some fields
runtime.ContentTypeJSON, // media type, somewhat inefficient but good enough
extensions.Codecs, // serializer
storage.NewDefaultResourceEncodingConfig(extensions.Scheme),
extensionsConfig.GenericConfig.MergedResourceConfig, // api resources
map[schema.GroupResource]string{}, // no override for extensions
),
}
extensionsConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, extensionsConfig.GenericConfig.MergedResourceConfig)
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
......@@ -338,16 +275,8 @@ var kubeApiserver = &Unit{
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig
restOptionsGetter := &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
legacyscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
apiConfig.GenericConfig.MergedResourceConfig,
kubeapiserver.SpecialDefaultResourcePrefixes,
),
}
// TODO add prefix overrides
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, apiConfig.GenericConfig.MergedResourceConfig)
apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
......@@ -369,16 +298,7 @@ var kubeApiserver = &Unit{
},
}
aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
aggregatorConfig.GenericConfig.RESTOptionsGetter = &RestOptionsFactory{
StorageFactory: storage.NewDefaultStorageFactory(
*etcdConfig,
runtime.ContentTypeJSON,
aggregatorscheme.Codecs,
storage.NewDefaultResourceEncodingConfig(aggregatorscheme.Scheme),
aggregatorConfig.GenericConfig.MergedResourceConfig,
map[schema.GroupResource]string{},
),
}
aggregatorConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, aggregatorConfig.GenericConfig.MergedResourceConfig)
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
......@@ -391,7 +311,7 @@ var kubeApiserver = &Unit{
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := newLoopbackClients(c)
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
......@@ -403,15 +323,11 @@ var kubeApiserver = &Unit{
},
}
// Write a loopback config (different for every start)
var loopback = &Unit{
Name: "kube-loopback",
Dependencies: []*Unit{kubeApiserver},
Start: func(u *Unit, c *Cluster, ctx context.Context) error {
clients, err := newLoopbackClients(c)
if err != nil {
return fmt.Errorf("could not get loopback config: %w", err)
}
// Write a loopback config (different for every start)
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
......@@ -422,14 +338,14 @@ var loopback = &Unit{
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: clients.KubeConfig.Host,
CertificateAuthority: clients.KubeConfig.TLSClientConfig.CAFile,
Server: c.masterUrl,
CertificateAuthority: c.pki.TLS.CertPath(),
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: clients.KubeConfig.BearerToken,
Token: c.loopbackToken,
}},
}
err = os.MkdirAll("/root/.kube", 0755)
err := os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
}
......
......@@ -7,6 +7,7 @@ import (
"net"
"time"
"go.acides.org/hepto/k8s"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/quota/v1/generic"
......@@ -40,7 +41,7 @@ var kubeControllerManager = &Unit{
// Used as a replacement for InformersStarted in vanilla code
allReady := make(chan struct{})
clients, err := newLoopbackClients(c)
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return err
}
......
......@@ -2,6 +2,7 @@ package services
import (
"context"
"fmt"
"go.acides.org/sml"
)
......@@ -36,6 +37,7 @@ var memberlist = &Unit{
if node.Role == "master" || node.Role == "full" {
u.Manager.Logger.Info("found remote master", "name", node.Name)
c.masterNode = node
c.masterUrl = fmt.Sprintf("https://[%s]:%d", node.VpnIP.String(), apiserverPort)
u.Manager.Trigger()
}
}
......
......@@ -4,18 +4,6 @@ import (
"context"
"fmt"
"go.acides.org/pekahi"
core "k8s.io/api/core/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
)
......@@ -31,88 +19,3 @@ var kubeLogger = &Unit{
return nil
},
}
type Clients struct {
KubeConfig *rest.Config
Client *kubernetes.Clientset
DynClient *dynamic.DynamicClient
EventClient *kubernetes.Clientset
MetadataClient metadata.Interface
CachedClient discovery.CachedDiscoveryInterface
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
Broadcaster events.EventBroadcasterAdapter
Informer informers.SharedInformerFactory
DynInformer dynamicinformer.DynamicSharedInformerFactory
}
// Make a k8s client config for connecting to the master
func newKC(c *Cluster) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("https://[%s]:%d", c.masterNode.VpnIP.String(), apiserverPort),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
},
}
}
// Make clients authenticating with a ceritficate (typically for kubelets)
func newCertClients(c *Cluster, cert *pekahi.Certificate) (*Clients, error) {
kc := newKC(c)
kc.TLSClientConfig.CertFile = cert.CertPath()
kc.TLSClientConfig.KeyFile = cert.KeyPath()
return newClientsForKC(kc)
}
// Make clients from the master context itself
func newLoopbackClients(c *Cluster) (*Clients, error) {
kc := newKC(c)
kc.BearerToken = c.loopbackToken
return newClientsForKC(kc)
}
func newClientsForKC(kc *rest.Config) (*Clients, error) {
baseClient, err := kubernetes.NewForConfig(kc)
if err != nil {
return nil, err
}
eventsClient, err := kubernetes.NewForConfig(kc)
if err != nil {
return nil, err
}
dynClient, err := dynamic.NewForConfig(kc)
if err != nil {
return nil, err
}
metadataClient, err := metadata.NewForConfig(kc)
if err != nil {
return nil, err
}
cachedClient := memory.NewMemCacheClient(baseClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
broadcaster := events.NewEventBroadcasterAdapter(eventsClient)
informers := informers.NewSharedInformerFactory(baseClient, 0)
dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, core.NamespaceAll, nil)
return &Clients{
KubeConfig: kc,
Client: baseClient,
DynClient: dynClient,
EventClient: eventsClient,
MetadataClient: metadataClient,
RESTMapper: restMapper,
Broadcaster: broadcaster,
Informer: informers,
DynInformer: dynInformers,
}, nil
}
func (c *Clients) Start(ctx context.Context) {
c.Broadcaster.StartRecordingToSink(ctx.Done())
c.Informer.Start(ctx.Done())
c.DynInformer.Start(ctx.Done())
c.Informer.WaitForCacheSync(ctx.Done())
c.DynInformer.WaitForCacheSync(ctx.Done())
}
func (c *Clients) Stop() {
c.Broadcaster.Shutdown()
}
......@@ -7,6 +7,7 @@ import (
"path"
"time"
"go.acides.org/hepto/k8s"
"go.opentelemetry.io/otel/trace"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -44,7 +45,7 @@ var kubeKubelet = &Unit{
// (very difficult to check otherwise)
time.Sleep(10 * time.Second)
kubeletRoot := path.Join(c.settings.DataDir, "kubelet")
clients, err := newCertClients(c, c.certs.API)
clients, err := k8s.NewCertsClient(c.masterUrl, c.certs.TLS, c.certs.API)
if err != nil {
return fmt.Errorf("could not create clients: %w", err)
}
......
......@@ -63,6 +63,7 @@ type Cluster struct {
nodes []*HeptoMeta
state *HeptoState
masterNode *HeptoMeta
masterUrl string
certs *NodeCerts
masterCerts *MasterCerts
......
......@@ -3,6 +3,7 @@ package services
import (
"context"
"go.acides.org/hepto/k8s"
"k8s.io/kubernetes/pkg/scheduler"
)
......@@ -10,7 +11,7 @@ var kubeScheduler = &Unit{
Name: "kube-scheduler",
Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
clients, err := newLoopbackClients(c)
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment