Skip to content
Snippets Groups Projects
Unverified Commit 90e3d082 authored by Jacob Blain Christen's avatar Jacob Blain Christen Committed by GitHub
Browse files

adjust job node affinity to use hostname value (#126)


Fix job node affinity for nodes with names that do not match the value
of their `kubernetes.io/hostname` label. With this change, node names
are still reported in the .status.applying slice but jobs are generated
with correct node-affinity via matching on the value of
`kubernetes.io/hostname`.

Fixes #119

Signed-off-by: default avatarJacob Blain Christen <jacob@rancher.com>
parent b4e5a67d
Branches
Tags
No related merge requests found
......@@ -68,15 +68,14 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
if !upgradeapiv1.PlanLatestResolved.IsTrue(obj) {
return objects, status, nil
}
concurrentNodeNames, err := upgradeplan.SelectConcurrentNodeNames(obj, nodes.Cache())
concurrentNodes, err := upgradeplan.SelectConcurrentNodes(obj, nodes.Cache())
if err != nil {
return objects, status, err
}
logrus.Debugf("concurrentNodeNames = %q", concurrentNodeNames)
for _, nodeName := range concurrentNodeNames {
objects = append(objects, upgradejob.New(obj, nodeName, ctl.Name))
for _, node := range concurrentNodes {
objects = append(objects, upgradejob.New(obj, node, ctl.Name))
obj.Status.Applying = append(obj.Status.Applying, node.Name)
}
obj.Status.Applying = concurrentNodeNames
return objects, obj.Status, nil
},
&generic.GeneratingHandlerOptions{
......
......@@ -5,6 +5,9 @@ import (
"testing"
upgradeapiv1 "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
func TestNew(t *testing.T) {
......@@ -20,7 +23,15 @@ func TestNew(t *testing.T) {
Spec: upgradeapiv1.PlanSpec{Drain: &upgradeapiv1.DrainSpec{SkipWaitForDeleteTimeout: val},
Upgrade: &upgradeapiv1.ContainerSpec{Image: "image"}},
})
job := New(plan, "node1", "ctr")
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: labels.Set{
corev1.LabelHostname: "node1",
},
},
}
job := New(plan, node, "ctr")
t.Logf("%#v", job.Spec.Template.Spec.InitContainers)
for _, container := range job.Spec.Template.Spec.InitContainers {
if container.Name == "drain" {
......
......@@ -90,19 +90,25 @@ var (
ConditionFailed = condition.Cond(batchv1.JobFailed)
)
func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job {
func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *batchv1.Job {
hostPathDirectory := corev1.HostPathDirectory
labelPlanName := upgradeapi.LabelPlanName(plan.Name)
nodeHostname := node.Name
if node.Labels != nil {
if hostname, ok := node.Labels[corev1.LabelHostname]; ok {
nodeHostname = hostname
}
}
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name.SafeConcatName("apply", plan.Name, "on", nodeName, "with", plan.Status.LatestHash),
Name: name.SafeConcatName("apply", plan.Name, "on", node.Name, "with", plan.Status.LatestHash),
Namespace: plan.Namespace,
Annotations: labels.Set{
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
},
Labels: labels.Set{
upgradeapi.LabelController: controllerName,
upgradeapi.LabelNode: nodeName,
upgradeapi.LabelNode: node.Name,
upgradeapi.LabelPlan: plan.Name,
upgradeapi.LabelVersion: plan.Status.LatestVersion,
labelPlanName: plan.Status.LatestHash,
......@@ -115,7 +121,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
ObjectMeta: metav1.ObjectMeta{
Labels: labels.Set{
upgradeapi.LabelController: controllerName,
upgradeapi.LabelNode: nodeName,
upgradeapi.LabelNode: node.Name,
upgradeapi.LabelPlan: plan.Name,
upgradeapi.LabelVersion: plan.Status.LatestVersion,
labelPlanName: plan.Status.LatestHash,
......@@ -135,7 +141,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
Key: corev1.LabelHostname,
Operator: corev1.NodeSelectorOpIn,
Values: []string{
nodeName,
nodeHostname,
},
}},
}},
......@@ -213,7 +219,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
// then we cordon/drain
cordon, drain := plan.Spec.Cordon, plan.Spec.Drain
if drain != nil {
args := []string{"drain", nodeName, "--pod-selector", `!` + upgradeapi.LabelController}
args := []string{"drain", node.Name, "--pod-selector", `!` + upgradeapi.LabelController}
if drain.IgnoreDaemonSets == nil || *plan.Spec.Drain.IgnoreDaemonSets {
args = append(args, "--ignore-daemonsets")
}
......@@ -252,7 +258,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers,
upgradectr.New("cordon", upgradeapiv1.ContainerSpec{
Image: KubectlImage,
Args: []string{"cordon", nodeName},
Args: []string{"cordon", node.Name},
},
upgradectr.WithSecrets(plan.Spec.Secrets),
upgradectr.WithPlanEnvironment(plan.Name, plan.Status),
......
......@@ -123,10 +123,10 @@ func ResolveChannel(ctx context.Context, url, latestVersion, clusterID string) (
return "", fmt.Errorf("unexpected response: %s %s", response.Proto, response.Status)
}
func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]string, error) {
func SelectConcurrentNodes(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]*corev1.Node, error) {
var (
applying = plan.Status.Applying
selected []string
selected []*corev1.Node
)
nodeSelector, err := metav1.LabelSelectorAsSelector(plan.Spec.NodeSelector)
if err != nil {
......@@ -147,7 +147,7 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
return nil, err
}
for _, node := range applyingNodes {
selected = append(selected, node.Name)
selected = append(selected, node.DeepCopy())
}
requirementNotApplying, err := labels.NewRequirement(corev1.LabelHostname, selection.NotIn, applying)
if err != nil {
......@@ -173,10 +173,12 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
})
for i := 0; i < len(candidateNodes) && int64(len(selected)) < plan.Spec.Concurrency; i++ {
selected = append(selected, candidateNodes[i].Name)
selected = append(selected, candidateNodes[i].DeepCopy())
}
}
sort.Strings(selected)
sort.Slice(selected, func(i, j int) bool {
return selected[i].Name < selected[i].Name
})
return selected, nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment