diff --git a/services/cm.go b/services/cm.go index 326c422e2ceef81ba4004c80efefe13d43725acf..d4d37b6105dd0912ff7e610f8e75d3b004721bd8 100644 --- a/services/cm.go +++ b/services/cm.go @@ -9,8 +9,11 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/quota/v1/generic" + "k8s.io/client-go/util/flowcontrol" "k8s.io/component-base/metrics/prometheus/controllers" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/daemon" + "k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/namespace" @@ -18,9 +21,11 @@ import ( "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/controller/podgc" + "k8s.io/kubernetes/pkg/controller/replicaset" "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/controller/resourcequota" "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controller/statefulset" "k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttlafterfinished" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" @@ -47,6 +52,10 @@ var kubeControllerManager = &Unit{ } metrics := controllers.NewControllerManagerMetrics("controller-manager") + ////////////////// + /// Infrastructure + ////////////////// + // Node ipam nodeIpamController, err := nodeipam.NewNodeIpamController( clients.Informer.Core().V1().Nodes(), @@ -87,27 +96,31 @@ var kubeControllerManager = &Unit{ } go lifecycleController.Run(ctx) - // TODO: persistent volume binder (complex) + // TODO: improve performance if required (see vanilla code) + // Namespace controller + go namespace.NewNamespaceController( + clients.Client, clients.MetadataClient, + clients.Client.Discovery().ServerPreferredNamespacedResources, + clients.Informer.Core().V1().Namespaces(), + 5*time.Minute, // namespace sync period, default + v1.FinalizerKubernetes, + ).Run(1, ctx.Done()) - // Ephemeral volume controller - ephemeralController, err := ephemeral.NewController( + // Serice accounts controller + saController, err := serviceaccount.NewServiceAccountsController( + clients.Informer.Core().V1().ServiceAccounts(), + clients.Informer.Core().V1().Namespaces(), clients.Client, - clients.Informer.Core().V1().Pods(), - clients.Informer.Core().V1().PersistentVolumeClaims(), + serviceaccount.DefaultServiceAccountsControllerOptions(), ) if err != nil { - return fmt.Errorf("could not initialize ephemeral volume controller: %w", err) + return fmt.Errorf("could not initialize sa controller: %w", err) } - go ephemeralController.Run(ctx, 1) + go saController.Run(ctx, 1) - // Endpoint controller - go endpoint.NewEndpointController( - clients.Informer.Core().V1().Pods(), - clients.Informer.Core().V1().Services(), - clients.Informer.Core().V1().Endpoints(), - clients.Client, - 0, // batch duration - ).Run(ctx, 1) + ////////////////// + /// Workloads + ////////////////// // Replication controller go replication.NewReplicationManager( @@ -125,6 +138,49 @@ var kubeControllerManager = &Unit{ 12500, // terminated pod gc threshold, default ).Run(ctx) + // Daemonset controller + daemonsetController, err := daemon.NewDaemonSetsController( + clients.Informer.Apps().V1().DaemonSets(), + clients.Informer.Apps().V1().ControllerRevisions(), + clients.Informer.Core().V1().Pods(), + clients.Informer.Core().V1().Nodes(), + clients.Client, + flowcontrol.NewBackOff(1*time.Second, 15*time.Minute), + ) + if err != nil { + return fmt.Errorf("could not initialize daemonset controller: %w", err) + } + go daemonsetController.Run(ctx, 1) + + // Statefulset controller + go statefulset.NewStatefulSetController( + clients.Informer.Core().V1().Pods(), + clients.Informer.Apps().V1().StatefulSets(), + clients.Informer.Core().V1().PersistentVolumeClaims(), + clients.Informer.Apps().V1().ControllerRevisions(), + clients.Client, + ).Run(ctx, 1) + + // Replicaset controller + go replicaset.NewReplicaSetController( + clients.Informer.Apps().V1().ReplicaSets(), + clients.Informer.Core().V1().Pods(), + clients.Client, + replicaset.BurstReplicas, + ).Run(ctx, 1) + + // Deployment + deploymentController, err := deployment.NewDeploymentController( + clients.Informer.Apps().V1().Deployments(), + clients.Informer.Apps().V1().ReplicaSets(), + clients.Informer.Core().V1().Pods(), + clients.Client, + ) + if err != nil { + return fmt.Errorf("could not initialize deployment controller: %w", err) + } + go deploymentController.Run(ctx, 1) + // Resource quotas quotaConfiguration := install.NewQuotaConfigurationForControllers( generic.ListerFuncForResourceFunc(clients.Informer.ForResource), @@ -147,28 +203,6 @@ var kubeControllerManager = &Unit{ go resourceController.Run(ctx, 1) go resourceController.Sync(clients.Client.ServerPreferredNamespacedResources, 30*time.Second, ctx.Done()) - // TODO: improve performance if required (see vanilla code) - // Namespace controller - go namespace.NewNamespaceController( - clients.Client, clients.MetadataClient, - clients.Client.Discovery().ServerPreferredNamespacedResources, - clients.Informer.Core().V1().Namespaces(), - 5*time.Minute, // namespace sync period, default - v1.FinalizerKubernetes, - ).Run(1, ctx.Done()) - - // Serice accounts controller - saController, err := serviceaccount.NewServiceAccountsController( - clients.Informer.Core().V1().ServiceAccounts(), - clients.Informer.Core().V1().Namespaces(), - clients.Client, - serviceaccount.DefaultServiceAccountsControllerOptions(), - ) - if err != nil { - return fmt.Errorf("could not initialize sa controller: %w", err) - } - go saController.Run(ctx, 1) - // TTL controller go ttl.NewTTLController( clients.Informer.Core().V1().Nodes(), @@ -181,6 +215,39 @@ var kubeControllerManager = &Unit{ clients.Client, ).Run(ctx, 1) + ////////////////// + /// Services + ////////////////// + + // Service controller is meant to manage cloud-based services, which are not supported + // Route controller is meant to manage cloud routes, which are not supported + + // Endpoint controller + go endpoint.NewEndpointController( + clients.Informer.Core().V1().Pods(), + clients.Informer.Core().V1().Services(), + clients.Informer.Core().V1().Endpoints(), + clients.Client, + 0, // batch duration + ).Run(ctx, 1) + + ////////////////// + /// Storage + ////////////////// + + // Volume expand controller is meant for volume types that are not supported (nfs, etc.) + + // Ephemeral volume controller + ephemeralController, err := ephemeral.NewController( + clients.Client, + clients.Informer.Core().V1().Pods(), + clients.Informer.Core().V1().PersistentVolumeClaims(), + ) + if err != nil { + return fmt.Errorf("could not initialize ephemeral volume controller: %w", err) + } + go ephemeralController.Run(ctx, 1) + // PVC protection controller pvcProtection, err := pvcprotection.NewPVCProtectionController( clients.Informer.Core().V1().PersistentVolumeClaims(),