diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index a8bb78cc1fe25765e01a33ecf9389f3acfba332b..10ed1dfa3d635f3e3ccfae03d77d10bfdcba8f3f 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -20,6 +20,8 @@ import ( "fmt" "strconv" + "slices" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -27,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" ) // BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods. @@ -171,7 +174,8 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes( if estimationState.lastNodeName != "" { // Try to schedule the pod on only newly created node. - if err := e.clusterSnapshot.SchedulePod(pod, estimationState.lastNodeName); err == nil { + err := e.clusterSnapshot.SchedulePod(pod, estimationState.lastNodeName) + if err == nil { // The pod was scheduled on the newly created node. found = true estimationState.trackScheduledPod(pod, estimationState.lastNodeName) @@ -180,6 +184,24 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes( return false, err } // The pod can't be scheduled on the newly created node because of scheduling predicates. + + // Check if node failed because of topology constraints. + if isPodUsingHostNameTopologyKey(pod) && hasTopologyConstraintError(err) { + // If the pod can't be scheduled on the last node because of topology constraints, we can stop binpacking. + // The pod can't be scheduled on any new node either, because it has the same topology constraints. + nodeName, err := e.clusterSnapshot.SchedulePodOnAnyNodeMatching(pod, func(nodeInfo *framework.NodeInfo) bool { + return nodeInfo.Node().Name != estimationState.lastNodeName // only skip the last node that failed scheduling + }) + if err != nil && err.Type() == clustersnapshot.SchedulingInternalError { + // Unexpected error. + return false, err + } + if nodeName != "" { + // The pod was scheduled on a different node, so we can continue binpacking. + found = true + estimationState.trackScheduledPod(pod, nodeName) + } + } } if !found { @@ -240,6 +262,33 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( return nil } +// isTopologyConstraintError determines if an error is related to pod topology spread constraints +// by checking the predicate name and reasons +func hasTopologyConstraintError(err clustersnapshot.SchedulingError) bool { + if err == nil { + return false + } + + // Check reasons for mentions of topology or constraints + return slices.Contains(err.FailingPredicateReasons(), podtopologyspread.ErrReasonConstraintsNotMatch) +} + +// isPodUsingHostNameTopoKey returns true if the pod has any topology spread +// constraint that uses the kubernetes.io/hostname topology key +func isPodUsingHostNameTopologyKey(pod *apiv1.Pod) bool { + if pod == nil || pod.Spec.TopologySpreadConstraints == nil { + return false + } + + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if constraint.TopologyKey == apiv1.LabelHostname { + return true + } + } + + return false +} + func observeBinpackingHeterogeneity(podsEquivalenceGroups []PodEquivalenceGroup, nodeTemplate *framework.NodeInfo) { node := nodeTemplate.Node() var instanceType, cpuCount string diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index ac205f16ba46e6874806c352ac5bb48463856088..00a32dbf5d0ce4f4aeb4b582b0dbc0d1fd38ba9b 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -178,13 +178,13 @@ func TestBinpackingEstimate(t *testing.T) { podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( BuildTestPod( "estimatee", - 20, - 100, + 200, + 200, WithNamespace("universe"), WithLabels(map[string]string{ "app": "estimatee", }), - WithMaxSkew(2, "kubernetes.io/hostname")), 8)}, + WithMaxSkew(2, "kubernetes.io/hostname", 1)), 8)}, expectNodeCount: 4, expectPodCount: 8, }, @@ -201,10 +201,27 @@ func TestBinpackingEstimate(t *testing.T) { WithLabels(map[string]string{ "app": "estimatee", }), - WithMaxSkew(2, "topology.kubernetes.io/zone")), 8)}, + WithMaxSkew(2, "topology.kubernetes.io/zone", 1)), 8)}, expectNodeCount: 1, expectPodCount: 2, }, + { + name: "hostname topology spreading with maxSkew=1 with a large scaleup handles scheduling pods retroactively", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 20, + 100, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + }), + WithMaxSkew(1, "kubernetes.io/hostname", 3)), 12)}, + expectNodeCount: 3, + expectPodCount: 12, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index 026a4e41266e74fe6877747897a6fca1121b9861..984924715317465bce2d98cd7fcf587269db2ec7 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -160,8 +160,8 @@ func WithHostPort(hostport int32) func(*apiv1.Pod) { } } -// WithMaxSkew sets a namespace to the pod. -func WithMaxSkew(maxSkew int32, topologySpreadingKey string) func(*apiv1.Pod) { +// WithMaxSkew sets a topology spread constraint to the pod. +func WithMaxSkew(maxSkew int32, topologySpreadingKey string, minDomains int32) func(*apiv1.Pod) { return func(pod *apiv1.Pod) { if maxSkew > 0 { pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ @@ -174,6 +174,7 @@ func WithMaxSkew(maxSkew int32, topologySpreadingKey string) func(*apiv1.Pod) { "app": "estimatee", }, }, + MinDomains: &minDomains, }, } }