diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index eaa56ec46f448c1ec621f24af408e110087dadf4..1dbb15dfb2823e384c1f131a4751ab758287b778 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -178,7 +178,8 @@ func (c *Cluster) Create() error { //TODO: service will create endpoint implicitly ep, err := c.createEndpoint() if err != nil { - return fmt.Errorf("could not create endpoint: %v", err) + c.Error = fmt.Errorf("could not create endpoint: %v", err) + return c.Error } c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) @@ -188,41 +189,48 @@ func (c *Cluster) Create() error { } service, err := c.createService(role) if err != nil { - return fmt.Errorf("could not create %s service: %v", role, err) + c.Error = fmt.Errorf("could not create %s service: %v", role, err) + return c.Error } c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { - return err + c.Error = err + return c.Error } c.logger.Infof("User secrets have been initialized") if err = c.applySecrets(); err != nil { - return fmt.Errorf("could not create secrets: %v", err) + c.Error = fmt.Errorf("could not create secrets: %v", err) + return c.Error } c.logger.Infof("secrets have been successfully created") ss, err := c.createStatefulSet() if err != nil { - return fmt.Errorf("could not create statefulset: %v", err) + c.Error = fmt.Errorf("could not create statefulset: %v", err) + return c.Error } c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") if err = c.waitStatefulsetPodsReady(); err != nil { + c.Error = err c.logger.Errorf("Failed to create cluster: %s", err) - return err + return c.Error } c.logger.Infof("pods are ready") if !(c.masterLess || c.databaseAccessDisabled()) { if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init db connection: %v", err) + c.Error = fmt.Errorf("could not init db connection: %v", err) + return c.Error } if err = c.createUsers(); err != nil { - return fmt.Errorf("could not create users: %v", err) + c.Error = fmt.Errorf("could not create users: %v", err) + return c.Error } c.logger.Infof("Users have been successfully created") } else { @@ -495,6 +503,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } c.setStatus(spec.ClusterStatusRunning) + c.Error = nil return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7dd2b9ee4d13f67b33c25c056dc81a9e663c3de6..706ede96501c33dada0a4a1645808ea0eacd9e06 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -73,6 +73,8 @@ func (c *Cluster) Sync() error { return fmt.Errorf("could not sync persistent volumes: %v", err) } + c.Error = nil + return nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962bdc34c277c8f96da9841ef58d840eab7e..6c37856ca19e19720b09ede12ab9933dae01f815 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -10,7 +10,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -31,7 +30,7 @@ type Controller struct { logger *logrus.Entry clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster + clusters map[spec.NamespacedName]spec.Cluster stopChs map[spec.NamespacedName]chan struct{} postgresqlInformer cache.SharedIndexInformer @@ -56,7 +55,7 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { Config: *controllerConfig, opConfig: operatorConfig, logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusters: make(map[spec.NamespacedName]spec.Cluster), stopChs: make(map[spec.NamespacedName]chan struct{}), podCh: make(chan spec.PodEvent), } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37f8579e925fa9e9875328568c3d01c3b3..06a334e7458e700262b3cc50b6415e07ec41734d 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -134,9 +134,7 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.Unlock() if err := cl.Create(); err != nil { - cl.Error = fmt.Errorf("could not create cluster: %v", err) - logger.Errorf("%v", cl.Error) - + logger.Errorf("could not create cluster '%s': %v", clusterName, err) return nil } @@ -149,17 +147,14 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) - logger.Errorf("%v", cl.Error) - + logger.Errorf("could not update cluster '%s': %v", clusterName, err) return nil } - cl.Error = nil logger.Infof("Cluster '%s' has been updated", clusterName) case spec.EventDelete: logger.Infof("Deletion of the '%s' cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + logger.Errorf("Cluster '%s' is not found", clusterName) return nil } @@ -191,11 +186,9 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) - logger.Errorf("%v", cl.Error) + logger.Errorf("%v", "could not sync cluster '%s': %v", clusterName, err) return nil } - cl.Error = nil logger.Infof("Cluster '%s' has been synced", clusterName) } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce923243dba9576be5fde37c2894d932be..c95a3e718719b21f0580f4ce8cc2b3291a1db2ed 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,9 +1,9 @@ package spec import ( + "database/sql" "fmt" "strings" - "database/sql" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/types" @@ -73,6 +73,28 @@ type UserSyncer interface { ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error } +type ClusterEventHandler interface { + Create() error + Update(*Postgresql) error + Delete() error + Sync() error +} + +type ClusterCommandExecutor interface { + ExecCommand(*NamespacedName, ...string) (string, error) +} + +type ClusterController interface { + Run(<-chan struct{}) + ReceivePodEvent(PodEvent) +} + +type Cluster interface { + ClusterEventHandler + ClusterCommandExecutor + ClusterController +} + func (n NamespacedName) String() string { return types.NamespacedName(n).String() }