diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 84a3919499c501ba7e62d4efa434ee6ad0f03eb4..02a06cac28e077730e0285bd77ff591b1bce1dc5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -41,7 +41,7 @@ type Config struct { } type kubeResources struct { - Service map[postgresRole]*v1.Service + Services map[postgresRole]*v1.Service Endpoint *v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *v1beta1.StatefulSet @@ -96,7 +96,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql pgUsers: make(map[string]spec.PgUser), systemUsers: make(map[string]spec.PgUser), podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), - kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)}, + kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[postgresRole]*v1.Service)}, masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, @@ -246,11 +246,11 @@ func (c *Cluster) Create() error { func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) { //TODO: improve comparison - if c.Service[role].Spec.Type != service.Spec.Type { + if c.Services[role].Spec.Type != service.Spec.Type { return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", - role, service.Spec.Type, c.Service[role].Spec.Type) + role, service.Spec.Type, c.Services[role].Spec.Type) } - oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges + oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges newSourceRanges := service.Spec.LoadBalancerSourceRanges /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { @@ -260,7 +260,7 @@ func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) } - oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] + oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] if oldDNSAnnotation != newDNSAnnotation { return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) @@ -445,12 +445,12 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } newService := c.generateService(role, &newSpec.Spec) if match, reason := c.sameServiceWith(role, newService); !match { - c.logServiceChanges(role, c.Service[role], newService, true, reason) + c.logServiceChanges(role, c.Services[role], newService, true, reason) if err := c.updateService(role, newService); err != nil { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not update %s service: %v", role, err) } - c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) + c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta)) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 8939a7bda2d489b9289c54f8aa51236d388c0807..c018f6a027accc8462f2cb9493cc03220b5a7536 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -31,9 +31,9 @@ func (c *Cluster) loadResources() error { for i, svc := range services.Items { switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { case replica: - c.Service[replica] = &services.Items[i] + c.Services[replica] = &services.Items[i] default: - c.Service[master] = &services.Items[i] + c.Services[master] = &services.Items[i] } } @@ -91,7 +91,7 @@ func (c *Cluster) listResources() error { c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) } - for role, service := range c.Service { + for role, service := range c.Services { c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) } @@ -231,7 +231,7 @@ func (c *Cluster) deleteStatefulSet() error { } func (c *Cluster) createService(role postgresRole) (*v1.Service, error) { - if c.Service[role] != nil { + if c.Services[role] != nil { return nil, fmt.Errorf("service already exists in the cluster") } serviceSpec := c.generateService(role, &c.Spec) @@ -241,18 +241,18 @@ func (c *Cluster) createService(role postgresRole) (*v1.Service, error) { return nil, err } - c.Service[role] = service + c.Services[role] = service return service, nil } func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error { - if c.Service[role] == nil { + if c.Services[role] == nil { return fmt.Errorf("there is no service in the cluster") } - serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) + serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes - if newService.Spec.Type != c.Service[role].Spec.Type { + if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. var ( @@ -263,12 +263,12 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error if role == master { // for the master service we need to re-create the endpoint as well. Get the up-to-date version of // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) - currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{}) + currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get current cluster endpoints: %v", err) } } - err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) + err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions) if err != nil { return fmt.Errorf("could not delete service %q: %v", serviceName, err) } @@ -277,11 +277,11 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error if err != nil { return fmt.Errorf("could not create service %q: %v", serviceName, err) } - c.Service[role] = svc + c.Services[role] = svc if role == master { // create the new endpoint using the addresses obtained from the previous one endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) - ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) + ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec) if err != nil { return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) } @@ -293,8 +293,8 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error if len(newService.ObjectMeta.Annotations) > 0 { annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) - _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( - c.Service[role].Name, + _, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( + c.Services[role].Name, types.StrategicMergePatchType, []byte(annotationsPatchData), "") @@ -308,30 +308,30 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } - svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( - c.Service[role].Name, + svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( + c.Services[role].Name, types.MergePatchType, patchData, "") if err != nil { return fmt.Errorf("could not patch service %q: %v", serviceName, err) } - c.Service[role] = svc + c.Services[role] = svc return nil } func (c *Cluster) deleteService(role postgresRole) error { c.logger.Debugf("deleting service %s", role) - if c.Service[role] == nil { + if c.Services[role] == nil { return fmt.Errorf("there is no %s service in the cluster", role) } - service := c.Service[role] + service := c.Services[role] err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) if err != nil { return err } c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) - c.Service[role] = nil + c.Services[role] = nil return nil } @@ -372,9 +372,9 @@ func (c *Cluster) applySecrets() error { secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) if k8sutil.ResourceAlreadyExists(err) { var userMap map[string]spec.PgUser - curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("could not get current secret: %v", err) + curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) + if err2 != nil { + return fmt.Errorf("could not get current secret: %v", err2) } c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { @@ -422,12 +422,12 @@ func (c *Cluster) createRoles() (err error) { // GetServiceMaster returns cluster's kubernetes master Service func (c *Cluster) GetServiceMaster() *v1.Service { - return c.Service[master] + return c.Services[master] } // GetServiceReplica returns cluster's kubernetes replica Service func (c *Cluster) GetServiceReplica() *v1.Service { - return c.Service[replica] + return c.Services[replica] } // GetEndpoint returns cluster's kubernetes Endpoint diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b77fec21002178ee2917c8899e8a97eb483a089c..f770c9a906202fc665361a8a7d2dc571ff555ef7 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -43,7 +43,7 @@ func (c *Cluster) Sync() error { c.logger.Debugf("syncing services") for _, role := range []postgresRole{master, replica} { if role == replica && !c.Spec.ReplicaLoadBalancer { - if c.Service[role] != nil { + if c.Services[role] != nil { // delete the left over replica service if err := c.deleteService(role); err != nil { return fmt.Errorf("could not delete obsolete %s service: %v", role, err) @@ -82,7 +82,7 @@ func (c *Cluster) Sync() error { func (c *Cluster) syncService(role postgresRole) error { cSpec := c.Spec - if c.Service[role] == nil { + if c.Services[role] == nil { c.logger.Infof("could not find the cluster's %s service", role) svc, err := c.createService(role) if err != nil { @@ -98,7 +98,7 @@ func (c *Cluster) syncService(role postgresRole) error { if match { return nil } - c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason) + c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason) if err := c.updateService(role, desiredSvc); err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 284ebcebe9966de0c71367c8dd4edef575839bd9..b490a583d2d0c977919112a8e4e0690f083183d5 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -235,13 +235,13 @@ func (c *Cluster) waitPodLabelsReady() error { err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption) - if err != nil { - return false, err + masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) + if err2 != nil { + return false, err2 } - replicaPods, err := c.KubeClient.Pods(namespace).List(replicaListOption) - if err != nil { - return false, err + replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption) + if err2 != nil { + return false, err2 } if len(masterPods.Items) > 1 { return false, fmt.Errorf("too many masters") diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 64c2a32d41a94c5af15ecc5dfe054f900e8a8687..8dc142e1431c0415355d80bac3b5bf838b400bca 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam return cl } -func (c *Controller) processEvent(obj interface{}) error { +func (c *Controller) processEvent(event spec.ClusterEvent) { var clusterName spec.NamespacedName - event, ok := obj.(spec.ClusterEvent) - if !ok { - return fmt.Errorf("could not cast to ClusterEvent") - } lg := c.logger.WithField("worker", event.WorkerID) if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { @@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error { case spec.EventAdd: if clusterFound { lg.Debugf("cluster already exists") - return nil + return } lg.Infof("creation of the cluster started") @@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error { cl.Error = fmt.Errorf("could not create cluster: %v", err) lg.Error(cl.Error) - return nil + return } lg.Infoln("cluster has been created") @@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error { if !clusterFound { lg.Warnln("cluster does not exist") - return nil + return } if err := cl.Update(event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not update cluster: %v", err) lg.Error(cl.Error) - return nil + return } cl.Error = nil lg.Infoln("cluster has been updated") @@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) - return nil + return } if err := cl.Delete(); err != nil { lg.Errorf("could not delete cluster: %v", err) - return nil + return } func() { @@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error { if err := cl.Sync(); err != nil { cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error) - return nil + return } cl.Error = nil lg.Infof("cluster has been synced") } - - return nil } func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, }() for { - if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { + obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil })) + if err != nil { if err == cache.FIFOClosedError { return } - c.logger.Errorf("error when processing cluster events queue: %v", err) + continue + } + event, ok := obj.(spec.ClusterEvent) + if !ok { + c.logger.Errorf("could not cast to ClusterEvent") } + + c.processEvent(event) } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 73273a2fb8f59fc7297fb71a4a80b8e998e69009..bee6607a89a82799b1d128547b22e3708387b165 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -4,14 +4,13 @@ import ( "crypto/md5" "encoding/hex" "math/rand" + "regexp" "strings" "time" "github.com/motomux/pretty" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "regexp" - "github.com/zalando-incubator/postgres-operator/pkg/spec" )