Skip to content
Snippets Groups Projects
Select Git revision
  • 56964fd99c7c8b66a1acd5644b564ece0e28cbe9
  • master default protected
  • spilo-wale-removal
  • dependabot/go_modules/golang.org/x/oauth2-0.27.0
  • dependabot/go_modules/golang.org/x/net-0.38.0
  • dependabot/pip/ui/requests-2.32.4
  • bug-upgrade
  • gh-pages
  • patroni-4-integration
  • remove-zappr
  • ignore-auto-upgrade
  • arm-pooler
  • update-go-and-deps
  • pluralsh-liveness-probe
  • silenium-dev-master
  • bump-v1.9.1
  • enable-query-logging
  • bump-v1.7.1
  • resize-mixed-mode
  • instance-annotation
  • bump-v1.8.2
  • v1.14.0
  • v1.13.0
  • v1.12.2
  • v1.12.1
  • v1.12.0
  • v1.11.0
  • v1.10.1
  • v1.10.0
  • v1.9.0
  • v1.8.2
  • v1.8.1
  • v1.8.0
  • v1.7.1
  • v1.7.0
  • v1.6.3
  • v1.6.2
  • v1.6.1
  • v1.6.0
  • v1.5.0
  • v1.4.0
41 results

controller.go

Blame
  • controller.go 3.64 KiB
    package controller
    
    import (
    	"fmt"
    	"sync"
    
    	"github.com/Sirupsen/logrus"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/pkg/api/v1"
    	"k8s.io/client-go/rest"
    	"k8s.io/client-go/tools/cache"
    
    	"github.com/zalando-incubator/postgres-operator/pkg/cluster"
    	"github.com/zalando-incubator/postgres-operator/pkg/spec"
    	"github.com/zalando-incubator/postgres-operator/pkg/util/config"
    	"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
    )
    
    type Config struct {
    	RestConfig          *rest.Config
    	KubeClient          *kubernetes.Clientset
    	RestClient          *rest.RESTClient
    	TeamsAPIClient      *teams.API
    	InfrastructureRoles map[string]spec.PgUser
    }
    
    type Controller struct {
    	Config
    	opConfig    *config.Config
    	logger      *logrus.Entry
    
    	clustersMu sync.RWMutex
    	clusters   map[spec.NamespacedName]*cluster.Cluster
    	stopChs    map[spec.NamespacedName]chan struct{}
    
    	postgresqlInformer cache.SharedIndexInformer
    	podInformer        cache.SharedIndexInformer
    	podCh              chan spec.PodEvent
    
    	clusterEventQueues []*cache.FIFO
    }
    
    func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
    	logger := logrus.New()
    
    	if operatorConfig.DebugLogging {
    		logger.Level = logrus.DebugLevel
    	}
    
    	controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger, operatorConfig.EnableTeamsAPI)
    	return &Controller{
    		Config:   *controllerConfig,
    		opConfig: operatorConfig,
    		logger:   logger.WithField("pkg", "controller"),
    		clusters: make(map[spec.NamespacedName]*cluster.Cluster),
    		stopChs:  make(map[spec.NamespacedName]chan struct{}),
    		podCh:    make(chan spec.PodEvent),
    	}
    }
    
    func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
    	defer wg.Done()
    	wg.Add(1)
    
    	c.initController()
    
    	go c.runInformers(stopCh)
    
    	for i := range c.clusterEventQueues {
    		go c.processClusterEventsQueue(i)
    	}
    
    	c.logger.Info("Started working in background")
    }
    
    func (c *Controller) initController() {
    	if err := c.createTPR(); err != nil {
    		c.logger.Fatalf("Can't register ThirdPartyResource: %s", err)
    	}
    
    	c.TeamsAPIClient.RefreshTokenAction = c.getOAuthToken
    	if infraRoles, err := c.getInfrastructureRoles(); err != nil {
    		c.logger.Warningf("Can't get infrastructure roles: %s", err)
    	} else {
    		c.InfrastructureRoles = infraRoles
    	}
    
    	// Postgresqls
    	clusterLw := &cache.ListWatch{
    		ListFunc:  c.clusterListFunc,
    		WatchFunc: c.clusterWatchFunc,
    	}
    	c.postgresqlInformer = cache.NewSharedIndexInformer(
    		clusterLw,
    		&spec.Postgresql{},
    		c.opConfig.ResyncPeriod,
    		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    
    	c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    c.postgresqlAdd,
    		UpdateFunc: c.postgresqlUpdate,
    		DeleteFunc: c.postgresqlDelete,
    	})
    
    	// Pods
    	podLw := &cache.ListWatch{
    		ListFunc:  c.podListFunc,
    		WatchFunc: c.podWatchFunc,
    	}
    
    	c.podInformer = cache.NewSharedIndexInformer(
    		podLw,
    		&v1.Pod{},
    		c.opConfig.ResyncPeriodPod,
    		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    
    	c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    c.podAdd,
    		UpdateFunc: c.podUpdate,
    		DeleteFunc: c.podDelete,
    	})
    
    	c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
    	for i := range c.clusterEventQueues {
    		c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
    			e, ok := obj.(spec.ClusterEvent)
    			if !ok {
    				return "", fmt.Errorf("Can't cast to ClusterEvent")
    			}
    
    			return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
    		})
    	}
    }
    
    func (c *Controller) runInformers(stopCh <-chan struct{}) {
    	go c.postgresqlInformer.Run(stopCh)
    	go c.podInformer.Run(stopCh)
    	go c.podEventsDispatcher(stopCh)
    
    	<-stopCh
    }