From 8522331cf260d8544483ef8b3aaecbc9fd38a99e Mon Sep 17 00:00:00 2001
From: Polina Bungina <27892524+hughcapet@users.noreply.github.com>
Date: Wed, 15 Jan 2025 20:04:36 +0300
Subject: [PATCH] Extend MaintenanceWindows parameter usage (#2810)

Consider maintenance window when migrating master pods and replacing pods (rolling update)
---
 docs/administrator.md              |  3 +
 docs/reference/cluster_manifest.md |  6 +-
 e2e/tests/test_e2e.py              |  7 +--
 pkg/cluster/cluster.go             | 44 ++++++++++++++-
 pkg/cluster/cluster_test.go        | 88 ++++++++++++++++++++++++++++++
 pkg/cluster/majorversionupgrade.go |  2 +-
 pkg/cluster/resources.go           |  4 +-
 pkg/cluster/sync.go                | 10 +++-
 pkg/cluster/util.go                |  2 +-
 pkg/cluster/util_test.go           |  6 +-
 pkg/util/patroni/patroni.go        | 26 ++++-----
 11 files changed, 168 insertions(+), 30 deletions(-)

diff --git a/docs/administrator.md b/docs/administrator.md
index 55abebc8..9f8e8657 100644
--- a/docs/administrator.md
+++ b/docs/administrator.md
@@ -208,6 +208,9 @@ Note that, changes in `SPILO_CONFIGURATION` env variable under `bootstrap.dcs`
 path are ignored for the diff. They will be applied through Patroni's rest api
 interface, following a restart of all instances.
 
+Rolling update is postponed until the next maintenance window if any is defined
+under the `maintenanceWindows` cluster manifest parameter.
+
 The operator also support lazy updates of the Spilo image. In this case the
 StatefulSet is only updated, but no rolling update follows. This feature saves
 you a switchover - and hence downtime - when you know pods are re-started later
diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md
index 8d02ee7d..d45bc094 100644
--- a/docs/reference/cluster_manifest.md
+++ b/docs/reference/cluster_manifest.md
@@ -116,9 +116,9 @@ These parameters are grouped directly under  the `spec` key in the manifest.
 
 * **maintenanceWindows**
   a list which defines specific time frames when certain maintenance operations
-  are allowed. So far, it is only implemented for automatic major version
-  upgrades. Accepted formats are "01:00-06:00" for daily maintenance windows or 
-  "Sat:00:00-04:00" for specific days, with all times in UTC.
+  such as automatic major upgrades or rolling updates are allowed. Accepted formats
+  are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific
+  days, with all times in UTC.
 
 * **users**
   a map of usernames to user flags for the users that should be created in the
diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py
index 04c6465c..4743bb4b 100644
--- a/e2e/tests/test_e2e.py
+++ b/e2e/tests/test_e2e.py
@@ -1251,7 +1251,7 @@ class EndToEndTestCase(unittest.TestCase):
             "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
 
-        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
+        # no pod replacement outside of the maintenance window
         k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
         self.eventuallyEqual(check_version, 14, "Version should not be upgraded")
@@ -1276,7 +1276,7 @@ class EndToEndTestCase(unittest.TestCase):
             "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16)
         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
 
-        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
+        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
         self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16")
@@ -1303,7 +1303,7 @@ class EndToEndTestCase(unittest.TestCase):
             "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_17)
         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
 
-        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
+        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
         self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set")
@@ -1313,7 +1313,6 @@ class EndToEndTestCase(unittest.TestCase):
             "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
 
-        k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
 
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 1a8d6f76..f835eaa0 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -1731,18 +1731,58 @@ func (c *Cluster) GetStatus() *ClusterStatus {
 	return status
 }
 
+func (c *Cluster) GetSwitchoverSchedule() string {
+	var possibleSwitchover, schedule time.Time
+
+	now := time.Now().UTC()
+	for _, window := range c.Spec.MaintenanceWindows {
+		// in the best case it is possible today
+		possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
+		if window.Everyday {
+			if now.After(possibleSwitchover) {
+				// we are already past the time for today, try tomorrow
+				possibleSwitchover = possibleSwitchover.AddDate(0, 0, 1)
+			}
+		} else {
+			if now.Weekday() != window.Weekday {
+				// get closest possible time for this window
+				possibleSwitchover = possibleSwitchover.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7))
+			} else if now.After(possibleSwitchover) {
+				// we are already past the time for today, try next week
+				possibleSwitchover = possibleSwitchover.AddDate(0, 0, 7)
+			}
+		}
+
+		if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) {
+			schedule = possibleSwitchover
+		}
+	}
+	return schedule.Format("2006-01-02T15:04+00")
+}
+
 // Switchover does a switchover (via Patroni) to a candidate pod
 func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
-
 	var err error
 	c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
+
+	if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
+		c.logger.Infof("postponing switchover, not in maintenance window")
+		schedule := c.GetSwitchoverSchedule()
+
+		if err := c.patroni.Switchover(curMaster, candidate.Name, schedule); err != nil {
+			return fmt.Errorf("could not schedule switchover: %v", err)
+		}
+		c.logger.Infof("switchover is scheduled at %s", schedule)
+		return nil
+	}
+
 	c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
 	stopCh := make(chan struct{})
 	ch := c.registerPodSubscriber(candidate)
 	defer c.unregisterPodSubscriber(candidate)
 	defer close(stopCh)
 
-	if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
+	if err = c.patroni.Switchover(curMaster, candidate.Name, ""); err == nil {
 		c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
 		_, err = c.waitForPodLabel(ch, stopCh, nil)
diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go
index 897ed6c0..9fb7f348 100644
--- a/pkg/cluster/cluster_test.go
+++ b/pkg/cluster/cluster_test.go
@@ -2057,3 +2057,91 @@ func TestCompareVolumeMounts(t *testing.T) {
 		})
 	}
 }
+
+func TestGetSwitchoverSchedule(t *testing.T) {
+	now := time.Now()
+
+	futureTimeStart := now.Add(1 * time.Hour)
+	futureWindowTimeStart := futureTimeStart.Format("15:04")
+	futureWindowTimeEnd := now.Add(2 * time.Hour).Format("15:04")
+	pastTimeStart := now.Add(-2 * time.Hour)
+	pastWindowTimeStart := pastTimeStart.Format("15:04")
+	pastWindowTimeEnd := now.Add(-1 * time.Hour).Format("15:04")
+
+	tests := []struct {
+		name     string
+		windows  []acidv1.MaintenanceWindow
+		expected string
+	}{
+		{
+			name: "everyday maintenance windows is later today",
+			windows: []acidv1.MaintenanceWindow{
+				{
+					Everyday:  true,
+					StartTime: mustParseTime(futureWindowTimeStart),
+					EndTime:   mustParseTime(futureWindowTimeEnd),
+				},
+			},
+			expected: futureTimeStart.Format("2006-01-02T15:04+00"),
+		},
+		{
+			name: "everyday maintenance window is tomorrow",
+			windows: []acidv1.MaintenanceWindow{
+				{
+					Everyday:  true,
+					StartTime: mustParseTime(pastWindowTimeStart),
+					EndTime:   mustParseTime(pastWindowTimeEnd),
+				},
+			},
+			expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
+		},
+		{
+			name: "weekday maintenance windows is later today",
+			windows: []acidv1.MaintenanceWindow{
+				{
+					Weekday:   now.Weekday(),
+					StartTime: mustParseTime(futureWindowTimeStart),
+					EndTime:   mustParseTime(futureWindowTimeEnd),
+				},
+			},
+			expected: futureTimeStart.Format("2006-01-02T15:04+00"),
+		},
+		{
+			name: "weekday maintenance windows is passed for today",
+			windows: []acidv1.MaintenanceWindow{
+				{
+					Weekday:   now.Weekday(),
+					StartTime: mustParseTime(pastWindowTimeStart),
+					EndTime:   mustParseTime(pastWindowTimeEnd),
+				},
+			},
+			expected: pastTimeStart.AddDate(0, 0, 7).Format("2006-01-02T15:04+00"),
+		},
+		{
+			name: "choose the earliest window",
+			windows: []acidv1.MaintenanceWindow{
+				{
+					Weekday:   now.AddDate(0, 0, 2).Weekday(),
+					StartTime: mustParseTime(futureWindowTimeStart),
+					EndTime:   mustParseTime(futureWindowTimeEnd),
+				},
+				{
+					Everyday:  true,
+					StartTime: mustParseTime(pastWindowTimeStart),
+					EndTime:   mustParseTime(pastWindowTimeEnd),
+				},
+			},
+			expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			cluster.Spec.MaintenanceWindows = tt.windows
+			schedule := cluster.GetSwitchoverSchedule()
+			if schedule != tt.expected {
+				t.Errorf("Expected GetSwitchoverSchedule to return %s, returned: %s", tt.expected, schedule)
+			}
+		})
+	}
+}
diff --git a/pkg/cluster/majorversionupgrade.go b/pkg/cluster/majorversionupgrade.go
index a4ae5f81..e7f9f4f0 100644
--- a/pkg/cluster/majorversionupgrade.go
+++ b/pkg/cluster/majorversionupgrade.go
@@ -129,7 +129,7 @@ func (c *Cluster) majorVersionUpgrade() error {
 		return nil
 	}
 
-	if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
+	if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
 		c.logger.Infof("skipping major version upgrade, not in maintenance window")
 		return nil
 	}
diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go
index 3f47328e..85711dbd 100644
--- a/pkg/cluster/resources.go
+++ b/pkg/cluster/resources.go
@@ -162,8 +162,8 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
 		return fmt.Errorf("pod %q does not belong to cluster", podName)
 	}
 
-	if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
-		return fmt.Errorf("could not failover: %v", err)
+	if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name, ""); err != nil {
+		return fmt.Errorf("could not switchover: %v", err)
 	}
 
 	return nil
diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go
index d1a33900..32aae605 100644
--- a/pkg/cluster/sync.go
+++ b/pkg/cluster/sync.go
@@ -497,6 +497,7 @@ func (c *Cluster) syncStatefulSet() error {
 	)
 	podsToRecreate := make([]v1.Pod, 0)
 	isSafeToRecreatePods := true
+	postponeReasons := make([]string, 0)
 	switchoverCandidates := make([]spec.NamespacedName, 0)
 
 	pods, err := c.listPods()
@@ -646,12 +647,19 @@ func (c *Cluster) syncStatefulSet() error {
 	c.logger.Debug("syncing Patroni config")
 	if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
 		c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
+		postponeReasons = append(postponeReasons, "errors during Patroni config sync")
 		isSafeToRecreatePods = false
 	}
 
 	// restart Postgres where it is still pending
 	if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
 		c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
+		postponeReasons = append(postponeReasons, "errors while restarting Postgres via Patroni API")
+		isSafeToRecreatePods = false
+	}
+
+	if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
+		postponeReasons = append(postponeReasons, "not in maintenance window")
 		isSafeToRecreatePods = false
 	}
 
@@ -666,7 +674,7 @@ func (c *Cluster) syncStatefulSet() error {
 			}
 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 		} else {
-			c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync")
+			c.logger.Warningf("postpone pod recreation until next sync - reason: %s", strings.Join(postponeReasons, `', '`))
 		}
 	}
 
diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go
index c570fcc3..0e31ecc3 100644
--- a/pkg/cluster/util.go
+++ b/pkg/cluster/util.go
@@ -663,7 +663,7 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac
 	return resources, nil
 }
 
-func isInMainternanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
+func isInMaintenanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
 	if len(specMaintenanceWindows) == 0 {
 		return true
 	}
diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go
index 2cb755c6..e245389a 100644
--- a/pkg/cluster/util_test.go
+++ b/pkg/cluster/util_test.go
@@ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) {
 	}
 }
 
-func TestIsInMaintenanceWindow(t *testing.T) {
+func TestisInMaintenanceWindow(t *testing.T) {
 	now := time.Now()
 	futureTimeStart := now.Add(1 * time.Hour)
 	futureTimeStartFormatted := futureTimeStart.Format("15:04")
@@ -705,8 +705,8 @@ func TestIsInMaintenanceWindow(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			cluster.Spec.MaintenanceWindows = tt.windows
-			if isInMainternanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {
-				t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected)
+			if isInMaintenanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {
+				t.Errorf("Expected isInMaintenanceWindow to return %t", tt.expected)
 			}
 		})
 	}
diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go
index 4d580f1c..2129f1ac 100644
--- a/pkg/util/patroni/patroni.go
+++ b/pkg/util/patroni/patroni.go
@@ -20,19 +20,19 @@ import (
 )
 
 const (
-	failoverPath = "/failover"
-	configPath   = "/config"
-	clusterPath  = "/cluster"
-	statusPath   = "/patroni"
-	restartPath  = "/restart"
-	ApiPort      = 8008
-	timeout      = 30 * time.Second
+	switchoverPath = "/switchover"
+	configPath     = "/config"
+	clusterPath    = "/cluster"
+	statusPath     = "/patroni"
+	restartPath    = "/restart"
+	ApiPort        = 8008
+	timeout        = 30 * time.Second
 )
 
 // Interface describe patroni methods
 type Interface interface {
 	GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
-	Switchover(master *v1.Pod, candidate string) error
+	Switchover(master *v1.Pod, candidate string, scheduled_at string) error
 	SetPostgresParameters(server *v1.Pod, options map[string]string) error
 	SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
 	GetMemberData(server *v1.Pod) (MemberData, error)
@@ -103,7 +103,7 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
 		}
 	}()
 
-	if resp.StatusCode != http.StatusOK {
+	if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 {
 		bodyBytes, err := io.ReadAll(resp.Body)
 		if err != nil {
 			return fmt.Errorf("could not read response: %v", err)
@@ -128,7 +128,7 @@ func (p *Patroni) httpGet(url string) (string, error) {
 		return "", fmt.Errorf("could not read response: %v", err)
 	}
 
-	if response.StatusCode != http.StatusOK {
+	if response.StatusCode < http.StatusOK || response.StatusCode >= 300 {
 		return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode)
 	}
 
@@ -136,9 +136,9 @@ func (p *Patroni) httpGet(url string) (string, error) {
 }
 
 // Switchover by calling Patroni REST API
-func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
+func (p *Patroni) Switchover(master *v1.Pod, candidate string, scheduled_at string) error {
 	buf := &bytes.Buffer{}
-	err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
+	err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate, "scheduled_at": scheduled_at})
 	if err != nil {
 		return fmt.Errorf("could not encode json: %v", err)
 	}
@@ -146,7 +146,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
 	if err != nil {
 		return err
 	}
-	return p.httpPostOrPatch(http.MethodPost, apiURLString+failoverPath, buf)
+	return p.httpPostOrPatch(http.MethodPost, apiURLString+switchoverPath, buf)
 }
 
 //TODO: add an option call /patroni to check if it is necessary to restart the server
-- 
GitLab