From b07e1e4c70b993a5724a3c584a161e8a8e1f8a1e Mon Sep 17 00:00:00 2001 From: Omran <aaomran@google.com> Date: Wed, 7 May 2025 21:22:32 +0000 Subject: [PATCH] Handle node readiness for DRA after a scale-up --- cluster-autoscaler/core/static_autoscaler.go | 24 +- .../core/static_autoscaler_dra_test.go | 35 +- .../custom_resources_processor.go | 8 +- .../default_custom_processor.go | 70 +++ .../default_custom_processor_test.go | 198 +++++++++ .../customresources/dra_processor.go | 139 ++++++ .../customresources/dra_processor_test.go | 399 ++++++++++++++++++ .../customresources/gpu_processor.go | 3 +- .../customresources/gpu_processor_test.go | 4 +- cluster-autoscaler/processors/processors.go | 2 +- cluster-autoscaler/processors/test/common.go | 2 +- 11 files changed, 836 insertions(+), 48 deletions(-) create mode 100644 cluster-autoscaler/processors/customresources/default_custom_processor.go create mode 100644 cluster-autoscaler/processors/customresources/default_custom_processor_test.go create mode 100644 cluster-autoscaler/processors/customresources/dra_processor.go create mode 100644 cluster-autoscaler/processors/customresources/dra_processor_test.go diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index f0109cf366..365bc8526f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -276,8 +276,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr stateUpdateStart := time.Now() + var draSnapshot *drasnapshot.Snapshot + if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil { + var err error + draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot() + if err != nil { + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) + } + } + // Get nodes and pods currently living on cluster - allNodes, readyNodes, typedErr := a.obtainNodeLists() + allNodes, readyNodes, typedErr := a.obtainNodeLists(draSnapshot) if typedErr != nil { klog.Errorf("Failed to get node list: %v", typedErr) return typedErr @@ -302,6 +311,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.Errorf("Failed to get daemonset list: %v", err) return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } + // Snapshot scale-down actuation status before cache refresh. scaleDownActuationStatus := a.scaleDownActuator.CheckStatus() // Call CloudProvider.Refresh before any other calls to cloud provider. @@ -335,14 +345,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) - var draSnapshot *drasnapshot.Snapshot - if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.AutoscalingContext.DraProvider != nil { - draSnapshot, err = a.AutoscalingContext.DraProvider.Snapshot() - if err != nil { - return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) - } - } - if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil { return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ") } @@ -980,7 +982,7 @@ func (a *StaticAutoscaler) ExitCleanUp() { a.clusterStateRegistry.Stop() } -func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) { +func (a *StaticAutoscaler) obtainNodeLists(draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) { allNodes, err := a.AllNodeLister().List() if err != nil { klog.Errorf("Failed to list all nodes: %v", err) @@ -998,7 +1000,7 @@ func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caer // Treat those nodes as unready until GPU actually becomes available and let // our normal handling for booting up nodes deal with this. // TODO: Remove this call when we handle dynamically provisioned resources. - allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes) + allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes, draSnapshot) allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes) return allNodes, readyNodes, nil } diff --git a/cluster-autoscaler/core/static_autoscaler_dra_test.go b/cluster-autoscaler/core/static_autoscaler_dra_test.go index 21e77d0c14..8545b3c144 100644 --- a/cluster-autoscaler/core/static_autoscaler_dra_test.go +++ b/cluster-autoscaler/core/static_autoscaler_dra_test.go @@ -181,8 +181,8 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) { req1Nic := testDeviceRequest{name: "req1Nic", count: 1, selectors: singleAttrSelector(exampleDriver, nicAttribute, nicTypeA)} req1Global := testDeviceRequest{name: "req1Global", count: 1, selectors: singleAttrSelector(exampleDriver, globalDevAttribute, globalDevTypeA)} - sharedGpuBClaim := testResourceClaim("sharedGpuBClaim", nil, "", []testDeviceRequest{req1GpuB}, nil, nil) - sharedAllocatedGlobalClaim := testResourceClaim("sharedGlobalClaim", nil, "", []testDeviceRequest{req1Global}, []testAllocation{{request: req1Global.name, driver: exampleDriver, pool: "global-pool", device: globalDevice + "-0"}}, nil) + sharedGpuBClaim := testResourceClaim("sharedGpuBClaim", nil, "", []testDeviceRequest{req1GpuB}, nil) + sharedAllocatedGlobalClaim := testResourceClaim("sharedGlobalClaim", nil, "", []testDeviceRequest{req1Global}, []testAllocation{{request: req1Global.name, driver: exampleDriver, pool: "global-pool", device: globalDevice + "-0"}}) testCases := map[string]struct { nodeGroups map[*testNodeGroupDef]int @@ -250,10 +250,8 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) { expectedScaleUps: map[string]int{node1Gpu1Nic1slice.name: 3}, }, "scale-up: scale from 0 nodes in a node group": { - nodeGroups: map[*testNodeGroupDef]int{node1Gpu1Nic1slice: 0}, - pods: append( - unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}), - ), + nodeGroups: map[*testNodeGroupDef]int{node1Gpu1Nic1slice: 0}, + pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}), expectedScaleUps: map[string]int{node1Gpu1Nic1slice.name: 3}, }, "scale-up: scale from 0 nodes in a node group, with pods on the template nodes consuming DRA resources": { @@ -264,9 +262,7 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) { scheduledPod(baseSmallPod, "template-1", node3GpuA1slice.name+"-template", map[*testDeviceRequest][]string{&req1GpuA: {gpuDevice + "-1"}}), }, }, - pods: append( - unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}), - ), + pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}), expectedScaleUps: map[string]int{node3GpuA1slice.name: 3}, }, "scale-up: scale from 0 nodes in a node group, with pods on the template nodes consuming DRA resources, including shared claims": { @@ -278,16 +274,12 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) { scheduledPod(baseSmallPod, "template-1", node3GpuA1slice.name+"-template", map[*testDeviceRequest][]string{&req1GpuA: {gpuDevice + "-1"}}, sharedAllocatedGlobalClaim), }, }, - pods: append( - unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}, sharedAllocatedGlobalClaim), - ), + pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA}, sharedAllocatedGlobalClaim), expectedScaleUps: map[string]int{node3GpuA1slice.name: 3}, }, "no scale-up: pods requesting multiple different devices, but they're on different nodes": { nodeGroups: map[*testNodeGroupDef]int{node1GpuA1slice: 1, node1Nic1slice: 1}, - pods: append( - unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}), - ), + pods: unscheduledPods(baseSmallPod, "unschedulable", 3, []testDeviceRequest{req1GpuA, req1Nic}), }, "scale-up: pods requesting a shared, unallocated claim": { extraResourceClaims: []*resourceapi.ResourceClaim{sharedGpuBClaim}, @@ -597,13 +589,13 @@ func resourceClaimsForPod(pod *apiv1.Pod, nodeName string, claimCount int, reque } } - claims = append(claims, testResourceClaim(name, pod, nodeName, claimRequests, claimAllocations, nil)) + claims = append(claims, testResourceClaim(name, pod, nodeName, claimRequests, claimAllocations)) } return claims } -func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string, requests []testDeviceRequest, allocations []testAllocation, reservedFor []*apiv1.Pod) *resourceapi.ResourceClaim { +func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string, requests []testDeviceRequest, allocations []testAllocation) *resourceapi.ResourceClaim { var deviceRequests []resourceapi.DeviceRequest for _, request := range requests { var selectors []resourceapi.DeviceSelector @@ -673,15 +665,6 @@ func testResourceClaim(claimName string, owningPod *apiv1.Pod, nodeName string, UID: owningPod.UID, }, } - } else { - for _, pod := range podReservations { - podReservations = append(podReservations, resourceapi.ResourceClaimConsumerReference{ - APIGroup: "", - Resource: "pods", - Name: pod.Name, - UID: pod.UID, - }) - } } claim.Status = resourceapi.ResourceClaimStatus{ Allocation: &resourceapi.AllocationResult{ diff --git a/cluster-autoscaler/processors/customresources/custom_resources_processor.go b/cluster-autoscaler/processors/customresources/custom_resources_processor.go index 052be2dd84..d68444a3a5 100644 --- a/cluster-autoscaler/processors/customresources/custom_resources_processor.go +++ b/cluster-autoscaler/processors/customresources/custom_resources_processor.go @@ -20,6 +20,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" ) @@ -35,14 +36,9 @@ type CustomResourceTarget struct { type CustomResourcesProcessor interface { // FilterOutNodesWithUnreadyResources removes nodes that should have a custom resource, but don't have // it in allocatable from ready nodes list and updates their status to unready on all nodes list. - FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) + FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) // GetNodeResourceTargets returns mapping of resource names to their targets. GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) // CleanUp cleans up processor's internal structures. CleanUp() } - -// NewDefaultCustomResourcesProcessor returns a default instance of CustomResourcesProcessor. -func NewDefaultCustomResourcesProcessor() CustomResourcesProcessor { - return &GpuCustomResourcesProcessor{} -} diff --git a/cluster-autoscaler/processors/customresources/default_custom_processor.go b/cluster-autoscaler/processors/customresources/default_custom_processor.go new file mode 100644 index 0000000000..3a8c846420 --- /dev/null +++ b/cluster-autoscaler/processors/customresources/default_custom_processor.go @@ -0,0 +1,70 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package customresources + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +// DefaultCustomResourcesProcessor handles multiple custom resource processors and +// executes them in order. +type DefaultCustomResourcesProcessor struct { + customResourcesProcessors []CustomResourcesProcessor +} + +// NewDefaultCustomResourcesProcessor returns an instance of DefaultCustomResourcesProcessor. +func NewDefaultCustomResourcesProcessor(draEnabled bool) CustomResourcesProcessor { + customProcessors := []CustomResourcesProcessor{&GpuCustomResourcesProcessor{}} + if draEnabled { + customProcessors = append(customProcessors, &DraCustomResourcesProcessor{}) + } + return &DefaultCustomResourcesProcessor{customProcessors} +} + +// FilterOutNodesWithUnreadyResources calls the corresponding method for internal custom resources processors in order. +func (p *DefaultCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { + newAllNodes := allNodes + newReadyNodes := readyNodes + for _, processor := range p.customResourcesProcessors { + newAllNodes, newReadyNodes = processor.FilterOutNodesWithUnreadyResources(context, newAllNodes, newReadyNodes, draSnapshot) + } + return newAllNodes, newReadyNodes +} + +// GetNodeResourceTargets calls the corresponding method for internal custom resources processors in order. +func (p *DefaultCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) { + customResourcesTargets := []CustomResourceTarget{} + for _, processor := range p.customResourcesProcessors { + targets, err := processor.GetNodeResourceTargets(context, node, nodeGroup) + if err != nil { + return nil, err + } + customResourcesTargets = append(customResourcesTargets, targets...) + } + return customResourcesTargets, nil +} + +// CleanUp cleans up all internal custom resources processors. +func (p *DefaultCustomResourcesProcessor) CleanUp() { + for _, processor := range p.customResourcesProcessors { + processor.CleanUp() + } +} diff --git a/cluster-autoscaler/processors/customresources/default_custom_processor_test.go b/cluster-autoscaler/processors/customresources/default_custom_processor_test.go new file mode 100644 index 0000000000..5946f00923 --- /dev/null +++ b/cluster-autoscaler/processors/customresources/default_custom_processor_test.go @@ -0,0 +1,198 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package customresources + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + utils "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestDefaultProcessorFilterOut(t *testing.T) { + processor := DefaultCustomResourcesProcessor{[]CustomResourcesProcessor{ + &mockCustomResourcesProcessor{nodeMark: "p1"}, + &mockCustomResourcesProcessor{nodeMark: "p2"}, + &mockCustomResourcesProcessor{nodeMark: "p3"}, + }} + + testCases := map[string]struct { + allNodes []*apiv1.Node + nodesInitialReadiness map[string]bool + expectedReadyNodes map[string]bool + }{ + "filtering one node by one processor": { + allNodes: []*apiv1.Node{ + utils.BuildTestNode("p1_node_1", 500, 100), + utils.BuildTestNode("node_2", 500, 100), + }, + nodesInitialReadiness: map[string]bool{ + "p1_node_1": true, + "node_2": true, + }, + expectedReadyNodes: map[string]bool{ + "node_2": true, + }, + }, + "filtering multiple nodes by one processor": { + allNodes: []*apiv1.Node{ + utils.BuildTestNode("p1_node_1", 500, 100), + utils.BuildTestNode("p1_node_2", 500, 100), + utils.BuildTestNode("node_3", 500, 100), + }, + nodesInitialReadiness: map[string]bool{ + "p1_node_1": true, + "p1_node_2": true, + "node_3": false, + }, + expectedReadyNodes: map[string]bool{}, + }, + "filtering one node by multiple processors": { + allNodes: []*apiv1.Node{ + utils.BuildTestNode("p1_p3_node_1", 500, 100), + utils.BuildTestNode("p1_node_2", 500, 100), + utils.BuildTestNode("node_3", 500, 100), + }, + nodesInitialReadiness: map[string]bool{ + "p1_node_1": true, + "p1_node_2": false, + "node_3": false, + }, + expectedReadyNodes: map[string]bool{}, + }, + "filtering multiple nodes by multiple processor": { + allNodes: []*apiv1.Node{ + utils.BuildTestNode("p1_node_1", 500, 100), + utils.BuildTestNode("p1_node_2", 500, 100), + utils.BuildTestNode("node_3", 500, 100), + utils.BuildTestNode("node_4", 500, 100), + utils.BuildTestNode("p2_node_5", 500, 100), + utils.BuildTestNode("p3_node_6", 500, 100), + }, + nodesInitialReadiness: map[string]bool{ + "p1_node_1": false, + "p1_node_2": true, + "node_3": false, + "node_4": true, + "p2_node_5": true, + "p3_node_6": true, + }, + expectedReadyNodes: map[string]bool{ + "node_4": true, + }, + }, + } + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + readyNodes := []*apiv1.Node{} + for _, node := range tc.allNodes { + if tc.nodesInitialReadiness[node.Name] { + readyNodes = append(readyNodes, node) + } + } + resultedAllNodes, resultedReadyNodes := processor.FilterOutNodesWithUnreadyResources(nil, tc.allNodes, readyNodes, nil) + assert.ElementsMatch(t, tc.allNodes, resultedAllNodes) + assert.True(t, len(resultedReadyNodes) == len(tc.expectedReadyNodes)) + for _, node := range resultedReadyNodes { + assert.True(t, tc.expectedReadyNodes[node.Name]) + } + + }) + } + +} + +func TestDefaultProcessorGetNodeResourceTargets(t *testing.T) { + processor := DefaultCustomResourcesProcessor{[]CustomResourcesProcessor{ + &mockCustomResourcesProcessor{nodeMark: "p1", customResourceTargetsToAdd: []string{"p1_R1", "p1_R2"}, customResourceTargetsQuantity: 1}, + &mockCustomResourcesProcessor{nodeMark: "p2", customResourceTargetsToAdd: []string{"p2_R1"}, customResourceTargetsQuantity: 2}, + &mockCustomResourcesProcessor{nodeMark: "p3", customResourceTargetsToAdd: []string{"p3_R1"}, customResourceTargetsQuantity: 3}, + }} + + testCases := map[string]struct { + node *apiv1.Node + expectedResources []CustomResourceTarget + }{ + "single processor": { + node: utils.BuildTestNode("p1", 500, 100), + expectedResources: []CustomResourceTarget{ + {ResourceType: "p1_R1", ResourceCount: 1}, + {ResourceType: "p1_R2", ResourceCount: 1}, + }, + }, + "many processors": { + node: utils.BuildTestNode("p1_p3", 500, 100), + expectedResources: []CustomResourceTarget{ + {ResourceType: "p1_R1", ResourceCount: 1}, + {ResourceType: "p1_R2", ResourceCount: 1}, + {ResourceType: "p3_R1", ResourceCount: 3}, + }, + }, + "all processors": { + node: utils.BuildTestNode("p1_p2_p3", 500, 100), + expectedResources: []CustomResourceTarget{ + {ResourceType: "p1_R1", ResourceCount: 1}, + {ResourceType: "p1_R2", ResourceCount: 1}, + {ResourceType: "p2_R1", ResourceCount: 2}, + {ResourceType: "p3_R1", ResourceCount: 3}, + }, + }, + } + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + customResourceTarget, _ := processor.GetNodeResourceTargets(nil, tc.node, nil) + assert.ElementsMatch(t, customResourceTarget, tc.expectedResources) + }) + } +} + +type mockCustomResourcesProcessor struct { + nodeMark string + customResourceTargetsToAdd []string + customResourceTargetsQuantity int64 +} + +func (m *mockCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(_ *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, _ *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { + filteredReadyNodes := []*apiv1.Node{} + for _, node := range readyNodes { + if !strings.Contains(node.Name, m.nodeMark) { + filteredReadyNodes = append(filteredReadyNodes, node) + } + } + return allNodes, filteredReadyNodes +} + +func (m *mockCustomResourcesProcessor) GetNodeResourceTargets(_ *context.AutoscalingContext, node *apiv1.Node, _ cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) { + result := []CustomResourceTarget{} + if strings.Contains(node.Name, m.nodeMark) { + for _, rt := range m.customResourceTargetsToAdd { + result = append(result, CustomResourceTarget{ResourceType: rt, ResourceCount: m.customResourceTargetsQuantity}) + } + } + return result, nil +} + +func (m *mockCustomResourcesProcessor) CleanUp() { +} diff --git a/cluster-autoscaler/processors/customresources/dra_processor.go b/cluster-autoscaler/processors/customresources/dra_processor.go new file mode 100644 index 0000000000..06d6e2759b --- /dev/null +++ b/cluster-autoscaler/processors/customresources/dra_processor.go @@ -0,0 +1,139 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package customresources + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/api/resource/v1beta1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/klog/v2" +) + +// DraCustomResourcesProcessor handles DRA custom resource. It assumes, +// that the DRA resources may not become allocatable immediately after the node creation. +type DraCustomResourcesProcessor struct { +} + +// FilterOutNodesWithUnreadyResources removes nodes that should have DRA resource, but don't have +// it in allocatable from ready nodes list and updates their status to unready on all nodes list. +func (p *DraCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *snapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { + newAllNodes := make([]*apiv1.Node, 0) + newReadyNodes := make([]*apiv1.Node, 0) + nodesWithUnreadyDraResources := make(map[string]*apiv1.Node) + if draSnapshot == nil { + klog.Warningf("Cannot filter out nodes with unready DRA resources. The DRA snapshot is nil. Processing will be skipped.") + return allNodes, readyNodes + } + + for _, node := range readyNodes { + ng, err := context.CloudProvider.NodeGroupForNode(node) + if err != nil { + newReadyNodes = append(newReadyNodes, node) + klog.Warningf("Failed to get node group for node %s, Skipping DRA readiness check and keeping node in ready list. Error: %v", node.Name, err) + continue + } + if ng == nil { + newReadyNodes = append(newReadyNodes, node) + continue + } + + nodeInfo, err := ng.TemplateNodeInfo() + if err != nil { + newReadyNodes = append(newReadyNodes, node) + klog.Warningf("Failed to get template node info for node group %s with error: %v", ng.Id(), err) + continue + } + + nodeResourcesSlices, _ := draSnapshot.NodeResourceSlices(node.Name) + if isEqualResourceSlices(nodeResourcesSlices, nodeInfo.LocalResourceSlices) { + newReadyNodes = append(newReadyNodes, node) + } else { + nodesWithUnreadyDraResources[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.ResourceUnready) + } + } + + // Override any node with unready DRA resources with its "unready" copy + for _, node := range allNodes { + if newNode, found := nodesWithUnreadyDraResources[node.Name]; found { + newAllNodes = append(newAllNodes, newNode) + } else { + newAllNodes = append(newAllNodes, node) + } + } + return newAllNodes, newReadyNodes +} + +type resourceSliceSpecs struct { + driver string + pool string +} + +func isEqualResourceSlices(nodeResourcesSlices []*v1beta1.ResourceSlice, templateResourcesSlices []*v1beta1.ResourceSlice) bool { + tempSlicesByPools := getDevicesBySpecs(templateResourcesSlices) + nodeSlicesByPools := getDevicesBySpecs(nodeResourcesSlices) + + for templSpecs, tempDevicesSet := range tempSlicesByPools { + matched := false + for nodeSpecs, nodeDevicesSet := range nodeSlicesByPools { + if templSpecs.driver == nodeSpecs.driver && nodeDevicesSet.Equal(tempDevicesSet) { + delete(nodeSlicesByPools, nodeSpecs) + matched = true + break + } + } + if !matched { + return false + } + } + + return true +} + +func getDevicesBySpecs(resourcesSlices []*v1beta1.ResourceSlice) map[resourceSliceSpecs]sets.Set[string] { + slicesGroupedByPoolAndDriver := make(map[resourceSliceSpecs]sets.Set[string]) + for _, rs := range resourcesSlices { + rsSpecs := resourceSliceSpecs{ + pool: rs.Spec.Pool.Name, + driver: rs.Spec.Driver, + } + slicesGroupedByPoolAndDriver[rsSpecs] = getResourceSliceDevicesSet(rs) + } + return slicesGroupedByPoolAndDriver +} + +func getResourceSliceDevicesSet(resourcesSlice *v1beta1.ResourceSlice) sets.Set[string] { + devices := sets.New[string]() + for _, device := range resourcesSlice.Spec.Devices { + devices.Insert(device.Name) + } + return devices +} + +// GetNodeResourceTargets returns the resource targets for DRA resource slices, not implemented. +func (p *DraCustomResourcesProcessor) GetNodeResourceTargets(_ *context.AutoscalingContext, _ *apiv1.Node, _ cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) { + // TODO(DRA): Figure out resource limits for DRA here. + return []CustomResourceTarget{}, nil +} + +// CleanUp cleans up processor's internal structures. +func (p *DraCustomResourcesProcessor) CleanUp() { +} diff --git a/cluster-autoscaler/processors/customresources/dra_processor_test.go b/cluster-autoscaler/processors/customresources/dra_processor_test.go new file mode 100644 index 0000000000..64b86c227f --- /dev/null +++ b/cluster-autoscaler/processors/customresources/dra_processor_test.go @@ -0,0 +1,399 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package customresources + +import ( + "fmt" + "testing" + "time" + + resourceapi "k8s.io/api/resource/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + utils "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) { + testCases := map[string]struct { + nodeGroupsAllNodes map[string][]*apiv1.Node + nodeGroupsTemplatesSlices map[string][]*resourceapi.ResourceSlice + nodesSlices map[string][]*resourceapi.ResourceSlice + expectedNodesReadiness map[string]bool + }{ + "1 DRA node group all totally ready": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{1, 1}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{1, 1}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": true, + }, + }, + "1 DRA node group, one initially unready": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", false), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{1, 1}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{1, 1}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": false, + }, + }, + "1 DRA node group, one initially ready with unready reasource": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{1, 1}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{1, 0}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": false, + }, + }, + "1 DRA node group, one initially ready with more reasources than expected": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{1, 1}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{1, 3}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": false, + }, + }, + "1 DRA node group, one initially ready with no slices": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": {}, + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{1, 3}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": false, + "node_2_Dra_Ready": false, + }, + }, + "1 DRA node group, single driver multiple pools, only one published": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": buildNodeResourceSlices("ng1_template", "driver", []int{2, 2, 2}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": buildNodeResourceSlices("node_2_Dra_Ready", "driver", []int{2}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": false, + }, + }, + "1 DRA node group, single driver multiple pools, more pools published including template pools": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_2_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": buildNodeResourceSlices("ng1_template", "driver", []int{2, 2, 2}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_2_Dra_Ready": buildNodeResourceSlices("node_2_Dra_Ready", "driver", []int{2, 2, 2, 2}), + }, + expectedNodesReadiness: map[string]bool{ + "node_2_Dra_Ready": true, + }, + }, + "1 DRA node group, single driver multiple pools, more pools published not including template pools": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": buildNodeResourceSlices("ng1_template", "driver", []int{2, 2, 2}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": buildNodeResourceSlices("node_1_Dra_Ready", "driver", []int{2, 2, 1, 2}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": false, + }, + }, + "2 node groups, one DRA with 1 reasource unready node": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + buildTestNode("node_3_Dra_Unready", true), + }, + "ng2": { + buildTestNode("node_4_NonDra_Ready", true), + buildTestNode("node_5_NonDra_Unready", false), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{2, 2}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{2, 2}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{2, 2}), + "node_3_Dra_Unready": createNodeResourceSlices("node_3_Dra_Unready", []int{2, 1}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": true, + "node_3_Dra_Unready": false, + "node_4_NonDra_Ready": true, + "node_5_NonDra_Unready": false, + }, + }, + "2 DRA node groups, each with 1 reasource unready node": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + buildTestNode("node_3_Dra_Unready", true), + }, + "ng2": { + buildTestNode("node_4_Dra_Ready", true), + buildTestNode("node_5_Dra_Unready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{2, 2}), + "ng2": createNodeResourceSlices("ng2_template", []int{3, 3}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": createNodeResourceSlices("node_1_Dra_Ready", []int{2, 2}), + "node_2_Dra_Ready": createNodeResourceSlices("node_2_Dra_Ready", []int{2, 2}), + "node_3_Dra_Unready": createNodeResourceSlices("node_3_Dra_Unready", []int{2, 1}), + "node_4_Dra_Ready": createNodeResourceSlices("node_4_Dra_Ready", []int{3, 3}), + "node_5_Dra_Unready": createNodeResourceSlices("node_5_Dra_Unready", []int{2, 1}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": true, + "node_3_Dra_Unready": false, + "node_4_Dra_Ready": true, + "node_5_Dra_Unready": false, + }, + }, + "2 DRA node group, single driver multiple pools, more pools published including template pools": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1_Dra_Ready", true), + buildTestNode("node_2_Dra_Ready", true), + }, + "ng2": { + buildTestNode("node_3_Dra_Ready", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": buildNodeResourceSlices("ng1_template", "driver", []int{2, 2, 2}), + "ng2": buildNodeResourceSlices("ng2_template", "driver", []int{1, 1}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1_Dra_Ready": buildNodeResourceSlices("node_1_Dra_Ready", "driver", []int{2, 2, 2, 2}), + "node_2_Dra_Ready": buildNodeResourceSlices("node_2_Dra_Ready", "driver", []int{2, 2, 2}), + "node_3_Dra_Ready": buildNodeResourceSlices("node_3_Dra_Ready", "driver", []int{1, 1, 1}), + }, + expectedNodesReadiness: map[string]bool{ + "node_1_Dra_Ready": true, + "node_2_Dra_Ready": true, + "node_3_Dra_Ready": true, + }, + }, + "All together": { + nodeGroupsAllNodes: map[string][]*apiv1.Node{ + "ng1": { + buildTestNode("node_1", true), + buildTestNode("node_2", true), + buildTestNode("node_3", true), + }, + "ng2": { + buildTestNode("node_4", false), + buildTestNode("node_5", true), + }, + "ng3": { + buildTestNode("node_6", false), + buildTestNode("node_7", true), + }, + }, + nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{ + "ng1": createNodeResourceSlices("ng1_template", []int{2, 2}), + "ng2": createNodeResourceSlices("ng2_template", []int{3, 3}), + }, + nodesSlices: map[string][]*resourceapi.ResourceSlice{ + "node_1": createNodeResourceSlices("node_1", []int{2, 2, 2}), + "node_2": createNodeResourceSlices("node_2", []int{1}), + "node_3": createNodeResourceSlices("node_3", []int{1, 2}), + "node_4": createNodeResourceSlices("node_4", []int{3, 3}), + "node_5": {}, + }, + expectedNodesReadiness: map[string]bool{ + "node_1": true, + "node_2": false, + "node_3": false, + "node_4": false, + "node_5": false, + "node_6": false, + "node_7": true, + }, + }, + } + + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + provider := testprovider.NewTestCloudProviderBuilder().Build() + machineTemplates := map[string]*framework.NodeInfo{} + initialAllNodes := []*apiv1.Node{} + initialReadyNodes := []*apiv1.Node{} + for ng, nodes := range tc.nodeGroupsAllNodes { + machineName := fmt.Sprintf("%s_machine_template", ng) + if rs, found := tc.nodeGroupsTemplatesSlices[ng]; found { + machineTemplates[machineName] = framework.NewNodeInfo(buildTestNode(fmt.Sprintf("%s_template", ng), true), rs) + } else { + machineTemplates[machineName] = framework.NewTestNodeInfo(buildTestNode(fmt.Sprintf("%s_template", ng), true)) + } + provider.AddAutoprovisionedNodeGroup(ng, 0, 20, len(nodes), machineName) + for _, node := range nodes { + initialAllNodes = append(initialAllNodes, node) + if getNodeReadiness(node) { + initialReadyNodes = append(initialReadyNodes, node) + } + provider.AddNode(ng, node) + } + } + provider.SetMachineTemplates(machineTemplates) + draSnapshot := drasnapshot.NewSnapshot(nil, tc.nodesSlices, nil, nil) + clusterSnapshotStore := store.NewBasicSnapshotStore() + clusterSnapshotStore.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, draSnapshot) + clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore) + + ctx := &context.AutoscalingContext{CloudProvider: provider, ClusterSnapshot: clusterSnapshot} + processor := DraCustomResourcesProcessor{} + newAllNodes, newReadyNodes := processor.FilterOutNodesWithUnreadyResources(ctx, initialAllNodes, initialReadyNodes, draSnapshot) + + readyNodes := make(map[string]bool) + for _, node := range newReadyNodes { + readyNodes[node.Name] = true + } + + assert.True(t, len(newAllNodes) == len(initialAllNodes), "Total number of nodes should not change") + for _, node := range newAllNodes { + gotReadiness := getNodeReadiness(node) + assert.Equal(t, tc.expectedNodesReadiness[node.Name], gotReadiness) + assert.Equal(t, gotReadiness, readyNodes[node.Name]) + } + + }) + } + +} + +func createNodeResourceSlices(nodeName string, numberOfDevicesInSlices []int) []*resourceapi.ResourceSlice { + return buildNodeResourceSlices(nodeName, "", numberOfDevicesInSlices) +} + +func buildNodeResourceSlices(nodeName, driverName string, numberOfDevicesInSlices []int) []*resourceapi.ResourceSlice { + numberOfSlices := len(numberOfDevicesInSlices) + resourceSlices := []*resourceapi.ResourceSlice{} + for sliceIndex := range numberOfSlices { + devices := []resourceapi.Device{} + for deviceIndex := range numberOfDevicesInSlices[sliceIndex] { + devices = append(devices, resourceapi.Device{Name: fmt.Sprintf("%d_%d", sliceIndex, deviceIndex)}) + } + if driverName == "" { + driverName = fmt.Sprintf("driver_%d", sliceIndex) + } + spec := resourceapi.ResourceSliceSpec{ + NodeName: nodeName, + Driver: driverName, + Pool: resourceapi.ResourcePool{Name: fmt.Sprintf("%s_pool_%d", nodeName, sliceIndex)}, + Devices: devices, + } + resourceSlices = append(resourceSlices, &resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: nodeName}, Spec: spec}) + } + return resourceSlices +} + +func buildTestNode(nodeName string, ready bool) *apiv1.Node { + node := utils.BuildTestNode(nodeName, 500, 100) + utils.SetNodeReadyState(node, ready, time.Now().Add(-5*time.Minute)) + return node +} + +func getNodeReadiness(node *apiv1.Node) bool { + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == apiv1.NodeReady { + return node.Status.Conditions[i].Status == apiv1.ConditionTrue + } + } + return false +} diff --git a/cluster-autoscaler/processors/customresources/gpu_processor.go b/cluster-autoscaler/processors/customresources/gpu_processor.go index 1ca572f2db..cb449cd4fd 100644 --- a/cluster-autoscaler/processors/customresources/gpu_processor.go +++ b/cluster-autoscaler/processors/customresources/gpu_processor.go @@ -20,6 +20,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -36,7 +37,7 @@ type GpuCustomResourcesProcessor struct { // it in allocatable from ready nodes list and updates their status to unready on all nodes list. // This is a hack/workaround for nodes with GPU coming up without installed drivers, resulting // in GPU missing from their allocatable and capacity. -func (p *GpuCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) { +func (p *GpuCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, _ *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { newAllNodes := make([]*apiv1.Node, 0) newReadyNodes := make([]*apiv1.Node, 0) nodesWithUnreadyGpu := make(map[string]*apiv1.Node) diff --git a/cluster-autoscaler/processors/customresources/gpu_processor_test.go b/cluster-autoscaler/processors/customresources/gpu_processor_test.go index b76a38034a..0ae6853f45 100644 --- a/cluster-autoscaler/processors/customresources/gpu_processor_test.go +++ b/cluster-autoscaler/processors/customresources/gpu_processor_test.go @@ -170,10 +170,10 @@ func TestFilterOutNodesWithUnreadyResources(t *testing.T) { nodeNoGpuUnready, } - processor := NewDefaultCustomResourcesProcessor() + processor := GpuCustomResourcesProcessor{} provider := testprovider.NewTestCloudProviderBuilder().Build() ctx := &context.AutoscalingContext{CloudProvider: provider} - newAllNodes, newReadyNodes := processor.FilterOutNodesWithUnreadyResources(ctx, initialAllNodes, initialReadyNodes) + newAllNodes, newReadyNodes := processor.FilterOutNodesWithUnreadyResources(ctx, initialAllNodes, initialReadyNodes, nil) foundInReady := make(map[string]bool) for _, node := range newReadyNodes { diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index b391fed789..3285590d32 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -97,7 +97,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(options.DynamicResourceAllocationEnabled), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), diff --git a/cluster-autoscaler/processors/test/common.go b/cluster-autoscaler/processors/test/common.go index 065b06d92f..5afa5b2c8d 100644 --- a/cluster-autoscaler/processors/test/common.go +++ b/cluster-autoscaler/processors/test/common.go @@ -52,7 +52,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(true), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), -- GitLab