Skip to content
Snippets Groups Projects
Commit 5ff6d6a6 authored by Murat Kabilov's avatar Murat Kabilov Committed by GitHub
Browse files

Discard cluster events from the queue on cluster delete

parents c557027e 83760ebb
No related branches found
No related tags found
No related merge requests found
...@@ -93,7 +93,7 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { ...@@ -93,7 +93,7 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
s.logger.Fatalf("Could not start http server: %v", err) s.logger.Fatalf("Could not start http server: %v", err)
} }
}() }()
s.logger.Infof("Listening on %s", s.http.Addr) s.logger.Infof("listening on %s", s.http.Addr)
<-stopCh <-stopCh
......
...@@ -132,12 +132,12 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { ...@@ -132,12 +132,12 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
DoRaw() DoRaw()
if k8sutil.ResourceNotFound(err) { if k8sutil.ResourceNotFound(err) {
c.logger.Warningf("could not set status for the non-existing cluster") c.logger.Warningf("could not set %q status for the non-existing cluster", status)
return return
} }
if err != nil { if err != nil {
c.logger.Warningf("could not set status for the cluster: %v", err) c.logger.Warningf("could not set %q status for the cluster: %v", status, err)
} }
} }
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
...@@ -169,7 +170,7 @@ func (c *Controller) initController() { ...@@ -169,7 +170,7 @@ func (c *Controller) initController() {
return "", fmt.Errorf("could not cast to ClusterEvent") return "", fmt.Errorf("could not cast to ClusterEvent")
} }
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil return queueClusterKey(e.EventType, e.UID), nil
}) })
} }
...@@ -206,3 +207,7 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait ...@@ -206,3 +207,7 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait
c.postgresqlInformer.Run(stopCh) c.postgresqlInformer.Run(stopCh)
} }
func queueClusterKey(eventType spec.EventType, uid types.UID) string {
return fmt.Sprintf("%s-%s", eventType, uid)
}
...@@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { ...@@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
cl.Error = nil cl.Error = nil
lg.Infoln("cluster has been updated") lg.Infoln("cluster has been updated")
case spec.EventDelete: case spec.EventDelete:
teamName := strings.ToLower(cl.Spec.TeamID)
lg.Infoln("Deletion of the cluster started")
if !clusterFound { if !clusterFound {
lg.Errorf("unknown cluster: %q", clusterName) lg.Errorf("unknown cluster: %q", clusterName)
return return
} }
lg.Infoln("deletion of the cluster started")
if err := cl.Delete(); err != nil { teamName := strings.ToLower(cl.Spec.TeamID)
lg.Errorf("could not delete cluster: %v", err)
return
}
func() { func() {
defer c.clustersMu.Unlock() defer c.clustersMu.Unlock()
c.clustersMu.Lock() c.clustersMu.Lock()
delete(c.clusters, clusterName) delete(c.clusters, clusterName)
delete(c.clusterLogs, clusterName) delete(c.clusterLogs, clusterName)
for i, val := range c.teamClusters[teamName] { // on relativel for i, val := range c.teamClusters[teamName] {
if val == clusterName { if val == clusterName {
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
...@@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { ...@@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
} }
}() }()
if err := cl.Delete(); err != nil {
lg.Errorf("could not delete cluster: %v", err)
return
}
lg.Infof("cluster has been deleted") lg.Infof("cluster has been deleted")
case spec.EventSync: case spec.EventSync:
lg.Infof("syncing of the cluster started") lg.Infof("syncing of the cluster started")
...@@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec ...@@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
NewSpec: new, NewSpec: new,
WorkerID: workerID, WorkerID: workerID,
} }
//TODO: if we delete cluster, discard all the previous events for the cluster
lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
lg.Errorf("error when queueing cluster event: %v", clusterEvent) lg.Errorf("error while queueing cluster event: %v", clusterEvent)
} }
lg.Infof("%q event has been queued", eventType) lg.Infof("%q event has been queued", eventType)
if eventType != spec.EventDelete {
return
}
for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} {
obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
if err != nil {
lg.Warningf("could not get event from the queue: %v", err)
continue
}
if !exists {
continue
}
err = c.clusterEventQueues[workerID].Delete(obj)
if err != nil {
lg.Warningf("could not delete event from the queue: %v", err)
} else {
lg.Debugf("event %q has been discarded for the cluster", evType)
}
}
} }
func (c *Controller) postgresqlAdd(obj interface{}) { func (c *Controller) postgresqlAdd(obj interface{}) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment