From 90e3d082a41fefaf09d14286b880c54a28af6c21 Mon Sep 17 00:00:00 2001
From: Jacob Blain Christen <dweomer5@gmail.com>
Date: Fri, 13 Aug 2021 09:24:45 -0700
Subject: [PATCH] adjust job node affinity to use hostname value (#126)

Fix job node affinity for nodes with names that do not match the value
of their `kubernetes.io/hostname` label. With this change, node names
are still reported in the .status.applying slice but jobs are generated
with correct node-affinity via matching on the value of
`kubernetes.io/hostname`.

Fixes #119

Signed-off-by: Jacob Blain Christen <jacob@rancher.com>
---
 pkg/upgrade/handle_upgrade.go |  9 ++++-----
 pkg/upgrade/job/flags_test.go | 13 ++++++++++++-
 pkg/upgrade/job/job.go        | 20 +++++++++++++-------
 pkg/upgrade/plan/plan.go      | 12 +++++++-----
 4 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/pkg/upgrade/handle_upgrade.go b/pkg/upgrade/handle_upgrade.go
index f297cffc..260b83b0 100644
--- a/pkg/upgrade/handle_upgrade.go
+++ b/pkg/upgrade/handle_upgrade.go
@@ -68,15 +68,14 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
 			if !upgradeapiv1.PlanLatestResolved.IsTrue(obj) {
 				return objects, status, nil
 			}
-			concurrentNodeNames, err := upgradeplan.SelectConcurrentNodeNames(obj, nodes.Cache())
+			concurrentNodes, err := upgradeplan.SelectConcurrentNodes(obj, nodes.Cache())
 			if err != nil {
 				return objects, status, err
 			}
-			logrus.Debugf("concurrentNodeNames = %q", concurrentNodeNames)
-			for _, nodeName := range concurrentNodeNames {
-				objects = append(objects, upgradejob.New(obj, nodeName, ctl.Name))
+			for _, node := range concurrentNodes {
+				objects = append(objects, upgradejob.New(obj, node, ctl.Name))
+				obj.Status.Applying = append(obj.Status.Applying, node.Name)
 			}
-			obj.Status.Applying = concurrentNodeNames
 			return objects, obj.Status, nil
 		},
 		&generic.GeneratingHandlerOptions{
diff --git a/pkg/upgrade/job/flags_test.go b/pkg/upgrade/job/flags_test.go
index 12b0674f..3ee7d725 100644
--- a/pkg/upgrade/job/flags_test.go
+++ b/pkg/upgrade/job/flags_test.go
@@ -5,6 +5,9 @@ import (
 	"testing"
 
 	upgradeapiv1 "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io/v1"
+	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
 )
 
 func TestNew(t *testing.T) {
@@ -20,7 +23,15 @@ func TestNew(t *testing.T) {
 				Spec: upgradeapiv1.PlanSpec{Drain: &upgradeapiv1.DrainSpec{SkipWaitForDeleteTimeout: val},
 					Upgrade: &upgradeapiv1.ContainerSpec{Image: "image"}},
 			})
-			job := New(plan, "node1", "ctr")
+			node := &corev1.Node{
+				ObjectMeta: metav1.ObjectMeta{
+					Name: "node1",
+					Labels: labels.Set{
+						corev1.LabelHostname: "node1",
+					},
+				},
+			}
+			job := New(plan, node, "ctr")
 			t.Logf("%#v", job.Spec.Template.Spec.InitContainers)
 			for _, container := range job.Spec.Template.Spec.InitContainers {
 				if container.Name == "drain" {
diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go
index 81cdb579..ed060e10 100644
--- a/pkg/upgrade/job/job.go
+++ b/pkg/upgrade/job/job.go
@@ -90,19 +90,25 @@ var (
 	ConditionFailed   = condition.Cond(batchv1.JobFailed)
 )
 
-func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job {
+func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *batchv1.Job {
 	hostPathDirectory := corev1.HostPathDirectory
 	labelPlanName := upgradeapi.LabelPlanName(plan.Name)
+	nodeHostname := node.Name
+	if node.Labels != nil {
+		if hostname, ok := node.Labels[corev1.LabelHostname]; ok {
+			nodeHostname = hostname
+		}
+	}
 	job := &batchv1.Job{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:      name.SafeConcatName("apply", plan.Name, "on", nodeName, "with", plan.Status.LatestHash),
+			Name:      name.SafeConcatName("apply", plan.Name, "on", node.Name, "with", plan.Status.LatestHash),
 			Namespace: plan.Namespace,
 			Annotations: labels.Set{
 				upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
 			},
 			Labels: labels.Set{
 				upgradeapi.LabelController: controllerName,
-				upgradeapi.LabelNode:       nodeName,
+				upgradeapi.LabelNode:       node.Name,
 				upgradeapi.LabelPlan:       plan.Name,
 				upgradeapi.LabelVersion:    plan.Status.LatestVersion,
 				labelPlanName:              plan.Status.LatestHash,
@@ -115,7 +121,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
 				ObjectMeta: metav1.ObjectMeta{
 					Labels: labels.Set{
 						upgradeapi.LabelController: controllerName,
-						upgradeapi.LabelNode:       nodeName,
+						upgradeapi.LabelNode:       node.Name,
 						upgradeapi.LabelPlan:       plan.Name,
 						upgradeapi.LabelVersion:    plan.Status.LatestVersion,
 						labelPlanName:              plan.Status.LatestHash,
@@ -135,7 +141,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
 										Key:      corev1.LabelHostname,
 										Operator: corev1.NodeSelectorOpIn,
 										Values: []string{
-											nodeName,
+											nodeHostname,
 										},
 									}},
 								}},
@@ -213,7 +219,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
 	// then we cordon/drain
 	cordon, drain := plan.Spec.Cordon, plan.Spec.Drain
 	if drain != nil {
-		args := []string{"drain", nodeName, "--pod-selector", `!` + upgradeapi.LabelController}
+		args := []string{"drain", node.Name, "--pod-selector", `!` + upgradeapi.LabelController}
 		if drain.IgnoreDaemonSets == nil || *plan.Spec.Drain.IgnoreDaemonSets {
 			args = append(args, "--ignore-daemonsets")
 		}
@@ -252,7 +258,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
 		podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers,
 			upgradectr.New("cordon", upgradeapiv1.ContainerSpec{
 				Image: KubectlImage,
-				Args:  []string{"cordon", nodeName},
+				Args:  []string{"cordon", node.Name},
 			},
 				upgradectr.WithSecrets(plan.Spec.Secrets),
 				upgradectr.WithPlanEnvironment(plan.Name, plan.Status),
diff --git a/pkg/upgrade/plan/plan.go b/pkg/upgrade/plan/plan.go
index d16fb5d8..b93fdda7 100644
--- a/pkg/upgrade/plan/plan.go
+++ b/pkg/upgrade/plan/plan.go
@@ -123,10 +123,10 @@ func ResolveChannel(ctx context.Context, url, latestVersion, clusterID string) (
 	return "", fmt.Errorf("unexpected response: %s %s", response.Proto, response.Status)
 }
 
-func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]string, error) {
+func SelectConcurrentNodes(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]*corev1.Node, error) {
 	var (
 		applying = plan.Status.Applying
-		selected []string
+		selected []*corev1.Node
 	)
 	nodeSelector, err := metav1.LabelSelectorAsSelector(plan.Spec.NodeSelector)
 	if err != nil {
@@ -147,7 +147,7 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
 			return nil, err
 		}
 		for _, node := range applyingNodes {
-			selected = append(selected, node.Name)
+			selected = append(selected, node.DeepCopy())
 		}
 		requirementNotApplying, err := labels.NewRequirement(corev1.LabelHostname, selection.NotIn, applying)
 		if err != nil {
@@ -173,10 +173,12 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
 		})
 
 		for i := 0; i < len(candidateNodes) && int64(len(selected)) < plan.Spec.Concurrency; i++ {
-			selected = append(selected, candidateNodes[i].Name)
+			selected = append(selected, candidateNodes[i].DeepCopy())
 		}
 	}
-	sort.Strings(selected)
+	sort.Slice(selected, func(i, j int) bool {
+		return selected[i].Name < selected[i].Name
+	})
 	return selected, nil
 }
 
-- 
GitLab