package cluster import ( "context" "fmt" "net" "forge.tedomum.net/acides/hepto/hepto/pkg/pki" "forge.tedomum.net/acides/hepto/hepto/pkg/wrappers" "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" ) type ClusterServices struct { ctx context.Context started bool done chan struct{} } func NewClusterServices() *ClusterServices { return &ClusterServices{ ctx: context.Background(), done: make(chan struct{}), } } func (s *ClusterServices) watch(ctx context.Context) { <-ctx.Done() s.done <- struct{}{} } func (s *ClusterServices) Done() <-chan struct{} { return s.done } func (s *ClusterServices) Update(c *Cluster) { if s.started { return } if c.node.Role == Master { s.startEtcd() s.startK8sMaster(c.networking, c.pki, c.masterCerts) s.started = true } else if c.node.Role == Node { // Bail if CA are not discovered or certificates are not ready if c.pki.TLS.Cert == nil || c.pki.Kubelet.Cert == nil { return } if c.certs.TLS.Cert == nil || c.certs.API.Cert == nil { return } logrus.Debug("certificates are...", c.pki.TLS.Cert, c.pki.API.Cert) logrus.Debug("looking for master node...") for _, node := range c.ml.Nodes() { if node.NodeMeta.Role == string(Master) { s.startK8sNode(node.NodeMeta.VpnIP, c.pki, c.certs) s.started = true } } } } func (s *ClusterServices) startEtcd() { etcdConfig := embed.NewConfig() etcdConfig.Dir = "/etcd" service, err := wrappers.ETCd(s.ctx, etcdConfig) if err != nil { logrus.Fatal(err) } go s.watch(service) } func (s *ClusterServices) startK8sMaster(net *ClusterNetworking, ca *pki.ClusterCA, certs *pki.MasterCerts) { api, err := wrappers.APIServer(s.ctx, []string{ "--bind-address", net.NodeAddress.IP.String(), "--service-cluster-ip-range", net.ServiceNet.String(), "--tls-cert-file", certs.TLS.CertPath(), "--tls-private-key-file", certs.TLS.KeyPath(), "--client-ca-file", ca.API.CertPath(), "--kubelet-certificate-authority", ca.Kubelet.CertPath(), "--kubelet-client-certificate", certs.Kubelet.CertPath(), "--kubelet-client-key", certs.Kubelet.KeyPath(), "--etcd-servers", "http://localhost:2379", "--service-account-signing-key-file", certs.Tokens.KeyPath(), "--service-account-key-file", certs.Tokens.KeyPath(), "--service-account-issuer", "https://kubernetes.default.svc.cluster.local", "--api-audiences", "https://kubernetes.default.svc.cluster.local", "--authorization-mode", "Node,RBAC", }) if err != nil { logrus.Fatal(err) } cmConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", net.NodeAddress.IP.String()), CACert: ca.TLS.CertPath(), ClientCert: certs.ControllersAPI.CertPath(), ClientKey: certs.ControllersAPI.KeyPath(), } cmConfigPath := "/etc/k8s/controller-manager.yaml" WriteConfig(cmConfig, cmConfigPath) cm, err := wrappers.ControllerManager(s.ctx, []string{ "--kubeconfig", cmConfigPath, "--tls-cert-file", certs.ControllersTLS.CertPath(), "--tls-private-key-file", certs.ControllersTLS.KeyPath(), "--service-account-private-key-file", certs.Tokens.KeyPath(), "--use-service-account-credentials", }) if err != nil { logrus.Fatal(err) } schedulerConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", net.NodeAddress.IP.String()), CACert: ca.TLS.CertPath(), ClientCert: certs.SchedulerAPI.CertPath(), ClientKey: certs.SchedulerAPI.KeyPath(), } schedulerConfigPath := "/etc/k8s/scheduler.yaml" WriteConfig(schedulerConfig, schedulerConfigPath) scheduler, err := wrappers.Scheduler(s.ctx, []string{ "--kubeconfig", schedulerConfigPath, }) if err != nil { logrus.Fatal(err) } go s.watch(api) go s.watch(cm) go s.watch(scheduler) } func (s *ClusterServices) startK8sNode(master net.IP, ca *pki.ClusterCA, certs *pki.NodeCerts) { // Containerd containerdConfig := ContainerdConfig{ RootDir: "/containerd", Socket: "/containerd.sock", } containerdConfigPath := "/etc/containerd/config.toml" WriteConfig(containerdConfig, containerdConfigPath) containerd, err := wrappers.Containerd(s.ctx, []string{ "--config", containerdConfigPath, }) if err != nil { logrus.Fatal("could not start containerd:", err) } // Kubelet kubeletKubeConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", master.String()), CACert: ca.TLS.CertPath(), ClientCert: certs.API.CertPath(), ClientKey: certs.API.KeyPath(), } kubeletKubeConfigPath := "/etc/k8s/kubelet-kubeconfig.yaml" WriteConfig(kubeletKubeConfig, kubeletKubeConfigPath) kubeletConfig := KubeletConfig{ CACert: ca.Kubelet.CertPath(), TLSCert: certs.TLS.CertPath(), TLSKey: certs.TLS.KeyPath(), } kubeletConfigPath := "/etc/k8s/kubelet.yaml" WriteConfig(kubeletConfig, kubeletConfigPath) kubelet, err := wrappers.Kubelet(s.ctx, []string{ "--kubeconfig", kubeletKubeConfigPath, "--config", kubeletConfigPath, "--container-runtime-endpoint", "unix://" + containerdConfig.Socket, }) if err != nil { logrus.Fatal(err) } go s.watch(containerd) go s.watch(kubelet) }