Skip to content
Snippets Groups Projects
Commit 37f43e2e authored by kaiyou's avatar kaiyou
Browse files

Remove dependency on cmd/kubelet and split k8s code into multiple files

parent dfc0ae20
No related branches found
No related tags found
No related merge requests found
...@@ -107,10 +107,12 @@ var Run = &cobra.Command{ ...@@ -107,10 +107,12 @@ var Run = &cobra.Command{
return fmt.Errorf("could not install binary %s: %w", cmd.Use, err) return fmt.Errorf("could not install binary %s: %w", cmd.Use, err)
} }
} }
// /tmp is not created by selfcontain, but / is a tmpfs, so just mkdir // Temporary directories are not created by selfcontain, but / is a tmpfs, so just mkdir
err = os.Mkdir("/tmp", 0o777) for _, dir := range []string{"/tmp", "/run"} {
if err != nil { err = os.Mkdir(dir, 0o777)
return err if err != nil {
return err
}
} }
// This is very useful for debugging, especially in network isolated // This is very useful for debugging, especially in network isolated
// environments // environments
......
package services
import (
"context"
"fmt"
"go.acides.org/hepto/utils"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
var kubeApiserver = &Unit{
Name: "kube-apiserver",
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
args := []string{
"--bind-address", c.networking.NodeAddress.IP.String(),
"--service-cluster-ip-range", c.networking.ServiceNet.String(),
"--tls-cert-file", c.masterCerts.TLS.CertPath(),
"--tls-private-key-file", c.masterCerts.TLS.KeyPath(),
"--client-ca-file", c.pki.API.CertPath(),
"--kubelet-certificate-authority", c.pki.TLS.CertPath(),
"--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(),
"--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(),
"--etcd-servers", "http://[::1]:2379",
"--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-issuer", "https://kubernetes.default.svc.cluster.local",
"--api-audiences", "https://kubernetes.default.svc.cluster.local",
"--authorization-mode", "Node,RBAC",
"--allow-privileged", "true",
}
config := options.NewServerRunOptions()
nfs := config.Flags()
flags := flagsFromNamedFlagSet("apiserver", &nfs)
err := flags.Parse(args)
if err != nil {
return err
}
completedOptions, err := app.Complete(config)
if err != nil {
return err
}
server, err := app.CreateServerChain(completedOptions)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
rootConfig := KubeConfig{
URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
CACert: c.pki.TLS.CertPath(),
ClientKey: c.masterCerts.RootClient.KeyPath(),
ClientCert: c.masterCerts.RootClient.CertPath(),
}
err = utils.WriteConfig(rootConfig, "/root/.kube/config")
if err != nil {
return err
}
return prepared.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
kc := &rest.Config{
Host: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
CertFile: c.masterCerts.SchedulerAPI.CertPath(),
KeyFile: c.masterCerts.SchedulerAPI.KeyPath(),
},
}
client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler"))
if err != nil {
return false
}
_, err = client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
}
package services
import (
"context"
"fmt"
"go.acides.org/hepto/utils"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
)
var kubeControllerManager = &Unit{
Name: "kube-controller-manager",
Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
cmConfig := KubeConfig{
URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
CACert: c.pki.TLS.CertPath(),
ClientCert: c.masterCerts.ControllersAPI.CertPath(),
ClientKey: c.masterCerts.ControllersAPI.KeyPath(),
}
cmConfigPath := "/controller-manager.yaml"
err := utils.WriteConfig(cmConfig, cmConfigPath)
if err != nil {
return err
}
args := []string{
"--kubeconfig", cmConfigPath,
"--tls-cert-file", c.masterCerts.ControllersTLS.CertPath(),
"--tls-private-key-file", c.masterCerts.ControllersTLS.KeyPath(),
"--service-account-private-key-file", c.masterCerts.APITokens.KeyPath(),
"--root-ca-file", c.pki.TLS.CertPath(),
// This is better than no declared cloud provider, since it does disable
// unnecessary cloud controllers
"--cloud-provider", "external",
"--use-service-account-credentials",
}
s, err := options.NewKubeControllerManagerOptions()
if err != nil {
return err
}
nfs := s.Flags(app.KnownControllers(), app.ControllersDisabledByDefault.List())
flags := flagsFromNamedFlagSet("cm", &nfs)
err = flags.Parse(args)
if err != nil {
return err
}
rest.SetDefaultWarningHandler(rest.NoWarnings{})
config, err := s.Config(app.KnownControllers(), app.ControllersDisabledByDefault.List())
if err != nil {
return err
}
return app.Run(config.Complete(), ctx.Done())
},
}
...@@ -3,32 +3,20 @@ package services ...@@ -3,32 +3,20 @@ package services
import ( import (
"context" "context"
"fmt" "fmt"
"path" "net"
"time"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"go.acides.org/hepto/utils" "go.acides.org/pekahi"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apiserver "k8s.io/kubernetes/cmd/kube-apiserver/app"
apiserveropts "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
cm "k8s.io/kubernetes/cmd/kube-controller-manager/app"
cmopts "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kubelet "k8s.io/kubernetes/cmd/kubelet/app"
kubeletopts "k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletcv "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
kubeletcf "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/util/filesystem"
) )
var kubeLogger = &Unit{ var kubeLogger = &Unit{
...@@ -39,249 +27,59 @@ var kubeLogger = &Unit{ ...@@ -39,249 +27,59 @@ var kubeLogger = &Unit{
}, },
} }
var kubeApiserver = &Unit{ type Clients struct {
Name: "kube-apiserver", KubeConfig *rest.Config
Dependencies: []*Unit{etcd, pkiCA, pkiMaster, vpn, kubeLogger}, Client *kubernetes.Clientset
Run: func(u *Unit, c *Cluster, ctx context.Context) error { EventClient *kubernetes.Clientset
args := []string{ Broadcaster events.EventBroadcasterAdapter
"--bind-address", c.networking.NodeAddress.IP.String(), Informer informers.SharedInformerFactory
"--service-cluster-ip-range", c.networking.ServiceNet.String(), DynInformer dynamicinformer.DynamicSharedInformerFactory
"--tls-cert-file", c.masterCerts.TLS.CertPath(),
"--tls-private-key-file", c.masterCerts.TLS.KeyPath(),
"--client-ca-file", c.pki.API.CertPath(),
"--kubelet-certificate-authority", c.pki.TLS.CertPath(),
"--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(),
"--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(),
"--etcd-servers", "http://[::1]:2379",
"--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-key-file", c.masterCerts.APITokens.KeyPath(),
"--service-account-issuer", "https://kubernetes.default.svc.cluster.local",
"--api-audiences", "https://kubernetes.default.svc.cluster.local",
"--authorization-mode", "Node,RBAC",
"--allow-privileged", "true",
}
config := apiserveropts.NewServerRunOptions()
nfs := config.Flags()
flags := flagsFromNamedFlagSet("apiserver", &nfs)
err := flags.Parse(args)
if err != nil {
return err
}
completedOptions, err := apiserver.Complete(config)
if err != nil {
return err
}
server, err := apiserver.CreateServerChain(completedOptions)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
rootConfig := KubeConfig{
URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
CACert: c.pki.TLS.CertPath(),
ClientKey: c.masterCerts.RootClient.KeyPath(),
ClientCert: c.masterCerts.RootClient.CertPath(),
}
err = utils.WriteConfig(rootConfig, "/root/.kube/config")
if err != nil {
return err
}
return prepared.Run(ctx.Done())
},
Ready: func(u *Unit, c *Cluster) bool {
u.Logger.Info("checking if apiserver is ready")
kc := &rest.Config{
Host: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
CertFile: c.masterCerts.SchedulerAPI.CertPath(),
KeyFile: c.masterCerts.SchedulerAPI.KeyPath(),
},
}
client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler"))
if err != nil {
return false
}
_, err = client.CoreV1().Nodes().List(context.Background(), meta.ListOptions{})
if err != nil {
return false
}
return true
},
} }
var kubeControllerManager = &Unit{ func newClients(c *Cluster, ua string, masterIP net.IP, cert *pekahi.Certificate) (*Clients, error) {
Name: "kube-controller-manager", kc := &rest.Config{
Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger}, Host: fmt.Sprintf("https://[%s]:6443", masterIP.String()),
Run: func(u *Unit, c *Cluster, ctx context.Context) error { TLSClientConfig: rest.TLSClientConfig{
cmConfig := KubeConfig{ CAFile: c.pki.TLS.CertPath(),
URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), CertFile: cert.CertPath(),
CACert: c.pki.TLS.CertPath(), KeyFile: cert.KeyPath(),
ClientCert: c.masterCerts.ControllersAPI.CertPath(), },
ClientKey: c.masterCerts.ControllersAPI.KeyPath(), }
} client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, ua))
cmConfigPath := "/controller-manager.yaml" if err != nil {
err := utils.WriteConfig(cmConfig, cmConfigPath) return nil, err
if err != nil { }
return err eventsClient, err := kubernetes.NewForConfig(kc)
} if err != nil {
args := []string{ return nil, err
"--kubeconfig", cmConfigPath, }
"--tls-cert-file", c.masterCerts.ControllersTLS.CertPath(), broadcaster := events.NewEventBroadcasterAdapter(eventsClient)
"--tls-private-key-file", c.masterCerts.ControllersTLS.KeyPath(),
"--service-account-private-key-file", c.masterCerts.APITokens.KeyPath(), informers := scheduler.NewInformerFactory(client, 0)
"--root-ca-file", c.pki.TLS.CertPath(), dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
// This is better than no declared cloud provider, since it does disable dynamic.NewForConfigOrDie(kc), 0, core.NamespaceAll, nil,
// unnecessary cloud controllers )
"--cloud-provider", "external", return &Clients{
"--use-service-account-credentials", KubeConfig: kc,
} Client: client,
s, err := cmopts.NewKubeControllerManagerOptions() EventClient: eventsClient,
if err != nil { Broadcaster: broadcaster,
return err Informer: informers,
} DynInformer: dynInformers,
nfs := s.Flags(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) }, nil
flags := flagsFromNamedFlagSet("cm", &nfs)
err = flags.Parse(args)
if err != nil {
return err
}
restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
config, err := s.Config(cm.KnownControllers(), cm.ControllersDisabledByDefault.List())
if err != nil {
return err
}
return cm.Run(config.Complete(), ctx.Done())
},
} }
var kubeScheduler = &Unit{ func (c *Clients) Start(ctx context.Context) {
Name: "kube-scheduler", c.Broadcaster.StartRecordingToSink(ctx.Done())
Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger}, c.Informer.Start(ctx.Done())
Run: func(u *Unit, c *Cluster, ctx context.Context) error { c.DynInformer.Start(ctx.Done())
// Setup scheduler clients c.Informer.WaitForCacheSync(ctx.Done())
kc := &rest.Config{ c.DynInformer.WaitForCacheSync(ctx.Done())
Host: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()),
TLSClientConfig: rest.TLSClientConfig{
CAFile: c.pki.TLS.CertPath(),
CertFile: c.masterCerts.SchedulerAPI.CertPath(),
KeyFile: c.masterCerts.SchedulerAPI.KeyPath(),
},
}
client, err := kubernetes.NewForConfig(rest.AddUserAgent(kc, "scheduler"))
if err != nil {
return err
}
eventsClient, err := kubernetes.NewForConfig(kc)
if err != nil {
return err
}
u.Logger.Info("preparing scheduler informers")
// Setup scheduler dependencies
broadcaster := events.NewEventBroadcasterAdapter(eventsClient)
informers := scheduler.NewInformerFactory(client, 0)
dynInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamic.NewForConfigOrDie(kc), 0, core.NamespaceAll, nil,
)
// Start the scheduler itself
u.Logger.Info("informers are ready, preparing the scheduler")
sched, err := scheduler.New(client, informers, dynInformers, broadcaster.NewRecorder, ctx.Done(),
scheduler.WithKubeConfig(kc),
)
if err != nil {
return err
}
u.Logger.Info("starting the scheduler")
broadcaster.StartRecordingToSink(ctx.Done())
defer broadcaster.Shutdown()
informers.Start(ctx.Done())
dynInformers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
dynInformers.WaitForCacheSync(ctx.Done())
sched.Run(ctx)
return nil
},
} }
var kubeKubelet = &Unit{ func (c *Clients) Stop() {
Name: "kubelet", c.Broadcaster.Shutdown()
Dependencies: []*Unit{masterDiscovery, containerdGRPC, containerdTTRPC, pkiCA, pkiNode, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
time.Sleep(10 * time.Second)
dataPath := path.Join(c.settings.DataDir, "kubelet")
kubeletKubeConfig := KubeConfig{
URL: fmt.Sprintf("https://[%s]:6443", c.masterNode.VpnIP.String()),
CACert: c.pki.TLS.CertPath(),
ClientCert: c.certs.API.CertPath(),
ClientKey: c.certs.API.KeyPath(),
}
kubeletKubeConfigPath := "/kubelet-kubeconfig.yaml"
err := utils.WriteConfig(kubeletKubeConfig, kubeletKubeConfigPath)
if err != nil {
return err
}
kubeletConfig := KubeletConfig{
Data: dataPath,
CACert: c.pki.Kubelet.CertPath(),
TLSCert: c.certs.TLS.CertPath(),
TLSKey: c.certs.TLS.KeyPath(),
}
kubeletConfigPath := "/kubelet.yaml"
err = utils.WriteConfig(kubeletConfig, kubeletConfigPath)
if err != nil {
return err
}
args := []string{
"--root-dir", dataPath,
"--kubeconfig", kubeletKubeConfigPath,
"--config", kubeletConfigPath,
"--container-runtime-endpoint", "unix://" + ContainerdSocket,
}
// Parse then validate cli flags
flags := kubeletopts.NewKubeletFlags()
flagSet := pflag.NewFlagSet("kubelet", pflag.ContinueOnError)
flags.AddFlags(flagSet)
kubeletopts.AddGlobalFlags(flagSet)
err = flagSet.Parse(args)
if err != nil {
return err
}
err = kubeletopts.ValidateKubeletFlags(flags)
if err != nil {
return err
}
// Load and validate config file
loader, err := kubeletcf.NewFsLoader(&filesystem.DefaultFs{}, flags.KubeletConfigFile)
if err != nil {
return err
}
config, err := loader.Load()
if err != nil {
return err
}
err = kubeletcv.ValidateKubeletConfiguration(config, feature.DefaultFeatureGate)
if err != nil {
return err
}
// Prepare feature gates
err = feature.DefaultMutableFeatureGate.SetFromMap(config.FeatureGates)
if err != nil {
return err
}
// Build the kubelet server
server := &kubeletopts.KubeletServer{
KubeletFlags: *flags,
KubeletConfiguration: *config,
}
deps, err := kubelet.UnsecuredDependencies(server, feature.DefaultFeatureGate)
if err != nil {
return err
}
return kubelet.Run(ctx, server, deps, feature.DefaultFeatureGate)
},
} }
func flagsFromNamedFlagSet(name string, nfs *flag.NamedFlagSets) *pflag.FlagSet { func flagsFromNamedFlagSet(name string, nfs *flag.NamedFlagSets) *pflag.FlagSet {
...@@ -325,45 +123,3 @@ type KubeConfig struct { ...@@ -325,45 +123,3 @@ type KubeConfig struct {
func (KubeConfig) Template() string { func (KubeConfig) Template() string {
return kubeconfigTemplate return kubeconfigTemplate
} }
const kubeletConfigTemplate = `
kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1
authentication:
anonymous:
enabled: false
webhook:
enabled: true
x509:
clientCAFile: "{{.CACert}}"
authorization:
mode: Webhook
clusterDomain: "cluster.local"
imageMinimumGCAge: "120h"
resolvConf: "/etc/resolv.conf"
cgroupDriver: cgroupfs
makeIPTablesUtilChains: false
protectKernelDefaults: true
enableProfilingHandler: false
enableDebugFlagsHandler: false
runtimeRequestTimeout: "15m"
tlsCertFile: "{{.TLSCert}}"
tlsPrivateKeyFile: "{{.TLSKey}}"
syncFrequency: 1m
fileCheckFrequency: 1m
httpCheckFrequency: 1m
nodeStatusUpdateFrequency: 20s
nodeStatusReportFrequency: 5m
volumePluginDir: "{{.Data}}/volume-plugins"
`
type KubeletConfig struct {
Data string
CACert string
TLSCert string
TLSKey string
}
func (KubeletConfig) Template() string {
return kubeletConfigTemplate
}
package services
import (
"context"
"fmt"
"net"
"path"
"time"
"go.opentelemetry.io/otel/trace"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/pkg/kubelet"
apiconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/configmap"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/emptydir"
"k8s.io/kubernetes/pkg/volume/hostpath"
"k8s.io/kubernetes/pkg/volume/local"
"k8s.io/kubernetes/pkg/volume/projected"
"k8s.io/kubernetes/pkg/volume/secret"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/subpath"
"k8s.io/mount-utils"
)
var kubeKubelet = &Unit{
Name: "kubelet",
Dependencies: []*Unit{masterDiscovery, containerdGRPC, containerdTTRPC, pkiCA, pkiNode, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
// Sleep before starting, to make sure that containerd is actually ready
// (very difficult to check otherwise)
time.Sleep(10 * time.Second)
kubeletRoot := path.Join(c.settings.DataDir, "kubelet")
clients, err := newClients(c, "kubelet", c.masterNode.VpnIP, c.certs.API)
if err != nil {
return fmt.Errorf("could not create clients: %w", err)
}
// Initialize a default kubelet configuration based on configuration scheme
// This is quire complex for configuration setup, yet necessary due to the complex defaults that
// we do not want to copy and paste
scheme, _, err := scheme.NewSchemeAndCodecs()
if err != nil {
return fmt.Errorf("could not initialize scheme: %w", err)
}
versioned := &v1beta1.KubeletConfiguration{}
scheme.Default(versioned)
cfg := &apiconfig.KubeletConfiguration{}
if err := scheme.Convert(versioned, cfg, nil); err != nil {
return fmt.Errorf("could not generate default config: %w", err)
}
// Prepare Kubelet dependencies, these are stored in a Dpeendencies instance, which is both used
// for direct access and as an argument for kubelet initialization
// Default mounter calls mount and umount, which are hooked in /bin
// TODO maybe we could implement an alternative Linux-only mounter based on direct syscalls later
mounter := mount.New("")
// Cadvisor is the heaviest bit, it takes time to start and launched multiple routines,
// we could not get rid of it however, since kubelet initialization has a hard dependency on it
cadv, err := cadvisor.New(
cadvisor.NewImageFsInfoProvider("unix://"+ContainerdSocket),
kubeletRoot,
[]string{"/"}, // Cgroup root
true, // Legacy stats enabled to avoid failing container manager upon startup
false,
)
if err != nil {
return fmt.Errorf("could not initialize cadvisor: %w", err)
}
// Container manager is an abstraction around both the API and direct container management by
// the kubelet (not everything is done through containerd)
manager, err := cm.NewContainerManager(
mounter,
cadv,
cm.NodeConfig{
CgroupRoot: "/",
CgroupDriver: "cgroupfs",
KubeletRootDir: kubeletRoot,
ProtectKernelDefaults: true,
CPUManagerPolicy: "none",
CPUManagerReconcilePeriod: 10 * time.Second,
ExperimentalMemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy,
EnforceCPULimits: true,
CPUCFSQuotaPeriod: 100 * time.Millisecond,
ExperimentalTopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy,
ExperimentalTopologyManagerScope: v1beta1.ContainerTopologyManagerScope,
},
false, // FailSwapOn
clients.Broadcaster.DeprecatedNewLegacyRecorder("kubelet"),
clients.Client,
)
if err != nil {
return fmt.Errorf("could not initialize container manager: %w", err)
}
// Only required volume plugins are loaded to save space and memory
// Projected is required for modern volume projection, including secret mounting, others are
// obvious requirements.
volumePlugins := []volume.VolumePlugin{}
volumePlugins = append(volumePlugins, emptydir.ProbeVolumePlugins()...)
volumePlugins = append(volumePlugins, hostpath.ProbeVolumePlugins(volume.VolumeConfig{})...)
volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...)
volumePlugins = append(volumePlugins, configmap.ProbeVolumePlugins()...)
volumePlugins = append(volumePlugins, projected.ProbeVolumePlugins()...)
volumePlugins = append(volumePlugins, local.ProbeVolumePlugins()...)
volumePlugins = append(volumePlugins, csi.ProbeVolumePlugins()...)
deps := &kubelet.Dependencies{
Auth: nil, // TODO handle authentication for remoting
CAdvisorInterface: cadv,
Cloud: nil, // No cloud manager
ContainerManager: manager,
KubeClient: clients.Client,
HeartbeatClient: clients.Client,
EventClient: clients.EventClient.CoreV1(),
Recorder: clients.Broadcaster.DeprecatedNewLegacyRecorder("kubelet"),
TracerProvider: trace.NewNoopTracerProvider(),
HostUtil: hostutil.NewHostUtil(),
Mounter: mounter,
Subpather: subpath.New(mounter),
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: container.RealOS{},
VolumePlugins: volumePlugins,
DynamicPluginProber: NoopPluginProber{}, // Our own implementation that does not load dynamic plugins
PodStartupLatencyTracker: util.NewPodStartupLatencyTracker(), // Used for pod management by kubelet
TLSOptions: &server.TLSOptions{
CertFile: c.certs.TLS.CertPath(),
KeyFile: c.certs.TLS.KeyPath(),
},
}
// Finally, initialize them run kubelet itself
// Kubelet configuration must be passed as a long list of arguments, which are thus documented for clarity sake
err = kubelet.PreInitRuntimeService(cfg, deps, "unix://"+ContainerdSocket, "")
if err != nil {
return fmt.Errorf("could not pre-init containerd: %w", err)
}
k, err := kubelet.NewMainKubelet(
cfg,
deps,
&config.ContainerRuntimeOptions{
ContainerRuntime: kubetypes.RemoteContainerRuntime,
PodSandboxImage: "registry.k8s.io/pause:3.9",
},
c.node.Name, // Hostname
false, // Hostname overridden
types.NodeName(c.node.Name), // Node name
[]net.IP{c.networking.NodeAddress.IP}, // IP addresses
c.node.Name, // Provider ID (unused)
"", // Cloud provider
path.Join(kubeletRoot, "pki"), // PKI path
kubeletRoot, // Root directory
"", // Image creds config file
"", // Image creds bin path
true, // Register node
[]core.Taint{}, // Taints
[]string{}, // Unsafe sysctl
"", // Mounter path
false, // Kernel memcg notifications
false, // Allocatable ignore eviction threshold
meta.Duration{Duration: 0}, // Max gc duration
1, // Max per pod container count
-1, // Max container count
meta.NamespaceDefault, // Master service namespace
true, // Register schedulable
false, // Keep terminated pod volumes
map[string]string{}, // Node labels
-1, // Node status max images
false, // Seccomp default
)
if err != nil {
return fmt.Errorf("could not instantiate kubelet: %w", err)
}
k.BirthCry()
k.StartGarbageCollection()
go k.Run(deps.PodConfig.Updates())
k.ListenAndServe(cfg, deps.TLSOptions, deps.Auth, deps.TracerProvider)
return nil
},
}
type NoopPluginProber struct{}
func (NoopPluginProber) Init() error {
return nil
}
func (NoopPluginProber) Probe() ([]volume.ProbeEvent, error) {
return []volume.ProbeEvent{}, nil
}
package services
import (
"context"
"k8s.io/kubernetes/pkg/scheduler"
)
var kubeScheduler = &Unit{
Name: "kube-scheduler",
Dependencies: []*Unit{kubeApiserver, pkiCA, pkiMaster, kubeLogger},
Run: func(u *Unit, c *Cluster, ctx context.Context) error {
clients, err := newClients(c, "scheduler", c.networking.NodeAddress.IP, c.masterCerts.SchedulerAPI)
if err != nil {
return err
}
sched, err := scheduler.New(
clients.Client,
clients.Informer,
clients.DynInformer,
clients.Broadcaster.NewRecorder,
ctx.Done(),
scheduler.WithKubeConfig(clients.KubeConfig),
)
if err != nil {
return err
}
u.Logger.Info("starting the scheduler")
clients.Start(ctx)
defer clients.Stop()
sched.Run(ctx)
return nil
},
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment