From 525145c651bb5f5a05e53458bedb97a04b219666 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= <danielmk@google.com>
Date: Mon, 14 Mar 2022 15:14:56 +0100
Subject: [PATCH] Limit caching pods per owner reference

---
 cluster-autoscaler/core/equivalence_groups.go | 34 +++++++++++------
 .../core/equivalence_groups_test.go           | 27 ++++++++++++++
 .../core/filter_out_schedulable.go            |  3 +-
 .../core/utils/pod_schedulable.go             | 37 ++++++++++++++++---
 .../core/utils/pod_schedulable_test.go        | 36 +++++++++++++++++-
 cluster-autoscaler/metrics/metrics.go         | 15 ++++++++
 6 files changed, 133 insertions(+), 19 deletions(-)

diff --git a/cluster-autoscaler/core/equivalence_groups.go b/cluster-autoscaler/core/equivalence_groups.go
index dcff93ab00..485cebd05c 100644
--- a/cluster-autoscaler/core/equivalence_groups.go
+++ b/cluster-autoscaler/core/equivalence_groups.go
@@ -51,6 +51,8 @@ type equivalenceGroup struct {
 	representant *apiv1.Pod
 }
 
+const maxEquivalenceGroupsByController = 10
+
 // groupPodsBySchedulingProperties groups pods based on scheduling properties. Group ID is meaningless.
 func groupPodsBySchedulingProperties(pods []*apiv1.Pod) map[equivalenceGroupId][]*apiv1.Pod {
 	podEquivalenceGroups := map[equivalenceGroupId][]*apiv1.Pod{}
@@ -65,25 +67,33 @@ func groupPodsBySchedulingProperties(pods []*apiv1.Pod) map[equivalenceGroupId][
 			continue
 		}
 
-		matchingFound := false
-		for _, g := range equivalenceGroupsByController[controllerRef.UID] {
-			if reflect.DeepEqual(pod.Labels, g.representant.Labels) && utils.PodSpecSemanticallyEqual(pod.Spec, g.representant.Spec) {
-				matchingFound = true
-				podEquivalenceGroups[g.id] = append(podEquivalenceGroups[g.id], pod)
-				break
-			}
+		egs := equivalenceGroupsByController[controllerRef.UID]
+		if gid := match(egs, pod); gid != nil {
+			podEquivalenceGroups[*gid] = append(podEquivalenceGroups[*gid], pod)
+			continue
 		}
-
-		if !matchingFound {
+		if len(egs) < maxEquivalenceGroupsByController {
+			// Avoid too many different pods per owner reference.
 			newGroup := equivalenceGroup{
 				id:           nextGroupId,
 				representant: pod,
 			}
-			equivalenceGroupsByController[controllerRef.UID] = append(equivalenceGroupsByController[controllerRef.UID], newGroup)
-			podEquivalenceGroups[newGroup.id] = append(podEquivalenceGroups[newGroup.id], pod)
-			nextGroupId++
+			equivalenceGroupsByController[controllerRef.UID] = append(egs, newGroup)
 		}
+		podEquivalenceGroups[nextGroupId] = append(podEquivalenceGroups[nextGroupId], pod)
+		nextGroupId++
 	}
 
 	return podEquivalenceGroups
 }
+
+// match tries to find an equivalence group for a given pod and returns the
+// group id or nil if the group can't be found.
+func match(egs []equivalenceGroup, pod *apiv1.Pod) *equivalenceGroupId {
+	for _, g := range egs {
+		if reflect.DeepEqual(pod.Labels, g.representant.Labels) && utils.PodSpecSemanticallyEqual(pod.Spec, g.representant.Spec) {
+			return &g.id
+		}
+	}
+	return nil
+}
diff --git a/cluster-autoscaler/core/equivalence_groups_test.go b/cluster-autoscaler/core/equivalence_groups_test.go
index a27e60eb99..c91ab03f9d 100644
--- a/cluster-autoscaler/core/equivalence_groups_test.go
+++ b/cluster-autoscaler/core/equivalence_groups_test.go
@@ -138,3 +138,30 @@ func TestGroupSchedulablePodsForNode(t *testing.T) {
 		assert.True(t, w.found, fmt.Errorf("Expected pod group: %+v", w))
 	}
 }
+
+func TestEquivalenceGroupSizeLimiting(t *testing.T) {
+	rc := apiv1.ReplicationController{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "rc",
+			Namespace: "default",
+			SelfLink:  "api/v1/namespaces/default/replicationcontrollers/rc",
+			UID:       "12345678-1234-1234-1234-123456789012",
+		},
+	}
+	pods := make([]*apiv1.Pod, 0, maxEquivalenceGroupsByController+1)
+	for i := 0; i < maxEquivalenceGroupsByController+1; i += 1 {
+		p := BuildTestPod(fmt.Sprintf("p%d", i), 3000, 200000)
+		p.OwnerReferences = GenerateOwnerReferences(rc.Name, "ReplicationController", "extensions/v1beta1", rc.UID)
+		label := fmt.Sprintf("l%d", i)
+		if i > maxEquivalenceGroupsByController {
+			label = fmt.Sprintf("l%d", maxEquivalenceGroupsByController)
+		}
+		p.Labels = map[string]string{"uniqueLabel": label}
+		pods = append(pods, p)
+	}
+	podGroups := groupPodsBySchedulingProperties(pods)
+	assert.Equal(t, len(pods), len(podGroups))
+	for i := range podGroups {
+		assert.Equal(t, 1, len(podGroups[i]))
+	}
+}
diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go
index 58f4d62a48..24953369f5 100644
--- a/cluster-autoscaler/core/filter_out_schedulable.go
+++ b/cluster-autoscaler/core/filter_out_schedulable.go
@@ -100,7 +100,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(
 	unschedulableCandidates []*apiv1.Pod,
 	clusterSnapshot simulator.ClusterSnapshot,
 	predicateChecker simulator.PredicateChecker) ([]*apiv1.Pod, error) {
-	unschedulablePodsCache := make(utils.PodSchedulableMap)
+	unschedulablePodsCache := utils.NewPodSchedulableMap()
 
 	// Sort unschedulable pods by importance
 	sort.Slice(unschedulableCandidates, func(i, j int) bool {
@@ -172,6 +172,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(
 			unschedulablePodsCache.Set(pod, nil)
 		}
 	}
+	metrics.UpdateOverflowingControllers(unschedulablePodsCache.OverflowingControllerCount())
 	klog.V(4).Infof("%v pods were kept as unschedulable based on caching", unschedulePodsCacheHitCounter)
 	klog.V(4).Infof("%v pods marked as unschedulable can be scheduled.", len(unschedulableCandidates)-len(unschedulablePods))
 	return unschedulablePods, nil
diff --git a/cluster-autoscaler/core/utils/pod_schedulable.go b/cluster-autoscaler/core/utils/pod_schedulable.go
index 90d0ce3fc0..c3945ce53a 100644
--- a/cluster-autoscaler/core/utils/pod_schedulable.go
+++ b/cluster-autoscaler/core/utils/pod_schedulable.go
@@ -45,8 +45,21 @@ type PodSchedulableInfo struct {
 	schedulingError *simulator.PredicateError
 }
 
+const maxPodsPerOwnerRef = 10
+
 // PodSchedulableMap stores mapping from controller ref to PodSchedulableInfo
-type PodSchedulableMap map[string][]PodSchedulableInfo
+type PodSchedulableMap struct {
+	items                  map[string][]PodSchedulableInfo
+	overflowingControllers map[string]bool
+}
+
+// NewPodSchedulableMap creates a new PodSchedulableMap
+func NewPodSchedulableMap() PodSchedulableMap {
+	return PodSchedulableMap{
+		items:                  make(map[string][]PodSchedulableInfo),
+		overflowingControllers: make(map[string]bool),
+	}
+}
 
 // Match tests if given pod matches PodSchedulableInfo
 func (psi *PodSchedulableInfo) Match(pod *apiv1.Pod) bool {
@@ -54,13 +67,13 @@ func (psi *PodSchedulableInfo) Match(pod *apiv1.Pod) bool {
 }
 
 // Get returns scheduling info for given pod if matching one exists in PodSchedulableMap
-func (podMap PodSchedulableMap) Get(pod *apiv1.Pod) (*simulator.PredicateError, bool) {
+func (p PodSchedulableMap) Get(pod *apiv1.Pod) (*simulator.PredicateError, bool) {
 	ref := drain.ControllerRef(pod)
 	if ref == nil {
 		return nil, false
 	}
 	uid := string(ref.UID)
-	if infos, found := podMap[uid]; found {
+	if infos, found := p.items[uid]; found {
 		for _, info := range infos {
 			if info.Match(pod) {
 				return info.schedulingError, true
@@ -71,15 +84,29 @@ func (podMap PodSchedulableMap) Get(pod *apiv1.Pod) (*simulator.PredicateError,
 }
 
 // Set sets scheduling info for given pod in PodSchedulableMap
-func (podMap PodSchedulableMap) Set(pod *apiv1.Pod, err *simulator.PredicateError) {
+func (p PodSchedulableMap) Set(pod *apiv1.Pod, err *simulator.PredicateError) {
 	ref := drain.ControllerRef(pod)
 	if ref == nil {
 		return
 	}
 	uid := string(ref.UID)
-	podMap[uid] = append(podMap[uid], PodSchedulableInfo{
+	pm := p.items[uid]
+	if len(pm) >= maxPodsPerOwnerRef {
+		// Too many different pods per owner reference. Don't cache the
+		// entry to avoid O(N) search in Get(). It would defeat the
+		// benefits from caching anyway.
+		p.overflowingControllers[uid] = true
+		return
+	}
+	p.items[uid] = append(pm, PodSchedulableInfo{
 		spec:            pod.Spec,
 		labels:          pod.Labels,
 		schedulingError: err,
 	})
 }
+
+// OverflowingControllerCount returns the number of controllers that had too
+// many different pods to be effectively cached.
+func (p PodSchedulableMap) OverflowingControllerCount() int {
+	return len(p.overflowingControllers)
+}
diff --git a/cluster-autoscaler/core/utils/pod_schedulable_test.go b/cluster-autoscaler/core/utils/pod_schedulable_test.go
index 5757864aaf..9943f3bfef 100644
--- a/cluster-autoscaler/core/utils/pod_schedulable_test.go
+++ b/cluster-autoscaler/core/utils/pod_schedulable_test.go
@@ -17,6 +17,7 @@ limitations under the License.
 package utils
 
 import (
+	"fmt"
 	"k8s.io/autoscaler/cluster-autoscaler/simulator"
 	"testing"
 
@@ -47,7 +48,7 @@ func TestPodSchedulableMap(t *testing.T) {
 		},
 	}
 
-	pMap := make(PodSchedulableMap)
+	pMap := NewPodSchedulableMap()
 
 	podInRc1_1 := BuildTestPod("podInRc1_1", 500, 1000)
 	podInRc1_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
@@ -120,3 +121,36 @@ func TestPodSchedulableMap(t *testing.T) {
 	assert.True(t, found)
 	assert.Nil(t, err)
 }
+
+func TestPodSchedulableMapSizeLimiting(t *testing.T) {
+	rc := apiv1.ReplicationController{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "rc",
+			Namespace: "default",
+			SelfLink:  "api/v1/namespaces/default/replicationcontrollers/rc",
+			UID:       "12345678-1234-1234-1234-123456789012",
+		},
+	}
+	pMap := NewPodSchedulableMap()
+	pods := make([]*apiv1.Pod, 0, maxPodsPerOwnerRef+1)
+	for i := 0; i < maxPodsPerOwnerRef+1; i += 1 {
+		p := BuildTestPod(fmt.Sprintf("p%d", i), 3000, 200000)
+		p.OwnerReferences = GenerateOwnerReferences(rc.Name, "ReplicationController", "extensions/v1beta1", rc.UID)
+		p.Labels = map[string]string{"uniqueLabel": fmt.Sprintf("l%d", i)}
+		pods = append(pods, p)
+		_, found := pMap.Get(p)
+		assert.False(t, found)
+	}
+	for _, p := range pods {
+		pMap.Set(p, nil)
+	}
+	for i, p := range pods {
+		_, found := pMap.Get(p)
+		if i != len(pods)-1 {
+			assert.True(t, found)
+		} else {
+			assert.False(t, found)
+		}
+	}
+	assert.Equal(t, 1, pMap.OverflowingControllerCount())
+}
diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go
index 7c8b643ac7..23ea180ca4 100644
--- a/cluster-autoscaler/metrics/metrics.go
+++ b/cluster-autoscaler/metrics/metrics.go
@@ -304,6 +304,14 @@ var (
 		},
 	)
 
+	overflowingControllersCount = k8smetrics.NewGauge(
+		&k8smetrics.GaugeOpts{
+			Namespace: caNamespace,
+			Name:      "overflowing_controllers_count",
+			Help:      "Number of controllers that own a large set of heterogenous pods, preventing CA from treating these pods as equivalent.",
+		},
+	)
+
 	/**** Metrics related to NodeAutoprovisioning ****/
 	napEnabled = k8smetrics.NewGauge(
 		&k8smetrics.GaugeOpts{
@@ -355,6 +363,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
 	legacyregistry.MustRegister(unremovableNodesCount)
 	legacyregistry.MustRegister(scaleDownInCooldown)
 	legacyregistry.MustRegister(oldUnregisteredNodesRemovedCount)
+	legacyregistry.MustRegister(overflowingControllersCount)
 	legacyregistry.MustRegister(napEnabled)
 	legacyregistry.MustRegister(nodeGroupCreationCount)
 	legacyregistry.MustRegister(nodeGroupDeletionCount)
@@ -532,3 +541,9 @@ func UpdateScaleDownInCooldown(inCooldown bool) {
 func RegisterOldUnregisteredNodesRemoved(nodesCount int) {
 	oldUnregisteredNodesRemovedCount.Add(float64(nodesCount))
 }
+
+// UpdateOverflowingControllers sets the number of controllers that could not
+// have their pods cached.
+func UpdateOverflowingControllers(count int) {
+	overflowingControllersCount.Set(float64(count))
+}
-- 
GitLab