From 356be8f0f1b640d5cf80fa8b574fcf2a0f886e04 Mon Sep 17 00:00:00 2001 From: Murat Kabilov <murat@kabilov.com> Date: Tue, 16 May 2017 16:46:37 +0200 Subject: [PATCH] skip clusters with invalid spec --- pkg/cluster/cluster.go | 5 ++- pkg/cluster/k8sres.go | 71 +++++++++++++++++++++++++++--------- pkg/cluster/resources.go | 5 ++- pkg/cluster/sync.go | 6 ++- pkg/controller/postgresql.go | 37 ++++++++++++++++--- pkg/spec/postgresql.go | 42 ++++++++++++++------- 6 files changed, 126 insertions(+), 40 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 895b9194..c6f5ed9e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -369,7 +369,10 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { //TODO: update PVC } - newStatefulSet := c.genStatefulSet(newSpec.Spec) + newStatefulSet, err := c.genStatefulSet(newSpec.Spec) + if err != nil { + return fmt.Errorf("Can't generate StatefulSet: %s", err) + } sameSS, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet) if !sameSS { diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index da25a6ac..9bdd2b2b 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -43,7 +43,9 @@ type spiloConfiguration struct { Bootstrap pgBootstrap `json:"bootstrap"` } -func (c *Cluster) resourceRequirements(resources spec.Resources) *v1.ResourceRequirements { +func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRequirements, error) { + var err error + specRequests := resources.ResourceRequest specLimits := resources.ResourceLimits @@ -54,26 +56,47 @@ func (c *Cluster) resourceRequirements(resources spec.Resources) *v1.ResourceReq result := v1.ResourceRequirements{} - result.Requests = fillResourceList(specRequests, defaultRequests) - result.Limits = fillResourceList(specLimits, defaultLimits) + result.Requests, err = fillResourceList(specRequests, defaultRequests) + if err != nil { + return nil, fmt.Errorf("Can't fill resource requests: %s", err) + } + + result.Limits, err = fillResourceList(specLimits, defaultLimits) + if err != nil { + return nil, fmt.Errorf("Can't fill resource limits: %s", err) + } - return &result + return &result, nil } -func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescription) v1.ResourceList { +func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescription) (v1.ResourceList, error) { + var err error requests := v1.ResourceList{} if spec.Cpu != "" { - requests[v1.ResourceCPU] = resource.MustParse(spec.Cpu) + requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.Cpu) + if err != nil { + return nil, fmt.Errorf("Can't parse CPU quantity: %s", err) + } } else { - requests[v1.ResourceCPU] = resource.MustParse(defaults.Cpu) + requests[v1.ResourceCPU], err = resource.ParseQuantity(defaults.Cpu) + if err != nil { + return nil, fmt.Errorf("Can't parse default CPU quantity: %s", err) + } } if spec.Memory != "" { - requests[v1.ResourceMemory] = resource.MustParse(spec.Memory) + requests[v1.ResourceMemory], err = resource.ParseQuantity(spec.Memory) + if err != nil { + return nil, fmt.Errorf("Can't parse memory quantity: %s", err) + } } else { - requests[v1.ResourceMemory] = resource.MustParse(defaults.Memory) + requests[v1.ResourceMemory], err = resource.ParseQuantity(defaults.Memory) + if err != nil { + return nil, fmt.Errorf("Can't parse default memory quantity: %s", err) + } } - return requests + + return requests, nil } func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patroni *spec.Patroni) string { @@ -170,7 +193,6 @@ PATRONI_INITDB_PARAMS: } func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { - spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters) envVars := []v1.EnvVar{ @@ -290,10 +312,17 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, return &template } -func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) *v1beta1.StatefulSet { - resourceRequirements := c.resourceRequirements(spec.Resources) +func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { + resourceRequirements, err := c.resourceRequirements(spec.Resources) + if err != nil { + return nil, err + } + podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) - volumeClaimTemplate := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) + volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) + if err != nil { + return nil, err + } statefulSet := &v1beta1.StatefulSet{ ObjectMeta: v1.ObjectMeta{ @@ -309,10 +338,10 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) *v1beta1.StatefulSet { }, } - return statefulSet + return statefulSet, nil } -func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.PersistentVolumeClaim { +func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { metadata := v1.ObjectMeta{ Name: constants.DataVolumeName, } @@ -323,18 +352,24 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.Pe metadata.Annotations = map[string]string{"volume.alpha.kubernetes.io/storage-class": "default"} } + quantity, err := resource.ParseQuantity(volumeSize) + if err != nil { + return nil, fmt.Errorf("Can't parse volume size: %s", err) + } + volumeClaim := &v1.PersistentVolumeClaim{ ObjectMeta: metadata, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ - v1.ResourceStorage: resource.MustParse(volumeSize), + v1.ResourceStorage: quantity, }, }, }, } - return volumeClaim + + return volumeClaim, nil } func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 8815aeca..11928ed6 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -106,7 +106,10 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if c.Statefulset != nil { return nil, fmt.Errorf("StatefulSet already exists in the cluster") } - statefulSetSpec := c.genStatefulSet(c.Spec) + statefulSetSpec, err := c.genStatefulSet(c.Spec) + if err != nil { + return nil, fmt.Errorf("Can't generate StatefulSet: %s", err) + } statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec) if k8sutil.ResourceAlreadyExists(err) { return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSetSpec.ObjectMeta)) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 11e89d60..49a32c9d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -137,7 +137,11 @@ func (c *Cluster) syncStatefulSet() error { match bool reason string ) - desiredSS := c.genStatefulSet(cSpec) + desiredSS, err := c.genStatefulSet(cSpec) + if err != nil { + return fmt.Errorf("Can't generate StatefulSet: %s", err) + } + match, rollUpdate, reason = c.compareStatefulSetWith(desiredSS) if match { return nil diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 4b37689e..30ef1f94 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -37,17 +37,30 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err) } + var activeClustersCnt, failedClustersCnt int for _, obj := range objList { pg, ok := obj.(*spec.Postgresql) if !ok { return nil, fmt.Errorf("Can't cast object to postgresql") } + + if pg.Error != nil { + failedClustersCnt++ + continue + } c.queueClusterEvent(nil, pg, spec.EventSync) c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata)) + activeClustersCnt++ } if len(objList) > 0 { - c.logger.Infof("There are %d clusters currently running", len(objList)) + if failedClustersCnt > 0 && activeClustersCnt == 0 { + c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) + } else if failedClustersCnt == 0 && activeClustersCnt > 0 { + c.logger.Infof("There are %d clusters running", activeClustersCnt) + } else { + c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) + } } else { c.logger.Infof("No clusters running") } @@ -168,17 +181,31 @@ func (c *Controller) processClusterEventsQueue(idx int) { func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { var ( - uid types.UID - clusterName spec.NamespacedName + uid types.UID + clusterName spec.NamespacedName + clusterError error ) - if old != nil { + if old != nil { //update, delete uid = old.Metadata.GetUID() clusterName = util.NameFromMeta(old.Metadata) - } else { + if eventType == spec.EventUpdate && new.Error == nil && old != nil { + eventType = spec.EventAdd + clusterError = new.Error + } else { + clusterError = old.Error + } + } else { //add, sync uid = new.Metadata.GetUID() clusterName = util.NameFromMeta(new.Metadata) + clusterError = new.Error } + + if clusterError != nil && eventType != spec.EventDelete { + c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %s)", eventType, clusterName, clusterError) + return + } + workerId := c.clusterWorkerId(clusterName) clusterEvent := spec.ClusterEvent{ EventType: eventType, diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index a4957375..a8b58204 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -38,8 +38,8 @@ type ResourceDescription struct { } type Resources struct { - ResourceRequest ResourceDescription `json:"requests,omitempty""` - ResourceLimits ResourceDescription `json:"limits,omitempty""` + ResourceRequest ResourceDescription `json:"requests,omitempty"` + ResourceLimits ResourceDescription `json:"limits,omitempty"` } type Patroni struct { @@ -62,6 +62,7 @@ const ( ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed" ClusterStatusAddFailed PostgresStatus = "CreateFailed" ClusterStatusRunning PostgresStatus = "Running" + ClusterStatusInvalid PostgresStatus = "Invalid" ) // PostgreSQL Third Party (resource) Object @@ -71,6 +72,7 @@ type Postgresql struct { Spec PostgresSpec `json:"spec"` Status PostgresStatus `json:"status"` + Error error `json:"-"` } type PostgresSpec struct { @@ -189,38 +191,50 @@ func (pl *PostgresqlList) GetListMeta() unversioned.List { return &pl.Metadata } -// The code below is used only to work around a known problem with third-party -// resources and ugorji. If/when these issues are resolved, the code below -// should no longer be required. -// -type PostgresqlListCopy PostgresqlList -type PostgresqlCopy Postgresql - func clusterName(clusterName string, teamName string) (string, error) { teamNameLen := len(teamName) if len(clusterName) < teamNameLen+2 { return "", fmt.Errorf("Name is too short") } if strings.ToLower(clusterName[:teamNameLen+1]) != strings.ToLower(teamName)+"-" { - return "", fmt.Errorf("Name must start with the team name and dash") + return "", fmt.Errorf("Name must match {TEAM}-{NAME} format") } return clusterName[teamNameLen+1:], nil } +// The code below is used only to work around a known problem with third-party +// resources and ugorji. If/when these issues are resolved, the code below +// should no longer be required. +// +type PostgresqlListCopy PostgresqlList +type PostgresqlCopy Postgresql + func (p *Postgresql) UnmarshalJSON(data []byte) error { tmp := PostgresqlCopy{} err := json.Unmarshal(data, &tmp) if err != nil { - return err + metaErr := json.Unmarshal(data, &tmp.Metadata) + if metaErr != nil { + return err + } + + tmp.Error = err + tmp.Status = ClusterStatusInvalid + + *p = Postgresql(tmp) + + return nil } tmp2 := Postgresql(tmp) clusterName, err := clusterName(tmp2.Metadata.Name, tmp2.Spec.TeamId) - if err != nil { - return err + if err == nil { + tmp2.Spec.ClusterName = clusterName + } else { + tmp2.Error = err + tmp2.Status = ClusterStatusInvalid } - tmp2.Spec.ClusterName = clusterName *p = tmp2 return nil -- GitLab