diff --git a/pkg/apis/upgrade.cattle.io/v1/types.go b/pkg/apis/upgrade.cattle.io/v1/types.go
index f3c26c0fa2ad37b126bbe425e8b3350911ab9e2b..2b1061599e84e9eae791831242ab851a504e6651 100644
--- a/pkg/apis/upgrade.cattle.io/v1/types.go
+++ b/pkg/apis/upgrade.cattle.io/v1/types.go
@@ -49,12 +49,13 @@ type PlanSpec struct {
 
 	Exclusive bool `json:"exclusive,omitempty"`
 
-	Window           *TimeWindowSpec               `json:"window,omitempty"`
-	Prepare          *ContainerSpec                `json:"prepare,omitempty"`
-	Cordon           bool                          `json:"cordon,omitempty"`
-	Drain            *DrainSpec                    `json:"drain,omitempty"`
-	Upgrade          *ContainerSpec                `json:"upgrade,omitempty" wrangler:"required"`
-	ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
+	Window            *TimeWindowSpec               `json:"window,omitempty"`
+	Prepare           *ContainerSpec                `json:"prepare,omitempty"`
+	Cordon            bool                          `json:"cordon,omitempty"`
+	Drain             *DrainSpec                    `json:"drain,omitempty"`
+	Upgrade           *ContainerSpec                `json:"upgrade,omitempty" wrangler:"required"`
+	ImagePullSecrets  []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
+	PostCompleteDelay *metav1.Duration              `json:"postCompleteDelay,omitempty"`
 }
 
 // PlanStatus represents the resulting state from processing Plan events.
diff --git a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
index 7cfe7e713751325e25f71e760d9e29e4b9ae7756..b379098afc374de78d3bd300e43d47d882bb408f 100644
--- a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
+++ b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go
@@ -232,6 +232,11 @@ func (in *PlanSpec) DeepCopyInto(out *PlanSpec) {
 		*out = make([]corev1.LocalObjectReference, len(*in))
 		copy(*out, *in)
 	}
+	if in.PostCompleteDelay != nil {
+		in, out := &in.PostCompleteDelay, &out.PostCompleteDelay
+		*out = new(metav1.Duration)
+		**out = **in
+	}
 	return
 }
 
diff --git a/pkg/upgrade/handle_batch.go b/pkg/upgrade/handle_batch.go
index 843e1125ca5612e2b7ec88d396b2491603d5803c..97692e82235ad65fd876fe20a810817d353ca7a7 100644
--- a/pkg/upgrade/handle_batch.go
+++ b/pkg/upgrade/handle_batch.go
@@ -7,12 +7,12 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
 	upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
 	upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
 	batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1"
 	"github.com/sirupsen/logrus"
 	batchv1 "k8s.io/api/batch/v1"
+	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -81,13 +81,39 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 		}
 		// if the job has failed enqueue-or-delete it depending on the TTL window
 		if upgradejob.ConditionFailed.IsTrue(obj) {
-			return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
+			failedTime := upgradejob.ConditionFailed.GetLastTransitionTime(obj)
+			if failedTime.IsZero() {
+				return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionFailed, "LastTransitionTime")
+			}
+			ctl.recorder.Eventf(plan, corev1.EventTypeWarning, "JobFailed", "Job failed on Node %s", node.Name)
+			return obj, enqueueOrDelete(jobs, obj, failedTime)
 		}
 		// if the job has completed tag the node then enqueue-or-delete depending on the TTL window
 		if upgradejob.ConditionComplete.IsTrue(obj) {
+			completeTime := upgradejob.ConditionComplete.GetLastTransitionTime(obj)
+			if completeTime.IsZero() {
+				return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionComplete, "LastTransitionTime")
+			}
 			planLabel := upgradeapi.LabelPlanName(planName)
 			if planHash, ok := obj.Labels[planLabel]; ok {
-				node.Labels[planLabel] = planHash
+				var delay time.Duration
+				if plan.Spec.PostCompleteDelay != nil {
+					delay = plan.Spec.PostCompleteDelay.Duration
+				}
+				// if the job has not been completed for the configured delay, re-enqueue
+				// it for processing once the delay has elapsed.
+				// the job's TTLSecondsAfterFinished is guaranteed to be set to a larger value
+				// than the plan's requested delay.
+				if interval := time.Now().Sub(completeTime); interval < delay {
+					logrus.Debugf("Enqueing sync of Job %s/%s in %v", obj.Namespace, obj.Name, delay-interval)
+					ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobCompleteWaiting", "Job completed on Node %s, waiting %s PostCompleteDelay", node.Name, delay)
+					jobs.EnqueueAfter(obj.Namespace, obj.Name, delay-interval)
+				} else {
+					ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobComplete", "Job completed on Node %s", node.Name)
+					node.Labels[planLabel] = planHash
+				}
+				// mark the node as schedulable even if the delay has not elapsed, so that
+				// workloads can resume scheduling.
 				if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
 					node.Spec.Unschedulable = false
 				}
@@ -95,7 +121,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 					return obj, err
 				}
 			}
-			return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
+			return obj, enqueueOrDelete(jobs, obj, completeTime)
 		}
 		// if the job is hasn't failed or completed but the job Node is not on the applying list, consider it running out-of-turn and delete it
 		if i := sort.SearchStrings(plan.Status.Applying, nodeName); i == len(plan.Status.Applying) ||
@@ -108,12 +134,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 	return nil
 }
 
-func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, done condition.Cond) error {
-	lastTransitionTime := done.GetLastTransitionTime(job)
-	if lastTransitionTime.IsZero() {
-		return fmt.Errorf("condition %q missing field %q", done, "LastTransitionTime")
-	}
-
+func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, lastTransitionTime time.Time) error {
 	var ttlSecondsAfterFinished time.Duration
 
 	if job.Spec.TTLSecondsAfterFinished == nil {
diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go
index df5aa95edc554c018ee0999231b385ab1bb1c72e..fa17f260eb32655f4d44e1fd2a0871a52d9f4ba1 100644
--- a/pkg/upgrade/job/job.go
+++ b/pkg/upgrade/job/job.go
@@ -5,6 +5,7 @@ import (
 	"slices"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
 	upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
@@ -133,9 +134,21 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
 	labelPlanName := upgradeapi.LabelPlanName(plan.Name)
 	nodeHostname := upgradenode.Hostname(node)
 	shortNodeName := strings.SplitN(node.Name, ".", 2)[0]
+	ttlSecondsAfterFinished := TTLSecondsAfterFinished
+
+	// Ensure that the job's TTLSecondsAfterFinished is at least 1 minute longer than
+	// the requested post-upgrade delay, so that the controller has time to see that
+	// it has been completed for the requested duration.
+	if delay := plan.Spec.PostCompleteDelay; delay != nil {
+		ttlPostCompleteDelay := delay.Duration + time.Minute
+		ttlAfterFinished := time.Duration(ttlSecondsAfterFinished) * time.Second
+		if ttlAfterFinished < ttlPostCompleteDelay {
+			ttlSecondsAfterFinished = int32(ttlPostCompleteDelay.Seconds())
+		}
+	}
 
 	jobAnnotations := labels.Set{
-		upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
+		upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(ttlSecondsAfterFinished), 10),
 	}
 	podAnnotations := labels.Set{}
 
@@ -171,7 +184,7 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
 		Spec: batchv1.JobSpec{
 			PodReplacementPolicy:    &PodReplacementPolicy,
 			BackoffLimit:            &BackoffLimit,
-			TTLSecondsAfterFinished: &TTLSecondsAfterFinished,
+			TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
 			Template: corev1.PodTemplateSpec{
 				ObjectMeta: metav1.ObjectMeta{
 					Annotations: podAnnotations,
diff --git a/pkg/upgrade/plan/plan.go b/pkg/upgrade/plan/plan.go
index 9dd8249c5d54337e97840f018d79525cdbdc9e55..92a2448f5fd37ed1c3edc849690a2f8878107ad5 100644
--- a/pkg/upgrade/plan/plan.go
+++ b/pkg/upgrade/plan/plan.go
@@ -35,6 +35,7 @@ var (
 	ErrDrainDeleteConflict           = fmt.Errorf("spec.drain cannot specify both deleteEmptydirData and deleteLocalData")
 	ErrDrainPodSelectorNotSelectable = fmt.Errorf("spec.drain.podSelector is not selectable")
 	ErrInvalidWindow                 = fmt.Errorf("spec.window is invalid")
+	ErrInvalidDelay                  = fmt.Errorf("spec.postCompleteDelay is negative")
 
 	PollingInterval = func(defaultValue time.Duration) time.Duration {
 		if str, ok := os.LookupEnv("SYSTEM_UPGRADE_PLAN_POLLING_INTERVAL"); ok {
@@ -257,5 +258,8 @@ func Validate(plan *upgradeapiv1.Plan) error {
 			return ErrInvalidWindow
 		}
 	}
+	if delay := plan.Spec.PostCompleteDelay; delay != nil && delay.Duration < 0 {
+		return ErrInvalidDelay
+	}
 	return nil
 }