From 746df0d33d00affc97660a4c2bd654a1bb19509e Mon Sep 17 00:00:00 2001
From: Felix Kunde <felix-kunde@gmx.de>
Date: Wed, 26 Feb 2025 17:31:37 +0100
Subject: [PATCH] do not remove publications of slot defined in manifest
 (#2868)

* do not remove publications of slot defined in manifest
* improve condition to sync streams
* init publication tables map when adding manifest slots
* need to update c.Stream when there is no update
---
 pkg/cluster/cluster.go   |  1 +
 pkg/cluster/streams.go   | 50 +++++++++++++++++++++++++---------------
 pkg/cluster/sync.go      |  5 +++-
 pkg/cluster/util_test.go |  2 +-
 4 files changed, 38 insertions(+), 20 deletions(-)

diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index a839397b..e9a691fa 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -1160,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
 
 	// streams
 	if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) {
+		c.logger.Debug("syncing streams")
 		if err := c.syncStreams(); err != nil {
 			c.logger.Errorf("could not sync streams: %v", err)
 			updateFailed = true
diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go
index 564c213e..bf9be3fb 100644
--- a/pkg/cluster/streams.go
+++ b/pkg/cluster/streams.go
@@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
 	}
 
 	for slotName, slotAndPublication := range databaseSlotsList {
-		tables := slotAndPublication.Publication
-		tableNames := make([]string, len(tables))
+		newTables := slotAndPublication.Publication
+		tableNames := make([]string, len(newTables))
 		i := 0
-		for t := range tables {
+		for t := range newTables {
 			tableName, schemaName := getTableSchema(t)
 			tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
 			i++
@@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
 		tableList := strings.Join(tableNames, ", ")
 
 		currentTables, exists := currentPublications[slotName]
+		// if newTables is empty it means that it's definition was removed from streams section
+		// but when slot is defined in manifest we should sync publications, too
+		// by reusing current tables we make sure it is not
+		if len(newTables) == 0 {
+			tableList = currentTables
+		}
 		if !exists {
 			createPublications[slotName] = tableList
 		} else if currentTables != tableList {
@@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error {
 		return nil
 	}
 
-	databaseSlots := make(map[string]map[string]zalandov1.Slot)
-	slotsToSync := make(map[string]map[string]string)
-	requiredPatroniConfig := c.Spec.Patroni
-
-	if len(requiredPatroniConfig.Slots) > 0 {
-		for slotName, slotConfig := range requiredPatroniConfig.Slots {
-			slotsToSync[slotName] = slotConfig
-		}
-	}
-
+	// create map with every database and empty slot defintion
+	// we need it to detect removal of streams from databases
 	if err := c.initDbConn(); err != nil {
 		return fmt.Errorf("could not init database connection")
 	}
@@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error {
 	if err != nil {
 		return fmt.Errorf("could not get list of databases: %v", err)
 	}
-	// get database name with empty list of slot, except template0 and template1
+	databaseSlots := make(map[string]map[string]zalandov1.Slot)
 	for dbName := range listDatabases {
 		if dbName != "template0" && dbName != "template1" {
 			databaseSlots[dbName] = map[string]zalandov1.Slot{}
 		}
 	}
 
+	// need to take explicitly defined slots into account whey syncing Patroni config
+	slotsToSync := make(map[string]map[string]string)
+	requiredPatroniConfig := c.Spec.Patroni
+	if len(requiredPatroniConfig.Slots) > 0 {
+		for slotName, slotConfig := range requiredPatroniConfig.Slots {
+			slotsToSync[slotName] = slotConfig
+			if _, exists := databaseSlots[slotConfig["database"]]; exists {
+				databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{
+					Slot:        slotConfig,
+					Publication: make(map[string]acidv1.StreamTable),
+				}
+			}
+		}
+	}
+
 	// get list of required slots and publications, group by database
 	for _, stream := range c.Spec.Streams {
 		if _, exists := databaseSlots[stream.Database]; !exists {
@@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error {
 			"type":     "logical",
 		}
 		slotName := getSlotName(stream.Database, stream.ApplicationId)
-		if _, exists := databaseSlots[stream.Database][slotName]; !exists {
+		slotAndPublication, exists := databaseSlots[stream.Database][slotName]
+		if !exists {
 			databaseSlots[stream.Database][slotName] = zalandov1.Slot{
 				Slot:        slot,
 				Publication: stream.Tables,
 			}
 		} else {
-			slotAndPublication := databaseSlots[stream.Database][slotName]
 			streamTables := slotAndPublication.Publication
 			for tableName, table := range stream.Tables {
 				if _, exists := streamTables[tableName]; !exists {
@@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error {
 			continue
 		}
 		streamExists = true
+		c.Streams[appId] = &stream
 		desiredStreams := c.generateFabricEventStream(appId)
 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
 			c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
 			stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
 			c.setProcessName("updating event streams with applicationId %s", appId)
-			stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
+			updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
 			if err != nil {
 				return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
 			}
-			c.Streams[appId] = stream
+			c.Streams[appId] = updatedStream
 		}
 		if match, reason := c.compareStreams(&stream, desiredStreams); !match {
 			c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go
index 06f98e42..797e7a5a 100644
--- a/pkg/cluster/sync.go
+++ b/pkg/cluster/sync.go
@@ -153,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
 		return fmt.Errorf("could not sync connection pooler: %v", err)
 	}
 
-	if len(c.Spec.Streams) > 0 {
+	// sync if manifest stream count is different from stream CR count
+	// it can be that they are always different due to grouping of manifest streams
+	// but we would catch missed removals on update
+	if len(c.Spec.Streams) != len(c.Streams) {
 		c.logger.Debug("syncing streams")
 		if err = c.syncStreams(); err != nil {
 			err = fmt.Errorf("could not sync streams: %v", err)
diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go
index b66b22f0..9cd7dc7e 100644
--- a/pkg/cluster/util_test.go
+++ b/pkg/cluster/util_test.go
@@ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) {
 	}
 }
 
-func TestisInMaintenanceWindow(t *testing.T) {
+func TestIsInMaintenanceWindow(t *testing.T) {
 	now := time.Now()
 	futureTimeStart := now.Add(1 * time.Hour)
 	futureTimeStartFormatted := futureTimeStart.Format("15:04")
-- 
GitLab