Skip to content
Snippets Groups Projects
Commit 1a4cff65 authored by Murat Kabilov's avatar Murat Kabilov
Browse files

Merge branch 'master' into feature/diagnostic-rest-api

# Conflicts:
#	pkg/controller/controller.go
parents ed0db719 2fe22ff6
Branches
No related tags found
No related merge requests found
......@@ -175,10 +175,9 @@ func (c *Controller) initController() {
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
c.initController()
wg.Add(5)
wg.Add(4)
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.podEventsDispatcher(stopCh, wg)
go c.clusterResync(stopCh, wg)
go c.apiserver.Run(stopCh, wg)
......
package controller
import (
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
......@@ -42,6 +40,16 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface,
return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts)
}
func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spec.PodEvent) {
c.clustersMu.RLock()
cluster, ok := c.clusters[clusterName]
c.clustersMu.RUnlock()
if ok {
c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, clusterName)
cluster.ReceivePodEvent(event)
}
}
func (c *Controller) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
......@@ -49,14 +57,13 @@ func (c *Controller) podAdd(obj interface{}) {
}
podEvent := spec.PodEvent{
ClusterName: c.podClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod,
EventType: spec.EventAdd,
ResourceVersion: pod.ResourceVersion,
}
c.podCh <- podEvent
c.dispatchPodEvent(c.podClusterName(pod), podEvent)
}
func (c *Controller) podUpdate(prev, cur interface{}) {
......@@ -71,7 +78,6 @@ func (c *Controller) podUpdate(prev, cur interface{}) {
}
podEvent := spec.PodEvent{
ClusterName: c.podClusterName(curPod),
PodName: util.NameFromMeta(curPod.ObjectMeta),
PrevPod: prevPod,
CurPod: curPod,
......@@ -79,7 +85,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) {
ResourceVersion: curPod.ResourceVersion,
}
c.podCh <- podEvent
c.dispatchPodEvent(c.podClusterName(curPod), podEvent)
}
func (c *Controller) podDelete(obj interface{}) {
......@@ -89,33 +95,11 @@ func (c *Controller) podDelete(obj interface{}) {
}
podEvent := spec.PodEvent{
ClusterName: c.podClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod,
EventType: spec.EventDelete,
ResourceVersion: pod.ResourceVersion,
}
c.podCh <- podEvent
}
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
c.logger.Debugln("Watching all pod events")
for {
select {
case event := <-c.podCh:
c.clustersMu.RLock()
cluster, ok := c.clusters[event.ClusterName]
c.clustersMu.RUnlock()
if ok {
c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName)
cluster.ReceivePodEvent(event)
}
case <-stopCh:
return
}
}
c.dispatchPodEvent(c.podClusterName(pod), podEvent)
}
......@@ -43,7 +43,6 @@ const (
// PodEvent describes the event for a single Pod
type PodEvent struct {
ResourceVersion string
ClusterName NamespacedName
PodName NamespacedName
PrevPod *v1.Pod
CurPod *v1.Pod
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment