From 8cc679653782a210d874009838fafde1d2385676 Mon Sep 17 00:00:00 2001 From: Felix Kunde <felix-kunde@gmx.de> Date: Wed, 18 Dec 2024 11:22:08 +0100 Subject: [PATCH] fix comparing stream annotations and improve unit test (#2820) --- e2e/tests/test_e2e.py | 4 ++- pkg/cluster/streams.go | 56 +++++++++++++++++++++++++------------ pkg/cluster/streams_test.go | 22 +++++++-------- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f89e2fb8..f5a05a15 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2204,6 +2204,8 @@ class EndToEndTestCase(unittest.TestCase): { "applicationId": "test-app", "batchSize": 100, + "cpu": "100m", + "memory": "200Mi", "database": "foo", "enableRecovery": True, "tables": { @@ -2225,7 +2227,7 @@ class EndToEndTestCase(unittest.TestCase): "eventType": "test-event", "idColumn": "id", "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + "ignoreRecovery": True } } } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 616a6828..9e2c7482 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -179,29 +179,19 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { eventStreams := make([]zalandov1.EventStream, 0) resourceAnnotations := map[string]string{} + var err, err2 error for _, stream := range c.Spec.Streams { if stream.ApplicationId != appId { continue } - if stream.CPU != nil { - cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey] - if exists { - isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU) - if isSmaller { - resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU - } - } - } - if stream.Memory != nil { - memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey] - if exists { - isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory) - if isSmaller { - resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory - } - } + + err = setResourceAnnotation(&resourceAnnotations, stream.CPU, constants.EventStreamCpuAnnotationKey) + err2 = setResourceAnnotation(&resourceAnnotations, stream.Memory, constants.EventStreamMemoryAnnotationKey) + if err != nil || err2 != nil { + c.logger.Warningf("could not set resource annotation for event stream: %v", err) } + for tableName, table := range stream.Tables { streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) streamFlow := getEventStreamFlow(table.PayloadColumn) @@ -236,6 +226,27 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent } } +func setResourceAnnotation(annotations *map[string]string, resource *string, key string) error { + var ( + isSmaller bool + err error + ) + if resource != nil { + currentValue, exists := (*annotations)[key] + if exists { + isSmaller, err = util.IsSmallerQuantity(currentValue, *resource) + if err != nil { + return fmt.Errorf("could not compare resource in %q annotation: %v", key, err) + } + } + if isSmaller || !exists { + (*annotations)[key] = *resource + } + } + + return nil +} + func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource { table, schema := getTableSchema(tableName) streamFilter := stream.Filter[tableName] @@ -521,10 +532,19 @@ func (c *Cluster) syncStream(appId string) error { func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) { reasons := make([]string, 0) + desiredAnnotations := make(map[string]string) match = true // stream operator can add extra annotations so incl. current annotations in desired annotations - desiredAnnotations := c.annotationsSet(curEventStreams.Annotations) + for curKey, curValue := range curEventStreams.Annotations { + if _, exists := desiredAnnotations[curKey]; !exists { + desiredAnnotations[curKey] = curValue + } + } + // add/or override annotations if cpu and memory values were changed + for newKey, newValue := range newEventStreams.Annotations { + desiredAnnotations[newKey] = newValue + } if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { match = false reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index dac3615c..dd76a41f 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -640,49 +640,49 @@ func TestSameStreams(t *testing.T) { streamsA: newFabricEventStream([]zalandov1.EventStream{stream1}, nil), streamsB: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), match: false, - reason: "number of defined streams is different", + reason: "new streams EventStreams array does not match : number of defined streams is different", }, { subTest: "different number of streams", streamsA: newFabricEventStream([]zalandov1.EventStream{stream1}, nil), streamsB: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), match: false, - reason: "number of defined streams is different", + reason: "new streams EventStreams array does not match : number of defined streams is different", }, { subTest: "event stream specs differ", streamsA: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), streamsB: fes, match: false, - reason: "number of defined streams is different", + reason: "new streams annotations do not match: Added \"fes.zalando.org/FES_CPU\" with value \"250m\". Added \"fes.zalando.org/FES_MEMORY\" with value \"500Mi\"., new streams labels do not match the current ones, new streams EventStreams array does not match : number of defined streams is different", }, { subTest: "event stream recovery specs differ", streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil), streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, nil), match: false, - reason: "event stream specs differ", + reason: "new streams EventStreams array does not match : event stream specs differ", }, { - subTest: "event stream annotations differ", + subTest: "event stream with new annotations", streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil), - streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA), match: false, - reason: "event stream specs differ", + reason: "new streams annotations do not match: Added \"fes.zalando.org/FES_MEMORY\" with value \"500Mi\".", }, { subTest: "event stream annotations differ", - streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA), + streamsA: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA), streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsB), match: false, - reason: "event stream specs differ", + reason: "new streams annotations do not match: \"fes.zalando.org/FES_MEMORY\" changed from \"500Mi\" to \"1Gi\".", }, } for _, tt := range tests { streamsMatch, matchReason := cluster.compareStreams(tt.streamsA, tt.streamsB) - if streamsMatch != tt.match { - t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s", + if streamsMatch != tt.match || matchReason != tt.reason { + t.Errorf("%s %s: unexpected match result when comparing streams: got %s, expected %s", testName, tt.subTest, matchReason, tt.reason) } } -- GitLab