diff --git a/.github/workflows/publish_ghcr_image.yaml b/.github/workflows/publish_ghcr_image.yaml
index 7efa623352daf525e0ee3a1416c12fdeb9545576..6cb529e18d7435b2a9f5eb06994778f97e91276d 100644
--- a/.github/workflows/publish_ghcr_image.yaml
+++ b/.github/workflows/publish_ghcr_image.yaml
@@ -81,7 +81,7 @@ jobs:
       - name: Build and push multiarch logical-backup image to ghcr
         uses: docker/build-push-action@v3
         with:
-          context: docker/logical-backup
+          context: logical-backup
           push: true
           build-args: BASE_IMAGE=ubuntu:22.04
           tags: "${{ steps.image_lb.outputs.BACKUP_IMAGE }}"
diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml
index 41acf834e6222c17c3010167143b46a1bc6bc9e5..ef77fa22084f23c539c8f7120f6bb6ccc2b5ee92 100644
--- a/charts/postgres-operator/crds/operatorconfigurations.yaml
+++ b/charts/postgres-operator/crds/operatorconfigurations.yaml
@@ -528,6 +528,8 @@ spec:
                     type: string
                   logical_backup_s3_bucket:
                     type: string
+                  logical_backup_s3_bucket_prefix:
+                    type: string
                   logical_backup_s3_endpoint:
                     type: string
                   logical_backup_s3_region:
diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml
index 2a5c572e6cac148e578ecd4232afa273fb107690..c8a6c16195225649f05a0b0cebdb22bae0a8c877 100644
--- a/charts/postgres-operator/values.yaml
+++ b/charts/postgres-operator/values.yaml
@@ -372,6 +372,8 @@ configLogicalBackup:
   logical_backup_s3_access_key_id: ""
   # S3 bucket to store backup results
   logical_backup_s3_bucket: "my-bucket-url"
+  # S3 bucket prefix to use
+  logical_backup_s3_bucket_prefix: "spilo"
   # S3 region of bucket
   logical_backup_s3_region: ""
   # S3 endpoint url when not using AWS
diff --git a/delivery.yaml b/delivery.yaml
index 6b0bfbadfd7394a64373785e2698d8feaebb26ba..39bb0defe753c943f363afb2ab668fbdda1e0672 100644
--- a/delivery.yaml
+++ b/delivery.yaml
@@ -90,7 +90,7 @@ pipeline:
       commands:
         - desc: Build image
           cmd: |
-            cd docker/logical-backup
+            cd logical-backup
             export TAG=$(git describe --tags --always --dirty)
             IMAGE="registry-write.opensource.zalan.do/acid/logical-backup"
             docker build --rm -t "$IMAGE:$TAG$CDP_TAG" .
diff --git a/docs/administrator.md b/docs/administrator.md
index 1e5caa6a400183df611c4deb3b5522a0d0f48428..b75253a4d1e0c0c965c4e620cc3e9895151f9063 100644
--- a/docs/administrator.md
+++ b/docs/administrator.md
@@ -1283,7 +1283,7 @@ but only snapshots of your data. In its current state, see logical backups as a
 way to quickly create SQL dumps that you can easily restore in an empty test
 cluster.
 
-2. The [example image](https://github.com/zalando/postgres-operator/blob/master/docker/logical-backup/Dockerfile) implements the backup
+2. The [example image](https://github.com/zalando/postgres-operator/blob/master/logical-backup/Dockerfile) implements the backup
 via `pg_dumpall` and upload of compressed and encrypted results to an S3 bucket.
 `pg_dumpall` requires a `superuser` access to a DB and runs on the replica when
 possible.
diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md
index 976c3b9ab03772c2cf49e204135a5900866ae4b3..ebd7650c2d45ecd507e321035228dc5662a8f703 100644
--- a/docs/reference/operator_parameters.md
+++ b/docs/reference/operator_parameters.md
@@ -813,9 +813,9 @@ grouped under the `logical_backup` key.
   default values from `postgres_pod_resources` will be used.
 
 * **logical_backup_docker_image**
-  An image for pods of the logical backup job. The [example image](https://github.com/zalando/postgres-operator/blob/master/docker/logical-backup/Dockerfile)
+  An image for pods of the logical backup job. The [example image](https://github.com/zalando/postgres-operator/blob/master/logical-backup/Dockerfile)
   runs `pg_dumpall` on a replica if possible and uploads compressed results to
-  an S3 bucket under the key `/spilo/pg_cluster_name/cluster_k8s_uuid/logical_backups`.
+  an S3 bucket under the key `/<configured-s3-bucket-prefix>/<pg_cluster_name>/<cluster_k8s_uuid>/logical_backups`.
   The default image is the same image built with the Zalando-internal CI
   pipeline. Default: "registry.opensource.zalan.do/acid/logical-backup:v1.11.0"
 
@@ -845,6 +845,9 @@ grouped under the `logical_backup` key.
   S3 bucket to store backup results. The bucket has to be present and
   accessible by Postgres pods. Default: empty.
 
+* **logical_backup_s3_bucket_prefix**
+  S3 bucket prefix to use in configured bucket. Default: "spilo"
+
 * **logical_backup_s3_endpoint**
   When using non-AWS S3 storage, endpoint can be set as a ENV variable. The default is empty.
 
diff --git a/docker/logical-backup/Dockerfile b/logical-backup/Dockerfile
similarity index 100%
rename from docker/logical-backup/Dockerfile
rename to logical-backup/Dockerfile
diff --git a/docker/logical-backup/dump.sh b/logical-backup/dump.sh
similarity index 90%
rename from docker/logical-backup/dump.sh
rename to logical-backup/dump.sh
index 3d2f609111f6a9c91b7bba9a680d1575d59e5549..25641c3b544060824e923fcaa5025f04f598780a 100755
--- a/docker/logical-backup/dump.sh
+++ b/logical-backup/dump.sh
@@ -45,7 +45,7 @@ function compress {
 }
 
 function az_upload {
-    PATH_TO_BACKUP=$LOGICAL_BACKUP_S3_BUCKET"/spilo/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
+    PATH_TO_BACKUP=$LOGICAL_BACKUP_S3_BUCKET"/"$LOGICAL_BACKUP_S3_BUCKET_PREFIX"/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
 
     az storage blob upload --file "$1" --account-name "$LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_NAME" --account-key "$LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_KEY" -c "$LOGICAL_BACKUP_AZURE_STORAGE_CONTAINER" -n "$PATH_TO_BACKUP"
 }
@@ -72,7 +72,7 @@ function aws_delete_outdated {
     cutoff_date=$(date -d "$LOGICAL_BACKUP_S3_RETENTION_TIME ago" +%F)
 
     # mimic bucket setup from Spilo
-    prefix="spilo/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"
+    prefix=$LOGICAL_BACKUP_S3_BUCKET_PREFIX"/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"
 
     args=(
       "--no-paginate"
@@ -107,7 +107,7 @@ function aws_upload {
     # mimic bucket setup from Spilo
     # to keep logical backups at the same path as WAL
     # NB: $LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX already contains the leading "/" when set by the Postgres Operator
-    PATH_TO_BACKUP=s3://$LOGICAL_BACKUP_S3_BUCKET"/spilo/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
+    PATH_TO_BACKUP=s3://$LOGICAL_BACKUP_S3_BUCKET"/"$LOGICAL_BACKUP_S3_BUCKET_PREFIX"/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
 
     args=()
 
@@ -120,7 +120,7 @@ function aws_upload {
 }
 
 function gcs_upload {
-    PATH_TO_BACKUP=gs://$LOGICAL_BACKUP_S3_BUCKET"/spilo/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
+    PATH_TO_BACKUP=gs://$LOGICAL_BACKUP_S3_BUCKET"/"$LOGICAL_BACKUP_S3_BUCKET_PREFIX"/"$SCOPE$LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX"/logical_backups/"$(date +%s).sql.gz
 
     gsutil -o Credentials:gs_service_key_file=$LOGICAL_BACKUP_GOOGLE_APPLICATION_CREDENTIALS cp - "$PATH_TO_BACKUP"
 }
diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml
index ac06dcf7602d081faa1ceb7183331ffc8fb568a3..5abc353e69bf029b0946118c62790466aa0db511 100644
--- a/manifests/configmap.yaml
+++ b/manifests/configmap.yaml
@@ -90,6 +90,7 @@ data:
   logical_backup_provider: "s3"
   # logical_backup_s3_access_key_id: ""
   logical_backup_s3_bucket: "my-bucket-url"
+  # logical_backup_s3_bucket_prefix: "spilo"
   # logical_backup_s3_region: ""
   # logical_backup_s3_endpoint: ""
   # logical_backup_s3_secret_access_key: ""
diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml
index 3bcb5c33015ee3534eab9f96cadbf5d2cb93c733..2b4aa533c24fe317049d0d99f3ffba54692f1f8e 100644
--- a/manifests/operatorconfiguration.crd.yaml
+++ b/manifests/operatorconfiguration.crd.yaml
@@ -526,6 +526,8 @@ spec:
                     type: string
                   logical_backup_s3_bucket:
                     type: string
+                  logical_backup_s3_bucket_prefix:
+                    type: string
                   logical_backup_s3_endpoint:
                     type: string
                   logical_backup_s3_region:
diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml
index 0fad31d1cd00872b2491df61f493e40643c8ba07..5208425af7a6e5412f61ed6afe7bc858d4bb54e2 100644
--- a/manifests/postgresql-operator-default-configuration.yaml
+++ b/manifests/postgresql-operator-default-configuration.yaml
@@ -172,6 +172,7 @@ configuration:
     logical_backup_provider: "s3"
     # logical_backup_s3_access_key_id: ""
     logical_backup_s3_bucket: "my-bucket-url"
+    # logical_backup_s3_bucket_prefix: "spilo"
     # logical_backup_s3_endpoint: ""
     # logical_backup_s3_region: ""
     # logical_backup_s3_secret_access_key: ""
diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go
index 852993ea06fbf072cf7ed1e3a432b2517229ff8b..734e276a60ab84d376623423505ee8f2d802d9d8 100644
--- a/pkg/apis/acid.zalan.do/v1/crds.go
+++ b/pkg/apis/acid.zalan.do/v1/crds.go
@@ -1762,6 +1762,9 @@ var OperatorConfigCRDResourceValidation = apiextv1.CustomResourceValidation{
 							"logical_backup_s3_bucket": {
 								Type: "string",
 							},
+							"logical_backup_s3_bucket_prefix": {
+								Type: "string",
+							},
 							"logical_backup_s3_endpoint": {
 								Type: "string",
 							},
diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go
index 42debc7f63ca1fdbe20ad7cd541283314a72c5ee..95f22f3349df79c87620ed475864799aa5b7e5b5 100644
--- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go
+++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go
@@ -228,6 +228,7 @@ type OperatorLogicalBackupConfiguration struct {
 	AzureStorageContainer        string `json:"logical_backup_azure_storage_container,omitempty"`
 	AzureStorageAccountKey       string `json:"logical_backup_azure_storage_account_key,omitempty"`
 	S3Bucket                     string `json:"logical_backup_s3_bucket,omitempty"`
+	S3BucketPrefix               string `json:"logical_backup_s3_bucket_prefix,omitempty"`
 	S3Region                     string `json:"logical_backup_s3_region,omitempty"`
 	S3Endpoint                   string `json:"logical_backup_s3_endpoint,omitempty"`
 	S3AccessKeyID                string `json:"logical_backup_s3_access_key_id,omitempty"`
diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go
index 1b1239f34bfdcb2d7816c3ed7d3a00aef11a84d7..0475c718c792d9733db12708bad7065e18fe51b2 100644
--- a/pkg/cluster/cluster.go
+++ b/pkg/cluster/cluster.go
@@ -28,6 +28,7 @@ import (
 	"github.com/zalando/postgres-operator/pkg/util/users"
 	"github.com/zalando/postgres-operator/pkg/util/volumes"
 	appsv1 "k8s.io/api/apps/v1"
+	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	policyv1 "k8s.io/api/policy/v1"
 	rbacv1 "k8s.io/api/rbac/v1"
@@ -438,8 +439,8 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
 		reasons = append(reasons, "new statefulset's persistent volume claim retention policy do not match")
 	}
 
-	needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
-	needsRollUpdate, reasons = c.compareContainers("containers", c.Statefulset.Spec.Template.Spec.Containers, statefulSet.Spec.Template.Spec.Containers, needsRollUpdate, reasons)
+	needsRollUpdate, reasons = c.compareContainers("statefulset initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
+	needsRollUpdate, reasons = c.compareContainers("statefulset containers", c.Statefulset.Spec.Template.Spec.Containers, statefulSet.Spec.Template.Spec.Containers, needsRollUpdate, reasons)
 
 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {
 		c.logger.Warningf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
@@ -571,30 +572,30 @@ func newCheck(msg string, cond containerCondition) containerCheck {
 
 func (c *Cluster) compareContainers(description string, setA, setB []v1.Container, needsRollUpdate bool, reasons []string) (bool, []string) {
 	if len(setA) != len(setB) {
-		return true, append(reasons, fmt.Sprintf("new statefulset %s's length does not match the current ones", description))
+		return true, append(reasons, fmt.Sprintf("new %s's length does not match the current ones", description))
 	}
 
 	checks := []containerCheck{
-		newCheck("new statefulset %s's %s (index %d) name does not match the current one",
+		newCheck("new %s's %s (index %d) name does not match the current one",
 			func(a, b v1.Container) bool { return a.Name != b.Name }),
-		newCheck("new statefulset %s's %s (index %d) readiness probe does not match the current one",
+		newCheck("new %s's %s (index %d) readiness probe does not match the current one",
 			func(a, b v1.Container) bool { return !reflect.DeepEqual(a.ReadinessProbe, b.ReadinessProbe) }),
-		newCheck("new statefulset %s's %s (index %d) ports do not match the current one",
+		newCheck("new %s's %s (index %d) ports do not match the current one",
 			func(a, b v1.Container) bool { return !comparePorts(a.Ports, b.Ports) }),
-		newCheck("new statefulset %s's %s (index %d) resources do not match the current ones",
+		newCheck("new %s's %s (index %d) resources do not match the current ones",
 			func(a, b v1.Container) bool { return !compareResources(&a.Resources, &b.Resources) }),
-		newCheck("new statefulset %s's %s (index %d) environment does not match the current one",
+		newCheck("new %s's %s (index %d) environment does not match the current one",
 			func(a, b v1.Container) bool { return !compareEnv(a.Env, b.Env) }),
-		newCheck("new statefulset %s's %s (index %d) environment sources do not match the current one",
+		newCheck("new %s's %s (index %d) environment sources do not match the current one",
 			func(a, b v1.Container) bool { return !reflect.DeepEqual(a.EnvFrom, b.EnvFrom) }),
-		newCheck("new statefulset %s's %s (index %d) security context does not match the current one",
+		newCheck("new %s's %s (index %d) security context does not match the current one",
 			func(a, b v1.Container) bool { return !reflect.DeepEqual(a.SecurityContext, b.SecurityContext) }),
-		newCheck("new statefulset %s's %s (index %d) volume mounts do not match the current one",
+		newCheck("new %s's %s (index %d) volume mounts do not match the current one",
 			func(a, b v1.Container) bool { return !reflect.DeepEqual(a.VolumeMounts, b.VolumeMounts) }),
 	}
 
 	if !c.OpConfig.EnableLazySpiloUpgrade {
-		checks = append(checks, newCheck("new statefulset %s's %s (index %d) image does not match the current one",
+		checks = append(checks, newCheck("new %s's %s (index %d) image does not match the current one",
 			func(a, b v1.Container) bool { return a.Image != b.Image }))
 	}
 
@@ -786,6 +787,47 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
 	return true, ""
 }
 
+func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {
+
+	if cur.Spec.Schedule != new.Spec.Schedule {
+		return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
+			new.Spec.Schedule, cur.Spec.Schedule)
+	}
+
+	newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
+	curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
+	if newImage != curImage {
+		return false, fmt.Sprintf("new job's image %q does not match the current one %q",
+			newImage, curImage)
+	}
+
+	newPgVersion := getPgVersion(new)
+	curPgVersion := getPgVersion(cur)
+	if newPgVersion != curPgVersion {
+		return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q",
+			newPgVersion, curPgVersion)
+	}
+
+	needsReplace := false
+	reasons := make([]string, 0)
+	needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons)
+	if needsReplace {
+		return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`))
+	}
+
+	return true, ""
+}
+
+func getPgVersion(cronJob *batchv1.CronJob) string {
+	envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
+	for _, env := range envs {
+		if env.Name == "PG_VERSION" {
+			return env.Value
+		}
+	}
+	return ""
+}
+
 // addFinalizer patches the postgresql CR to add finalizer
 func (c *Cluster) addFinalizer() error {
 	if c.hasFinalizer() {
diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go
index 0a2fe2fd4364b229267ea8d8ef673e60fc332a74..9c65877461f23ee430d99bd5876af87abec58eab 100644
--- a/pkg/cluster/cluster_test.go
+++ b/pkg/cluster/cluster_test.go
@@ -19,6 +19,7 @@ import (
 	"github.com/zalando/postgres-operator/pkg/util/constants"
 	"github.com/zalando/postgres-operator/pkg/util/k8sutil"
 	"github.com/zalando/postgres-operator/pkg/util/teams"
+	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes/fake"
@@ -1649,16 +1650,92 @@ func TestCompareServices(t *testing.T) {
 			if match && !tt.match {
 				t.Logf("match=%v current=%v, old=%v reason=%s", match, tt.current.Annotations, tt.new.Annotations, reason)
 				t.Errorf("%s - expected services to do not match: %q and %q", t.Name(), tt.current, tt.new)
-				return
 			}
 			if !match && tt.match {
 				t.Errorf("%s - expected services to be the same: %q and %q", t.Name(), tt.current, tt.new)
-				return
 			}
 			if !match && !tt.match {
 				if !strings.HasPrefix(reason, tt.reason) {
 					t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason)
-					return
+				}
+			}
+		})
+	}
+}
+
+func newCronJob(image, schedule string, vars []v1.EnvVar) *batchv1.CronJob {
+	cron := &batchv1.CronJob{
+		Spec: batchv1.CronJobSpec{
+			Schedule: schedule,
+			JobTemplate: batchv1.JobTemplateSpec{
+				Spec: batchv1.JobSpec{
+					Template: v1.PodTemplateSpec{
+						Spec: v1.PodSpec{
+							Containers: []v1.Container{
+								v1.Container{
+									Name:  "logical-backup",
+									Image: image,
+									Env:   vars,
+								},
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	return cron
+}
+
+func TestCompareLogicalBackupJob(t *testing.T) {
+
+	img1 := "registry.opensource.zalan.do/acid/logical-backup:v1.0"
+	img2 := "registry.opensource.zalan.do/acid/logical-backup:v2.0"
+
+	tests := []struct {
+		about   string
+		current *batchv1.CronJob
+		new     *batchv1.CronJob
+		match   bool
+		reason  string
+	}{
+		{
+			about:   "two equal cronjobs",
+			current: newCronJob(img1, "0 0 * * *", []v1.EnvVar{}),
+			new:     newCronJob(img1, "0 0 * * *", []v1.EnvVar{}),
+			match:   true,
+		},
+		{
+			about:   "two cronjobs with different image",
+			current: newCronJob(img1, "0 0 * * *", []v1.EnvVar{}),
+			new:     newCronJob(img2, "0 0 * * *", []v1.EnvVar{}),
+			match:   false,
+			reason:  fmt.Sprintf("new job's image %q does not match the current one %q", img2, img1),
+		},
+		{
+			about:   "two cronjobs with different schedule",
+			current: newCronJob(img1, "0 0 * * *", []v1.EnvVar{}),
+			new:     newCronJob(img1, "0 * * * *", []v1.EnvVar{}),
+			match:   false,
+			reason:  fmt.Sprintf("new job's schedule %q does not match the current one %q", "0 * * * *", "0 0 * * *"),
+		},
+		{
+			about:   "two cronjobs with different environment variables",
+			current: newCronJob(img1, "0 0 * * *", []v1.EnvVar{{Name: "LOGICAL_BACKUP_S3_BUCKET_PREFIX", Value: "spilo"}}),
+			new:     newCronJob(img1, "0 0 * * *", []v1.EnvVar{{Name: "LOGICAL_BACKUP_S3_BUCKET_PREFIX", Value: "logical-backup"}}),
+			match:   false,
+			reason:  "logical backup container specs do not match: new cronjob container's logical-backup (index 0) environment does not match the current one",
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.about, func(t *testing.T) {
+			match, reason := cl.compareLogicalBackupJob(tt.current, tt.new)
+			if match != tt.match {
+				t.Errorf("%s - unexpected match result %t when comparing cronjobs %q and %q", t.Name(), match, tt.current, tt.new)
+			} else {
+				if !strings.HasPrefix(reason, tt.reason) {
+					t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason)
 				}
 			}
 		})
diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go
index 2cd1a96e3793f878c18422fd303e1da4615fce0d..64d85ab374825907a8714868cb6895ded1f79f5d 100644
--- a/pkg/cluster/k8sres.go
+++ b/pkg/cluster/k8sres.go
@@ -2403,6 +2403,10 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
 			Name:  "LOGICAL_BACKUP_S3_RETENTION_TIME",
 			Value: c.OpConfig.LogicalBackup.LogicalBackupS3RetentionTime,
 		},
+		{
+			Name:  "LOGICAL_BACKUP_S3_BUCKET_PREFIX",
+			Value: c.OpConfig.LogicalBackup.LogicalBackupS3BucketPrefix,
+		},
 		{
 			Name:  "LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX",
 			Value: getBucketScopeSuffix(string(c.Postgresql.GetUID())),
diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go
index 3a48e1082b2ddacf69e0c9c1d1861ef53dfea3c9..4c89c98fe02dce4c1a440c2b0854c2645d758f60 100644
--- a/pkg/cluster/sync.go
+++ b/pkg/cluster/sync.go
@@ -1367,7 +1367,7 @@ func (c *Cluster) syncLogicalBackupJob() error {
 		if err != nil {
 			return fmt.Errorf("could not generate the desired logical backup job state: %v", err)
 		}
-		if match, reason := k8sutil.SameLogicalBackupJob(job, desiredJob); !match {
+		if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match {
 			c.logger.Infof("logical job %s is not in the desired state and needs to be updated",
 				c.getLogicalBackupJobName(),
 			)
diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go
index b2f61a0cbe38e0574703130784e427ea7ea670a4..3b575e49858cc1554de3df922479424785b6aaa9 100644
--- a/pkg/controller/operator_config.go
+++ b/pkg/controller/operator_config.go
@@ -184,6 +184,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
 	result.LogicalBackupAzureStorageAccountKey = fromCRD.LogicalBackup.AzureStorageAccountKey
 	result.LogicalBackupAzureStorageContainer = fromCRD.LogicalBackup.AzureStorageContainer
 	result.LogicalBackupS3Bucket = fromCRD.LogicalBackup.S3Bucket
+	result.LogicalBackupS3BucketPrefix = util.Coalesce(fromCRD.LogicalBackup.S3BucketPrefix, "spilo")
 	result.LogicalBackupS3Region = fromCRD.LogicalBackup.S3Region
 	result.LogicalBackupS3Endpoint = fromCRD.LogicalBackup.S3Endpoint
 	result.LogicalBackupS3AccessKeyID = fromCRD.LogicalBackup.S3AccessKeyID
diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go
index 294c36898f8ee84218123ea300950b03db0c4600..ef19b45a31eb7f61b6a8b66e10c7b4a35b623058 100644
--- a/pkg/util/config/config.go
+++ b/pkg/util/config/config.go
@@ -132,6 +132,7 @@ type LogicalBackup struct {
 	LogicalBackupAzureStorageContainer        string `name:"logical_backup_azure_storage_container" default:""`
 	LogicalBackupAzureStorageAccountKey       string `name:"logical_backup_azure_storage_account_key" default:""`
 	LogicalBackupS3Bucket                     string `name:"logical_backup_s3_bucket" default:""`
+	LogicalBackupS3BucketPrefix               string `name:"logical_backup_s3_bucket_prefix" default:"spilo"`
 	LogicalBackupS3Region                     string `name:"logical_backup_s3_region" default:""`
 	LogicalBackupS3Endpoint                   string `name:"logical_backup_s3_endpoint" default:""`
 	LogicalBackupS3AccessKeyID                string `name:"logical_backup_s3_access_key_id" default:""`
diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go
index 44608856ef5595676e8eab7107c12ab2d184a0a5..66c30dede4ac26e3031df58fc8e2c896e4e8e0ac 100644
--- a/pkg/util/k8sutil/k8sutil.go
+++ b/pkg/util/k8sutil/k8sutil.go
@@ -8,7 +8,6 @@ import (
 	b64 "encoding/base64"
 	"encoding/json"
 
-	batchv1 "k8s.io/api/batch/v1"
 	clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
 
 	apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@@ -254,45 +253,6 @@ func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason stri
 	return
 }
 
-func getJobImage(cronJob *batchv1.CronJob) string {
-	return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
-}
-
-func getPgVersion(cronJob *batchv1.CronJob) string {
-	envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
-	for _, env := range envs {
-		if env.Name == "PG_VERSION" {
-			return env.Value
-		}
-	}
-	return ""
-}
-
-// SameLogicalBackupJob compares Specs of logical backup cron jobs
-func SameLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {
-
-	if cur.Spec.Schedule != new.Spec.Schedule {
-		return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
-			new.Spec.Schedule, cur.Spec.Schedule)
-	}
-
-	newImage := getJobImage(new)
-	curImage := getJobImage(cur)
-	if newImage != curImage {
-		return false, fmt.Sprintf("new job's image %q does not match the current one %q",
-			newImage, curImage)
-	}
-
-	newPgVersion := getPgVersion(new)
-	curPgVersion := getPgVersion(cur)
-	if newPgVersion != curPgVersion {
-		return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q",
-			newPgVersion, curPgVersion)
-	}
-
-	return true, ""
-}
-
 func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) {
 	oldFormatSecret := &v1.Secret{}
 	oldFormatSecret.Name = "testcluster"