diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 64393e1e740c32d274830f8b32ed14074f54f177..2d79c5b7268d5c7c10e886a3629ba3c9214e2c92 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/types" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -54,18 +55,17 @@ type Cluster struct { kubeResources spec.Postgresql Config - logger *logrus.Entry - pgUsers map[string]spec.PgUser - systemUsers map[string]spec.PgUser - podEvents chan spec.PodEvent - podSubscribers map[spec.NamespacedName]chan spec.PodEvent - podSubscribersMu sync.RWMutex - pgDb *sql.DB - mu sync.Mutex - masterLess bool - podDispatcherRunning bool - userSyncStrategy spec.UserSyncer - deleteOptions *v1.DeleteOptions + logger *logrus.Entry + pgUsers map[string]spec.PgUser + systemUsers map[string]spec.PgUser + podSubscribers map[spec.NamespacedName]chan spec.PodEvent + podSubscribersMu sync.RWMutex + pgDb *sql.DB + mu sync.Mutex + masterLess bool + userSyncStrategy spec.UserSyncer + deleteOptions *v1.DeleteOptions + podEventsQueue *cache.FIFO } func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { @@ -73,19 +73,27 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} orphanDependents := true + podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { + e, ok := obj.(spec.PodEvent) + if !ok { + return "", fmt.Errorf("could not cast to PodEvent") + } + + return fmt.Sprintf("%s-%s", e.PodName, e.ResourceVersion), nil + }) + cluster := &Cluster{ - Config: cfg, - Postgresql: pgSpec, - logger: lg, - pgUsers: make(map[string]spec.PgUser), - systemUsers: make(map[string]spec.PgUser), - podEvents: make(chan spec.PodEvent), - podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), - kubeResources: kubeResources, - masterLess: false, - podDispatcherRunning: false, - userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents}, + Config: cfg, + Postgresql: pgSpec, + logger: lg, + pgUsers: make(map[string]spec.PgUser), + systemUsers: make(map[string]spec.PgUser), + podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), + kubeResources: kubeResources, + masterLess: false, + userSyncStrategy: users.DefaultUserSyncStrategy{}, + deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, } return cluster @@ -143,16 +151,11 @@ func (c *Cluster) initUsers() error { return nil } -func (c *Cluster) Create(stopCh <-chan struct{}) error { +func (c *Cluster) Create() error { c.mu.Lock() defer c.mu.Unlock() var err error - if !c.podDispatcherRunning { - go c.podEventsDispatcher(stopCh) - c.podDispatcherRunning = true - } - defer func() { if err == nil { c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? @@ -460,7 +463,38 @@ func (c *Cluster) Delete() error { } func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { - c.podEvents <- event + c.podEventsQueue.Add(event) +} + +func (c *Cluster) podEventProcess(obj interface{}) error { + event, ok := obj.(spec.PodEvent) + if !ok { + return fmt.Errorf("could not cast to PodEvent") + } + + c.podSubscribersMu.RLock() + subscriber, ok := c.podSubscribers[event.PodName] + c.podSubscribersMu.RUnlock() + if ok { + subscriber <- event + } + + return nil +} + +func (c *Cluster) Run(stopCh <-chan struct{}) { + go c.processPodEventQueue(stopCh) +} + +func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + return + default: + c.podEventsQueue.Pop(cache.PopProcessFunc(c.podEventProcess)) + } + } } func (c *Cluster) initSystemUsers() { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 7773c8b34c159ccb293de61d139a60d7335184e9..363074d7306f4bc664f8679747bd0046926f40fa 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -146,23 +146,6 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { return nil } -func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { - c.logger.Infof("Watching '%s' cluster", c.ClusterName()) - for { - select { - case event := <-c.podEvents: - c.podSubscribersMu.RLock() - subscriber, ok := c.podSubscribers[event.PodName] - c.podSubscribersMu.RUnlock() - if ok { - go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? - } - case <-stopCh: - return - } - } -} - func (c *Cluster) recreatePods() error { ls := c.labelsSet() namespace := c.Metadata.Namespace diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index c16c1f3152b9af1c761dd8a32caa6f2846ccdc48..3ac35a2744b15ca937fc7d2999cc65116c8dac05 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -7,7 +7,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) -func (c *Cluster) Sync(stopCh <-chan struct{}) error { +func (c *Cluster) Sync() error { c.mu.Lock() defer c.mu.Unlock() @@ -16,11 +16,6 @@ func (c *Cluster) Sync(stopCh <-chan struct{}) error { c.logger.Errorf("could not load resources: %v", err) } - if !c.podDispatcherRunning { - go c.podEventsDispatcher(stopCh) - c.podDispatcherRunning = true - } - c.logger.Debugf("Syncing secrets") if err := c.syncSecrets(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 331d8a26741d64500c79de77d136e193bb07462d..41668c0336e9c49f238a422c0715cf38c82af3e6 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -62,10 +62,11 @@ func (c *Controller) podAdd(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventAdd, + ClusterName: c.PodClusterName(pod), + PodName: util.NameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.EventAdd, + ResourceVersion: pod.ResourceVersion, } c.podCh <- podEvent @@ -83,11 +84,12 @@ func (c *Controller) podUpdate(prev, cur interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(curPod), - PodName: util.NameFromMeta(curPod.ObjectMeta), - PrevPod: prevPod, - CurPod: curPod, - EventType: spec.EventUpdate, + ClusterName: c.PodClusterName(curPod), + PodName: util.NameFromMeta(curPod.ObjectMeta), + PrevPod: prevPod, + CurPod: curPod, + EventType: spec.EventUpdate, + ResourceVersion: curPod.ResourceVersion, } c.podCh <- podEvent @@ -100,27 +102,28 @@ func (c *Controller) podDelete(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventDelete, + ClusterName: c.PodClusterName(pod), + PodName: util.NameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.EventDelete, + ResourceVersion: pod.ResourceVersion, } c.podCh <- podEvent } func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { - c.logger.Infof("Watching all pod events") + c.logger.Debugln("Watching all pod events") for { select { case event := <-c.podCh: c.clustersMu.RLock() - subscriber, ok := c.clusters[event.ClusterName] + cluster, ok := c.clusters[event.ClusterName] c.clustersMu.RUnlock() if ok { c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) - go subscriber.ReceivePodEvent(event) + cluster.ReceivePodEvent(event) } case <-stopCh: return diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index a5298e7fc38b7f92a51300ff6f7e1abb3894230d..5e1919ecb46a446827d774622978dec5b4d80cc8 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -91,7 +91,6 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.RLock() cl, clusterFound := c.clusters[clusterName] - stopCh := c.stopChs[clusterName] c.clustersMu.RUnlock() switch event.EventType { @@ -105,13 +104,14 @@ func (c *Controller) processEvent(obj interface{}) error { stopCh := make(chan struct{}) cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl.Run(stopCh) c.clustersMu.Lock() c.clusters[clusterName] = cl c.stopChs[clusterName] = stopCh c.clustersMu.Unlock() - if err := cl.Create(stopCh); err != nil { + if err := cl.Create(); err != nil { cl.Error = fmt.Errorf("could not create cluster: %v", err) logger.Errorf("%v", cl.Error) @@ -158,8 +158,9 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { + stopCh := make(chan struct{}) cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) - stopCh = make(chan struct{}) + cl.Run(stopCh) c.clustersMu.Lock() c.clusters[clusterName] = cl @@ -167,7 +168,7 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.Unlock() } - if err := cl.Sync(stopCh); err != nil { + if err := cl.Sync(); err != nil { cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) logger.Errorf("%v", cl) return nil diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 9c88249f49fdd0c065c62280be7751342746c3a0..864f519287ca79560a1529ddd264117909b01e17 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -34,11 +34,12 @@ const ( ) type PodEvent struct { - ClusterName NamespacedName - PodName NamespacedName - PrevPod *v1.Pod - CurPod *v1.Pod - EventType EventType + ResourceVersion string + ClusterName NamespacedName + PodName NamespacedName + PrevPod *v1.Pod + CurPod *v1.Pod + EventType EventType } type PgUser struct {