diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a839397b2eacd48b33053854f5d25433fef0caf1..e9a691faad6abf88dab04e051a78509185ffe77d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1160,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // streams if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { + c.logger.Debug("syncing streams") if err := c.syncStreams(); err != nil { c.logger.Errorf("could not sync streams: %v", err) updateFailed = true diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 564c213e3f0b8d88816ecae2be5f0ed40267a800..bf9be3fb4e0ba048bfe7e4165c3f38143578279f 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za } for slotName, slotAndPublication := range databaseSlotsList { - tables := slotAndPublication.Publication - tableNames := make([]string, len(tables)) + newTables := slotAndPublication.Publication + tableNames := make([]string, len(newTables)) i := 0 - for t := range tables { + for t := range newTables { tableName, schemaName := getTableSchema(t) tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) i++ @@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za tableList := strings.Join(tableNames, ", ") currentTables, exists := currentPublications[slotName] + // if newTables is empty it means that it's definition was removed from streams section + // but when slot is defined in manifest we should sync publications, too + // by reusing current tables we make sure it is not + if len(newTables) == 0 { + tableList = currentTables + } if !exists { createPublications[slotName] = tableList } else if currentTables != tableList { @@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error { return nil } - databaseSlots := make(map[string]map[string]zalandov1.Slot) - slotsToSync := make(map[string]map[string]string) - requiredPatroniConfig := c.Spec.Patroni - - if len(requiredPatroniConfig.Slots) > 0 { - for slotName, slotConfig := range requiredPatroniConfig.Slots { - slotsToSync[slotName] = slotConfig - } - } - + // create map with every database and empty slot defintion + // we need it to detect removal of streams from databases if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error { if err != nil { return fmt.Errorf("could not get list of databases: %v", err) } - // get database name with empty list of slot, except template0 and template1 + databaseSlots := make(map[string]map[string]zalandov1.Slot) for dbName := range listDatabases { if dbName != "template0" && dbName != "template1" { databaseSlots[dbName] = map[string]zalandov1.Slot{} } } + // need to take explicitly defined slots into account whey syncing Patroni config + slotsToSync := make(map[string]map[string]string) + requiredPatroniConfig := c.Spec.Patroni + if len(requiredPatroniConfig.Slots) > 0 { + for slotName, slotConfig := range requiredPatroniConfig.Slots { + slotsToSync[slotName] = slotConfig + if _, exists := databaseSlots[slotConfig["database"]]; exists { + databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{ + Slot: slotConfig, + Publication: make(map[string]acidv1.StreamTable), + } + } + } + } + // get list of required slots and publications, group by database for _, stream := range c.Spec.Streams { if _, exists := databaseSlots[stream.Database]; !exists { @@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error { "type": "logical", } slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := databaseSlots[stream.Database][slotName]; !exists { + slotAndPublication, exists := databaseSlots[stream.Database][slotName] + if !exists { databaseSlots[stream.Database][slotName] = zalandov1.Slot{ Slot: slot, Publication: stream.Tables, } } else { - slotAndPublication := databaseSlots[stream.Database][slotName] streamTables := slotAndPublication.Publication for tableName, table := range stream.Tables { if _, exists := streamTables[tableName]; !exists { @@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error { continue } streamExists = true + c.Streams[appId] = &stream desiredStreams := c.generateFabricEventStream(appId) if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences c.setProcessName("updating event streams with applicationId %s", appId) - stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) + updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) } - c.Streams[appId] = stream + c.Streams[appId] = updatedStream } if match, reason := c.compareStreams(&stream, desiredStreams); !match { c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 06f98e42f0fac8f5f9018d53405cd9aace0244f4..797e7a5aa7d4091712a40777f8cddb2625a3a8a0 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -153,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return fmt.Errorf("could not sync connection pooler: %v", err) } - if len(c.Spec.Streams) > 0 { + // sync if manifest stream count is different from stream CR count + // it can be that they are always different due to grouping of manifest streams + // but we would catch missed removals on update + if len(c.Spec.Streams) != len(c.Streams) { c.logger.Debug("syncing streams") if err = c.syncStreams(); err != nil { err = fmt.Errorf("could not sync streams: %v", err) diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index b66b22f04344ba32daa8214b908a47dc259a0fdf..9cd7dc7e9b66de185f40db20ddb3e7e20414f802 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) { } } -func TestisInMaintenanceWindow(t *testing.T) { +func TestIsInMaintenanceWindow(t *testing.T) { now := time.Now() futureTimeStart := now.Add(1 * time.Hour) futureTimeStartFormatted := futureTimeStart.Format("15:04")