From 0b25e9c211b934ee2a7e5dbd12a8e35a7275c4e6 Mon Sep 17 00:00:00 2001
From: Brad Davidson <brad.davidson@rancher.com>
Date: Fri, 1 Nov 2024 20:11:32 +0000
Subject: [PATCH] Add support for Spec.PostCompleteDelay to set delay after job
 completion

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
---
 pkg/apis/upgrade.cattle.io/v1/types.go        | 13 +++---
 .../v1/zz_generated_deepcopy.go               |  5 +++
 pkg/upgrade/handle_batch.go                   | 41 ++++++++++++++-----
 pkg/upgrade/job/job.go                        | 17 +++++++-
 pkg/upgrade/plan/plan.go                      |  4 ++
 5 files changed, 62 insertions(+), 18 deletions(-)

diff --git a/pkg/apis/upgrade.cattle.io/v1/types.go b/pkg/apis/upgrade.cattle.io/v1/types.go
index f3c26c0f..2b106159 100644
--- a/pkg/apis/upgrade.cattle.io/v1/types.go
+++ b/pkg/apis/upgrade.cattle.io/v1/types.go
@@ -49,12 +49,13 @@ type PlanSpec struct {
 
 	Exclusive bool `json:"exclusive,omitempty"`
 
-	Window           *TimeWindowSpec               `json:"window,omitempty"`
-	Prepare          *ContainerSpec                `json:"prepare,omitempty"`
-	Cordon           bool                          `json:"cordon,omitempty"`
-	Drain            *DrainSpec                    `json:"drain,omitempty"`
-	Upgrade          *ContainerSpec                `json:"upgrade,omitempty" wrangler:"required"`
-	ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
+	Window            *TimeWindowSpec               `json:"window,omitempty"`
+	Prepare           *ContainerSpec                `json:"prepare,omitempty"`
+	Cordon            bool                          `json:"cordon,omitempty"`
+	Drain             *DrainSpec                    `json:"drain,omitempty"`
+	Upgrade           *ContainerSpec                `json:"upgrade,omitempty" wrangler:"required"`
+	ImagePullSecrets  []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
+	PostCompleteDelay *metav1.Duration              `json:"postCompleteDelay,omitempty"`
 }
 
 // PlanStatus represents the resulting state from processing Plan events.
diff --git a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
index 7cfe7e71..b379098a 100644
--- a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
+++ b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
@@ -232,6 +232,11 @@ func (in *PlanSpec) DeepCopyInto(out *PlanSpec) {
 		*out = make([]corev1.LocalObjectReference, len(*in))
 		copy(*out, *in)
 	}
+	if in.PostCompleteDelay != nil {
+		in, out := &in.PostCompleteDelay, &out.PostCompleteDelay
+		*out = new(metav1.Duration)
+		**out = **in
+	}
 	return
 }
 
diff --git a/pkg/upgrade/handle_batch.go b/pkg/upgrade/handle_batch.go
index 843e1125..97692e82 100644
--- a/pkg/upgrade/handle_batch.go
+++ b/pkg/upgrade/handle_batch.go
@@ -7,12 +7,12 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
 	upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
 	upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
 	batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1"
 	"github.com/sirupsen/logrus"
 	batchv1 "k8s.io/api/batch/v1"
+	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -81,13 +81,39 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 		}
 		// if the job has failed enqueue-or-delete it depending on the TTL window
 		if upgradejob.ConditionFailed.IsTrue(obj) {
-			return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
+			failedTime := upgradejob.ConditionFailed.GetLastTransitionTime(obj)
+			if failedTime.IsZero() {
+				return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionFailed, "LastTransitionTime")
+			}
+			ctl.recorder.Eventf(plan, corev1.EventTypeWarning, "JobFailed", "Job failed on Node %s", node.Name)
+			return obj, enqueueOrDelete(jobs, obj, failedTime)
 		}
 		// if the job has completed tag the node then enqueue-or-delete depending on the TTL window
 		if upgradejob.ConditionComplete.IsTrue(obj) {
+			completeTime := upgradejob.ConditionComplete.GetLastTransitionTime(obj)
+			if completeTime.IsZero() {
+				return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionComplete, "LastTransitionTime")
+			}
 			planLabel := upgradeapi.LabelPlanName(planName)
 			if planHash, ok := obj.Labels[planLabel]; ok {
-				node.Labels[planLabel] = planHash
+				var delay time.Duration
+				if plan.Spec.PostCompleteDelay != nil {
+					delay = plan.Spec.PostCompleteDelay.Duration
+				}
+				// if the job has not been completed for the configured delay, re-enqueue
+				// it for processing once the delay has elapsed.
+				// the job's TTLSecondsAfterFinished is guaranteed to be set to a larger value
+				// than the plan's requested delay.
+				if interval := time.Now().Sub(completeTime); interval < delay {
+					logrus.Debugf("Enqueing sync of Job %s/%s in %v", obj.Namespace, obj.Name, delay-interval)
+					ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobCompleteWaiting", "Job completed on Node %s, waiting %s PostCompleteDelay", node.Name, delay)
+					jobs.EnqueueAfter(obj.Namespace, obj.Name, delay-interval)
+				} else {
+					ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobComplete", "Job completed on Node %s", node.Name)
+					node.Labels[planLabel] = planHash
+				}
+				// mark the node as schedulable even if the delay has not elapsed, so that
+				// workloads can resume scheduling.
 				if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
 					node.Spec.Unschedulable = false
 				}
@@ -95,7 +121,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 					return obj, err
 				}
 			}
-			return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
+			return obj, enqueueOrDelete(jobs, obj, completeTime)
 		}
 		// if the job is hasn't failed or completed but the job Node is not on the applying list, consider it running out-of-turn and delete it
 		if i := sort.SearchStrings(plan.Status.Applying, nodeName); i == len(plan.Status.Applying) ||
@@ -108,12 +134,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 	return nil
 }
 
-func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, done condition.Cond) error {
-	lastTransitionTime := done.GetLastTransitionTime(job)
-	if lastTransitionTime.IsZero() {
-		return fmt.Errorf("condition %q missing field %q", done, "LastTransitionTime")
-	}
-
+func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, lastTransitionTime time.Time) error {
 	var ttlSecondsAfterFinished time.Duration
 
 	if job.Spec.TTLSecondsAfterFinished == nil {
diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go
index df5aa95e..fa17f260 100644
--- a/pkg/upgrade/job/job.go
+++ b/pkg/upgrade/job/job.go
@@ -5,6 +5,7 @@ import (
 	"slices"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
 	upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
@@ -133,9 +134,21 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
 	labelPlanName := upgradeapi.LabelPlanName(plan.Name)
 	nodeHostname := upgradenode.Hostname(node)
 	shortNodeName := strings.SplitN(node.Name, ".", 2)[0]
+	ttlSecondsAfterFinished := TTLSecondsAfterFinished
+
+	// Ensure that the job's TTLSecondsAfterFinished is at least 1 minute longer than
+	// the requested post-upgrade delay, so that the controller has time to see that
+	// it has been completed for the requested duration.
+	if delay := plan.Spec.PostCompleteDelay; delay != nil {
+		ttlPostCompleteDelay := delay.Duration + time.Minute
+		ttlAfterFinished := time.Duration(ttlSecondsAfterFinished) * time.Second
+		if ttlAfterFinished < ttlPostCompleteDelay {
+			ttlSecondsAfterFinished = int32(ttlPostCompleteDelay.Seconds())
+		}
+	}
 
 	jobAnnotations := labels.Set{
-		upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
+		upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(ttlSecondsAfterFinished), 10),
 	}
 	podAnnotations := labels.Set{}
 
@@ -171,7 +184,7 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
 		Spec: batchv1.JobSpec{
 			PodReplacementPolicy:    &PodReplacementPolicy,
 			BackoffLimit:            &BackoffLimit,
-			TTLSecondsAfterFinished: &TTLSecondsAfterFinished,
+			TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
 			Template: corev1.PodTemplateSpec{
 				ObjectMeta: metav1.ObjectMeta{
 					Annotations: podAnnotations,
diff --git a/pkg/upgrade/plan/plan.go b/pkg/upgrade/plan/plan.go
index 9dd8249c..92a2448f 100644
--- a/pkg/upgrade/plan/plan.go
+++ b/pkg/upgrade/plan/plan.go
@@ -35,6 +35,7 @@ var (
 	ErrDrainDeleteConflict           = fmt.Errorf("spec.drain cannot specify both deleteEmptydirData and deleteLocalData")
 	ErrDrainPodSelectorNotSelectable = fmt.Errorf("spec.drain.podSelector is not selectable")
 	ErrInvalidWindow                 = fmt.Errorf("spec.window is invalid")
+	ErrInvalidDelay                  = fmt.Errorf("spec.postCompleteDelay is negative")
 
 	PollingInterval = func(defaultValue time.Duration) time.Duration {
 		if str, ok := os.LookupEnv("SYSTEM_UPGRADE_PLAN_POLLING_INTERVAL"); ok {
@@ -257,5 +258,8 @@ func Validate(plan *upgradeapiv1.Plan) error {
 			return ErrInvalidWindow
 		}
 	}
+	if delay := plan.Spec.PostCompleteDelay; delay != nil && delay.Duration < 0 {
+		return ErrInvalidDelay
+	}
 	return nil
 }
-- 
GitLab