Skip to content
Snippets Groups Projects
Commit 3e561e6a authored by kaiyou's avatar kaiyou
Browse files

Refactor the controller manager

parent 60ac2146
No related branches found
No related tags found
No related merge requests found
...@@ -9,8 +9,11 @@ import ( ...@@ -9,8 +9,11 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/apiserver/pkg/quota/v1/generic"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/component-base/metrics/prometheus/controllers" "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/namespace"
...@@ -18,9 +21,11 @@ import ( ...@@ -18,9 +21,11 @@ import (
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
"k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/controller/podgc" "k8s.io/kubernetes/pkg/controller/podgc"
"k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/controller/resourcequota" "k8s.io/kubernetes/pkg/controller/resourcequota"
"k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/statefulset"
"k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished" "k8s.io/kubernetes/pkg/controller/ttlafterfinished"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach" "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
...@@ -47,6 +52,10 @@ var kubeControllerManager = &Unit{ ...@@ -47,6 +52,10 @@ var kubeControllerManager = &Unit{
} }
metrics := controllers.NewControllerManagerMetrics("controller-manager") metrics := controllers.NewControllerManagerMetrics("controller-manager")
//////////////////
/// Infrastructure
//////////////////
// Node ipam // Node ipam
nodeIpamController, err := nodeipam.NewNodeIpamController( nodeIpamController, err := nodeipam.NewNodeIpamController(
clients.Informer.Core().V1().Nodes(), clients.Informer.Core().V1().Nodes(),
...@@ -87,27 +96,31 @@ var kubeControllerManager = &Unit{ ...@@ -87,27 +96,31 @@ var kubeControllerManager = &Unit{
} }
go lifecycleController.Run(ctx) go lifecycleController.Run(ctx)
// TODO: persistent volume binder (complex) // TODO: improve performance if required (see vanilla code)
// Namespace controller
go namespace.NewNamespaceController(
clients.Client, clients.MetadataClient,
clients.Client.Discovery().ServerPreferredNamespacedResources,
clients.Informer.Core().V1().Namespaces(),
5*time.Minute, // namespace sync period, default
v1.FinalizerKubernetes,
).Run(1, ctx.Done())
// Ephemeral volume controller // Serice accounts controller
ephemeralController, err := ephemeral.NewController( saController, err := serviceaccount.NewServiceAccountsController(
clients.Informer.Core().V1().ServiceAccounts(),
clients.Informer.Core().V1().Namespaces(),
clients.Client, clients.Client,
clients.Informer.Core().V1().Pods(), serviceaccount.DefaultServiceAccountsControllerOptions(),
clients.Informer.Core().V1().PersistentVolumeClaims(),
) )
if err != nil { if err != nil {
return fmt.Errorf("could not initialize ephemeral volume controller: %w", err) return fmt.Errorf("could not initialize sa controller: %w", err)
} }
go ephemeralController.Run(ctx, 1) go saController.Run(ctx, 1)
// Endpoint controller //////////////////
go endpoint.NewEndpointController( /// Workloads
clients.Informer.Core().V1().Pods(), //////////////////
clients.Informer.Core().V1().Services(),
clients.Informer.Core().V1().Endpoints(),
clients.Client,
0, // batch duration
).Run(ctx, 1)
// Replication controller // Replication controller
go replication.NewReplicationManager( go replication.NewReplicationManager(
...@@ -125,6 +138,49 @@ var kubeControllerManager = &Unit{ ...@@ -125,6 +138,49 @@ var kubeControllerManager = &Unit{
12500, // terminated pod gc threshold, default 12500, // terminated pod gc threshold, default
).Run(ctx) ).Run(ctx)
// Daemonset controller
daemonsetController, err := daemon.NewDaemonSetsController(
clients.Informer.Apps().V1().DaemonSets(),
clients.Informer.Apps().V1().ControllerRevisions(),
clients.Informer.Core().V1().Pods(),
clients.Informer.Core().V1().Nodes(),
clients.Client,
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return fmt.Errorf("could not initialize daemonset controller: %w", err)
}
go daemonsetController.Run(ctx, 1)
// Statefulset controller
go statefulset.NewStatefulSetController(
clients.Informer.Core().V1().Pods(),
clients.Informer.Apps().V1().StatefulSets(),
clients.Informer.Core().V1().PersistentVolumeClaims(),
clients.Informer.Apps().V1().ControllerRevisions(),
clients.Client,
).Run(ctx, 1)
// Replicaset controller
go replicaset.NewReplicaSetController(
clients.Informer.Apps().V1().ReplicaSets(),
clients.Informer.Core().V1().Pods(),
clients.Client,
replicaset.BurstReplicas,
).Run(ctx, 1)
// Deployment
deploymentController, err := deployment.NewDeploymentController(
clients.Informer.Apps().V1().Deployments(),
clients.Informer.Apps().V1().ReplicaSets(),
clients.Informer.Core().V1().Pods(),
clients.Client,
)
if err != nil {
return fmt.Errorf("could not initialize deployment controller: %w", err)
}
go deploymentController.Run(ctx, 1)
// Resource quotas // Resource quotas
quotaConfiguration := install.NewQuotaConfigurationForControllers( quotaConfiguration := install.NewQuotaConfigurationForControllers(
generic.ListerFuncForResourceFunc(clients.Informer.ForResource), generic.ListerFuncForResourceFunc(clients.Informer.ForResource),
...@@ -147,28 +203,6 @@ var kubeControllerManager = &Unit{ ...@@ -147,28 +203,6 @@ var kubeControllerManager = &Unit{
go resourceController.Run(ctx, 1) go resourceController.Run(ctx, 1)
go resourceController.Sync(clients.Client.ServerPreferredNamespacedResources, 30*time.Second, ctx.Done()) go resourceController.Sync(clients.Client.ServerPreferredNamespacedResources, 30*time.Second, ctx.Done())
// TODO: improve performance if required (see vanilla code)
// Namespace controller
go namespace.NewNamespaceController(
clients.Client, clients.MetadataClient,
clients.Client.Discovery().ServerPreferredNamespacedResources,
clients.Informer.Core().V1().Namespaces(),
5*time.Minute, // namespace sync period, default
v1.FinalizerKubernetes,
).Run(1, ctx.Done())
// Serice accounts controller
saController, err := serviceaccount.NewServiceAccountsController(
clients.Informer.Core().V1().ServiceAccounts(),
clients.Informer.Core().V1().Namespaces(),
clients.Client,
serviceaccount.DefaultServiceAccountsControllerOptions(),
)
if err != nil {
return fmt.Errorf("could not initialize sa controller: %w", err)
}
go saController.Run(ctx, 1)
// TTL controller // TTL controller
go ttl.NewTTLController( go ttl.NewTTLController(
clients.Informer.Core().V1().Nodes(), clients.Informer.Core().V1().Nodes(),
...@@ -181,6 +215,39 @@ var kubeControllerManager = &Unit{ ...@@ -181,6 +215,39 @@ var kubeControllerManager = &Unit{
clients.Client, clients.Client,
).Run(ctx, 1) ).Run(ctx, 1)
//////////////////
/// Services
//////////////////
// Service controller is meant to manage cloud-based services, which are not supported
// Route controller is meant to manage cloud routes, which are not supported
// Endpoint controller
go endpoint.NewEndpointController(
clients.Informer.Core().V1().Pods(),
clients.Informer.Core().V1().Services(),
clients.Informer.Core().V1().Endpoints(),
clients.Client,
0, // batch duration
).Run(ctx, 1)
//////////////////
/// Storage
//////////////////
// Volume expand controller is meant for volume types that are not supported (nfs, etc.)
// Ephemeral volume controller
ephemeralController, err := ephemeral.NewController(
clients.Client,
clients.Informer.Core().V1().Pods(),
clients.Informer.Core().V1().PersistentVolumeClaims(),
)
if err != nil {
return fmt.Errorf("could not initialize ephemeral volume controller: %w", err)
}
go ephemeralController.Run(ctx, 1)
// PVC protection controller // PVC protection controller
pvcProtection, err := pvcprotection.NewPVCProtectionController( pvcProtection, err := pvcprotection.NewPVCProtectionController(
clients.Informer.Core().V1().PersistentVolumeClaims(), clients.Informer.Core().V1().PersistentVolumeClaims(),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment