Skip to content
Snippets Groups Projects
Commit dad8e2f4 authored by Murat Kabilov's avatar Murat Kabilov
Browse files

make cluster event queue consumption non-blocking

parent d2828e5e
Branches
Tags
No related merge requests found
...@@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam ...@@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
return cl return cl
} }
func (c *Controller) processEvent(obj interface{}) error { func (c *Controller) processEvent(event spec.ClusterEvent) {
var clusterName spec.NamespacedName var clusterName spec.NamespacedName
event, ok := obj.(spec.ClusterEvent)
if !ok {
return fmt.Errorf("could not cast to ClusterEvent")
}
lg := c.logger.WithField("worker", event.WorkerID) lg := c.logger.WithField("worker", event.WorkerID)
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
...@@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error { ...@@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error {
case spec.EventAdd: case spec.EventAdd:
if clusterFound { if clusterFound {
lg.Debugf("cluster already exists") lg.Debugf("cluster already exists")
return nil return
} }
lg.Infof("creation of the cluster started") lg.Infof("creation of the cluster started")
...@@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error { ...@@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error {
cl.Error = fmt.Errorf("could not create cluster: %v", err) cl.Error = fmt.Errorf("could not create cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
return nil return
} }
lg.Infoln("cluster has been created") lg.Infoln("cluster has been created")
...@@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error { ...@@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error {
if !clusterFound { if !clusterFound {
lg.Warnln("cluster does not exist") lg.Warnln("cluster does not exist")
return nil return
} }
if err := cl.Update(event.NewSpec); err != nil { if err := cl.Update(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not update cluster: %v", err) cl.Error = fmt.Errorf("could not update cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
return nil return
} }
cl.Error = nil cl.Error = nil
lg.Infoln("cluster has been updated") lg.Infoln("cluster has been updated")
...@@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error { ...@@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error {
lg.Infoln("Deletion of the cluster started") lg.Infoln("Deletion of the cluster started")
if !clusterFound { if !clusterFound {
lg.Errorf("unknown cluster: %q", clusterName) lg.Errorf("unknown cluster: %q", clusterName)
return nil return
} }
if err := cl.Delete(); err != nil { if err := cl.Delete(); err != nil {
lg.Errorf("could not delete cluster: %v", err) lg.Errorf("could not delete cluster: %v", err)
return nil return
} }
func() { func() {
...@@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error { ...@@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error {
if err := cl.Sync(); err != nil { if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err) cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
return nil return
} }
cl.Error = nil cl.Error = nil
lg.Infof("cluster has been synced") lg.Infof("cluster has been synced")
} }
return nil
} }
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
...@@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, ...@@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
}() }()
for { for {
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil }))
if err != nil {
if err == cache.FIFOClosedError { if err == cache.FIFOClosedError {
return return
} }
c.logger.Errorf("error when processing cluster events queue: %v", err) c.logger.Errorf("error when processing cluster events queue: %v", err)
continue
} }
event, ok := obj.(spec.ClusterEvent)
if !ok {
c.logger.Errorf("could not cast to ClusterEvent")
}
c.processEvent(event)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment