diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml
index ebaf2d1f8b0b860d75603a43e3449214fbb95619..a83f7cc95281316c8aca726babd1efdafdcf177c 100644
--- a/charts/postgres-operator/crds/postgresqls.yaml
+++ b/charts/postgres-operator/crds/postgresqls.yaml
@@ -514,6 +514,9 @@ spec:
                       type: string
                     batchSize:
                       type: integer
+                    cpu:
+                      type: string
+                      pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
                     database:
                       type: string
                     enableRecovery:
@@ -522,6 +525,9 @@ spec:
                       type: object
                       additionalProperties:
                         type: string
+                    memory:
+                      type: string
+                      pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
                     tables:
                       type: object
                       additionalProperties:
@@ -533,6 +539,8 @@ spec:
                             type: string
                           idColumn:
                             type: string
+                          ignoreRecovery:
+                            type: boolean
                           payloadColumn:
                             type: string
                           recoveryEventType:
diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md
index bf731be2e7f0be71e4e6ca3138e94ef46574be21..610982c734fd5af5c3c73c63a185cd0767846480 100644
--- a/docs/reference/cluster_manifest.md
+++ b/docs/reference/cluster_manifest.md
@@ -652,11 +652,11 @@ can have the following properties:
 
 * **applicationId**
   The application name to which the database and CDC belongs to. For each
-  set of streams with a distinct `applicationId` a separate stream CR as well
-  as a separate logical replication slot will be created. This means there can
-  be different streams in the same database and streams with the same
-  `applicationId` are bundled in one stream CR. The stream CR will be called
-  like the Postgres cluster plus "-<applicationId>" suffix. Required.
+  set of streams with a distinct `applicationId` a separate stream resource as
+  well as a separate logical replication slot will be created. This means there
+  can be different streams in the same database and streams with the same
+  `applicationId` are bundled in one stream resource. The stream resource will
+  be called like the Postgres cluster plus "-<applicationId>" suffix. Required.
 
 * **database**
   Name of the database from where events will be published via Postgres'
@@ -667,7 +667,8 @@ can have the following properties:
 
 * **tables**
   Defines a map of table names and their properties (`eventType`, `idColumn`
-  and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
+  and `payloadColumn`). Required.
+  The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
   The application is responsible for putting events into a (JSON/B or VARCHAR)
   payload column of the outbox table in the structure of the specified target
   event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html)
@@ -676,12 +677,27 @@ can have the following properties:
   committed to the outbox table. The `idColumn` will be used in telemetry for
   the CDC operator. The names for `idColumn` and `payloadColumn` can be
   configured. Defaults are `id` and `payload`. The target `eventType` has to
-  be defined. Required.
+  be defined. One can also specify a `recoveryEventType` that will be used
+  for a dead letter queue. By enabling `ignoreRecovery`, you can choose to
+  ignore failing events.
 
 * **filter**
   Streamed events can be filtered by a jsonpath expression for each table.
   Optional.
 
+* **enableRecovery**
+  Flag to enable a dead letter queue recovery for all streams tables.
+  Alternatively, recovery can also be enable for single outbox tables by only
+  specifying a `recoveryEventType` and no `enableRecovery` flag. When set to
+  false or missing, events will be retried until consuming succeeded. You can
+  use a `filter` expression to get rid of poison pills. Optional.
+
 * **batchSize**
   Defines the size of batches in which events are consumed. Optional.
   Defaults to 1.
+
+* **cpu**
+  CPU requests to be set as an annotation on the stream resource. Optional.
+
+* **memory**
+  memory requests to be set as an annotation on the stream resource. Optional.
diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml
index 9207c83d48851504618dd89182145eefecf741a3..9f7e3eff831610929b086f13fa277c18851981d7 100644
--- a/manifests/postgresql.crd.yaml
+++ b/manifests/postgresql.crd.yaml
@@ -512,6 +512,9 @@ spec:
                       type: string
                     batchSize:
                       type: integer
+                    cpu:
+                      type: string
+                      pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
                     database:
                       type: string
                     enableRecovery:
@@ -520,6 +523,9 @@ spec:
                       type: object
                       additionalProperties:
                         type: string
+                    memory:
+                      type: string
+                      pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
                     tables:
                       type: object
                       additionalProperties:
@@ -531,6 +537,8 @@ spec:
                             type: string
                           idColumn:
                             type: string
+                          ignoreRecovery:
+                            type: boolean
                           payloadColumn:
                             type: string
                           recoveryEventType:
diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go
index 3d731743f28ca54f5049427ee44f8e632fed4263..1a8a311f55a8cfab431fdaf5fbaf6afb043ef8aa 100644
--- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go
+++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go
@@ -258,6 +258,8 @@ type Stream struct {
 	Tables         map[string]StreamTable `json:"tables"`
 	Filter         map[string]*string     `json:"filter,omitempty"`
 	BatchSize      *uint32                `json:"batchSize,omitempty"`
+	CPU            *string                `json:"cpu,omitempty"`
+	Memory         *string                `json:"memory,omitempty"`
 	EnableRecovery *bool                  `json:"enableRecovery,omitempty"`
 }
 
@@ -265,6 +267,7 @@ type Stream struct {
 type StreamTable struct {
 	EventType         string  `json:"eventType"`
 	RecoveryEventType string  `json:"recoveryEventType,omitempty"`
+	IgnoreRecovery    *bool   `json:"ignoreRecovery,omitempty"`
 	IdColumn          *string `json:"idColumn,omitempty"`
 	PayloadColumn     *string `json:"payloadColumn,omitempty"`
 }
diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go
index 557f8889c45e7ea52b1b0b6014cc42e53f6f6bd9..7c0b3ee2334a708c08b116f5cda12d365eb05e72 100644
--- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go
@@ -1336,6 +1336,16 @@ func (in *Stream) DeepCopyInto(out *Stream) {
 		*out = new(uint32)
 		**out = **in
 	}
+	if in.CPU != nil {
+		in, out := &in.CPU, &out.CPU
+		*out = new(string)
+		**out = **in
+	}
+	if in.Memory != nil {
+		in, out := &in.Memory, &out.Memory
+		*out = new(string)
+		**out = **in
+	}
 	if in.EnableRecovery != nil {
 		in, out := &in.EnableRecovery, &out.EnableRecovery
 		*out = new(bool)
@@ -1357,6 +1367,11 @@ func (in *Stream) DeepCopy() *Stream {
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *StreamTable) DeepCopyInto(out *StreamTable) {
 	*out = *in
+	if in.IgnoreRecovery != nil {
+		in, out := &in.IgnoreRecovery, &out.IgnoreRecovery
+		*out = new(bool)
+		**out = **in
+	}
 	if in.IdColumn != nil {
 		in, out := &in.IdColumn, &out.IdColumn
 		*out = new(string)
diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go
index 6e940820d14eaa82bed86fbdfbd0337f88fe4764..14fc3aaf06e99dff9a2db4c5dac6634431637c47 100644
--- a/pkg/cluster/streams.go
+++ b/pkg/cluster/streams.go
@@ -178,16 +178,35 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
 
 func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
 	eventStreams := make([]zalandov1.EventStream, 0)
+	resourceAnnotations := map[string]string{}
 
 	for _, stream := range c.Spec.Streams {
 		if stream.ApplicationId != appId {
 			continue
 		}
+		if stream.CPU != nil {
+			cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey]
+			if exists {
+				isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU)
+				if isSmaller {
+					resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU
+				}
+			}
+		}
+		if stream.Memory != nil {
+			memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey]
+			if exists {
+				isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory)
+				if isSmaller {
+					resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory
+				}
+			}
+		}
 		for tableName, table := range stream.Tables {
 			streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
 			streamFlow := getEventStreamFlow(table.PayloadColumn)
 			streamSink := getEventStreamSink(stream, table.EventType)
-			streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
+			streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)
 
 			eventStreams = append(eventStreams, zalandov1.EventStream{
 				EventStreamFlow:     streamFlow,
@@ -207,7 +226,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
 			Name:            fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
 			Namespace:       c.Namespace,
 			Labels:          c.labelsSet(true),
-			Annotations:     c.AnnotationsToPropagate(c.annotationsSet(nil)),
+			Annotations:     c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
 			OwnerReferences: c.ownerReferences(),
 		},
 		Spec: zalandov1.FabricEventStreamSpec{
@@ -247,7 +266,7 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
 	}
 }
 
-func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
+func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
 	if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
 		(stream.EnableRecovery == nil && recoveryEventType == "") {
 		return zalandov1.EventStreamRecovery{
@@ -255,6 +274,12 @@ func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType s
 		}
 	}
 
+	if ignoreRecovery != nil && *ignoreRecovery {
+		return zalandov1.EventStreamRecovery{
+			Type: constants.EventStreamRecoveryIgnoreType,
+		}
+	}
+
 	if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
 		recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
 	}
diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go
index 77710aa1946d2509b25cc8d77442ed808b6288e1..86fd235c77e1eb276df84694d8ccc998446b2511 100644
--- a/pkg/cluster/streams_test.go
+++ b/pkg/cluster/streams_test.go
@@ -65,12 +65,18 @@ var (
 							EventType:         "stream-type-b",
 							RecoveryEventType: "stream-type-b-dlq",
 						},
+						"data.foofoobar": {
+							EventType:      "stream-type-c",
+							IgnoreRecovery: util.True(),
+						},
 					},
 					EnableRecovery: util.True(),
 					Filter: map[string]*string{
 						"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
 					},
 					BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
+					CPU:       k8sutil.StringToPointer("250m"),
+					Memory:    k8sutil.StringToPointer("500Mi"),
 				},
 			},
 			TeamID: "acid",
@@ -88,6 +94,10 @@ var (
 		ObjectMeta: metav1.ObjectMeta{
 			Name:      fmt.Sprintf("%s-12345", clusterName),
 			Namespace: namespace,
+			Annotations: map[string]string{
+				constants.EventStreamCpuAnnotationKey:    "250m",
+				constants.EventStreamMemoryAnnotationKey: "500Mi",
+			},
 			Labels: map[string]string{
 				"application":  "spilo",
 				"cluster-name": clusterName,
@@ -180,6 +190,37 @@ var (
 						Type: constants.EventStreamSourcePGType,
 					},
 				},
+				{
+					EventStreamFlow: zalandov1.EventStreamFlow{
+						Type: constants.EventStreamFlowPgGenericType,
+					},
+					EventStreamRecovery: zalandov1.EventStreamRecovery{
+						Type: constants.EventStreamRecoveryIgnoreType,
+					},
+					EventStreamSink: zalandov1.EventStreamSink{
+						EventType:    "stream-type-c",
+						MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
+						Type:         constants.EventStreamSinkNakadiType,
+					},
+					EventStreamSource: zalandov1.EventStreamSource{
+						Connection: zalandov1.Connection{
+							DBAuth: zalandov1.DBAuth{
+								Name:        fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
+								PasswordKey: "password",
+								Type:        constants.EventStreamSourceAuthType,
+								UserKey:     "username",
+							},
+							Url:        fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
+							SlotName:   slotName,
+							PluginType: constants.EventStreamSourcePluginType,
+						},
+						Schema: "data",
+						EventStreamTable: zalandov1.EventStreamTable{
+							Name: "foofoobar",
+						},
+						Type: constants.EventStreamSourcePGType,
+					},
+				},
 			},
 		},
 	}
@@ -528,8 +569,8 @@ func TestSyncStreams(t *testing.T) {
 
 func TestSameStreams(t *testing.T) {
 	testName := "TestSameStreams"
-	annotationsA := map[string]string{"owned-by": "acid"}
-	annotationsB := map[string]string{"owned-by": "foo"}
+	annotationsA := map[string]string{constants.EventStreamMemoryAnnotationKey: "500Mi"}
+	annotationsB := map[string]string{constants.EventStreamMemoryAnnotationKey: "1Gi"}
 
 	stream1 := zalandov1.EventStream{
 		EventStreamFlow:     zalandov1.EventStreamFlow{},
@@ -621,6 +662,13 @@ func TestSameStreams(t *testing.T) {
 			match:    false,
 			reason:   "event stream specs differ",
 		},
+		{
+			subTest:  "event stream annotations differ",
+			streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil),
+			streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA),
+			match:    false,
+			reason:   "event stream specs differ",
+		},
 		{
 			subTest:  "event stream annotations differ",
 			streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA),
diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go
index 8916701f3418c0ff186a4a00f7913d552c206db9..cb4bb6a3fa315446dbc447b9b204582108a34a9a 100644
--- a/pkg/util/constants/streams.go
+++ b/pkg/util/constants/streams.go
@@ -2,16 +2,19 @@ package constants
 
 // PostgreSQL specific constants
 const (
-	EventStreamCRDApiVersion     = "zalando.org/v1"
-	EventStreamCRDKind           = "FabricEventStream"
-	EventStreamCRDName           = "fabriceventstreams.zalando.org"
-	EventStreamSourcePGType      = "PostgresLogicalReplication"
-	EventStreamSourceSlotPrefix  = "fes"
-	EventStreamSourcePluginType  = "pgoutput"
-	EventStreamSourceAuthType    = "DatabaseAuthenticationSecret"
-	EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
-	EventStreamSinkNakadiType    = "Nakadi"
-	EventStreamRecoveryNoneType  = "None"
-	EventStreamRecoveryDLQType   = "DeadLetter"
-	EventStreamRecoverySuffix    = "dead-letter-queue"
+	EventStreamCRDApiVersion       = "zalando.org/v1"
+	EventStreamCRDKind             = "FabricEventStream"
+	EventStreamCRDName             = "fabriceventstreams.zalando.org"
+	EventStreamSourcePGType        = "PostgresLogicalReplication"
+	EventStreamSourceSlotPrefix    = "fes"
+	EventStreamSourcePluginType    = "pgoutput"
+	EventStreamSourceAuthType      = "DatabaseAuthenticationSecret"
+	EventStreamFlowPgGenericType   = "PostgresWalToGenericNakadiEvent"
+	EventStreamSinkNakadiType      = "Nakadi"
+	EventStreamRecoveryDLQType     = "DeadLetter"
+	EventStreamRecoveryIgnoreType  = "Ignore"
+	EventStreamRecoveryNoneType    = "None"
+	EventStreamRecoverySuffix      = "dead-letter-queue"
+	EventStreamCpuAnnotationKey    = "fes.zalando.org/FES_CPU"
+	EventStreamMemoryAnnotationKey = "fes.zalando.org/FES_MEMORY"
 )