Skip to content
Snippets Groups Projects
Select Git revision
  • 97fc3e61356c82f7d4006e705cc8b3d2bd91100b
  • main default protected
  • renovate/main-ghcr.io-renovatebot-base-image-11.x
  • chore/maintainers-rarkins
  • refactor/pin-new-value
  • fix/user-agent
  • feat/37517-base64-private-key
  • next
  • feat/gnupg
  • fix/36615b-branch-reuse-no-cache
  • chore/punycode
  • feat/36219--git-x509-signing
  • feat/structured-logger
  • hotfix/39.264.1
  • feat/skip-dangling
  • gh-readonly-queue/next/pr-36034-7a061c4ca1024a19e2c295d773d9642625d1c2be
  • hotfix/39.238.3
  • refactor/gitlab-auto-approve
  • feat/template-strings
  • gh-readonly-queue/next/pr-35654-137d934242c784e0c45d4b957362214f0eade1d7
  • fix/32307-global-extends-merging
  • 41.84.0
  • 41.83.2
  • 41.83.1
  • 41.83.0
  • 41.82.10
  • 41.82.9
  • 41.82.8
  • 41.82.7
  • 41.82.6
  • 41.82.5
  • 41.82.4
  • 41.82.3
  • 41.82.2
  • 41.82.1
  • 41.82.0
  • 41.81.6
  • 41.81.5
  • 41.81.4
  • 41.81.3
  • 41.81.2
41 results

yarn.js

Blame
  • controller.go 3.64 KiB
    package controller
    
    import (
    	"fmt"
    	"sync"
    
    	"github.com/Sirupsen/logrus"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/pkg/api/v1"
    	"k8s.io/client-go/rest"
    	"k8s.io/client-go/tools/cache"
    
    	"github.com/zalando-incubator/postgres-operator/pkg/cluster"
    	"github.com/zalando-incubator/postgres-operator/pkg/spec"
    	"github.com/zalando-incubator/postgres-operator/pkg/util/config"
    	"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
    )
    
    type Config struct {
    	RestConfig          *rest.Config
    	KubeClient          *kubernetes.Clientset
    	RestClient          *rest.RESTClient
    	TeamsAPIClient      *teams.API
    	InfrastructureRoles map[string]spec.PgUser
    }
    
    type Controller struct {
    	Config
    	opConfig    *config.Config
    	logger      *logrus.Entry
    
    	clustersMu sync.RWMutex
    	clusters   map[spec.NamespacedName]*cluster.Cluster
    	stopChs    map[spec.NamespacedName]chan struct{}
    
    	postgresqlInformer cache.SharedIndexInformer
    	podInformer        cache.SharedIndexInformer
    	podCh              chan spec.PodEvent
    
    	clusterEventQueues []*cache.FIFO
    }
    
    func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
    	logger := logrus.New()
    
    	if operatorConfig.DebugLogging {
    		logger.Level = logrus.DebugLevel
    	}
    
    	controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger, operatorConfig.EnableTeamsAPI)
    	return &Controller{
    		Config:   *controllerConfig,
    		opConfig: operatorConfig,
    		logger:   logger.WithField("pkg", "controller"),
    		clusters: make(map[spec.NamespacedName]*cluster.Cluster),
    		stopChs:  make(map[spec.NamespacedName]chan struct{}),
    		podCh:    make(chan spec.PodEvent),
    	}
    }
    
    func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
    	defer wg.Done()
    	wg.Add(1)
    
    	c.initController()
    
    	go c.runInformers(stopCh)
    
    	for i := range c.clusterEventQueues {
    		go c.processClusterEventsQueue(i)
    	}
    
    	c.logger.Info("Started working in background")
    }
    
    func (c *Controller) initController() {
    	if err := c.createTPR(); err != nil {
    		c.logger.Fatalf("Can't register ThirdPartyResource: %s", err)
    	}
    
    	c.TeamsAPIClient.RefreshTokenAction = c.getOAuthToken
    	if infraRoles, err := c.getInfrastructureRoles(); err != nil {
    		c.logger.Warningf("Can't get infrastructure roles: %s", err)
    	} else {
    		c.InfrastructureRoles = infraRoles
    	}
    
    	// Postgresqls
    	clusterLw := &cache.ListWatch{
    		ListFunc:  c.clusterListFunc,
    		WatchFunc: c.clusterWatchFunc,
    	}
    	c.postgresqlInformer = cache.NewSharedIndexInformer(
    		clusterLw,
    		&spec.Postgresql{},
    		c.opConfig.ResyncPeriod,
    		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    
    	c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    c.postgresqlAdd,
    		UpdateFunc: c.postgresqlUpdate,
    		DeleteFunc: c.postgresqlDelete,
    	})
    
    	// Pods
    	podLw := &cache.ListWatch{
    		ListFunc:  c.podListFunc,
    		WatchFunc: c.podWatchFunc,
    	}
    
    	c.podInformer = cache.NewSharedIndexInformer(
    		podLw,
    		&v1.Pod{},
    		c.opConfig.ResyncPeriodPod,
    		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    
    	c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    c.podAdd,
    		UpdateFunc: c.podUpdate,
    		DeleteFunc: c.podDelete,
    	})
    
    	c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
    	for i := range c.clusterEventQueues {
    		c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
    			e, ok := obj.(spec.ClusterEvent)
    			if !ok {
    				return "", fmt.Errorf("Can't cast to ClusterEvent")
    			}
    
    			return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
    		})
    	}
    }
    
    func (c *Controller) runInformers(stopCh <-chan struct{}) {
    	go c.postgresqlInformer.Run(stopCh)
    	go c.podInformer.Run(stopCh)
    	go c.podEventsDispatcher(stopCh)
    
    	<-stopCh
    }