From 333dfdd640f70e3fb6c08293dcf4039432a7dba1 Mon Sep 17 00:00:00 2001 From: Murat Kabilov <murat@kabilov.com> Date: Thu, 13 Jul 2017 15:26:22 +0200 Subject: [PATCH] introduce Cluster interface --- pkg/cluster/cluster.go | 25 +++++++++++++++++-------- pkg/cluster/sync.go | 2 ++ pkg/controller/controller.go | 5 ++--- pkg/controller/postgresql.go | 15 ++++----------- pkg/spec/types.go | 24 +++++++++++++++++++++++- 5 files changed, 48 insertions(+), 23 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index eaa56ec4..1dbb15df 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -178,7 +178,8 @@ func (c *Cluster) Create() error { //TODO: service will create endpoint implicitly ep, err := c.createEndpoint() if err != nil { - return fmt.Errorf("could not create endpoint: %v", err) + c.Error = fmt.Errorf("could not create endpoint: %v", err) + return c.Error } c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) @@ -188,41 +189,48 @@ func (c *Cluster) Create() error { } service, err := c.createService(role) if err != nil { - return fmt.Errorf("could not create %s service: %v", role, err) + c.Error = fmt.Errorf("could not create %s service: %v", role, err) + return c.Error } c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { - return err + c.Error = err + return c.Error } c.logger.Infof("User secrets have been initialized") if err = c.applySecrets(); err != nil { - return fmt.Errorf("could not create secrets: %v", err) + c.Error = fmt.Errorf("could not create secrets: %v", err) + return c.Error } c.logger.Infof("secrets have been successfully created") ss, err := c.createStatefulSet() if err != nil { - return fmt.Errorf("could not create statefulset: %v", err) + c.Error = fmt.Errorf("could not create statefulset: %v", err) + return c.Error } c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") if err = c.waitStatefulsetPodsReady(); err != nil { + c.Error = err c.logger.Errorf("Failed to create cluster: %s", err) - return err + return c.Error } c.logger.Infof("pods are ready") if !(c.masterLess || c.databaseAccessDisabled()) { if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init db connection: %v", err) + c.Error = fmt.Errorf("could not init db connection: %v", err) + return c.Error } if err = c.createUsers(); err != nil { - return fmt.Errorf("could not create users: %v", err) + c.Error = fmt.Errorf("could not create users: %v", err) + return c.Error } c.logger.Infof("Users have been successfully created") } else { @@ -495,6 +503,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } c.setStatus(spec.ClusterStatusRunning) + c.Error = nil return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7dd2b9ee..706ede96 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -73,6 +73,8 @@ func (c *Cluster) Sync() error { return fmt.Errorf("could not sync persistent volumes: %v", err) } + c.Error = nil + return nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962b..6c37856c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -10,7 +10,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -31,7 +30,7 @@ type Controller struct { logger *logrus.Entry clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster + clusters map[spec.NamespacedName]spec.Cluster stopChs map[spec.NamespacedName]chan struct{} postgresqlInformer cache.SharedIndexInformer @@ -56,7 +55,7 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { Config: *controllerConfig, opConfig: operatorConfig, logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusters: make(map[spec.NamespacedName]spec.Cluster), stopChs: make(map[spec.NamespacedName]chan struct{}), podCh: make(chan spec.PodEvent), } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37..06a334e7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -134,9 +134,7 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.Unlock() if err := cl.Create(); err != nil { - cl.Error = fmt.Errorf("could not create cluster: %v", err) - logger.Errorf("%v", cl.Error) - + logger.Errorf("could not create cluster '%s': %v", clusterName, err) return nil } @@ -149,17 +147,14 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) - logger.Errorf("%v", cl.Error) - + logger.Errorf("could not update cluster '%s': %v", clusterName, err) return nil } - cl.Error = nil logger.Infof("Cluster '%s' has been updated", clusterName) case spec.EventDelete: logger.Infof("Deletion of the '%s' cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + logger.Errorf("Cluster '%s' is not found", clusterName) return nil } @@ -191,11 +186,9 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) - logger.Errorf("%v", cl.Error) + logger.Errorf("%v", "could not sync cluster '%s': %v", clusterName, err) return nil } - cl.Error = nil logger.Infof("Cluster '%s' has been synced", clusterName) } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce..c95a3e71 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,9 +1,9 @@ package spec import ( + "database/sql" "fmt" "strings" - "database/sql" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/types" @@ -73,6 +73,28 @@ type UserSyncer interface { ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error } +type ClusterEventHandler interface { + Create() error + Update(*Postgresql) error + Delete() error + Sync() error +} + +type ClusterCommandExecutor interface { + ExecCommand(*NamespacedName, ...string) (string, error) +} + +type ClusterController interface { + Run(<-chan struct{}) + ReceivePodEvent(PodEvent) +} + +type Cluster interface { + ClusterEventHandler + ClusterCommandExecutor + ClusterController +} + func (n NamespacedName) String() string { return types.NamespacedName(n).String() } -- GitLab