Skip to content
Snippets Groups Projects
Commit 6b882e57 authored by kaiyou's avatar kaiyou
Browse files

Keep refactoring

parent 29b0a28f
No related branches found
No related tags found
No related merge requests found
Pipeline #27117 passed
...@@ -2,9 +2,11 @@ package k8s ...@@ -2,9 +2,11 @@ package k8s
import ( import (
"context" "context"
"net/url"
"go.acides.org/pekahi" "go.acides.org/pekahi"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
...@@ -15,6 +17,7 @@ import ( ...@@ -15,6 +17,7 @@ import (
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/kube-aggregator/pkg/apiserver"
) )
type Clients struct { type Clients struct {
...@@ -101,3 +104,14 @@ func (c *Clients) Start(ctx context.Context) { ...@@ -101,3 +104,14 @@ func (c *Clients) Start(ctx context.Context) {
func (c *Clients) Stop() { func (c *Clients) Stop() {
c.Broadcaster.Shutdown() c.Broadcaster.Shutdown()
} }
// Make a service resolver for a given client config
// Resolver first returns the apiserver endpoint for kubernetes.default, then
// tries to resolve endpoints
func (c *Clients) ServiceResolver() webhook.ServiceResolver {
host, _ := url.Parse(c.KubeConfig.Host)
return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver(
c.Informer.Core().V1().Services().Lister(),
c.Informer.Core().V1().Endpoints().Lister(),
), host)
}
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"net/url"
"os" "os"
"time" "time"
...@@ -30,7 +29,6 @@ import ( ...@@ -30,7 +29,6 @@ import (
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/version" "k8s.io/component-base/version"
"k8s.io/kube-aggregator/pkg/apiserver"
aggregator "k8s.io/kube-aggregator/pkg/apiserver" aggregator "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
...@@ -57,13 +55,87 @@ import ( ...@@ -57,13 +55,87 @@ import (
const audience = "https://kubernetes.default.svc.cluster.local" const audience = "https://kubernetes.default.svc.cluster.local"
const apiserverPort = 6443 const apiserverPort = 6443
// Make a service resolver for a given client config var kubeApiserver = &Unit{
func getResolver(clients *k8s.Clients) apiserver.ServiceResolver { Name: "kube-apiserver",
host, _ := url.Parse(clients.KubeConfig.Host) Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
return apiserver.NewLoopbackServiceResolver(apiserver.NewEndpointServiceResolver( Run: func(u *Unit, c *Cluster, ctx context.Context) error {
clients.Informer.Core().V1().Services().Lister(), config, clients, err := buildConfig(c)
clients.Informer.Core().V1().Endpoints().Lister(), if err != nil {
), host) return err
}
// Allow privileged pods
capabilities.Setup(true, 0)
// Servers are created in reverse orders for proper delegation
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey)
extensionsConfig, _ := buildExtensionsConfig(config, clients) // Currently no error case
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
apiConfig, err := buildApiConfig(c, config, clients)
if err != nil {
return fmt.Errorf("could not build apiserver config: %w", err)
}
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
aggregatorConfig, _ := buildAggregatorConfig(config, clients)
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize aggregator: %w", err)
}
// Finally start the apiserver
server := aggregatorServer.GenericAPIServer.PrepareRun()
go clients.Start(ctx)
return server.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken)
if err != nil {
return false
}
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
// Write a loopback config (different for every start)
var loopback = &Unit{
Name: "kube-loopback",
Dependencies: []*Unit{kubeApiserver},
Start: func(u *Unit, c *Cluster, ctx context.Context) error {
name := "loopback"
clientConfig := api.Config{
APIVersion: "v1",
Kind: "Config",
CurrentContext: name,
Contexts: map[string]*api.Context{name: {
Cluster: name,
AuthInfo: name,
}},
Clusters: map[string]*api.Cluster{name: {
Server: c.masterUrl,
CertificateAuthority: c.pki.TLS.CertPath(),
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: c.loopbackToken,
}},
}
err := os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
}
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)
}
return nil
},
} }
// Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.) // Build a generic apiserver config, that is used and tweaked for various instanciated servers (native, extensions, etc.)
...@@ -167,8 +239,7 @@ func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err e ...@@ -167,8 +239,7 @@ func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err e
ExternalInformers: clients.Informer, ExternalInformers: clients.Informer,
LoopbackClientConfig: config.LoopbackClientConfig, LoopbackClientConfig: config.LoopbackClientConfig,
} }
serviceResolver := getResolver(clients) initializers, _, err := admissionConfig.New(nil, nil, clients.ServiceResolver(), nil) // TODO: handle post start hook
initializers, _, err := admissionConfig.New(nil, nil, serviceResolver, nil) // TODO: handle post start hook
if err != nil { if err != nil {
err = fmt.Errorf("could not get admission plugins: %w", err) err = fmt.Errorf("could not get admission plugins: %w", err)
return return
...@@ -213,146 +284,73 @@ func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err e ...@@ -213,146 +284,73 @@ func buildConfig(c *Cluster) (config *server.Config, clients *k8s.Clients, err e
return return
} }
var kubeApiserver = &Unit{ func buildExtensionsConfig(config *server.Config, clients *k8s.Clients) (*extensions.Config, error) {
Name: "kube-apiserver", // Build the extensions server (create then customize the configuration)
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger}, extensionsConfig := extensions.Config{
Run: func(u *Unit, c *Cluster, ctx context.Context) error { GenericConfig: &server.RecommendedConfig{
config, clients, err := buildConfig(c) Config: *config, // Copy by value to later change fields
if err != nil { SharedInformerFactory: clients.Informer,
return err },
} ExtraConfig: extensions.ExtraConfig{},
// Allow privileged pods }
capabilities.Setup(true, 0) extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
extensionsConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, extensionsConfig.GenericConfig.MergedResourceConfig)
// Servers are created in reverse orders for proper delegation // TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
notFound := notfoundhandler.New(config.Serializer, apifilters.NoMuxAndDiscoveryIncompleteKey) extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
return &extensionsConfig, nil
// Build the extensions server (create then customize the configuration) }
extensionsConfig := extensions.Config{
GenericConfig: &server.RecommendedConfig{
Config: *config, // Copy by value to later change fields
SharedInformerFactory: clients.Informer,
},
ExtraConfig: extensions.ExtraConfig{},
}
extensionsConfig.GenericConfig.MergedResourceConfig = extensions.DefaultAPIResourceConfigSource()
extensionsConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(extensions.Codecs, extensions.Scheme, extensionsConfig.GenericConfig.MergedResourceConfig)
// TODO This might be an issue, since vanilla sets a special unstructured encoding for CRD
extensionsConfig.ExtraConfig.CRDRESTOptionsGetter = extensionsConfig.GenericConfig.RESTOptionsGetter
extensionServer, err := extensionsConfig.Complete().New(server.NewEmptyDelegateWithCustomHandler(notFound))
if err != nil {
return fmt.Errorf("could not seutp the extension server: %w", err)
}
// Setup the apiserver itself, same logics as the extensions server
signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
if err != nil {
return fmt.Errorf("could not initilize service account signer: %w", err)
}
apiConfig := controlplane.Config{
GenericConfig: &(*config), // copy by value, so we can modify fields
ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
},
ServiceIPRange: *c.networking.ServiceNet,
APIServerServiceIP: c.networking.APIAddress,
APIServerServicePort: 443,
ServiceNodePortRange: options.DefaultServiceNodePortRange,
// This still triggers an error, stating that stale data was found and cleaned up,
// which is inevitable
EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
ServiceAccountIssuer: signer,
ServiceAccountIssuerURL: audience,
ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
VersionedInformers: clients.Informer,
},
}
apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig
// TODO add prefix overrides
restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, apiConfig.GenericConfig.MergedResourceConfig)
apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory
openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiConfig.GenericConfig.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
apiServer, err := apiConfig.Complete().New(extensionServer.GenericAPIServer)
if err != nil {
return fmt.Errorf("could not initialize generic apiserver: %w", err)
}
// Setup an API aggregator func buildApiConfig(c *Cluster, config *server.Config, clients *k8s.Clients) (*controlplane.Config, error) {
aggregatorConfig := &aggregator.Config{ signer, err := serviceaccount.JWTTokenGenerator(audience, c.masterCerts.APITokens.Key)
GenericConfig: &server.RecommendedConfig{ if err != nil {
Config: *config, return nil, fmt.Errorf("could not initilize service account signer: %w", err)
SharedInformerFactory: clients.Informer, }
}, apiConfig := &controlplane.Config{
ExtraConfig: aggregator.ExtraConfig{ GenericConfig: &(*config), // copy by value, so we can modify fields
ServiceResolver: getResolver(clients), ExtraConfig: controlplane.ExtraConfig{
EventTTL: time.Hour,
KubeletClientConfig: client.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{string(core.NodeInternalIP), string(core.NodeExternalIP)},
}, },
} ServiceIPRange: *c.networking.ServiceNet,
aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource() APIServerServiceIP: c.networking.APIAddress,
aggregatorConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, aggregatorConfig.GenericConfig.MergedResourceConfig) APIServerServicePort: 443,
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(apiServer.GenericAPIServer) ServiceNodePortRange: options.DefaultServiceNodePortRange,
if err != nil { // This still triggers an error, stating that stale data was found and cleaned up,
return fmt.Errorf("could not initialize aggregator: %w", err) // which is inevitable
} EndpointReconcilerType: reconcilers.NoneEndpointReconcilerType,
MasterCount: 1,
// Finally start the apiserver ServiceAccountIssuer: signer,
server := aggregatorServer.GenericAPIServer.PrepareRun() ServiceAccountIssuerURL: audience,
go clients.Start(ctx) ServiceAccountPublicKeys: []interface{}{&c.masterCerts.APITokens.Key.PublicKey},
return server.Run(ctx.Done()) VersionedInformers: clients.Informer,
}, },
Ready: func(u *Unit, c *Cluster) bool { }
u.Logger.Info("checking if apiserver is ready") apiConfig.ExtraConfig.ClusterAuthenticationInfo.ClientCA = config.SecureServing.ClientCA
clients, err := k8s.NewTokenClients(c.masterUrl, c.pki.TLS, c.loopbackToken) apiConfig.GenericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if err != nil { apiConfig.ExtraConfig.APIResourceConfigSource = apiConfig.GenericConfig.MergedResourceConfig
return false restOptionsGetter := k8s.PrepareStorage(legacyscheme.Codecs, legacyscheme.Scheme, apiConfig.GenericConfig.MergedResourceConfig)
} apiConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
_, err = clients.Client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{}) apiConfig.ExtraConfig.StorageFactory = restOptionsGetter.StorageFactory
if err != nil { openapiFactory := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
return false apiConfig.GenericConfig.OpenAPIConfig = server.DefaultOpenAPIConfig(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
} apiConfig.GenericConfig.OpenAPIV3Config = server.DefaultOpenAPIV3Config(openapiFactory, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensions.Scheme))
return true return apiConfig, nil
},
} }
// Write a loopback config (different for every start) func buildAggregatorConfig(config *server.Config, clients *k8s.Clients) (*aggregator.Config, error) {
var loopback = &Unit{ aggregatorConfig := &aggregator.Config{
Name: "kube-loopback", GenericConfig: &server.RecommendedConfig{
Dependencies: []*Unit{kubeApiserver}, Config: *config,
Start: func(u *Unit, c *Cluster, ctx context.Context) error { SharedInformerFactory: clients.Informer,
name := "loopback" },
clientConfig := api.Config{ ExtraConfig: aggregator.ExtraConfig{
APIVersion: "v1", ServiceResolver: clients.ServiceResolver(),
Kind: "Config", },
CurrentContext: name, }
Contexts: map[string]*api.Context{name: { aggregatorConfig.GenericConfig.MergedResourceConfig = aggregator.DefaultAPIResourceConfigSource()
Cluster: name, aggregatorConfig.GenericConfig.RESTOptionsGetter = k8s.PrepareStorage(aggregatorscheme.Codecs, aggregatorscheme.Scheme, aggregatorConfig.GenericConfig.MergedResourceConfig)
AuthInfo: name, return aggregatorConfig, nil
}},
Clusters: map[string]*api.Cluster{name: {
Server: c.masterUrl,
CertificateAuthority: c.pki.TLS.CertPath(),
}},
AuthInfos: map[string]*api.AuthInfo{name: {
Token: c.loopbackToken,
}},
}
err := os.MkdirAll("/root/.kube", 0755)
if err != nil {
return fmt.Errorf("could not create kubeconfig dir: %w", err)
}
clientcmd.WriteToFile(clientConfig, "/root/.kube/config")
if err != nil {
return fmt.Errorf("could not write privileged kubeconfig: %w", err)
}
return nil
},
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment