diff --git a/e2e/framework/controller/deployment.go b/e2e/framework/controller/deployment.go
index c5c09672a9907ef15a61a6ae64d83410d31b248e..ae4d6a8804529b4ec70a425305d8056d36f7528d 100644
--- a/e2e/framework/controller/deployment.go
+++ b/e2e/framework/controller/deployment.go
@@ -53,6 +53,13 @@ func NewDeployment(name string, opt ...DeploymentOption) *appsv1.Deployment {
 					FieldPath: "metadata.namespace",
 				},
 			},
+		}, {
+			Name: "SYSTEM_UPGRADE_CONTROLLER_NODE_NAME",
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					FieldPath: "spec.nodeName",
+				},
+			},
 		}}
 		container.VolumeMounts = []corev1.VolumeMount{{
 			Name:      `tmp`,
diff --git a/main.go b/main.go
index fb985d5728bd09d0aee20ec6f03fd6bd05aae477..19eab1555aa2640d43e9c21f5cc5c99719c4870f 100644
--- a/main.go
+++ b/main.go
@@ -21,15 +21,15 @@ import (
 )
 
 var (
-	debug                               bool
-	kubeConfig, masterURL               string
+	debug, leaderElect                  bool
+	kubeConfig, masterURL, nodeName     string
 	namespace, name, serviceAccountName string
 	threads                             int
 )
 
 func main() {
 	app := cli.NewApp()
-	app.Name = "system-upgrade-controller"
+	app.Name = version.Program
 	app.Usage = "in ur system controllin ur upgradez"
 	app.Version = fmt.Sprintf("%s (%s)", version.Version, version.GitCommit)
 	app.Flags = []cli.Flag{
@@ -38,6 +38,11 @@ func main() {
 			EnvVar:      "SYSTEM_UPGRADE_CONTROLLER_DEBUG",
 			Destination: &debug,
 		},
+		cli.BoolFlag{
+			Name:        "leader-elect",
+			EnvVar:      "SYSTEM_UPGRADE_CONTROLLER_LEADER_ELECT",
+			Destination: &leaderElect,
+		},
 		cli.StringFlag{
 			Name:   "kubeconfig",
 			EnvVar: "SYSTEM_UPGRADE_CONTROLLER_KUBE_CONFIG",
@@ -55,6 +60,12 @@ func main() {
 			Required:    true,
 			Destination: &name,
 		},
+		cli.StringFlag{
+			Name:        "node-name",
+			EnvVar:      "SYSTEM_UPGRADE_CONTROLLER_NODE_NAME",
+			Required:    false,
+			Destination: &nodeName,
+		},
 		cli.StringFlag{
 			Name:        "namespace",
 			EnvVar:      "SYSTEM_UPGRADE_CONTROLLER_NAMESPACE",
@@ -93,7 +104,7 @@ func Run(_ *cli.Context) {
 	if err != nil {
 		logrus.Fatal(err)
 	}
-	ctl, err := upgrade.NewController(cfg, namespace, name, 2*time.Hour)
+	ctl, err := upgrade.NewController(cfg, namespace, name, nodeName, leaderElect, 2*time.Hour)
 	if err != nil {
 		logrus.Fatal(err)
 	}
diff --git a/manifests/clusterrole.yaml b/manifests/clusterrole.yaml
index 78ffd6b11cc4121a69eb1b030966feff13fb5533..1accfc76075ed98d48e12a1dd69b3c5806443230 100644
--- a/manifests/clusterrole.yaml
+++ b/manifests/clusterrole.yaml
@@ -37,6 +37,30 @@ rules:
   - nodes
   verbs:
   - update
+- apiGroups:
+  - ""
+  resources:
+  - events
+  verbs:
+  - get
+  - create
+  - patch
+  - update
+- apiGroups:
+  - "coordination.k8s.io"
+  resources:
+  - leases
+  verbs:
+  - create
+- apiGroups:
+  - "coordination.k8s.io"
+  resources:
+  - leases
+  resourceNames:
+  - "system-upgrade-controller"
+  verbs:
+  - get
+  - update
 - apiGroups:
   - upgrade.cattle.io
   resources:
diff --git a/manifests/system-upgrade-controller.yaml b/manifests/system-upgrade-controller.yaml
index 955f549712fa02ba5250eb4b69d690d9f44ffadf..8749102e18f5165155c1b906bd86bda260dc2368 100644
--- a/manifests/system-upgrade-controller.yaml
+++ b/manifests/system-upgrade-controller.yaml
@@ -19,6 +19,7 @@ metadata:
 data:
   SYSTEM_UPGRADE_CONTROLLER_DEBUG: "false"
   SYSTEM_UPGRADE_CONTROLLER_THREADS: "2"
+  SYSTEM_UPGRADE_CONTROLLER_LEADER_ELECT: "true"
   SYSTEM_UPGRADE_JOB_ACTIVE_DEADLINE_SECONDS: "900"
   SYSTEM_UPGRADE_JOB_BACKOFF_LIMIT: "99"
   SYSTEM_UPGRADE_JOB_IMAGE_PULL_POLICY: "Always"
@@ -33,12 +34,16 @@ metadata:
   name: system-upgrade-controller
   namespace: system-upgrade
 spec:
+  strategy:
+    type: Recreate
   selector:
     matchLabels:
       upgrade.cattle.io/controller: system-upgrade-controller
   template:
     metadata:
       labels:
+        app.kubernetes.io/component: controller
+        app.kubernetes.io/name: system-upgrade-controller
         upgrade.cattle.io/controller: system-upgrade-controller # necessary to avoid drain
     spec:
       affinity:
@@ -48,6 +53,19 @@ spec:
               - matchExpressions:
                   - key: "node-role.kubernetes.io/control-plane"
                     operator: "Exists"
+                  - key: "kubernetes.io/os"
+                    operator: "In"
+                    values:
+                      - "linux"
+        podAntiAffinity:
+          requiredDuringSchedulingIgnoredDuringExecution:
+            - topologyKey: "kubernetes.io/hostname"
+              labelSelector:
+                matchExpressions:
+                  - key: "app.kubernetes.io/name"
+                    operator: "In"
+                    values:
+                      - "system-upgrade-controller"
       serviceAccountName: system-upgrade
       tolerations:
         - key: "CriticalAddonsOnly"
@@ -90,6 +108,10 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.namespace
+            - name: SYSTEM_UPGRADE_CONTROLLER_NODE_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: spec.nodeName
           volumeMounts:
             - name: etc-ssl
               mountPath: /etc/ssl
diff --git a/pkg/upgrade/controller.go b/pkg/upgrade/controller.go
index 4afce479aaa9db61581c353053af48bd5a21f107..77987e5b7d9f6822dbab9113b2079d96006953ce 100644
--- a/pkg/upgrade/controller.go
+++ b/pkg/upgrade/controller.go
@@ -4,21 +4,31 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"os"
 	"time"
 
 	upgradectl "github.com/rancher/system-upgrade-controller/pkg/generated/controllers/upgrade.cattle.io"
 	upgradeplan "github.com/rancher/system-upgrade-controller/pkg/upgrade/plan"
+	"github.com/rancher/system-upgrade-controller/pkg/version"
 	"github.com/rancher/wrangler/v3/pkg/apply"
 	"github.com/rancher/wrangler/v3/pkg/crd"
 	batchctl "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch"
 	corectl "github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
+	"github.com/rancher/wrangler/v3/pkg/leader"
+	"github.com/rancher/wrangler/v3/pkg/schemes"
 	"github.com/rancher/wrangler/v3/pkg/start"
+	"github.com/sirupsen/logrus"
+	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/types"
 	"k8s.io/client-go/kubernetes"
+	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
 	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/record"
 )
 
 var (
+	ErrPlanNotReady                = errors.New("plan is not valid and resolved")
 	ErrControllerNameRequired      = errors.New("controller name is required")
 	ErrControllerNamespaceRequired = errors.New("controller namespace is required")
 )
@@ -26,20 +36,23 @@ var (
 type Controller struct {
 	Namespace string
 	Name      string
+	NodeName  string
 
 	cfg *rest.Config
 	kcs *kubernetes.Clientset
 
-	clusterID string
+	clusterID   string
+	leaderElect bool
 
 	coreFactory    *corectl.Factory
 	batchFactory   *batchctl.Factory
 	upgradeFactory *upgradectl.Factory
 
-	apply apply.Apply
+	apply    apply.Apply
+	recorder record.EventRecorder
 }
 
-func NewController(cfg *rest.Config, namespace, name string, resync time.Duration) (ctl *Controller, err error) {
+func NewController(cfg *rest.Config, namespace, name, nodeName string, leaderElect bool, resync time.Duration) (ctl *Controller, err error) {
 	if namespace == "" {
 		return nil, ErrControllerNamespaceRequired
 	}
@@ -47,6 +60,13 @@ func NewController(cfg *rest.Config, namespace, name string, resync time.Duratio
 		return nil, ErrControllerNameRequired
 	}
 
+	if nodeName == "" {
+		nodeName, err = os.Hostname()
+		if err != nil {
+			return nil, err
+		}
+	}
+
 	if cfg == nil {
 		cfg, err = rest.InClusterConfig()
 		if err != nil {
@@ -55,9 +75,11 @@ func NewController(cfg *rest.Config, namespace, name string, resync time.Duratio
 	}
 
 	ctl = &Controller{
-		Namespace: namespace,
-		Name:      name,
-		cfg:       cfg,
+		Namespace:   namespace,
+		Name:        name,
+		NodeName:    nodeName,
+		cfg:         cfg,
+		leaderElect: leaderElect,
 	}
 
 	ctl.kcs, err = kubernetes.NewForConfig(cfg)
@@ -90,10 +112,24 @@ func NewController(cfg *rest.Config, namespace, name string, resync time.Duratio
 		return nil, err
 	}
 
+	eventBroadcaster := record.NewBroadcaster()
+	eventBroadcaster.StartStructuredLogging(0)
+	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ctl.kcs.CoreV1().Events(metav1.NamespaceAll)})
+	ctl.recorder = eventBroadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ctl.Name, Host: ctl.NodeName})
+
 	return ctl, nil
 }
 
 func (ctl *Controller) Start(ctx context.Context, threads int) error {
+	// This is consistent with events attached to the node generated by the kubelet
+	// https://github.com/kubernetes/kubernetes/blob/612130dd2f4188db839ea5c2dea07a96b0ad8d1c/pkg/kubelet/kubelet.go#L479-L485
+	nodeRef := &corev1.ObjectReference{
+		Kind:      "Node",
+		Name:      ctl.NodeName,
+		UID:       types.UID(ctl.NodeName),
+		Namespace: "",
+	}
+
 	// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
 	systemNS, err := ctl.kcs.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{})
 	if err != nil {
@@ -119,7 +155,23 @@ func (ctl *Controller) Start(ctx context.Context, threads int) error {
 		return err
 	}
 
-	return start.All(ctx, threads, ctl.coreFactory, ctl.batchFactory, ctl.upgradeFactory)
+	appName := fmt.Sprintf("%s %s (%s)", version.Program, version.Version, version.GitCommit)
+	run := func(ctx context.Context) {
+		if err := start.All(ctx, threads, ctl.coreFactory, ctl.batchFactory, ctl.upgradeFactory); err != nil {
+			ctl.recorder.Eventf(nodeRef, corev1.EventTypeWarning, "StartFailed", "%s failed to start controllers for %s/%s: %v", appName, ctl.Namespace, ctl.Name, err)
+			logrus.Panicf("Failed to start controllers: %v", err)
+		}
+		ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Started", "%s running as %s/%s", appName, ctl.Namespace, ctl.Name)
+	}
+
+	if ctl.leaderElect {
+		ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Starting", "%s starting leader election for %s/%s", appName, ctl.Namespace, ctl.Name)
+		leader.RunOrDie(ctx, ctl.Namespace, ctl.Name, ctl.kcs, run)
+	} else {
+		run(ctx)
+	}
+
+	return nil
 }
 
 func (ctl *Controller) registerCRD(ctx context.Context) error {
diff --git a/pkg/upgrade/handle_batch.go b/pkg/upgrade/handle_batch.go
index 28a237a2faada389a09f0585b05cfdaf1bcf8594..843e1125ca5612e2b7ec88d396b2491603d5803c 100644
--- a/pkg/upgrade/handle_batch.go
+++ b/pkg/upgrade/handle_batch.go
@@ -11,6 +11,7 @@ import (
 	upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
 	upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
 	batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1"
+	"github.com/sirupsen/logrus"
 	batchv1 "k8s.io/api/batch/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -61,6 +62,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
 			return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
 		}
 		// trigger the plan when we're done, might free up a concurrency slot
+		logrus.Debugf("Enqueing sync of Plan %s/%s from Job %s/%s", obj.Namespace, planName, obj.Namespace, obj.Name)
 		defer plans.Enqueue(obj.Namespace, planName)
 		// identify the node that this job is targeting
 		nodeName, ok := obj.Labels[upgradeapi.LabelNode]
@@ -127,6 +129,7 @@ func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, d
 		ttlSecondsAfterFinished = time.Second * time.Duration(*job.Spec.TTLSecondsAfterFinished)
 	}
 	if interval := time.Now().Sub(lastTransitionTime); interval < ttlSecondsAfterFinished {
+		logrus.Debugf("Enqueing sync of Job %s/%s in %v", job.Namespace, job.Name, ttlSecondsAfterFinished-interval)
 		jobController.EnqueueAfter(job.Namespace, job.Name, ttlSecondsAfterFinished-interval)
 		return nil
 	}
diff --git a/pkg/upgrade/handle_core.go b/pkg/upgrade/handle_core.go
index 1d357f7c26dc6817371fa947dd6af4aad318529c..4ed6f6e6f91c6a6abe5b7c361da36a9f3864c5b3 100644
--- a/pkg/upgrade/handle_core.go
+++ b/pkg/upgrade/handle_core.go
@@ -3,6 +3,7 @@ package upgrade
 import (
 	"context"
 
+	"github.com/sirupsen/logrus"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -24,6 +25,7 @@ func (ctl *Controller) handleNodes(ctx context.Context) error {
 			if selector, err := metav1.LabelSelectorAsSelector(plan.Spec.NodeSelector); err != nil {
 				return obj, err
 			} else if selector.Matches(labels.Set(obj.Labels)) {
+				logrus.Debugf("Enqueing sync of Plan %s/%s from Node %s", plan.Namespace, plan.Name, obj.Name)
 				plans.Enqueue(plan.Namespace, plan.Name)
 			}
 		}
@@ -49,6 +51,7 @@ func (ctl *Controller) handleSecrets(ctx context.Context) error {
 			for _, secret := range plan.Spec.Secrets {
 				if obj.Name == secret.Name {
 					if !secret.IgnoreUpdates {
+						logrus.Debugf("Enqueing sync of Plan %s/%s from Secret %s/%s", plan.Namespace, plan.Name, obj.Namespace, obj.Name)
 						plans.Enqueue(plan.Namespace, plan.Name)
 						continue
 					}
diff --git a/pkg/upgrade/handle_upgrade.go b/pkg/upgrade/handle_upgrade.go
index bb8f8722bc042da64a9777d63e5281ade3f6c330..841db0eee920d5a40b3f94713e95a33650686a70 100644
--- a/pkg/upgrade/handle_upgrade.go
+++ b/pkg/upgrade/handle_upgrade.go
@@ -2,6 +2,8 @@ package upgrade
 
 import (
 	"context"
+	"slices"
+	"strings"
 	"time"
 
 	upgradeapiv1 "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io/v1"
@@ -11,6 +13,7 @@ import (
 	upgradeplan "github.com/rancher/system-upgrade-controller/pkg/upgrade/plan"
 	"github.com/rancher/wrangler/v3/pkg/generic"
 	"github.com/sirupsen/logrus"
+	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 )
 
@@ -20,6 +23,7 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
 	plans := ctl.upgradeFactory.Upgrade().V1().Plan()
 	secrets := ctl.coreFactory.Core().V1().Secret()
 	secretsCache := secrets.Cache()
+	recorder := ctl.recorder
 
 	// process plan events, mutating status accordingly
 	upgradectlv1.RegisterPlanStatusHandler(ctx, plans, "", ctl.Name,
@@ -29,27 +33,52 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
 			}
 			logrus.Debugf("PLAN STATUS HANDLER: plan=%s/%s@%s, status=%+v", obj.Namespace, obj.Name, obj.ResourceVersion, status)
 
+			// ensure that the complete status is present
+			complete := upgradeapiv1.PlanComplete
+			complete.CreateUnknownIfNotExists(obj)
+
+			// validate plan, and generate events for transitions
 			validated := upgradeapiv1.PlanSpecValidated
 			validated.CreateUnknownIfNotExists(obj)
 			if err := upgradeplan.Validate(obj); err != nil {
+				if !validated.IsFalse(obj) {
+					recorder.Eventf(obj, corev1.EventTypeWarning, "ValidateFailed", "Failed to validate plan: %v", err)
+				}
 				validated.SetError(obj, "Error", err)
 				return upgradeplan.DigestStatus(obj, secretsCache)
 			}
-			validated.False(obj)
+			if !validated.IsTrue(obj) {
+				recorder.Event(obj, corev1.EventTypeNormal, "Validated", "Plan is valid")
+			}
 			validated.SetError(obj, "PlanIsValid", nil)
 
+			// resolve version from spec or channel, and generate events for transitions
 			resolved := upgradeapiv1.PlanLatestResolved
 			resolved.CreateUnknownIfNotExists(obj)
+			// raise error if neither version nor channel are set. this is handled separate from other validation.
 			if obj.Spec.Version == "" && obj.Spec.Channel == "" {
+				if !resolved.IsFalse(obj) {
+					recorder.Event(obj, corev1.EventTypeWarning, "ResolveFailed", upgradeapiv1.ErrPlanUnresolvable.Error())
+				}
 				resolved.SetError(obj, "Error", upgradeapiv1.ErrPlanUnresolvable)
 				return upgradeplan.DigestStatus(obj, secretsCache)
 			}
+			// use static version from spec if set
 			if obj.Spec.Version != "" {
-				resolved.False(obj)
+				latest := upgradeplan.MungeVersion(obj.Spec.Version)
+				if !resolved.IsTrue(obj) || obj.Status.LatestVersion != latest {
+					// Version has changed, set complete to false and emit event
+					recorder.Eventf(obj, corev1.EventTypeNormal, "Resolved", "Resolved latest version from Spec.Version: %s", latest)
+					complete.False(obj)
+					complete.Message(obj, "")
+					complete.Reason(obj, "Resolved")
+				}
+				obj.Status.LatestVersion = latest
 				resolved.SetError(obj, "Version", nil)
-				obj.Status.LatestVersion = upgradeplan.MungeVersion(obj.Spec.Version)
 				return upgradeplan.DigestStatus(obj, secretsCache)
 			}
+			// re-enqueue a sync at the next channel polling interval, if the LastUpdated time
+			// on the resolved status indicates that the interval has not been reached.
 			if resolved.IsTrue(obj) {
 				if lastUpdated, err := time.Parse(time.RFC3339, resolved.GetLastUpdated(obj)); err == nil {
 					if interval := time.Now().Sub(lastUpdated); interval < upgradeplan.PollingInterval {
@@ -58,13 +87,24 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
 					}
 				}
 			}
+			// no static version, poll the channel to get latest version
 			latest, err := upgradeplan.ResolveChannel(ctx, obj.Spec.Channel, obj.Status.LatestVersion, ctl.clusterID)
 			if err != nil {
+				if !resolved.IsFalse(obj) {
+					recorder.Eventf(obj, corev1.EventTypeWarning, "ResolveFailed", "Failed to resolve latest version from Spec.Channel: %v", err)
+				}
 				return status, err
 			}
-			resolved.False(obj)
+			latest = upgradeplan.MungeVersion(latest)
+			if !resolved.IsTrue(obj) || obj.Status.LatestVersion != latest {
+				// Version has changed, set complete to false and emit event
+				recorder.Eventf(obj, corev1.EventTypeNormal, "Resolved", "Resolved latest version from Spec.Channel: %s", latest)
+				complete.False(obj)
+				complete.Message(obj, "")
+				complete.Reason(obj, "Resolved")
+			}
+			obj.Status.LatestVersion = latest
 			resolved.SetError(obj, "Channel", nil)
-			obj.Status.LatestVersion = upgradeplan.MungeVersion(latest)
 			return upgradeplan.DigestStatus(obj, secretsCache)
 		},
 	)
@@ -75,30 +115,61 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
 			if obj == nil {
 				return objects, status, nil
 			}
+
 			logrus.Debugf("PLAN GENERATING HANDLER: plan=%s/%s@%s, status=%+v", obj.Namespace, obj.Name, obj.ResourceVersion, status)
-			if !upgradeapiv1.PlanSpecValidated.IsTrue(obj) {
-				return objects, status, nil
-			}
-			if !upgradeapiv1.PlanLatestResolved.IsTrue(obj) {
+			// return early without selecting nodes if the plan is not validated and resolved
+			complete := upgradeapiv1.PlanComplete
+			if !upgradeapiv1.PlanSpecValidated.IsTrue(obj) || !upgradeapiv1.PlanLatestResolved.IsTrue(obj) {
+				complete.SetError(obj, "NotReady", ErrPlanNotReady)
 				return objects, status, nil
 			}
+
+			// select nodes to apply the plan on based on nodeSelector, plan hash, and concurrency
 			concurrentNodes, err := upgradeplan.SelectConcurrentNodes(obj, nodes.Cache())
 			if err != nil {
+				recorder.Eventf(obj, corev1.EventTypeWarning, "SelectNodesFailed", "Failed to select Nodes: %v", err)
+				complete.SetError(obj, "SelectNodesFailed", err)
 				return objects, status, err
 			}
+
+			// Create an upgrade job for each node, and add the node name to Status.Applying
+			// Note that this initially creates paused jobs, and then on a second pass once
+			// the node has been added to Status.Applying the job parallelism is patched to 1
+			// to unpause the job. Ref: https://github.com/rancher/system-upgrade-controller/issues/134
 			concurrentNodeNames := make([]string, len(concurrentNodes))
 			for i := range concurrentNodes {
 				node := concurrentNodes[i]
 				objects = append(objects, upgradejob.New(obj, node, ctl.Name))
 				concurrentNodeNames[i] = upgradenode.Hostname(node)
 			}
-			obj.Status.Applying = concurrentNodeNames[:]
-			upgradeapiv1.PlanComplete.SetStatusBool(obj, len(concurrentNodeNames) == 0)
+
+			if len(concurrentNodeNames) > 0 {
+				// If the node list has changed, update Applying status with new node list and emit an event
+				if !slices.Equal(obj.Status.Applying, concurrentNodeNames) {
+					recorder.Eventf(obj, corev1.EventTypeNormal, "SyncJob", "Jobs synced for version %s on Nodes %s. Hash: %s",
+						obj.Status.LatestVersion, strings.Join(concurrentNodeNames, ","), obj.Status.LatestHash)
+				}
+				obj.Status.Applying = concurrentNodeNames[:]
+				complete.False(obj)
+				complete.Message(obj, "")
+				complete.Reason(obj, "SyncJob")
+			} else {
+				// set PlanComplete to true when no nodes have been selected,
+				// and emit an event if the plan just completed
+				if !complete.IsTrue(obj) {
+					recorder.Eventf(obj, corev1.EventTypeNormal, "Complete", "Jobs complete for version %s. Hash: %s",
+						obj.Status.LatestVersion, obj.Status.LatestHash)
+				}
+				obj.Status.Applying = nil
+				complete.SetError(obj, "Complete", nil)
+			}
+
 			return objects, obj.Status, nil
 		},
 		&generic.GeneratingHandlerOptions{
-			AllowClusterScoped: true,
-			NoOwnerReference:   true,
+			AllowClusterScoped:            true,
+			NoOwnerReference:              true,
+			UniqueApplyForResourceVersion: true,
 		},
 	)
 
diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go
index 3a0bf1ae8a31cc7ca31d629d830a1ae91188070d..df5aa95edc554c018ee0999231b385ab1bb1c72e 100644
--- a/pkg/upgrade/job/job.go
+++ b/pkg/upgrade/job/job.go
@@ -2,7 +2,7 @@ package job
 
 import (
 	"os"
-	"sort"
+	"slices"
 	"strconv"
 	"strings"
 
@@ -18,6 +18,7 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/selection"
+	"k8s.io/utils/pointer"
 )
 
 const (
@@ -240,13 +241,14 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
 					ImagePullSecrets: plan.Spec.ImagePullSecrets,
 				},
 			},
-			Completions: new(int32),
-			Parallelism: new(int32),
+			Completions: pointer.Int32(1), // Run only once
+			Parallelism: pointer.Int32(0), // Create Job paused
 		},
 	}
 
-	*job.Spec.Completions = 1
-	if i := sort.SearchStrings(plan.Status.Applying, nodeHostname); i < len(plan.Status.Applying) && plan.Status.Applying[i] == nodeHostname {
+	// After the Job has been created and registered as in-progress in the Plan Status,
+	// update parallelism to 1 to unpause it.  Ref: https://github.com/rancher/system-upgrade-controller/issues/134
+	if slices.Contains(plan.Status.Applying, nodeHostname) {
 		*job.Spec.Parallelism = 1
 	}
 
diff --git a/pkg/version/version.go b/pkg/version/version.go
index a645beef9eb7321888f6769a8871206b9cb48e8a..3c52397cbfae7eb05a132c918602c5e726d93f08 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -1,6 +1,7 @@
 package version
 
 var (
+	Program   = "system-upgrade-controller"
 	Version   = "dev"
 	GitCommit = "HEAD"
 )