diff --git a/cmd/hepto/config.go b/cmd/hepto/config.go index 5c08097ce157f35cb9ae717057e9f3f3bc13302a..6d54de808e102a6de72a8f53bfb3548c92e4ffb7 100644 --- a/cmd/hepto/config.go +++ b/cmd/hepto/config.go @@ -7,7 +7,7 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/zapr" - "go.acides.org/hepto/pkg/cluster" + "go.acides.org/hepto/pkg/services" "go.acides.org/selfcontain" ) @@ -17,9 +17,9 @@ type Config struct { Pprof bool Logger logr.Logger LogLevel int - Cluster cluster.ClusterSettings + Cluster services.ClusterSettings Container selfcontain.Config - Node cluster.NodeSettings + Node services.NodeSettings } var config Config @@ -41,8 +41,12 @@ func (c *Config) Complete() error { return err } // Setup data persistence mounts - for _, mount := range cluster.RequiredMounts(&c.Cluster, &c.Node) { - c.Container.Mounts[mount] = path.Join(c.Container.Data, mount) + c.Container.Mounts[services.CertsPath] = path.Join(c.Container.Data, "/certs") + switch c.Node.Role { + case "master": + c.Container.Mounts[services.EtcdPath] = path.Join(c.Container.Data, "/etcd") + case "node": + c.Container.Mounts[services.ContainerdPath] = path.Join(c.Container.Data, "/containerd") } // Set the container IP mask if required, defaults to /64 if len(c.Container.IP.Mask) == 0 { diff --git a/cmd/hepto/service.go b/cmd/hepto/service.go index 42c3d02b29f3a04ed8896582d423167181962a3e..91dc18fcbee9dc3566ba5272264d9dab5695cc0a 100644 --- a/cmd/hepto/service.go +++ b/cmd/hepto/service.go @@ -4,14 +4,16 @@ import ( "fmt" "net" "os" + "path" "time" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" - "go.acides.org/hepto/pkg/cluster" + "go.acides.org/hepto/pkg/services" "go.acides.org/selfcontain" "k8s.io/component-base/version/verflag" + "k8s.io/component-helpers/node/util/sysctl" ) var Hepto = &cobra.Command{ @@ -31,8 +33,27 @@ var Hepto = &cobra.Command{ var Start = &cobra.Command{ Use: "start", Short: "Start the hepto service", + PreRunE: func(cmd *cobra.Command, args []string) error { + // Set the proper sysctl for the cluster + // Copied from https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cm/container_manager_linux.go + desiredState := map[string]int{ + sysctl.VMOvercommitMemory: sysctl.VMOvercommitMemoryAlways, + sysctl.VMPanicOnOOM: sysctl.VMPanicOnOOMInvokeOOMKiller, + sysctl.KernelPanic: sysctl.KernelPanicRebootTimeout, + sysctl.KernelPanicOnOops: sysctl.KernelPanicOnOopsAlways, + sysctl.RootMaxKeys: sysctl.RootMaxKeysSetting, + sysctl.RootMaxBytes: sysctl.RootMaxBytesSetting, + } + s := sysctl.New() + for key, value := range desiredState { + err := s.SetSysctl(key, value) + if err != nil { + return err + } + } + return nil + }, RunE: func(cmd *cobra.Command, args []string) error { - cluster.Sysctl() newArgs := append([]string{Run.Use}, os.Args[2:]...) return selfcontain.RunWithArgs(&config.Container, newArgs) }, @@ -43,6 +64,28 @@ var Run = &cobra.Command{ Short: "Actually run hepto inside the container", // Determine current IP, which should run only once inside the container PreRun: func(cmd *cobra.Command, args []string) { + // Install ourselves as a hooking binary + self, err := os.Executable() + if err != nil { + config.Logger.Error(err, "could not get executable path") + } + err = os.Setenv("PATH", "/bin") + if err != nil { + config.Logger.Error(err, "could not set binaries path") + return + } + err = os.MkdirAll("/bin", 0o755) + if err != nil { + config.Logger.Error(err, "could not prepare bin dir") + return + } + for _, name := range []string{"kubectl", "ctr", "mount", "umount", "runc"} { + err = os.Symlink(self, path.Join("/bin", name)) + if err != nil { + config.Logger.Error(err, "could not install binary", "name", name) + return + } + } // This is very useful for debugging, especially in network isolated // environments if config.BypassIPCheck { @@ -76,9 +119,9 @@ var Run = &cobra.Command{ break } }, - Run: func(cmd *cobra.Command, args []string) { - c := cluster.New(&config.Cluster, &config.Node) - c.Run() + RunE: func(cmd *cobra.Command, args []string) error { + c := services.NewManager(&config.Cluster, &config.Node, config.Logger) + return c.Run() }, } @@ -126,5 +169,5 @@ func init() { Hepto.PersistentFlags().IntVar(&config.Node.Port, "discovery-port", 7123, "TCP port used for discovering the cluster") Hepto.PersistentFlags().StringVar(&config.Node.Name, "name", "", "Hepto node name") Hepto.PersistentFlags().StringSliceVar(&config.Node.Anchors, "anchors", []string{}, "List of cluster anchors") - Hepto.PersistentFlags().Var(&config.Node.Role, "role", "Node role inside the cluster") + Hepto.PersistentFlags().StringVar(&config.Node.Role, "role", "node", "Node role inside the cluster") } diff --git a/go.mod b/go.mod index 02a16d43e596407b8cf7d15846cd5b182203aa03..d49f9158c88da7578b0e19a0c757764fa39d8c80 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2 go.acides.org/pekahi v0.1.1 go.acides.org/selfcontain v0.2.0 - go.acides.org/sml v0.1.1 + go.acides.org/sml v0.2.1 go.etcd.io/etcd/server/v3 v3.5.7 go.uber.org/zap v1.24.0 golang.org/x/sys v0.4.0 diff --git a/go.sum b/go.sum index c45e7346436d199ee5d0dee07a948d5043ebb169..851fecd601175906dfc47370d08ed28d7c6ff0c2 100644 --- a/go.sum +++ b/go.sum @@ -1330,6 +1330,8 @@ go.acides.org/selfcontain v0.2.0 h1:7b9rfBIGOqpPjqIaXo2h8OZUeCu8+XrFh9zzToi2JKM= go.acides.org/selfcontain v0.2.0/go.mod h1:cyKYsVw1scp6MTVIhquG+2OJrsyaDCwkXlsBvMO+cws= go.acides.org/sml v0.1.1 h1:v424XQ1RhgZHfKG+rV0VZ8YL32HC1UlqbIqv4E4q5JU= go.acides.org/sml v0.1.1/go.mod h1:lBAbmfk5FFmK5yt7pFoqwwGNtEfWL/aXAdCj0ry7Kj0= +go.acides.org/sml v0.2.1 h1:WxlUUZqCzv+He91iyzURMwrjuB9FR+8rwe9nm+WR0y0= +go.acides.org/sml v0.2.1/go.mod h1:lBAbmfk5FFmK5yt7pFoqwwGNtEfWL/aXAdCj0ry7Kj0= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= diff --git a/pkg/cluster/certs.go b/pkg/cluster/certs.go deleted file mode 100644 index c946c284df8ad5fd50b1323ef646267d673cd040..0000000000000000000000000000000000000000 --- a/pkg/cluster/certs.go +++ /dev/null @@ -1,60 +0,0 @@ -package cluster - -import ( - "os" - "path" - - "go.acides.org/hepto/pkg/pki" -) - -const certsPath = "/certs" - -func (c *Cluster) initCerts() { - // Prepare the cluster PKI - if c.node.Role == Master { - ca, err := pki.NewClusterCA(path.Join(certsPath, "pki")) - if err != nil { - c.settings.Logger.Error(err, "could not initialize pki") - os.Exit(1) - } - masterCerts, err := pki.NewMasterCerts( - path.Join(certsPath, "master"), - "kubernetes.default", - c.networking.NodeAddress.IP, - c.networking.APIAddress, - ) - if err != nil { - c.settings.Logger.Error(err, "could not initialize master certs") - os.Exit(1) - } - c.pki = ca - c.masterCerts = masterCerts - c.pki.SignMasterCerts(c.masterCerts) - } else { - ca, err := pki.EmptyClusterCA(path.Join(certsPath, "pki")) - if err != nil { - c.settings.Logger.Error(err, "could not initialize pki") - os.Exit(1) - } - c.pki = ca - } - c.ml.State.PKI = c.pki - // Initialize node certificates - certs, err := pki.NewNodeCerts(path.Join(certsPath, "node"), c.node.Name) - if err != nil { - c.settings.Logger.Error(err, "could not initialize node certs") - os.Exit(1) - } - c.certs = certs - c.ml.State.Certificates = make(map[string]*pki.NodeCerts) - c.ml.State.Certificates[c.node.Name] = certs -} - -func (c *Cluster) handlePKI() { - if c.node.Role != Master { - return - } - for name, certs := range c.ml.State.Certificates { - c.pki.SignNodeCerts(name, certs) - } -} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go deleted file mode 100644 index ded4946bba6e09e4e768b5dcb8cec2193bd1e231..0000000000000000000000000000000000000000 --- a/pkg/cluster/cluster.go +++ /dev/null @@ -1,111 +0,0 @@ -// Hepto cluster implements the main clustering logics, based on -// sml (Simple Memberlist) for cluster bootstraping and node discovery -package cluster - -import ( - "context" - "fmt" - "os" - - "go.acides.org/hepto/pkg/pki" - "go.acides.org/hepto/pkg/wg" - "go.acides.org/sml" - "k8s.io/component-helpers/node/util/sysctl" -) - -type Cluster struct { - settings *ClusterSettings - ml *sml.Memberlist[HeptoMeta, HeptoState, *HeptoMeta, *HeptoState] - vpn *wg.Wireguard - networking *ClusterNetworking - node *NodeSettings - certs *pki.NodeCerts - masterCerts *pki.MasterCerts - pki *pki.ClusterCA - servicesStarted bool - servicesDone chan struct{} - serviceCtx context.Context -} - -func RequiredMounts(settings *ClusterSettings, node *NodeSettings) []string { - // Persist certificate storage - mounts := []string{certsPath} - // Persist etcd data if this is a master - if node.Role == Master { - mounts = append(mounts, etcdPath) - } - // Persist containerd data if this is a node - if node.Role == Node { - mounts = append(mounts, containerdPath) - } - return mounts -} - -// Run required sysctl for kubelet, before we self-contain ourselves and lose -// this ability -func Sysctl() error { - // Copied from https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cm/container_manager_linux.go - desiredState := map[string]int{ - sysctl.VMOvercommitMemory: sysctl.VMOvercommitMemoryAlways, - sysctl.VMPanicOnOOM: sysctl.VMPanicOnOOMInvokeOOMKiller, - sysctl.KernelPanic: sysctl.KernelPanicRebootTimeout, - sysctl.KernelPanicOnOops: sysctl.KernelPanicOnOopsAlways, - sysctl.RootMaxKeys: sysctl.RootMaxKeysSetting, - sysctl.RootMaxBytes: sysctl.RootMaxBytesSetting, - } - s := sysctl.New() - for key, value := range desiredState { - err := s.SetSysctl(key, value) - if err != nil { - fmt.Printf("could not set sysctl: %v", err) - os.Exit(1) - } - } - return nil -} - -func New(settings *ClusterSettings, node *NodeSettings) *Cluster { - return &Cluster{ - settings: settings, - node: node, - networking: NewClusterNetworking(settings.Name, node.Name), - ml: sml.New[HeptoMeta, HeptoState](node.Name, node.IP, node.Port, node.Anchors, settings.Key, settings.Logger), - servicesDone: make(chan struct{}), - servicesStarted: false, - serviceCtx: context.Background(), - } -} - -func (c *Cluster) Run() { - c.initVPN() - c.initCerts() - c.ml.Meta.Name = c.node.Name - c.ml.Meta.PublicIP = c.node.IP - c.ml.Meta.Role = string(c.node.Role) - c.ml.Meta.VpnIP = c.networking.NodeAddress.IP - c.ml.Meta.VpnKey = c.vpn.PubKey.String() - // Start waiting for events - events := c.ml.Events() - err := c.ml.Start() - if err != nil { - c.settings.Logger.Error(err, "could not start memberlist") - os.Exit(1) - } - instr := c.ml.Instr() - instrUpdates := instr.Updates() - go c.ml.Run() - for { - select { - case <-events: - c.handlePKI() - c.updateVPN() - c.updateServices() - case <-instrUpdates: - c.networking.MTU = instr.MinMTU() - c.updateVPN() - case <-c.servicesDone: - c.settings.Logger.Info("a service stopped unexpectedly") - os.Exit(1) - } - } -} diff --git a/pkg/cluster/config.go b/pkg/cluster/config.go deleted file mode 100644 index 3601036653d7701bb230d4d0b53ec6c2a6157441..0000000000000000000000000000000000000000 --- a/pkg/cluster/config.go +++ /dev/null @@ -1,85 +0,0 @@ -package cluster - -import ( - "errors" - "net" - "net/netip" - - "github.com/go-logr/logr" - "go.uber.org/zap" -) - -type ClusterSettings struct { - // Logger interface - Logger logr.Logger - // Concrete zap logger for etcd - ZapLogger *zap.Logger - // Cluster name, should be locally unique - Name string - // Cluster key, must be shared across nodes - Key []byte -} - -type NodeSettings struct { - // Node name, must be unique inside a cluster - Name string - // Node role - Role NodeRole - // Port for initial memberlist negotiations - Port int - // Public IPv6 address for the node - IP net.IP - // Anchors for this node to join - Anchors []string -} - -type NodeRole string - -const ( - Master NodeRole = "master" - Node = "node" -) - -func (r *NodeRole) String() string { - return string(*r) -} - -func (r *NodeRole) Type() string { - return "NodeRole" -} - -func (r *NodeRole) Set(v string) error { - cast := (NodeRole)(v) - if cast == Master || cast == Node { - *r = (NodeRole)(cast) - return nil - } - return errors.New("wrong node type") -} - -type ClusterNetworking struct { - NodeNet *net.IPNet - NodeAddress *net.IPNet - ServiceNet *net.IPNet - PodNet *net.IPNet - APIAddress net.IP - MTU int -} - -// Create a new cluster networking based on sane settings -// All networks use ULA private network derived deterministically -// from cluster name -func NewClusterNetworking(clusterName string, nodeName string) *ClusterNetworking { - // Cluster nodes are hosted on a /64 at :1, e.g. fd00:0:1:0::/64 - nodeNet := ULA(clusterName, 64, 1) - // Current node address is derived from node name inside the cluster node net - nodeAddress := DeriveAddress(nodeNet, nodeName) - // Pods are hosted on a /56 at :3, e.g. fd00:0:2:0::/56 - podNet := ULA(clusterName, 56, 2) - // Services are hosted on a /112 at :3, e.g. fd00:0:0:0:0:0:0::/112 - serviceNet := ULA(clusterName, 112, 3) - // API address is the first service address - apiAddress, _ := netip.AddrFromSlice(serviceNet.IP) - apiIP := net.IP(apiAddress.Next().AsSlice()) - return &ClusterNetworking{nodeNet, nodeAddress, serviceNet, podNet, apiIP, 1500} -} diff --git a/pkg/cluster/kubeconfig.go b/pkg/cluster/kubeconfig.go deleted file mode 100644 index 997f4905c6c2291b3e7b7a20d23f7103bf322263..0000000000000000000000000000000000000000 --- a/pkg/cluster/kubeconfig.go +++ /dev/null @@ -1,123 +0,0 @@ -package cluster - -import ( - "os" - "path" - "text/template" -) - -type WriteableConfig interface { - Template() string -} - -func (c *Cluster) WriteConfig(config WriteableConfig, dest string) { - t, _ := template.New("template").Parse(config.Template()) - err := os.MkdirAll(path.Dir(dest), 0o755) - if err != nil { - c.settings.Logger.Error(err, "could not create config dir") - os.Exit(1) - } - file, err := os.Create(dest) - if err != nil { - c.settings.Logger.Error(err, "could not create config file") - os.Exit(1) - } - defer file.Close() - err = t.Execute(file, config) - if err != nil { - c.settings.Logger.Error(err, "could not generate config") - os.Exit(1) - } -} - -const kubeconfigTemplate = ` -apiVersion: v1 -clusters: -- cluster: - server: {{.URL}} - certificate-authority: {{.CACert}} - name: local -contexts: -- context: - cluster: local - namespace: default - user: user - name: Default -current-context: Default -kind: Config -preferences: {} -users: -- name: user - user: - client-certificate: {{.ClientCert}} - client-key: {{.ClientKey}} -` - -type KubeConfig struct { - URL string - CACert string - ClientCert string - ClientKey string -} - -func (KubeConfig) Template() string { - return kubeconfigTemplate -} - -const kubeletConfigTemplate = ` -kind: KubeletConfiguration -apiVersion: kubelet.config.k8s.io/v1beta1 -authentication: - anonymous: - enabled: false - webhook: - enabled: true - x509: - clientCAFile: "{{.CACert}}" -authorization: - mode: Webhook -clusterDomain: "cluster.local" -imageMinimumGCAge: "120h" -resolvConf: "/etc/resolv.conf" -cgroupDriver: cgroupfs -runtimeRequestTimeout: "15m" -tlsCertFile: "{{.TLSCert}}" -tlsPrivateKeyFile: "{{.TLSKey}}" -syncFrequency: 1m -fileCheckFrequency: 1m -httpCheckFrequency: 1m -nodeStatusUpdateFrequency: 20s -nodeStatusReportFrequency: 5m -` - -type KubeletConfig struct { - CACert string - TLSCert string - TLSKey string -} - -func (KubeletConfig) Template() string { - return kubeletConfigTemplate -} - -const containerdTemplate = ` -plugin_dir = "" -root = "{{.RootDir}}" -state = "/run/containerd" -version = 2 - -[cgroup] - path = "" - -[grpc] - address = "{{.Socket}}" -` - -type ContainerdConfig struct { - RootDir string - Socket string -} - -func (ContainerdConfig) Template() string { - return containerdTemplate -} diff --git a/pkg/cluster/services.go b/pkg/cluster/services.go deleted file mode 100644 index b70410b0d09c1bbf9f762093be4154036142b1fc..0000000000000000000000000000000000000000 --- a/pkg/cluster/services.go +++ /dev/null @@ -1,187 +0,0 @@ -package cluster - -import ( - "fmt" - "net" - "os" - "path" - "time" - - "go.acides.org/hepto/pkg/wrappers" - "go.etcd.io/etcd/server/v3/embed" -) - -const configPath = "/config" -const etcdPath = "/etcd" -const binPath = "/bin" -const containerdPath = "/containerd" - -func (c *Cluster) watchService(name string, errCh <-chan error) { - c.settings.Logger.Info("service started", "name", name) - err := <-errCh - c.settings.Logger.Error(err, "service failed unexpectedly", "name", name) - c.servicesDone <- struct{}{} -} - -func (c *Cluster) updateServices() { - if c.servicesStarted { - return - } - if c.node.Role == Master { - c.installBin() - c.startEtcd() - c.startK8sMaster() - c.servicesStarted = true - } else if c.node.Role == Node { - // Bail if CA are not discovered or certificates are not ready - if c.pki.TLS.Cert == nil || c.pki.Kubelet.Cert == nil { - return - } - if c.certs.TLS.Cert == nil || c.certs.API.Cert == nil { - return - } - c.settings.Logger.Info("looking for master node") - for _, node := range c.ml.Nodes() { - if node.NodeMeta.Role == string(Master) { - c.installBin() - c.startK8sNode(node.NodeMeta.VpnIP) - c.servicesStarted = true - } - } - } -} - -func (c *Cluster) installBin() { - self, err := os.Executable() - if err != nil { - c.settings.Logger.Error(err, "could not get executable path") - } - err = os.Setenv("PATH", binPath) - if err != nil { - c.settings.Logger.Error(err, "could not set binaries path") - os.Exit(1) - } - err = os.MkdirAll(binPath, 0o755) - if err != nil { - c.settings.Logger.Error(err, "could not prepare bin dir") - os.Exit(1) - } - for _, name := range []string{"kubectl", "ctr", "mount", "umount", "containerd-shim", "containerd-shim-runc-v2", "runc"} { - err = os.Symlink(self, path.Join(binPath, name)) - if err != nil { - c.settings.Logger.Error(err, "could not install binary", "name", name) - } - } -} - -func (c *Cluster) startEtcd() { - etcdConfig := embed.NewConfig() - etcdConfig.Dir = etcdPath - etcdConfig.AuthToken = fmt.Sprintf("jwt,priv-key=%s,sign-method=ES256", c.masterCerts.EtcdTokens.KeyPath()) - etcdConfig.ZapLoggerBuilder = embed.NewZapLoggerBuilder(c.settings.ZapLogger) - go c.watchService("etcd", wrappers.ETCd(c.serviceCtx, etcdConfig)) -} - -func (c *Cluster) startK8sMaster() { - wrappers.SetK8sLogger(c.settings.Logger) - go c.watchService("kube-apiserver", wrappers.APIServer(c.serviceCtx, []string{ - "--bind-address", c.networking.NodeAddress.IP.String(), - "--service-cluster-ip-range", c.networking.ServiceNet.String(), - "--tls-cert-file", c.masterCerts.TLS.CertPath(), - "--tls-private-key-file", c.masterCerts.TLS.KeyPath(), - "--client-ca-file", c.pki.API.CertPath(), - "--kubelet-certificate-authority", c.pki.TLS.CertPath(), - "--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(), - "--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(), - "--etcd-servers", "http://localhost:2379", - "--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(), - "--service-account-key-file", c.masterCerts.APITokens.KeyPath(), - "--service-account-issuer", "https://kubernetes.default.svc.cluster.local", - "--api-audiences", "https://kubernetes.default.svc.cluster.local", - "--authorization-mode", "Node,RBAC", - "--allow-privileged", "true", - })) - // TODO wait for APIserver to be ready before starting other components - cmConfig := KubeConfig{ - URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), - CACert: c.pki.TLS.CertPath(), - ClientCert: c.masterCerts.ControllersAPI.CertPath(), - ClientKey: c.masterCerts.ControllersAPI.KeyPath(), - } - cmConfigPath := path.Join(configPath, "controller-manager.yaml") - c.WriteConfig(cmConfig, cmConfigPath) - go c.watchService("controller-manager", wrappers.ControllerManager(c.serviceCtx, []string{ - "--kubeconfig", cmConfigPath, - "--tls-cert-file", c.masterCerts.ControllersTLS.CertPath(), - "--tls-private-key-file", c.masterCerts.ControllersTLS.KeyPath(), - "--service-account-private-key-file", c.masterCerts.APITokens.KeyPath(), - "--root-ca-file", c.pki.TLS.CertPath(), - // This is better than no declared cloud provider, since it does disable - // unnecessary cloud controllers - "--cloud-provider", "external", - "--use-service-account-credentials", - })) - schedulerConfig := KubeConfig{ - URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), - CACert: c.pki.TLS.CertPath(), - ClientCert: c.masterCerts.SchedulerAPI.CertPath(), - ClientKey: c.masterCerts.SchedulerAPI.KeyPath(), - } - schedulerConfigPath := path.Join(configPath, "scheduler.yaml") - c.WriteConfig(schedulerConfig, schedulerConfigPath) - go c.watchService("kube-scheduler", wrappers.Scheduler(c.serviceCtx, []string{ - "--kubeconfig", schedulerConfigPath, - })) - // Write a kubelet config for debugging purposes - rootConfig := KubeConfig{ - URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), - CACert: c.pki.TLS.CertPath(), - ClientKey: c.masterCerts.RootClient.KeyPath(), - ClientCert: c.masterCerts.RootClient.CertPath(), - } - c.WriteConfig(rootConfig, "/root/.kube/config") -} - -func (c *Cluster) startK8sNode(masterIP net.IP) { - // Create tmp, required for containerd - err := os.Mkdir("/tmp", 0o777) - if err != nil { - c.settings.Logger.Error(err, "failed to create /tmp") - os.Exit(1) - } - // Setup logging - wrappers.SetK8sLogger(c.settings.Logger) - // Containerd - containerdConfig := ContainerdConfig{ - RootDir: containerdPath, - Socket: "/run/containerd/containerd.sock", - } - containerdConfigPath := path.Join(configPath, "containerd.toml") - c.WriteConfig(containerdConfig, containerdConfigPath) - go c.watchService("containerd", wrappers.Containerd(c.serviceCtx, []string{ - "--config", containerdConfigPath, - })) - // Wait for containerd to settle down - time.Sleep(5 * time.Second) - // Kubelet - kubeletKubeConfig := KubeConfig{ - URL: fmt.Sprintf("https://[%s]:6443", masterIP.String()), - CACert: c.pki.TLS.CertPath(), - ClientCert: c.certs.API.CertPath(), - ClientKey: c.certs.API.KeyPath(), - } - kubeletKubeConfigPath := path.Join(configPath, "kubelet-kubeconfig.yaml") - c.WriteConfig(kubeletKubeConfig, kubeletKubeConfigPath) - kubeletConfig := KubeletConfig{ - CACert: c.pki.Kubelet.CertPath(), - TLSCert: c.certs.TLS.CertPath(), - TLSKey: c.certs.TLS.KeyPath(), - } - kubeletConfigPath := path.Join(configPath, "kubelet.yaml") - c.WriteConfig(kubeletConfig, kubeletConfigPath) - go c.watchService("kubelet", wrappers.Kubelet(c.serviceCtx, []string{ - "--kubeconfig", kubeletKubeConfigPath, - "--config", kubeletConfigPath, - "--container-runtime-endpoint", "unix://" + containerdConfig.Socket, - })) -} diff --git a/pkg/cluster/vpn.go b/pkg/cluster/vpn.go deleted file mode 100644 index 6619e0b9404ac57902873dae04ca7057e69d8b73..0000000000000000000000000000000000000000 --- a/pkg/cluster/vpn.go +++ /dev/null @@ -1,40 +0,0 @@ -package cluster - -import ( - "os" - "strconv" - - "go.acides.org/hepto/pkg/wg" -) - -func (c *Cluster) initVPN() { - // Prepare wireguard - vpn, err := wg.New("wg0", 7124, c.networking.NodeAddress, c.settings.Logger) - if err != nil { - c.settings.Logger.Error(err, "could not initialize wireguard") - os.Exit(1) - } - c.vpn = vpn -} - -func (c *Cluster) updateVPN() { - peers := []wg.Peer{} - for _, node := range c.ml.Nodes() { - if node.Name == c.node.Name { - continue - } - peer := node.NodeMeta - // Check whether the node ip is consistent - peerAddr := DeriveAddress(c.networking.NodeNet, node.Name).IP - if !peer.VpnIP.Equal(peerAddr) { - c.settings.Logger.Info("inconsistent node IP", "name", node.Name, "expected", peerAddr, "actual", peer.VpnIP) - continue - } - peers = append(peers, peer) - } - c.settings.Logger.Info("updating VPN mesh", "peers", strconv.Itoa(len(peers)), "mtu", strconv.Itoa(c.networking.MTU)) - err := c.vpn.Update(peers, c.networking.MTU) - if err != nil { - c.settings.Logger.Error(err, "could not update VPN") - } -} diff --git a/pkg/daeman/manager.go b/pkg/daeman/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..b9ebf12df4bf2badfcc2d579aefc12e21ea849c6 --- /dev/null +++ b/pkg/daeman/manager.go @@ -0,0 +1,68 @@ +package daeman + +import "github.com/go-logr/logr" + +// Service manager +type Manager[S any] struct { + // All registered units + units []*Unit[S] + + // General error channel + errChan chan error + + // Logger + Logger logr.Logger + + // Manager state + State S +} + +func NewManager[S any](state S, units []*Unit[S], logger logr.Logger) *Manager[S] { + m := &Manager[S]{ + State: state, + units: []*Unit[S]{}, + errChan: make(chan error), + Logger: logger, + } + // Use register to populate units, which resolves dependencies + // and avoid duplicates + for _, unit := range units { + m.Register(unit) + } + return m +} + +func (m *Manager[S]) Register(u *Unit[S]) { + m.Logger.Info("registering unit", "name", u.Name) + // Avoid duplicates + for _, unit := range m.units { + if u == unit { + return + } + } + // Register the unit and its dependencies + m.units = append(m.units, u) + if u.Dependencies != nil { + for _, dep := range u.Dependencies { + m.Register(dep) + } + } + u.Manager = m +} + +func (m *Manager[S]) Trigger() { + for _, u := range m.units { + u.Trigger() + } +} + +func (m *Manager[S]) Fail(err error) { + m.Logger.Error(err, "unexpected error encountered") + m.errChan <- err +} + +func (m *Manager[S]) Run() error { + m.Trigger() + err := <-m.errChan + return err +} diff --git a/pkg/daeman/unit.go b/pkg/daeman/unit.go new file mode 100644 index 0000000000000000000000000000000000000000..d22a102af0696ed590e3c1614d485a052e51c930 --- /dev/null +++ b/pkg/daeman/unit.go @@ -0,0 +1,83 @@ +package daeman + +import "context" + +// Manageable work unit, that might be notified of general +// state change, started and stopped +type Unit[S any] struct { + // Unit name for logging mostly + Name string + + // Unit dependencies + Dependencies []*Unit[S] + + // Manager for this Unit + Manager *Manager[S] + + // Is a long-running work unit in progress + Running bool + + // Callback for determining whether this unit is ready + Ready func(*Unit[S], S) bool + + // Callback invoked when a state change requires action + Wake func(*Unit[S], S) error + + // Callback invoked to start a long-running work + Run func(*Unit[S], S, context.Context) error + + // When started, context cancel function + cancelFunc func() +} + +// Start the work unit +func (u *Unit[S]) Start() { + if u.Running || u.Run == nil || !u.DependenciesReady() { + return + } + u.Manager.Logger.Info("starting unit", "name", u.Name) + newCtx, cancel := context.WithCancel(context.Background()) + u.cancelFunc = cancel + go func() { + err := u.Run(u, u.Manager.State, newCtx) + if err != nil { + u.Manager.Fail(err) + } + u.Manager.Trigger() + }() + u.Running = true + u.Manager.Trigger() +} + +// Stop the work unit +func (u *Unit[S]) Stop() { + if !u.Running { + return + } + u.Manager.Logger.Info("stopping unit", "name", u.Name) + u.cancelFunc() + u.Running = false + u.Manager.Trigger() +} + +// Wake the work unit +func (u *Unit[S]) Trigger() error { + u.Start() + if u.Wake == nil { + return nil + } + u.Manager.Logger.Info("triggering unit", "name", u.Name) + return u.Wake(u, u.Manager.State) +} + +// Check whether dependencies are ready +func (u *Unit[S]) DependenciesReady() bool { + for _, dep := range u.Dependencies { + if dep.Ready != nil && !dep.Ready(dep, u.Manager.State) { + return false + } else if dep.Ready == nil && !dep.Running { + return false + } + } + return true +} diff --git a/pkg/services/certs.go b/pkg/services/certs.go new file mode 100644 index 0000000000000000000000000000000000000000..9a3cd4914778f90b90178941f64bef2e860d36d4 --- /dev/null +++ b/pkg/services/certs.go @@ -0,0 +1,72 @@ +package services + +import ( + "context" + "path" + + "go.acides.org/hepto/pkg/pki" +) + +const CertsPath = "/certs" + +var certs = &Unit{ + Name: "certificates", + Wake: func(u *Unit, c *Cluster) error { + if c.node.Role == "master" { + for name, certs := range c.state.Certificates { + u.Manager.Logger.Info("signing node cert", "node", name) + c.pki.SignNodeCerts(name, certs) + } + } + return nil + }, + Ready: func(u *Unit, c *Cluster) bool { + return (c.pki != nil && + c.certs != nil && + c.pki.TLS.Cert != nil && + c.pki.Kubelet.Cert != nil && + c.pki.API.Cert != nil && + c.certs.TLS.Cert != nil && + c.certs.API.Cert != nil) + }, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + // Prepare the cluster PKI + if c.node.Role == "master" { + ca, err := pki.NewClusterCA(path.Join(CertsPath, "pki")) + if err != nil { + return err + } + masterCerts, err := pki.NewMasterCerts( + path.Join(CertsPath, "master"), + "kubernetes.default", + c.networking.NodeAddress.IP, + c.networking.APIAddress, + ) + if err != nil { + return err + } + c.pki = ca + c.masterCerts = masterCerts + c.pki.SignMasterCerts(c.masterCerts) + u.Manager.Logger.Info("master pki initialized") + } else { + ca, err := pki.EmptyClusterCA(path.Join(CertsPath, "pki")) + if err != nil { + return err + } + c.pki = ca + u.Manager.Logger.Info("placeholder pki initialized") + } + c.state.PKI = c.pki + // Initialize node certificates + certs, err := pki.NewNodeCerts(path.Join(CertsPath, "node"), c.node.Name) + if err != nil { + return err + } + c.certs = certs + c.state.Certificates = make(map[string]*pki.NodeCerts) + c.state.Certificates[c.node.Name] = certs + u.Manager.Logger.Info("node certificates initialized") + return nil + }, +} diff --git a/pkg/wrappers/containerd.go b/pkg/services/containerd.go similarity index 54% rename from pkg/wrappers/containerd.go rename to pkg/services/containerd.go index 73341827062a9eebcf2b66a5e6946d3318c2c279..e4e2aaaee26703e94137f86c38272383de00eacb 100644 --- a/pkg/wrappers/containerd.go +++ b/pkg/services/containerd.go @@ -1,7 +1,8 @@ -package wrappers +package services import ( "context" + "path" "strings" "github.com/containerd/containerd/cmd/containerd/command" @@ -31,21 +32,54 @@ import ( _ "github.com/containerd/containerd/services/tasks" _ "github.com/containerd/containerd/services/version" _ "github.com/containerd/containerd/snapshots/overlay/plugin" + "go.acides.org/hepto/pkg/utils" ) -func Containerd(ctx context.Context, args []string) (errCh chan error) { - errCh = make(chan error) - app := command.App() - args = append([]string{"containerd"}, args...) - // Disable ttrpc plugins when running the main server - // These are registered by there init() because we build a single binary with - // both containerd and its shim - plugins := plugin.Graph(func(*plugin.Registration) bool { return false }) - for _, plug := range plugins { - plug.Disable = strings.HasPrefix(plug.URI(), "io.containerd.ttrpc") - } - go func() { - errCh <- app.Run(args) - }() - return +var ContainerdPath = "/containerd" + +var containerd = &Unit{ + Name: "containerd", + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + containerdConfig := ContainerdConfig{ + RootDir: ContainerdPath, + Socket: "/run/containerd/containerd.sock", + } + containerdConfigPath := path.Join(configPath, "containerd.toml") + err := utils.WriteConfig(containerdConfig, containerdConfigPath) + if err != nil { + return err + } + app := command.App() + args := []string{"containerd", "--config", ContainerdPath} + // Disable ttrpc plugins when running the main server + // These are registered by there init() because we build a single binary with + // both containerd and its shim + plugins := plugin.Graph(func(*plugin.Registration) bool { return false }) + for _, plug := range plugins { + plug.Disable = strings.HasPrefix(plug.URI(), "io.containerd.ttrpc") + } + return app.Run(args) + }, +} + +const containerdTemplate = ` +plugin_dir = "" +root = "{{.RootDir}}" +state = "/run/containerd" +version = 2 + +[cgroup] + path = "" + +[grpc] + address = "{{.Socket}}" +` + +type ContainerdConfig struct { + RootDir string + Socket string +} + +func (ContainerdConfig) Template() string { + return containerdTemplate } diff --git a/pkg/services/discovery.go b/pkg/services/discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..3eda4aca05f5cc7d43368feefc97cb674f5ffd31 --- /dev/null +++ b/pkg/services/discovery.go @@ -0,0 +1,43 @@ +package services + +import ( + "context" + + "go.acides.org/sml" +) + +var memberlist = &Unit{ + Name: "memberlist", + Dependencies: []*Unit{certs}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + node := u.Manager.State.node + ml := sml.New[HeptoMeta, HeptoState](node.Name, node.IP, node.Port, node.Anchors, c.settings.Key, c.settings.Logger) + ml.Meta = c.localNode + events := ml.Events() + err := ml.Start() + if err != nil { + return err + } + go ml.Run() + for { + <-events + c.nodes = ml.Nodes() + u.Manager.Trigger() + } + }, +} + +var remote_master = &Unit{ + Name: "master-discovery", + Wake: func(u *Unit, c *Cluster) error { + for _, node := range c.nodes { + if node.Role == "master" { + c.masterNode = node + } + } + return nil + }, + Ready: func(u *Unit, c *Cluster) bool { + return c.masterNode != nil + }, +} diff --git a/pkg/services/etcd.go b/pkg/services/etcd.go new file mode 100644 index 0000000000000000000000000000000000000000..a8a0636d7fc1fb080fef72839646d21e195306cb --- /dev/null +++ b/pkg/services/etcd.go @@ -0,0 +1,32 @@ +package services + +import ( + "context" + "fmt" + + "go.etcd.io/etcd/server/v3/embed" +) + +var EtcdPath = "/etcd" + +var etcd = &Unit{ + Name: "etcd", + Dependencies: []*Unit{certs}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + config := embed.NewConfig() + config.Dir = EtcdPath + config.AuthToken = fmt.Sprintf("jwt,priv-key=%s,sign-method=ES256", c.masterCerts.EtcdTokens.KeyPath()) + config.ZapLoggerBuilder = embed.NewZapLoggerBuilder(c.settings.ZapLogger) + server, err := embed.StartEtcd(config) + if err != nil { + return err + } + select { + case err := <-server.Err(): + return err + case <-ctx.Done(): + server.Close() + return nil + } + }, +} diff --git a/pkg/services/k8s.go b/pkg/services/k8s.go new file mode 100644 index 0000000000000000000000000000000000000000..25b440e5314cb853f74a2fc2c5e2d499c1064d43 --- /dev/null +++ b/pkg/services/k8s.go @@ -0,0 +1,307 @@ +package services + +import ( + "context" + "fmt" + "path" + + "github.com/spf13/pflag" + "go.acides.org/hepto/pkg/utils" + "k8s.io/apiserver/pkg/util/feature" + restclient "k8s.io/client-go/rest" + "k8s.io/component-base/cli/flag" + api "k8s.io/kubernetes/cmd/kube-apiserver/app" + apiopts "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + cm "k8s.io/kubernetes/cmd/kube-controller-manager/app" + cmopts "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" + scheduler "k8s.io/kubernetes/cmd/kube-scheduler/app" + scheduleropts "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + kubelet "k8s.io/kubernetes/cmd/kubelet/app" + kubeletopts "k8s.io/kubernetes/cmd/kubelet/app/options" + kubeletcv "k8s.io/kubernetes/pkg/kubelet/apis/config/validation" + kubeletcf "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" + "k8s.io/kubernetes/pkg/util/filesystem" +) + +var configPath = "/etc" + +var apiserver = &Unit{ + Name: "kube-apiserver", + Dependencies: []*Unit{etcd, certs, vpn}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + args := []string{ + "--bind-address", c.networking.NodeAddress.IP.String(), + "--service-cluster-ip-range", c.networking.ServiceNet.String(), + "--tls-cert-file", c.masterCerts.TLS.CertPath(), + "--tls-private-key-file", c.masterCerts.TLS.KeyPath(), + "--client-ca-file", c.pki.API.CertPath(), + "--kubelet-certificate-authority", c.pki.TLS.CertPath(), + "--kubelet-client-certificate", c.masterCerts.Kubelet.CertPath(), + "--kubelet-client-key", c.masterCerts.Kubelet.KeyPath(), + "--etcd-servers", "http://localhost:2379", + "--service-account-signing-key-file", c.masterCerts.APITokens.KeyPath(), + "--service-account-key-file", c.masterCerts.APITokens.KeyPath(), + "--service-account-issuer", "https://kubernetes.default.svc.cluster.local", + "--api-audiences", "https://kubernetes.default.svc.cluster.local", + "--authorization-mode", "Node,RBAC", + "--allow-privileged", "true", + } + config := apiopts.NewServerRunOptions() + nfs := config.Flags() + flags := flagsFromNamedFlagSet("apiserver", &nfs) + err := flags.Parse(args) + if err != nil { + return err + } + completedOptions, err := api.Complete(config) + if err != nil { + return err + } + server, err := api.CreateServerChain(completedOptions) + if err != nil { + return err + } + prepared, err := server.PrepareRun() + if err != nil { + return err + } + rootConfig := KubeConfig{ + URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), + CACert: c.pki.TLS.CertPath(), + ClientKey: c.masterCerts.RootClient.KeyPath(), + ClientCert: c.masterCerts.RootClient.CertPath(), + } + err = utils.WriteConfig(rootConfig, "/root/.kube/config") + if err != nil { + return err + } + return prepared.Run(ctx.Done()) + }, +} + +var controller_manager = &Unit{ + Name: "kube-controller-manager", + Dependencies: []*Unit{apiserver, certs}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + cmConfig := KubeConfig{ + URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), + CACert: c.pki.TLS.CertPath(), + ClientCert: c.masterCerts.ControllersAPI.CertPath(), + ClientKey: c.masterCerts.ControllersAPI.KeyPath(), + } + cmConfigPath := path.Join(configPath, "controller-manager.yaml") + err := utils.WriteConfig(cmConfig, cmConfigPath) + if err != nil { + return err + } + args := []string{ + "--kubeconfig", cmConfigPath, + "--tls-cert-file", c.masterCerts.ControllersTLS.CertPath(), + "--tls-private-key-file", c.masterCerts.ControllersTLS.KeyPath(), + "--service-account-private-key-file", c.masterCerts.APITokens.KeyPath(), + "--root-ca-file", c.pki.TLS.CertPath(), + // This is better than no declared cloud provider, since it does disable + // unnecessary cloud controllers + "--cloud-provider", "external", + "--use-service-account-credentials", + } + s, err := cmopts.NewKubeControllerManagerOptions() + if err != nil { + return err + } + nfs := s.Flags(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) + flags := flagsFromNamedFlagSet("cm", &nfs) + err = flags.Parse(args) + if err != nil { + return err + } + restclient.SetDefaultWarningHandler(restclient.NoWarnings{}) + config, err := s.Config(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) + if err != nil { + return err + } + return cm.Run(config.Complete(), ctx.Done()) + }, +} + +var kube_scheduler = &Unit{ + Name: "kube-scheduler", + Dependencies: []*Unit{apiserver, certs}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + schedulerConfig := KubeConfig{ + URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), + CACert: c.pki.TLS.CertPath(), + ClientCert: c.masterCerts.SchedulerAPI.CertPath(), + ClientKey: c.masterCerts.SchedulerAPI.KeyPath(), + } + schedulerConfigPath := path.Join(configPath, "scheduler.yaml") + err := utils.WriteConfig(schedulerConfig, schedulerConfigPath) + if err != nil { + return err + } + + config := scheduleropts.NewOptions() + flags := flagsFromNamedFlagSet("scheduler", config.Flags) + err = flags.Parse([]string{"--kubeconfig", schedulerConfigPath}) + if err != nil { + return err + } + cc, sched, err := scheduler.Setup(ctx, config) + if err != nil { + return err + } + return scheduler.Run(ctx, cc, sched) + }, +} + +var kube_kubelet = &Unit{ + Name: "kubelet", + Dependencies: []*Unit{remote_master, containerd, certs}, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + kubeletKubeConfig := KubeConfig{ + URL: fmt.Sprintf("https://[%s]:6443", c.masterNode.String()), + CACert: c.pki.TLS.CertPath(), + ClientCert: c.certs.API.CertPath(), + ClientKey: c.certs.API.KeyPath(), + } + kubeletKubeConfigPath := path.Join(configPath, "kubelet-kubeconfig.yaml") + err := utils.WriteConfig(kubeletKubeConfig, kubeletKubeConfigPath) + if err != nil { + return err + } + kubeletConfig := KubeletConfig{ + CACert: c.pki.Kubelet.CertPath(), + TLSCert: c.certs.TLS.CertPath(), + TLSKey: c.certs.TLS.KeyPath(), + } + kubeletConfigPath := path.Join(configPath, "kubelet.yaml") + err = utils.WriteConfig(kubeletConfig, kubeletConfigPath) + if err != nil { + return err + } + args := []string{ + "--kubeconfig", kubeletKubeConfigPath, + "--config", kubeletConfigPath, + "--container-runtime-endpoint", "unix:///run/containerd/containerd.sock", + } + // Parse then validate cli flags + flags := kubeletopts.NewKubeletFlags() + flagSet := pflag.NewFlagSet("kubelet", pflag.ContinueOnError) + flags.AddFlags(flagSet) + kubeletopts.AddGlobalFlags(flagSet) + err = flagSet.Parse(args) + if err != nil { + return err + } + err = kubeletopts.ValidateKubeletFlags(flags) + if err != nil { + return err + } + // Load and validate config file + loader, err := kubeletcf.NewFsLoader(&filesystem.DefaultFs{}, flags.KubeletConfigFile) + if err != nil { + return err + } + config, err := loader.Load() + if err != nil { + return err + } + err = kubeletcv.ValidateKubeletConfiguration(config, feature.DefaultFeatureGate) + if err != nil { + return err + } + // Prepare feature gates + err = feature.DefaultMutableFeatureGate.SetFromMap(config.FeatureGates) + if err != nil { + return err + } + // Build the kubelet server + server := &kubeletopts.KubeletServer{ + KubeletFlags: *flags, + KubeletConfiguration: *config, + } + deps, err := kubelet.UnsecuredDependencies(server, feature.DefaultFeatureGate) + if err != nil { + return err + } + return kubelet.Run(ctx, server, deps, feature.DefaultFeatureGate) + }, +} + +func flagsFromNamedFlagSet(name string, nfs *flag.NamedFlagSets) *pflag.FlagSet { + flags := pflag.NewFlagSet(name, pflag.ContinueOnError) + for _, f := range nfs.FlagSets { + flags.AddFlagSet(f) + } + return flags +} + +const kubeconfigTemplate = ` +apiVersion: v1 +clusters: +- cluster: + server: {{.URL}} + certificate-authority: {{.CACert}} + name: local +contexts: +- context: + cluster: local + namespace: default + user: user + name: Default +current-context: Default +kind: Config +preferences: {} +users: +- name: user + user: + client-certificate: {{.ClientCert}} + client-key: {{.ClientKey}} +` + +type KubeConfig struct { + URL string + CACert string + ClientCert string + ClientKey string +} + +func (KubeConfig) Template() string { + return kubeconfigTemplate +} + +const kubeletConfigTemplate = ` +kind: KubeletConfiguration +apiVersion: kubelet.config.k8s.io/v1beta1 +authentication: + anonymous: + enabled: false + webhook: + enabled: true + x509: + clientCAFile: "{{.CACert}}" +authorization: + mode: Webhook +clusterDomain: "cluster.local" +imageMinimumGCAge: "120h" +resolvConf: "/etc/resolv.conf" +cgroupDriver: cgroupfs +runtimeRequestTimeout: "15m" +tlsCertFile: "{{.TLSCert}}" +tlsPrivateKeyFile: "{{.TLSKey}}" +syncFrequency: 1m +fileCheckFrequency: 1m +httpCheckFrequency: 1m +nodeStatusUpdateFrequency: 20s +nodeStatusReportFrequency: 5m +` + +type KubeletConfig struct { + CACert string + TLSCert string + TLSKey string +} + +func (KubeletConfig) Template() string { + return kubeletConfigTemplate +} diff --git a/pkg/services/manager.go b/pkg/services/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..9638255b681edbd6016e2e83d211c2162db5c3ed --- /dev/null +++ b/pkg/services/manager.go @@ -0,0 +1,119 @@ +// Hepto cluster implements the main clustering logics, based on +// sml (Simple Memberlist) for cluster bootstraping and node discovery +package services + +import ( + "net" + "net/netip" + + "github.com/go-logr/logr" + "go.acides.org/hepto/pkg/daeman" + "go.acides.org/hepto/pkg/pki" + "go.acides.org/hepto/pkg/utils" + "go.acides.org/hepto/pkg/wg" + "go.uber.org/zap" +) + +type ClusterSettings struct { + // Logger interface + Logger logr.Logger + // Concrete zap logger for etcd + ZapLogger *zap.Logger + // Cluster name, should be locally unique + Name string + // Cluster key, must be shared across nodes + Key []byte +} + +type NodeSettings struct { + // Node name, must be unique inside a cluster + Name string + // Node role + Role string + // Port for initial memberlist negotiations + Port int + // Public IPv6 address for the node + IP net.IP + // Anchors for this node to join + Anchors []string +} + +type ClusterNetworking struct { + NodeNet *net.IPNet + NodeAddress *net.IPNet + ServiceNet *net.IPNet + PodNet *net.IPNet + APIAddress net.IP + MTU int +} + +type Cluster struct { + settings *ClusterSettings + node *NodeSettings + networking *ClusterNetworking + localNode *HeptoMeta + nodes []*HeptoMeta + state *HeptoState + masterNode *HeptoMeta + + certs *pki.NodeCerts + masterCerts *pki.MasterCerts + pki *pki.ClusterCA + + vpn *wg.Wireguard +} + +type Unit = daeman.Unit[*Cluster] + +var master_unit = &Unit{ + Name: "master", + Dependencies: []*Unit{memberlist, apiserver, controller_manager, kube_scheduler}, +} + +var node_unit = &Unit{ + Name: "node", + Dependencies: []*Unit{memberlist, kube_kubelet}, +} + +func NewManager(settings *ClusterSettings, node *NodeSettings, logger logr.Logger) *daeman.Manager[*Cluster] { + networking := NewClusterNetworking(settings.Name, node.Name) + cluster := &Cluster{ + settings: settings, + node: node, + networking: networking, + localNode: &HeptoMeta{ + Name: node.Name, + PublicIP: node.IP, + Role: node.Role, + VpnIP: networking.NodeAddress.IP, + }, + nodes: []*HeptoMeta{}, + state: &HeptoState{}, + } + units := []*Unit{} + switch node.Role { + case "master": + units = append(units, master_unit) + case "node": + units = append(units, node_unit) + } + return daeman.NewManager(cluster, units, logger) +} + +// Create a new cluster networking based on sane settings +// All networks use ULA private network derived deterministically +// from cluster name +func NewClusterNetworking(clusterName string, nodeName string) *ClusterNetworking { + // Cluster nodes are hosted on a /64 at :1, e.g. fd00:0:1:0::/64 + nodeNet := utils.ULA(clusterName, 64, 1) + // Current node address is derived from node name inside the cluster node net + nodeAddress := utils.DeriveAddress(nodeNet, nodeName) + // Pods are hosted on a /56 at :3, e.g. fd00:0:2:0::/56 + podNet := utils.ULA(clusterName, 56, 2) + // Services are hosted on a /112 at :3, e.g. fd00:0:0:0:0:0:0::/112 + serviceNet := utils.ULA(clusterName, 112, 3) + // API address is the first service address + apiAddress, _ := netip.AddrFromSlice(serviceNet.IP) + apiIP := net.IP(apiAddress.Next().AsSlice()) + return &ClusterNetworking{nodeNet, nodeAddress, serviceNet, podNet, apiIP, 1500} +} diff --git a/pkg/cluster/meta.go b/pkg/services/meta.go similarity index 99% rename from pkg/cluster/meta.go rename to pkg/services/meta.go index 5811cebb0d9620a2884a4604749b20abb51f44b4..c2f40b9b21b43520f52201ed87a87a7729d46fc4 100644 --- a/pkg/cluster/meta.go +++ b/pkg/services/meta.go @@ -1,4 +1,4 @@ -package cluster +package services import ( "encoding/json" diff --git a/pkg/services/vpn.go b/pkg/services/vpn.go new file mode 100644 index 0000000000000000000000000000000000000000..16b4df211ca0a31480de1e495765c38fab328528 --- /dev/null +++ b/pkg/services/vpn.go @@ -0,0 +1,31 @@ +package services + +import ( + "context" + + "go.acides.org/hepto/pkg/wg" +) + +var vpn = &Unit{ + Name: "wireguard", + Wake: func(u *Unit, c *Cluster) error { + if c.vpn == nil { + return nil + } + peers := []wg.Peer{} + for _, node := range c.nodes { + peers = append(peers, node) + } + c.vpn.Update(peers, c.networking.MTU) + return nil + }, + Run: func(u *Unit, c *Cluster, ctx context.Context) error { + vpn, err := wg.New("wg0", 7124, c.networking.NodeAddress, c.settings.Logger) + c.localNode.VpnKey = vpn.PubKey.String() + if err != nil { + return err + } + c.vpn = vpn + return nil + }, +} diff --git a/pkg/utils/config.go b/pkg/utils/config.go new file mode 100644 index 0000000000000000000000000000000000000000..79c5680f1d56e07f3aabdaa0487420fe0c865e74 --- /dev/null +++ b/pkg/utils/config.go @@ -0,0 +1,30 @@ +package utils + +import ( + "fmt" + "os" + "path" + "text/template" +) + +type WriteableConfig interface { + Template() string +} + +func WriteConfig(config WriteableConfig, dest string) error { + t, _ := template.New("template").Parse(config.Template()) + err := os.MkdirAll(path.Dir(dest), 0o755) + if err != nil { + return fmt.Errorf("could not create config dir: %v", err) + } + file, err := os.Create(dest) + if err != nil { + return fmt.Errorf("could not create config file: %v", err) + } + defer file.Close() + err = t.Execute(file, config) + if err != nil { + return fmt.Errorf("could not generate config: %v", err) + } + return nil +} diff --git a/pkg/cluster/net.go b/pkg/utils/net.go similarity index 98% rename from pkg/cluster/net.go rename to pkg/utils/net.go index 33b5f87451ea3052c1ff3f2030779f19efa1d001..926276bcf8ac8de69a8d44b94469923c725fc263 100644 --- a/pkg/cluster/net.go +++ b/pkg/utils/net.go @@ -1,4 +1,4 @@ -package cluster +package utils import ( "crypto/sha256" diff --git a/pkg/wrappers/etcd.go b/pkg/wrappers/etcd.go deleted file mode 100644 index 8d242233f35b4318031ddaac5d9f9aa094b1eecf..0000000000000000000000000000000000000000 --- a/pkg/wrappers/etcd.go +++ /dev/null @@ -1,26 +0,0 @@ -package wrappers - -import ( - "context" - - "go.etcd.io/etcd/server/v3/embed" -) - -func ETCd(ctx context.Context, config *embed.Config) (errCh chan error) { - errCh = make(chan error) - server, err := embed.StartEtcd(config) - if err != nil { - errCh <- err - return - } - go func() { - select { - case err := <-server.Err(): - errCh <- err - case <-ctx.Done(): - server.Close() - errCh <- nil - } - }() - return -} diff --git a/pkg/wrappers/kubernetes.go b/pkg/wrappers/kubernetes.go deleted file mode 100644 index 7b2c27135b11d107222922a236e0a05846ae2537..0000000000000000000000000000000000000000 --- a/pkg/wrappers/kubernetes.go +++ /dev/null @@ -1,168 +0,0 @@ -package wrappers - -import ( - "context" - - "github.com/go-logr/logr" - "github.com/spf13/pflag" - "k8s.io/apiserver/pkg/util/feature" - restclient "k8s.io/client-go/rest" - "k8s.io/component-base/cli/flag" - "k8s.io/klog/v2" - apiserver "k8s.io/kubernetes/cmd/kube-apiserver/app" - apiserveropts "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - cm "k8s.io/kubernetes/cmd/kube-controller-manager/app" - cmopts "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" - scheduler "k8s.io/kubernetes/cmd/kube-scheduler/app" - scheduleropts "k8s.io/kubernetes/cmd/kube-scheduler/app/options" - kubelet "k8s.io/kubernetes/cmd/kubelet/app" - kubeletopts "k8s.io/kubernetes/cmd/kubelet/app/options" - kubeletcv "k8s.io/kubernetes/pkg/kubelet/apis/config/validation" - kubeletcf "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" - "k8s.io/kubernetes/pkg/util/filesystem" -) - -func SetK8sLogger(logger logr.Logger) { - klog.SetLogger(logger) -} - -func ControllerManager(ctx context.Context, args []string) (errCh chan error) { - errCh = make(chan error) - s, err := cmopts.NewKubeControllerManagerOptions() - if err != nil { - errCh <- err - return - } - nfs := s.Flags(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) - flags := flagsFromNamedFlagSet("cm", &nfs) - err = flags.Parse(args) - if err != nil { - errCh <- err - return - } - restclient.SetDefaultWarningHandler(restclient.NoWarnings{}) - c, err := s.Config(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) - if err != nil { - errCh <- err - return - } - go func() { - errCh <- cm.Run(c.Complete(), ctx.Done()) - }() - return -} - -func Scheduler(ctx context.Context, args []string) (errCh chan error) { - errCh = make(chan error) - config := scheduleropts.NewOptions() - flags := flagsFromNamedFlagSet("scheduler", config.Flags) - err := flags.Parse(args) - if err != nil { - errCh <- err - return - } - cc, sched, err := scheduler.Setup(ctx, config) - if err != nil { - errCh <- err - return - } - go func() { - errCh <- scheduler.Run(ctx, cc, sched) - }() - return -} - -func Kubelet(ctx context.Context, args []string) (errCh chan error) { - errCh = make(chan error) - // Parse then validate cli flags - flags := kubeletopts.NewKubeletFlags() - flagSet := pflag.NewFlagSet("kubelet", pflag.ContinueOnError) - flags.AddFlags(flagSet) - kubeletopts.AddGlobalFlags(flagSet) - err := flagSet.Parse(args) - if err != nil { - errCh <- err - return - } - err = kubeletopts.ValidateKubeletFlags(flags) - if err != nil { - errCh <- err - return - } - // Load and validate config file - loader, err := kubeletcf.NewFsLoader(&filesystem.DefaultFs{}, flags.KubeletConfigFile) - if err != nil { - errCh <- err - return - } - config, err := loader.Load() - if err != nil { - errCh <- err - return - } - err = kubeletcv.ValidateKubeletConfiguration(config, feature.DefaultFeatureGate) - if err != nil { - errCh <- err - return - } - // Prepare feature gates - err = feature.DefaultMutableFeatureGate.SetFromMap(config.FeatureGates) - if err != nil { - errCh <- err - return - } - // Build the kubelet server - server := &kubeletopts.KubeletServer{ - KubeletFlags: *flags, - KubeletConfiguration: *config, - } - deps, err := kubelet.UnsecuredDependencies(server, feature.DefaultFeatureGate) - if err != nil { - errCh <- err - return - } - - go func() { - errCh <- kubelet.Run(ctx, server, deps, feature.DefaultFeatureGate) - }() - return -} - -func APIServer(ctx context.Context, args []string) (errCh chan error) { - errCh = make(chan error) - config := apiserveropts.NewServerRunOptions() - nfs := config.Flags() - flags := flagsFromNamedFlagSet("apiserver", &nfs) - err := flags.Parse(args) - if err != nil { - errCh <- err - return - } - completedOptions, err := apiserver.Complete(config) - if err != nil { - errCh <- err - return - } - server, err := apiserver.CreateServerChain(completedOptions) - if err != nil { - errCh <- err - return - } - prepared, err := server.PrepareRun() - if err != nil { - errCh <- err - return - } - go func() { - errCh <- prepared.Run(ctx.Done()) - }() - return -} - -func flagsFromNamedFlagSet(name string, nfs *flag.NamedFlagSets) *pflag.FlagSet { - flags := pflag.NewFlagSet(name, pflag.ContinueOnError) - for _, f := range nfs.FlagSets { - flags.AddFlagSet(f) - } - return flags -}