Skip to content
Snippets Groups Projects
Commit c557027e authored by Murat Kabilov's avatar Murat Kabilov Committed by GitHub
Browse files

Fix cluster event queue consumption

parents 5a7a3fec dad8e2f4
No related tags found
No related merge requests found
......@@ -41,7 +41,7 @@ type Config struct {
}
type kubeResources struct {
Service map[postgresRole]*v1.Service
Services map[postgresRole]*v1.Service
Endpoint *v1.Endpoints
Secrets map[types.UID]*v1.Secret
Statefulset *v1beta1.StatefulSet
......@@ -96,7 +96,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
pgUsers: make(map[string]spec.PgUser),
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)},
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[postgresRole]*v1.Service)},
masterLess: false,
userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
......@@ -246,11 +246,11 @@ func (c *Cluster) Create() error {
func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) {
//TODO: improve comparison
if c.Service[role].Spec.Type != service.Spec.Type {
if c.Services[role].Spec.Type != service.Spec.Type {
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
role, service.Spec.Type, c.Service[role].Spec.Type)
role, service.Spec.Type, c.Services[role].Spec.Type)
}
oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges
oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges
newSourceRanges := service.Spec.LoadBalancerSourceRanges
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) {
......@@ -260,7 +260,7 @@ func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match
return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role)
}
oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation]
oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation]
newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation]
if oldDNSAnnotation != newDNSAnnotation {
return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation)
......@@ -445,12 +445,12 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
}
newService := c.generateService(role, &newSpec.Spec)
if match, reason := c.sameServiceWith(role, newService); !match {
c.logServiceChanges(role, c.Service[role], newService, true, reason)
c.logServiceChanges(role, c.Services[role], newService, true, reason)
if err := c.updateService(role, newService); err != nil {
c.setStatus(spec.ClusterStatusUpdateFailed)
return fmt.Errorf("could not update %s service: %v", role, err)
}
c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta))
c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta))
}
}
......
......@@ -31,9 +31,9 @@ func (c *Cluster) loadResources() error {
for i, svc := range services.Items {
switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) {
case replica:
c.Service[replica] = &services.Items[i]
c.Services[replica] = &services.Items[i]
default:
c.Service[master] = &services.Items[i]
c.Services[master] = &services.Items[i]
}
}
......@@ -91,7 +91,7 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID)
}
for role, service := range c.Service {
for role, service := range c.Services {
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
}
......@@ -231,7 +231,7 @@ func (c *Cluster) deleteStatefulSet() error {
}
func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
if c.Service[role] != nil {
if c.Services[role] != nil {
return nil, fmt.Errorf("service already exists in the cluster")
}
serviceSpec := c.generateService(role, &c.Spec)
......@@ -241,18 +241,18 @@ func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
return nil, err
}
c.Service[role] = service
c.Services[role] = service
return service, nil
}
func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error {
if c.Service[role] == nil {
if c.Services[role] == nil {
return fmt.Errorf("there is no service in the cluster")
}
serviceName := util.NameFromMeta(c.Service[role].ObjectMeta)
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
if newService.Spec.Type != c.Service[role].Spec.Type {
if newService.Spec.Type != c.Services[role].Spec.Type {
// service type has changed, need to replace the service completely.
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
var (
......@@ -263,12 +263,12 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
if role == master {
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{})
currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get current cluster endpoints: %v", err)
}
}
err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions)
err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions)
if err != nil {
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
}
......@@ -277,11 +277,11 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
if err != nil {
return fmt.Errorf("could not create service %q: %v", serviceName, err)
}
c.Service[role] = svc
c.Services[role] = svc
if role == master {
// create the new endpoint using the addresses obtained from the previous one
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec)
if err != nil {
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
}
......@@ -293,8 +293,8 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
if len(newService.ObjectMeta.Annotations) > 0 {
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
_, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
c.Service[role].Name,
_, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
c.Services[role].Name,
types.StrategicMergePatchType,
[]byte(annotationsPatchData), "")
......@@ -308,30 +308,30 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
c.Service[role].Name,
svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
c.Services[role].Name,
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
}
c.Service[role] = svc
c.Services[role] = svc
return nil
}
func (c *Cluster) deleteService(role postgresRole) error {
c.logger.Debugf("deleting service %s", role)
if c.Service[role] == nil {
if c.Services[role] == nil {
return fmt.Errorf("there is no %s service in the cluster", role)
}
service := c.Service[role]
service := c.Services[role]
err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions)
if err != nil {
return err
}
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
c.Service[role] = nil
c.Services[role] = nil
return nil
}
......@@ -372,9 +372,9 @@ func (c *Cluster) applySecrets() error {
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
if k8sutil.ResourceAlreadyExists(err) {
var userMap map[string]spec.PgUser
curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get current secret: %v", err)
curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
if err2 != nil {
return fmt.Errorf("could not get current secret: %v", err2)
}
c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta))
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
......@@ -422,12 +422,12 @@ func (c *Cluster) createRoles() (err error) {
// GetServiceMaster returns cluster's kubernetes master Service
func (c *Cluster) GetServiceMaster() *v1.Service {
return c.Service[master]
return c.Services[master]
}
// GetServiceReplica returns cluster's kubernetes replica Service
func (c *Cluster) GetServiceReplica() *v1.Service {
return c.Service[replica]
return c.Services[replica]
}
// GetEndpoint returns cluster's kubernetes Endpoint
......
......@@ -43,7 +43,7 @@ func (c *Cluster) Sync() error {
c.logger.Debugf("syncing services")
for _, role := range []postgresRole{master, replica} {
if role == replica && !c.Spec.ReplicaLoadBalancer {
if c.Service[role] != nil {
if c.Services[role] != nil {
// delete the left over replica service
if err := c.deleteService(role); err != nil {
return fmt.Errorf("could not delete obsolete %s service: %v", role, err)
......@@ -82,7 +82,7 @@ func (c *Cluster) Sync() error {
func (c *Cluster) syncService(role postgresRole) error {
cSpec := c.Spec
if c.Service[role] == nil {
if c.Services[role] == nil {
c.logger.Infof("could not find the cluster's %s service", role)
svc, err := c.createService(role)
if err != nil {
......@@ -98,7 +98,7 @@ func (c *Cluster) syncService(role postgresRole) error {
if match {
return nil
}
c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason)
c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason)
if err := c.updateService(role, desiredSvc); err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
......
......@@ -235,13 +235,13 @@ func (c *Cluster) waitPodLabelsReady() error {
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption)
if err != nil {
return false, err
masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption)
if err2 != nil {
return false, err2
}
replicaPods, err := c.KubeClient.Pods(namespace).List(replicaListOption)
if err != nil {
return false, err
replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption)
if err2 != nil {
return false, err2
}
if len(masterPods.Items) > 1 {
return false, fmt.Errorf("too many masters")
......
......@@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
return cl
}
func (c *Controller) processEvent(obj interface{}) error {
func (c *Controller) processEvent(event spec.ClusterEvent) {
var clusterName spec.NamespacedName
event, ok := obj.(spec.ClusterEvent)
if !ok {
return fmt.Errorf("could not cast to ClusterEvent")
}
lg := c.logger.WithField("worker", event.WorkerID)
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
......@@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error {
case spec.EventAdd:
if clusterFound {
lg.Debugf("cluster already exists")
return nil
return
}
lg.Infof("creation of the cluster started")
......@@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error {
cl.Error = fmt.Errorf("could not create cluster: %v", err)
lg.Error(cl.Error)
return nil
return
}
lg.Infoln("cluster has been created")
......@@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error {
if !clusterFound {
lg.Warnln("cluster does not exist")
return nil
return
}
if err := cl.Update(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not update cluster: %v", err)
lg.Error(cl.Error)
return nil
return
}
cl.Error = nil
lg.Infoln("cluster has been updated")
......@@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error {
lg.Infoln("Deletion of the cluster started")
if !clusterFound {
lg.Errorf("unknown cluster: %q", clusterName)
return nil
return
}
if err := cl.Delete(); err != nil {
lg.Errorf("could not delete cluster: %v", err)
return nil
return
}
func() {
......@@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error {
if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error)
return nil
return
}
cl.Error = nil
lg.Infof("cluster has been synced")
}
return nil
}
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
......@@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
}()
for {
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil }))
if err != nil {
if err == cache.FIFOClosedError {
return
}
c.logger.Errorf("error when processing cluster events queue: %v", err)
continue
}
event, ok := obj.(spec.ClusterEvent)
if !ok {
c.logger.Errorf("could not cast to ClusterEvent")
}
c.processEvent(event)
}
}
......
......@@ -4,14 +4,13 @@ import (
"crypto/md5"
"encoding/hex"
"math/rand"
"regexp"
"strings"
"time"
"github.com/motomux/pretty"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment