Skip to content
Snippets Groups Projects
services.go 4.94 KiB
package cluster

import (
	"context"
	"fmt"
	"net"

	"forge.tedomum.net/acides/hepto/hepto/pkg/pki"
	"forge.tedomum.net/acides/hepto/hepto/pkg/wrappers"
	"github.com/sirupsen/logrus"
	"go.etcd.io/etcd/server/v3/embed"
)

type ClusterServices struct {
	ctx     context.Context
	started bool
	done    chan struct{}
}

func NewClusterServices() *ClusterServices {
	return &ClusterServices{
		ctx:  context.Background(),
		done: make(chan struct{}),
	}
}

func (s *ClusterServices) watch(ctx context.Context) {
	<-ctx.Done()
	s.done <- struct{}{}
}

func (s *ClusterServices) Done() <-chan struct{} {
	return s.done
}

func (s *ClusterServices) Update(c *Cluster) {
	if s.started {
		return
	}
	if c.node.Role == Master {
		s.startEtcd()
		s.startK8sMaster(c.networking, c.pki, c.masterCerts)
		s.started = true
	} else if c.node.Role == Node {
		// Bail if CA are not discovered or certificates are not ready
		if c.pki.TLS.Cert == nil || c.pki.Kubelet.Cert == nil {
			return
		}
		if c.certs.TLS.Cert == nil || c.certs.API.Cert == nil {
			return
		}
		logrus.Debug("certificates are...", c.pki.TLS.Cert, c.pki.API.Cert)
		logrus.Debug("looking for master node...")
		for _, node := range c.ml.Nodes() {
			if node.NodeMeta.Role == string(Master) {
				s.startK8sNode(node.NodeMeta.VpnIP, c.pki, c.certs)
				s.started = true
			}
		}
	}
}

func (s *ClusterServices) startEtcd() {
	etcdConfig := embed.NewConfig()
	etcdConfig.Dir = "/etcd"
	service, err := wrappers.ETCd(s.ctx, etcdConfig)
	if err != nil {
		logrus.Fatal(err)
	}
	go s.watch(service)
}

func (s *ClusterServices) startK8sMaster(net *ClusterNetworking, ca *pki.ClusterCA, certs *pki.MasterCerts) {
	api, err := wrappers.APIServer(s.ctx, []string{
		"--bind-address", net.NodeAddress.IP.String(),
		"--service-cluster-ip-range", net.ServiceNet.String(),
		"--tls-cert-file", certs.TLS.CertPath(),
		"--tls-private-key-file", certs.TLS.KeyPath(),
		"--client-ca-file", ca.API.CertPath(),
		"--kubelet-certificate-authority", ca.Kubelet.CertPath(),
		"--kubelet-client-certificate", certs.Kubelet.CertPath(),
		"--kubelet-client-key", certs.Kubelet.KeyPath(),
		"--etcd-servers", "http://localhost:2379",
		"--service-account-signing-key-file", certs.Tokens.KeyPath(),
		"--service-account-key-file", certs.Tokens.KeyPath(),
		"--service-account-issuer", "https://kubernetes.default.svc.cluster.local",
		"--api-audiences", "https://kubernetes.default.svc.cluster.local",
		"--authorization-mode", "Node,RBAC",
	})
	if err != nil {
		logrus.Fatal(err)
	}
	cmConfig := KubeConfig{
		URL:        fmt.Sprintf("https://[%s]:6443", net.NodeAddress.IP.String()),
		CACert:     ca.TLS.CertPath(),
		ClientCert: certs.ControllersAPI.CertPath(),
		ClientKey:  certs.ControllersAPI.KeyPath(),
	}
	cmConfigPath := "/etc/k8s/controller-manager.yaml"
  WriteConfig(cmConfig, cmConfigPath)
	cm, err := wrappers.ControllerManager(s.ctx, []string{
		"--kubeconfig", cmConfigPath,
		"--tls-cert-file", certs.ControllersTLS.CertPath(),
		"--tls-private-key-file", certs.ControllersTLS.KeyPath(),
		"--service-account-private-key-file", certs.Tokens.KeyPath(),
		"--use-service-account-credentials",
	})
	if err != nil {
		logrus.Fatal(err)
	}
	schedulerConfig := KubeConfig{
		URL:        fmt.Sprintf("https://[%s]:6443", net.NodeAddress.IP.String()),
		CACert:     ca.TLS.CertPath(),
		ClientCert: certs.SchedulerAPI.CertPath(),
		ClientKey:  certs.SchedulerAPI.KeyPath(),
	}
	schedulerConfigPath := "/etc/k8s/scheduler.yaml"
	WriteConfig(schedulerConfig, schedulerConfigPath)
	scheduler, err := wrappers.Scheduler(s.ctx, []string{
		"--kubeconfig", schedulerConfigPath,
	})
	if err != nil {
		logrus.Fatal(err)
	}
	go s.watch(api)
	go s.watch(cm)
	go s.watch(scheduler)
}

func (s *ClusterServices) startK8sNode(master net.IP, ca *pki.ClusterCA, certs *pki.NodeCerts) {
	// Containerd
	containerdConfig := ContainerdConfig{
		RootDir: "/containerd",
		Socket:  "/containerd.sock",
	}
	containerdConfigPath := "/etc/containerd/config.toml"
	WriteConfig(containerdConfig, containerdConfigPath)
	containerd, err := wrappers.Containerd(s.ctx, []string{
		"--config", containerdConfigPath,
	})
	if err != nil {
		logrus.Fatal("could not start containerd:", err)
	}
	// Kubelet
	kubeletKubeConfig := KubeConfig{
		URL:        fmt.Sprintf("https://[%s]:6443", master.String()),
		CACert:     ca.TLS.CertPath(),
		ClientCert: certs.API.CertPath(),
		ClientKey:  certs.API.KeyPath(),
	}
	kubeletKubeConfigPath := "/etc/k8s/kubelet-kubeconfig.yaml"
	WriteConfig(kubeletKubeConfig, kubeletKubeConfigPath)
	kubeletConfig := KubeletConfig{
		CACert:  ca.Kubelet.CertPath(),
		TLSCert: certs.TLS.CertPath(),
		TLSKey:  certs.TLS.KeyPath(),
	}
	kubeletConfigPath := "/etc/k8s/kubelet.yaml"
	WriteConfig(kubeletConfig, kubeletConfigPath)
	kubelet, err := wrappers.Kubelet(s.ctx, []string{
		"--kubeconfig", kubeletKubeConfigPath,
		"--config", kubeletConfigPath,
		"--container-runtime-endpoint", "unix://" + containerdConfig.Socket,
	})
	if err != nil {
		logrus.Fatal(err)
	}
	go s.watch(containerd)
	go s.watch(kubelet)
}