From 301462c415697138a21e363d38bcb3bd08d39f20 Mon Sep 17 00:00:00 2001
From: Felix Kunde <felix-kunde@gmx.de>
Date: Mon, 16 Dec 2024 18:13:52 +0100
Subject: [PATCH] remove streams delete and extend unit tests (#2737)

---
 pkg/cluster/streams.go      |  10 +--
 pkg/cluster/streams_test.go | 158 ++++++++++++++++++++++++------------
 2 files changed, 105 insertions(+), 63 deletions(-)

diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go
index 3d9cbae1..6e940820 100644
--- a/pkg/cluster/streams.go
+++ b/pkg/cluster/streams.go
@@ -453,15 +453,6 @@ func (c *Cluster) syncStream(appId string) error {
 		if stream.Spec.ApplicationId != appId {
 			continue
 		}
-		if streamExists {
-			c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId)
-			if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
-				c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err)
-			} else {
-				c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId)
-			}
-			continue
-		}
 		streamExists = true
 		desiredStreams := c.generateFabricEventStream(appId)
 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
@@ -484,6 +475,7 @@ func (c *Cluster) syncStream(appId string) error {
 			c.Streams[appId] = updatedStream
 			c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
 		}
+		break
 	}
 
 	if !streamExists {
diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go
index 92d28663..77710aa1 100644
--- a/pkg/cluster/streams_test.go
+++ b/pkg/cluster/streams_test.go
@@ -90,7 +90,7 @@ var (
 			Namespace: namespace,
 			Labels: map[string]string{
 				"application":  "spilo",
-				"cluster-name": fmt.Sprintf("%s-2", clusterName),
+				"cluster-name": clusterName,
 				"team":         "acid",
 			},
 			OwnerReferences: []metav1.OwnerReference{
@@ -494,14 +494,13 @@ func TestSyncStreams(t *testing.T) {
 			OpConfig: config.Config{
 				PodManagementPolicy: "ordered_ready",
 				Resources: config.Resources{
-					ClusterLabels:         map[string]string{"application": "spilo"},
-					ClusterNameLabel:      "cluster-name",
-					DefaultCPURequest:     "300m",
-					DefaultCPULimit:       "300m",
-					DefaultMemoryRequest:  "300Mi",
-					DefaultMemoryLimit:    "300Mi",
-					EnableOwnerReferences: util.True(),
-					PodRoleLabel:          "spilo-role",
+					ClusterLabels:        map[string]string{"application": "spilo"},
+					ClusterNameLabel:     "cluster-name",
+					DefaultCPURequest:    "300m",
+					DefaultCPULimit:      "300m",
+					DefaultMemoryRequest: "300Mi",
+					DefaultMemoryLimit:   "300Mi",
+					PodRoleLabel:         "spilo-role",
 				},
 			},
 		}, client, pg, logger, eventRecorder)
@@ -514,33 +513,17 @@ func TestSyncStreams(t *testing.T) {
 	err = cluster.syncStream(appId)
 	assert.NoError(t, err)
 
-	// create a second stream with same spec but with different name
-	createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
-		context.TODO(), fes, metav1.CreateOptions{})
+	// sync the stream again
+	err = cluster.syncStream(appId)
 	assert.NoError(t, err)
-	assert.Equal(t, createdStream.Spec.ApplicationId, appId)
 
-	// check that two streams exist
+	// check that only one stream remains after sync
 	listOptions := metav1.ListOptions{
 		LabelSelector: cluster.labelsSet(true).String(),
 	}
 	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
 	assert.NoError(t, err)
-	assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items))
-
-	// sync the stream which should remove the redundant stream
-	err = cluster.syncStream(appId)
-	assert.NoError(t, err)
-
-	// check that only one stream remains after sync
-	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
-	assert.NoError(t, err)
 	assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))
-
-	// check owner references
-	if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
-		t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
-	}
 }
 
 func TestSameStreams(t *testing.T) {
@@ -663,13 +646,14 @@ func TestUpdateStreams(t *testing.T) {
 			OpConfig: config.Config{
 				PodManagementPolicy: "ordered_ready",
 				Resources: config.Resources{
-					ClusterLabels:        map[string]string{"application": "spilo"},
-					ClusterNameLabel:     "cluster-name",
-					DefaultCPURequest:    "300m",
-					DefaultCPULimit:      "300m",
-					DefaultMemoryRequest: "300Mi",
-					DefaultMemoryLimit:   "300Mi",
-					PodRoleLabel:         "spilo-role",
+					ClusterLabels:         map[string]string{"application": "spilo"},
+					ClusterNameLabel:      "cluster-name",
+					DefaultCPURequest:     "300m",
+					DefaultCPULimit:       "300m",
+					DefaultMemoryRequest:  "300Mi",
+					DefaultMemoryLimit:    "300Mi",
+					EnableOwnerReferences: util.True(),
+					PodRoleLabel:          "spilo-role",
 				},
 			},
 		}, client, pg, logger, eventRecorder)
@@ -678,10 +662,31 @@ func TestUpdateStreams(t *testing.T) {
 		context.TODO(), &pg, metav1.CreateOptions{})
 	assert.NoError(t, err)
 
-	// create the stream
+	// create stream with different owner reference
+	fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
+	fes.ObjectMeta.Labels["cluster-name"] = pg.Name
+	createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
+		context.TODO(), fes, metav1.CreateOptions{})
+	assert.NoError(t, err)
+	assert.Equal(t, createdStream.Spec.ApplicationId, appId)
+
+	// sync the stream which should update the owner reference
 	err = cluster.syncStream(appId)
 	assert.NoError(t, err)
 
+	// check that only one stream exists after sync
+	listOptions := metav1.ListOptions{
+		LabelSelector: cluster.labelsSet(true).String(),
+	}
+	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
+	assert.NoError(t, err)
+	assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))
+
+	// compare owner references
+	if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
+		t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
+	}
+
 	// change specs of streams and patch CRD
 	for i, stream := range pg.Spec.Streams {
 		if stream.ApplicationId == appId {
@@ -694,10 +699,7 @@ func TestUpdateStreams(t *testing.T) {
 	}
 
 	// compare stream returned from API with expected stream
-	listOptions := metav1.ListOptions{
-		LabelSelector: cluster.labelsSet(true).String(),
-	}
-	streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
+	streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
 	result := cluster.generateFabricEventStream(appId)
 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
 		t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
@@ -716,9 +718,51 @@ func TestUpdateStreams(t *testing.T) {
 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
 		t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
 	}
+}
 
-	mockClient := k8sutil.NewMockKubernetesClient()
-	cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
+func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
+	patchData, err := specPatch(pgSpec)
+	assert.NoError(t, err)
+
+	pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
+		context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
+	assert.NoError(t, err)
+
+	cluster.Postgresql.Spec = pgPatched.Spec
+	err = cluster.syncStream(appId)
+	assert.NoError(t, err)
+
+	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
+	assert.NoError(t, err)
+
+	return streams
+}
+
+func TestDeleteStreams(t *testing.T) {
+	pg.Name = fmt.Sprintf("%s-4", pg.Name)
+	var cluster = New(
+		Config{
+			OpConfig: config.Config{
+				PodManagementPolicy: "ordered_ready",
+				Resources: config.Resources{
+					ClusterLabels:        map[string]string{"application": "spilo"},
+					ClusterNameLabel:     "cluster-name",
+					DefaultCPURequest:    "300m",
+					DefaultCPULimit:      "300m",
+					DefaultMemoryRequest: "300Mi",
+					DefaultMemoryLimit:   "300Mi",
+					PodRoleLabel:         "spilo-role",
+				},
+			},
+		}, client, pg, logger, eventRecorder)
+
+	_, err := cluster.KubeClient.Postgresqls(namespace).Create(
+		context.TODO(), &pg, metav1.CreateOptions{})
+	assert.NoError(t, err)
+
+	// create the stream
+	err = cluster.syncStream(appId)
+	assert.NoError(t, err)
 
 	// remove streams from manifest
 	pg.Spec.Streams = nil
@@ -729,26 +773,32 @@ func TestUpdateStreams(t *testing.T) {
 	appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams)
 	cluster.cleanupRemovedStreams(appIds)
 
-	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
-	if len(streams.Items) > 0 || err != nil {
-		t.Errorf("stream resource has not been removed or unexpected error %v", err)
+	// check that streams have been deleted
+	listOptions := metav1.ListOptions{
+		LabelSelector: cluster.labelsSet(true).String(),
 	}
-}
-
-func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
-	patchData, err := specPatch(pgSpec)
+	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
 	assert.NoError(t, err)
+	assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
 
-	pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
-		context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
+	// create stream to test deleteStreams code
+	fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
+	fes.ObjectMeta.Labels["cluster-name"] = pg.Name
+	_, err = cluster.KubeClient.FabricEventStreams(namespace).Create(
+		context.TODO(), fes, metav1.CreateOptions{})
 	assert.NoError(t, err)
 
-	cluster.Postgresql.Spec = pgPatched.Spec
+	// sync it once to cluster struct
 	err = cluster.syncStream(appId)
 	assert.NoError(t, err)
 
+	// we need a mock client because deleteStreams checks for CRD existance
+	mockClient := k8sutil.NewMockKubernetesClient()
+	cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
+	cluster.deleteStreams()
+
+	// check that streams have been deleted
 	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
 	assert.NoError(t, err)
-
-	return streams
+	assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
 }
-- 
GitLab