diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index b262ddd84d736b8edffb51bfd3097c31da5b1fd9..730874f2f17bc8ade84d8a5a050fcd052e900918 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -31,14 +31,16 @@ import ( ) const ( - scaleToZeroSupported = true - placeholderInstanceNamePrefix = "i-placeholder" + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fullfilled" ) type asgCache struct { registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg + instanceStatus map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -62,9 +64,10 @@ type mixedInstancesPolicy struct { type asg struct { AwsRef - minSize int - maxSize int - curSize int + minSize int + maxSize int + curSize int + lastUpdateTime *time.Time AvailabilityZones []string LaunchConfigurationName string @@ -79,6 +82,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + instanceStatus: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -217,6 +221,17 @@ func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) { return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref) } +func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if status, found := m.instanceStatus[ref]; found { + return status, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -239,6 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions + asg.lastUpdateTime = &start asg.curSize = size return nil @@ -358,6 +374,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) + newInstanceStatusMap := make(map[AwsInstanceRef]*string) // Build list of known ASG names refreshNames, err := m.buildAsgNames() @@ -394,6 +411,7 @@ func (m *asgCache) regenerate() error { ref := m.buildInstanceRefFromAWS(instance) newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref + newInstanceStatusMap[ref] = instance.HealthStatus } } @@ -422,6 +440,7 @@ func (m *asgCache) regenerate() error { m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions + m.instanceStatus = newInstanceStatusMap return nil } @@ -435,17 +454,56 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ "Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired) + + healthStatus := "" + isAvailable, err := m.isNodeGroupAvailable(g) + if err != nil { + klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err) + } else if !isAvailable { + klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName) + healthStatus = placeholderUnfulfillableStatus + } + for i := realInstances; i < desired; i++ { id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) g.Instances = append(g.Instances, &autoscaling.Instance{ InstanceId: &id, AvailabilityZone: g.AvailabilityZones[0], + HealthStatus: &healthStatus, }) } } return groups } +func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: group.AutoScalingGroupName, + MaxRecords: aws.Int64(1), // We only care about the most recent event + } + + start := time.Now() + response, err := m.awsService.DescribeScalingActivities(input) + observeAWSRequest("DescribeScalingActivities", err, start) + if err != nil { + return true, err // If we can't describe the scaling activities we assume the node group is available + } + + if len(response.Activities) > 0 { + activity := response.Activities[0] + asgRef := AwsRef{Name: *group.AutoScalingGroupName} + if a, ok := m.registeredAsgs[asgRef]; ok { + lut := a.lastUpdateTime + if lut != nil && activity.StartTime.After(*lut) && *activity.StatusCode == "Failed" { + return false, nil + } + } else { + klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name) + } + } + return true, nil +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index 6e951b6ca88b3af90ec687a83244f13451e2e43e..4bf11960209d39071a47cfea27f4a93fa4590a14 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -320,9 +320,23 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { + var status *cloudprovider.InstanceStatus + instanceStatusString, err := ng.awsManager.GetInstanceStatus(asgNode) + if err != nil { + klog.V(4).Infof("Could not get instance status, continuing anyways: %v", err) + } else if instanceStatusString != nil && *instanceStatusString == placeholderUnfulfillableStatus { + status = &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: placeholderUnfulfillableStatus, + ErrorMessage: "AWS cannot provision any more instances for this node group", + }, + } + } instances[i] = cloudprovider.Instance{ Id: asgNode.ProviderID, - Status: nil, + Status: status, } } return instances, nil diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 7eeb82ae575867b0698d88cd8c8950e93fdf6606..31635966f7605dcb61f21b6a94d69bda794d77ca 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -463,6 +463,13 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { expectedInstancesCount = 1 }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + provider.Refresh() initialSize, err := asgs[0].TargetSize() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index e21d249c0c8d458bec7f9da3bcfd22600843e9bf..0f246db3bb6f5b329acbe6463cf3040bce275d37 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -301,6 +301,10 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { + return m.asgCache.InstanceStatus(ref) +} + func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 2392ed98f2647ec99d0335dafcb6910954c98db0..9d4a9ce7856761e40931cef9a2949591032b0ac2 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -391,6 +391,13 @@ func TestFetchExplicitAsgs(t *testing.T) { }}, false) }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. // The intention is to test that the asgs.Register method will update @@ -549,6 +556,13 @@ func TestFetchAutoAsgs(t *testing.T) { }}}, false) }).Return(nil).Twice() + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + MaxRecords: aws.Int64(1), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}) + do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index 86453300b757ca56ea5d275a4e4c219d93c6044e..1c7ef4eaa1cea4f3edacaf5b04781d0834151f63 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -33,6 +33,7 @@ import ( type autoScalingI interface { DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index 4fc2778db03e7cd1c68f01404fefde114d342fe7..f83a7bf5b7d67d9da81af2e0887a50d6238439a2 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -45,6 +45,11 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } +func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), nil +} + func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { args := a.Called(i, fn) return args.Error(0)