diff --git a/pkg/cluster/services.go b/pkg/cluster/services.go index 4b8665a2a54812aaa7343f0a36e43dabad87cc88..e94b10c54f8a2f5d9cda2a33ceaf1bddef77519c 100644 --- a/pkg/cluster/services.go +++ b/pkg/cluster/services.go @@ -1,7 +1,6 @@ package cluster import ( - "context" "fmt" "net" "os" @@ -18,9 +17,11 @@ const binPath = "/bin" const containerdPath = "/containerd" const imagePath = "/images" -func (c *Cluster) watchService(ctx context.Context) { - <-ctx.Done() - c.servicesDone <- struct{}{} +func (c *Cluster) watchService(name string, errCh <-chan error) { + c.settings.Logger.Info("service started", "name", name) + err := <- errCh + c.settings.Logger.Error(err, "service failed unexpectedly", "name", name) + c.servicesDone <- struct{}{} } func (c *Cluster) updateServices() { @@ -79,17 +80,12 @@ func (c *Cluster) startEtcd() { etcdConfig.Dir = etcdPath etcdConfig.AuthToken = fmt.Sprintf("jwt,priv-key=%s,sign-method=ES256", c.masterCerts.EtcdTokens.KeyPath()) etcdConfig.ZapLoggerBuilder = embed.NewZapLoggerBuilder(c.settings.ZapLogger) - service, err := wrappers.ETCd(c.serviceCtx, etcdConfig) - if err != nil { - c.settings.Logger.Error(err, "failed to start etcd") - os.Exit(1) - } - go c.watchService(service) + go c.watchService("etcd", wrappers.ETCd(c.serviceCtx, etcdConfig)) } func (c *Cluster) startK8sMaster() { wrappers.SetK8sLogger(c.settings.Logger) - api, err := wrappers.APIServer(c.serviceCtx, []string{ + go c.watchService("kube-apiserver", wrappers.APIServer(c.serviceCtx, []string{ "--bind-address", c.networking.NodeAddress.IP.String(), "--service-cluster-ip-range", c.networking.ServiceNet.String(), "--tls-cert-file", c.masterCerts.TLS.CertPath(), @@ -105,11 +101,7 @@ func (c *Cluster) startK8sMaster() { "--api-audiences", "https://kubernetes.default.svc.cluster.local", "--authorization-mode", "Node,RBAC", "--allow-privileged", "true", - }) - if err != nil { - c.settings.Logger.Error(err, "failed to start APIserver") - os.Exit(1) - } + })) // TODO wait for APIserver to be ready before starting other components cmConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), @@ -119,7 +111,7 @@ func (c *Cluster) startK8sMaster() { } cmConfigPath := path.Join(configPath, "controller-manager.yaml") c.WriteConfig(cmConfig, cmConfigPath) - cm, err := wrappers.ControllerManager(c.serviceCtx, []string{ + go c.watchService("controller-manager", wrappers.ControllerManager(c.serviceCtx, []string{ "--kubeconfig", cmConfigPath, "--tls-cert-file", c.masterCerts.ControllersTLS.CertPath(), "--tls-private-key-file", c.masterCerts.ControllersTLS.KeyPath(), @@ -129,11 +121,7 @@ func (c *Cluster) startK8sMaster() { // unnecessary cloud controllers "--cloud-provider", "external", "--use-service-account-credentials", - }) - if err != nil { - c.settings.Logger.Error(err, "failed to start controller manager") - os.Exit(1) - } + })) schedulerConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), CACert: c.pki.TLS.CertPath(), @@ -142,16 +130,9 @@ func (c *Cluster) startK8sMaster() { } schedulerConfigPath := path.Join(configPath, "scheduler.yaml") c.WriteConfig(schedulerConfig, schedulerConfigPath) - scheduler, err := wrappers.Scheduler(c.serviceCtx, []string{ + go c.watchService("kube-scheduler", wrappers.Scheduler(c.serviceCtx, []string{ "--kubeconfig", schedulerConfigPath, - }) - if err != nil { - c.settings.Logger.Error(err, "failed to start scheduler") - os.Exit(1) - } - go c.watchService(api) - go c.watchService(cm) - go c.watchService(scheduler) + })) // Write a kubelet config for debugging purposes rootConfig := KubeConfig{ URL: fmt.Sprintf("https://[%s]:6443", c.networking.NodeAddress.IP.String()), @@ -178,13 +159,9 @@ func (c *Cluster) startK8sNode(masterIP net.IP) { } containerdConfigPath := path.Join(configPath, "containerd.toml") c.WriteConfig(containerdConfig, containerdConfigPath) - containerd, err := wrappers.Containerd(c.serviceCtx, []string{ + go c.watchService("containerd", wrappers.Containerd(c.serviceCtx, []string{ "--config", containerdConfigPath, - }) - if err != nil { - c.settings.Logger.Error(err, "failed to start containerd") - os.Exit(1) - } + })) // Wait for containerd to settle down time.Sleep(5 * time.Second) // Kubelet @@ -203,15 +180,9 @@ func (c *Cluster) startK8sNode(masterIP net.IP) { } kubeletConfigPath := path.Join(configPath, "kubelet.yaml") c.WriteConfig(kubeletConfig, kubeletConfigPath) - kubelet, err := wrappers.Kubelet(c.serviceCtx, []string{ + go c.watchService("kubelet", wrappers.Kubelet(c.serviceCtx, []string{ "--kubeconfig", kubeletKubeConfigPath, "--config", kubeletConfigPath, "--container-runtime-endpoint", "unix://" + containerdConfig.Socket, - }) - if err != nil { - c.settings.Logger.Error(err, "failed to start kubelet") - os.Exit(1) - } - go c.watchService(containerd) - go c.watchService(kubelet) + })) } diff --git a/pkg/wrappers/containerd.go b/pkg/wrappers/containerd.go index d3860f649ddad27b5b07715af10874df9d311b2c..4d43381cad45cebf7f76c9fe630776e4a07a8d67 100644 --- a/pkg/wrappers/containerd.go +++ b/pkg/wrappers/containerd.go @@ -32,7 +32,8 @@ import ( _ "github.com/containerd/containerd/snapshots/overlay/plugin" ) -func Containerd(ctx context.Context, args []string) (context.Context, error) { +func Containerd(ctx context.Context, args []string) (errCh chan error) { + errCh = make(chan error) app := command.App() args = append([]string{"containerd"}, args...) // Disable ttrpc plugins when running the main server @@ -42,12 +43,8 @@ func Containerd(ctx context.Context, args []string) (context.Context, error) { for _, plug := range plugins { plug.Disable = strings.HasPrefix(plug.URI(), "io.containerd.ttrpc") } - - newCtx, cancel := context.WithCancel(ctx) go func() { - app.Run(args) - cancel() + errCh <- app.Run(args) }() - - return newCtx, nil + return } diff --git a/pkg/wrappers/etcd.go b/pkg/wrappers/etcd.go index 29488be61a8dd55d69c891122775ad78aece870b..e8ad75cb1872027d57ca8010f3ad88145ec59560 100644 --- a/pkg/wrappers/etcd.go +++ b/pkg/wrappers/etcd.go @@ -6,22 +6,21 @@ import ( "go.etcd.io/etcd/server/v3/embed" ) -func ETCd(ctx context.Context, config *embed.Config) (context.Context, error) { +func ETCd(ctx context.Context, config *embed.Config) (errCh chan error) { + errCh = make(chan error) server, err := embed.StartEtcd(config) if err != nil { - return nil, err + errCh <- err + return } - newCtx, cancel := context.WithCancel(ctx) - go func() { select { - case <-server.Err(): - cancel() + case err := <-server.Err(): + errCh <- err case <-ctx.Done(): server.Close() - cancel() + errCh <- nil } }() - - return newCtx, nil + return } diff --git a/pkg/wrappers/kubernetes.go b/pkg/wrappers/kubernetes.go index 94e356110a5c95ea79e60967bc85fd13f1ad8834..fc938336691e583d784c54cf42341c9353dd521d 100644 --- a/pkg/wrappers/kubernetes.go +++ b/pkg/wrappers/kubernetes.go @@ -26,54 +26,54 @@ func SetK8sLogger(logger logr.Logger) { klog.SetLogger(logger) } -func ControllerManager(ctx context.Context, args []string) (context.Context, error) { +func ControllerManager(ctx context.Context, args []string) (errCh chan error) { + errCh = make(chan error) s, err := cmopts.NewKubeControllerManagerOptions() if err != nil { - return nil, err + errCh <- err + return } nfs := s.Flags(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) flags := flagsFromNamedFlagSet("cm", &nfs) err = flags.Parse(args) if err != nil { - return nil, err + errCh <- err + return } restclient.SetDefaultWarningHandler(restclient.NoWarnings{}) c, err := s.Config(cm.KnownControllers(), cm.ControllersDisabledByDefault.List()) if err != nil { - return nil, err + errCh <- err + return } - - newCtx, cancel := context.WithCancel(ctx) go func() { - cm.Run(c.Complete(), ctx.Done()) - cancel() + errCh <- cm.Run(c.Complete(), ctx.Done()) }() - - return newCtx, nil + return } -func Scheduler(ctx context.Context, args []string) (context.Context, error) { +func Scheduler(ctx context.Context, args []string) (errCh chan error) { + errCh = make(chan error) config := scheduleropts.NewOptions() flags := flagsFromNamedFlagSet("scheduler", config.Flags) err := flags.Parse(args) if err != nil { - return nil, err + errCh <- err + return } cc, sched, err := scheduler.Setup(ctx, config) if err != nil { - return nil, err + errCh <- err + return } - - newCtx, cancel := context.WithCancel(ctx) go func() { - scheduler.Run(ctx, cc, sched) - cancel() + errCh <- scheduler.Run(ctx, cc, sched) }() - - return newCtx, nil + return } -func Kubelet(ctx context.Context, args []string) (context.Context, error) { +func Kubelet(ctx context.Context, args []string) (errCh chan error) { + errCh = make(chan error) // Parse then validate cli flags flags := kubeletopts.NewKubeletFlags() flagSet := pflag.NewFlagSet("kubelet", pflag.ContinueOnError) @@ -81,29 +81,35 @@ func Kubelet(ctx context.Context, args []string) (context.Context, error) { kubeletopts.AddGlobalFlags(flagSet) err := flagSet.Parse(args) if err != nil { - return nil, err + errCh <- err + return } err = kubeletopts.ValidateKubeletFlags(flags) if err != nil { - return nil, err + errCh <- err + return } // Load and validate config file loader, err := kubeletcf.NewFsLoader(&filesystem.DefaultFs{}, flags.KubeletConfigFile) if err != nil { - return nil, err + errCh <- err + return } config, err := loader.Load() if err != nil { - return nil, err + errCh <- err + return } err = kubeletcv.ValidateKubeletConfiguration(config, feature.DefaultFeatureGate) if err != nil { - return nil, err + errCh <- err + return } // Prepare feature gates err = feature.DefaultMutableFeatureGate.SetFromMap(config.FeatureGates) if err != nil { - return nil, err + errCh <- err + return } // Build the kubelet server server := &kubeletopts.KubeletServer{ @@ -112,46 +118,45 @@ func Kubelet(ctx context.Context, args []string) (context.Context, error) { } deps, err := kubelet.UnsecuredDependencies(server, feature.DefaultFeatureGate) if err != nil { - return nil, err + errCh <- err + return } - newCtx, cancel := context.WithCancel(ctx) go func() { - kubelet.Run(newCtx, server, deps, feature.DefaultFeatureGate) - cancel() + errCh <- kubelet.Run(ctx, server, deps, feature.DefaultFeatureGate) }() - - return newCtx, nil + return } -func APIServer(ctx context.Context, args []string) (context.Context, error) { +func APIServer(ctx context.Context, args []string) (errCh chan error) { + errCh = make(chan error) config := apiserveropts.NewServerRunOptions() nfs := config.Flags() flags := flagsFromNamedFlagSet("apiserver", &nfs) err := flags.Parse(args) if err != nil { - return nil, err + errCh <- err + return } completedOptions, err := apiserver.Complete(config) if err != nil { - return nil, err + errCh <- err + return } server, err := apiserver.CreateServerChain(completedOptions) if err != nil { - return nil, err + errCh <- err + return } prepared, err := server.PrepareRun() if err != nil { - return nil, err + errCh <- err + return } - - newCtx, cancel := context.WithCancel(ctx) go func() { - prepared.Run(ctx.Done()) - cancel() + errCh <- prepared.Run(ctx.Done()) }() - - return newCtx, nil + return } func flagsFromNamedFlagSet(name string, nfs *flag.NamedFlagSets) *pflag.FlagSet {