diff --git a/cmd/main.go b/cmd/main.go index c38eb2e91eab8cdde4e91775448b800afec05db0..3093cedb85b52928ba5426e35e73ff463d25ea4f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,64 +9,41 @@ import ( "syscall" "github.com/zalando-incubator/postgres-operator/pkg/controller" - "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) var ( - KubeConfigFile string - podNamespace string - configMapName spec.NamespacedName - OutOfCluster bool - noTeamsAPI bool - noDatabaseAccess bool - version string + KubeConfigFile string + OutOfCluster bool + version string + + config controller.Config ) func init() { flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") - flag.BoolVar(&noDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") - flag.BoolVar(&noTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") + flag.BoolVar(&config.NoDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") + flag.BoolVar(&config.NoTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") flag.Parse() - podNamespace = os.Getenv("MY_POD_NAMESPACE") - if podNamespace == "" { - podNamespace = "default" + config.Namespace = os.Getenv("MY_POD_NAMESPACE") + if config.Namespace == "" { + config.Namespace = "default" } configMap := os.Getenv("CONFIG_MAP_NAME") if configMap != "" { - configMapName.Decode(configMap) - } -} - -func ControllerConfig() *controller.Config { - restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) - if err != nil { - log.Fatalf("Can't get REST config: %s", err) - } - - client, err := k8sutil.ClientSet(restConfig) - if err != nil { - log.Fatalf("Can't create client: %s", err) - } - - restClient, err := k8sutil.KubernetesRestClient(restConfig) - if err != nil { - log.Fatalf("Can't create rest client: %s", err) - } - - return &controller.Config{ - RestConfig: restConfig, - KubeClient: k8sutil.NewFromKubernetesInterface(client), - RestClient: restClient, + err := config.ConfigMapName.Decode(configMap) + if err != nil { + log.Fatalf("incorrect config map name") + } } } func main() { - configMapData := make(map[string]string) + var err error + log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) @@ -76,32 +53,13 @@ func main() { wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - controllerConfig := ControllerConfig() - - if configMapName != (spec.NamespacedName{}) { - configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name) - if err != nil { - panic(err) - } - - configMapData = configMap.Data - } else { - log.Printf("No ConfigMap specified. Loading default values") - } - if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var - configMapData["namespace"] = podNamespace - } - if noDatabaseAccess { - configMapData["enable_database_access"] = "false" - } - if noTeamsAPI { - configMapData["enable_teams_api"] = "false" + config.RestConfig, err = k8sutil.RestConfig(KubeConfigFile, OutOfCluster) + if err != nil { + log.Fatalf("couldn't get REST config: %v", err) } - cfg := config.NewFromMap(configMapData) - log.Printf("Config: %s", cfg.MustMarshal()) + c := controller.NewController(&config) - c := controller.NewController(controllerConfig, cfg) c.Run(stop, wg) sig := <-sigs diff --git a/glide.lock b/glide.lock index 24db2b0d8a54a42ce155be26861426c2458c0ad4..f9a3aa4b1dda30241bc1226f0b181201dbb89fec 100644 --- a/glide.lock +++ b/glide.lock @@ -1,13 +1,8 @@ -hash: 427db08c70ab32596f9230f0111e24996f73b1b66ddd7365dd0b1b38c0ae367f -updated: 2017-05-19T17:11:37.120200516+02:00 +hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 +updated: 2017-07-24T19:24:17.604824235+02:00 imports: -- name: cloud.google.com/go - version: 3b1ae45394a234c385be014e9a488f2bb6eef821 - subpackages: - - compute/metadata - - internal - name: github.com/aws/aws-sdk-go - version: e766cfe96ef7320817087fa4cd92c09abdb87310 + version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd subpackages: - aws - aws/awserr @@ -25,40 +20,17 @@ imports: - aws/request - aws/session - aws/signer/v4 + - internal/shareddefaults - private/protocol - private/protocol/ec2query - - private/protocol/json/jsonutil - - private/protocol/jsonrpc - private/protocol/query - private/protocol/query/queryutil - private/protocol/rest - - private/protocol/restxml - private/protocol/xml/xmlutil - - private/waiter - - service/autoscaling - service/ec2 - - service/ecr - - service/elb - - service/route53 - service/sts -- name: github.com/blang/semver - version: 31b736133b98f26d5e078ec9eb591666edfd091f -- name: github.com/coreos/go-oidc - version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d - subpackages: - - http - - jose - - key - - oauth2 - - oidc -- name: github.com/coreos/pkg - version: fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 - subpackages: - - health - - httputil - - timeutil - name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + version: 782f4967f2dc4564575ca782fe2d04090b5faca8 subpackages: - spew - name: github.com/docker/distribution @@ -71,49 +43,52 @@ imports: subpackages: - spdy - name: github.com/emicklei/go-restful - version: 89ef8af493ab468a45a42bb0d89a06fccdd2fb22 + version: ff4f55a206334ef123e4f79bbf348980da81ca46 subpackages: - log - - swagger +- name: github.com/emicklei/go-restful-swagger12 + version: dcef7f55730566d41eae5db10e7d6981829720f6 - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-ini/ini - version: 2e44421e256d82ebbf3d4d4fcabe8930b905eff3 + version: 3d73f4b845efdf9989fffd4b4e562727744a34ba +- name: github.com/go-openapi/analysis + version: b44dc874b601d9e4e2f6e19140e794ba24bead3b - name: github.com/go-openapi/jsonpointer version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 - name: github.com/go-openapi/jsonreference version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/loads + version: 18441dfa706d924a39a030ee2c3b1d8d81917b38 - name: github.com/go-openapi/spec version: 6aced65f8501fe1217321abf0749d354824ba2ff - name: github.com/go-openapi/swag version: 1d0bd113de87027671077d3c71eb3ac5d7dbba72 - name: github.com/gogo/protobuf - version: 100ba4e885062801d56799d78530b73b178a78f3 + version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 subpackages: - proto - sortkeys - name: github.com/golang/glog version: 44145f04b68cf362d9c4df2182967c2275eaefed -- name: github.com/golang/protobuf - version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67 - subpackages: - - proto - name: github.com/google/gofuzz - version: bbcb9da2d746f8bdbd6a936686a0a6067ada0ec5 + version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/hashicorp/golang-lru + version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + subpackages: + - simplelru - name: github.com/howeyc/gopass - version: 3ca23474a7c7203e0a0a070fd33508f6efdb9b3d + version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo version: 6633656539c1639d9d78127b7d47c622b5d7b6dc - name: github.com/jmespath/go-jmespath - version: 3433f3ea46d9f8019119e7dd41274e112a2359a9 -- name: github.com/jonboulle/clockwork - version: 72f9bd7c4e0c2a40055ab3d0f09654f730cce982 + version: bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d - name: github.com/juju/ratelimit - version: 77ed1c8a01217656d2080ad51981f6e99adaa177 + version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 - name: github.com/kr/text version: 7cafcd837844e784b526369c9bce262804aebc60 - name: github.com/lib/pq - version: 2704adc878c21e1329f46f6e56a1c387d788ff94 + version: dd1fe2071026ce53f36a39112e645b4d4f5793a4 subpackages: - oid - name: github.com/mailru/easyjson @@ -124,44 +99,34 @@ imports: - jwriter - name: github.com/motomux/pretty version: b2aad2c9a95d14eb978f29baa6e3a5c3c20eef30 -- name: github.com/pborman/uuid - version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f - subpackages: - - client + version: a3f95b5c423586578a4e099b11a46c2479628cac - name: github.com/spf13/pflag - version: 5ccb023bc27df288a957c5e994cd44fd19619465 + version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 - name: github.com/ugorji/go - version: f1f1a805ed361a0e078bb537e4ea78cd37dcf065 + version: ded73eae5db7e7a0ef6f55aace87a2873c5d2b74 subpackages: - codec - name: golang.org/x/crypto - version: 1f22c0103821b9390939b6776727195525381532 + version: d172538b2cfce0c13cee31e647d0367aa8cd2486 subpackages: - ssh/terminal - name: golang.org/x/net - version: e90d6d0afc4c315a0d87a568ae68577cc15149a0 + version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: - - context - - context/ctxhttp + - html + - html/atom - http2 - http2/hpack - idna - lex/httplex -- name: golang.org/x/oauth2 - version: 3c3a985cb79f52a3190fbc056984415ca6763d01 - subpackages: - - google - - internal - - jws - - jwt + - websocket - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f subpackages: - unix - name: golang.org/x/text @@ -177,25 +142,14 @@ imports: - unicode/bidi - unicode/norm - width -- name: google.golang.org/appengine - version: 4f7eeb5305a4ba1966344836ba4af9996b7b4e05 - subpackages: - - internal - - internal/app_identity - - internal/base - - internal/datastore - - internal/log - - internal/modules - - internal/remote_api - - internal/urlfetch - - urlfetch - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/apimachinery - version: 84c15da65eb86243c295d566203d7689cc6ac04b + version: abe34e4f5b4413c282a83011892cbeea5b32223b subpackages: + - pkg/api/equality - pkg/api/errors - pkg/api/meta - pkg/api/resource @@ -204,8 +158,10 @@ imports: - pkg/apimachinery/registered - pkg/apis/meta/v1 - pkg/apis/meta/v1/unstructured + - pkg/apis/meta/v1alpha1 - pkg/conversion - pkg/conversion/queryparams + - pkg/conversion/unstructured - pkg/fields - pkg/labels - pkg/openapi @@ -215,9 +171,13 @@ imports: - pkg/runtime/serializer/json - pkg/runtime/serializer/protobuf - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming - pkg/runtime/serializer/versioning - pkg/selection - pkg/types + - pkg/util/cache + - pkg/util/clock + - pkg/util/diff - pkg/util/errors - pkg/util/framer - pkg/util/httpstream @@ -225,6 +185,7 @@ imports: - pkg/util/intstr - pkg/util/json - pkg/util/net + - pkg/util/rand - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets @@ -232,116 +193,76 @@ imports: - pkg/util/validation/field - pkg/util/wait - pkg/util/yaml + - pkg/version - pkg/watch - third_party/forked/golang/netutil - third_party/forked/golang/reflect - name: k8s.io/client-go - version: e121606b0d09b2e1c467183ee46217fa85a6b672 + version: df46f7f13b3da19b90b8b4f0d18b8adc6fbf28dc subpackages: - discovery - kubernetes + - kubernetes/scheme + - kubernetes/typed/admissionregistration/v1alpha1 - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/authentication/v1 - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1 - kubernetes/typed/authorization/v1beta1 - kubernetes/typed/autoscaling/v1 + - kubernetes/typed/autoscaling/v2alpha1 - kubernetes/typed/batch/v1 - kubernetes/typed/batch/v2alpha1 - - kubernetes/typed/certificates/v1alpha1 + - kubernetes/typed/certificates/v1beta1 - kubernetes/typed/core/v1 - kubernetes/typed/extensions/v1beta1 + - kubernetes/typed/networking/v1 - kubernetes/typed/policy/v1beta1 - kubernetes/typed/rbac/v1alpha1 + - kubernetes/typed/rbac/v1beta1 + - kubernetes/typed/settings/v1alpha1 + - kubernetes/typed/storage/v1 - kubernetes/typed/storage/v1beta1 - pkg/api - - pkg/api/errors - - pkg/api/install - - pkg/api/meta - - pkg/api/meta/metatypes - - pkg/api/resource - - pkg/api/unversioned - pkg/api/v1 - - pkg/api/validation/path - - pkg/apimachinery - - pkg/apimachinery/announced - - pkg/apimachinery/registered + - pkg/api/v1/ref + - pkg/apis/admissionregistration + - pkg/apis/admissionregistration/v1alpha1 - pkg/apis/apps - - pkg/apis/apps/install - pkg/apis/apps/v1beta1 - pkg/apis/authentication - - pkg/apis/authentication/install + - pkg/apis/authentication/v1 - pkg/apis/authentication/v1beta1 - pkg/apis/authorization - - pkg/apis/authorization/install + - pkg/apis/authorization/v1 - pkg/apis/authorization/v1beta1 - pkg/apis/autoscaling - - pkg/apis/autoscaling/install - pkg/apis/autoscaling/v1 + - pkg/apis/autoscaling/v2alpha1 - pkg/apis/batch - - pkg/apis/batch/install - pkg/apis/batch/v1 - pkg/apis/batch/v2alpha1 - pkg/apis/certificates - - pkg/apis/certificates/install - - pkg/apis/certificates/v1alpha1 + - pkg/apis/certificates/v1beta1 - pkg/apis/extensions - - pkg/apis/extensions/install - pkg/apis/extensions/v1beta1 + - pkg/apis/networking + - pkg/apis/networking/v1 - pkg/apis/policy - - pkg/apis/policy/install - pkg/apis/policy/v1beta1 - pkg/apis/rbac - - pkg/apis/rbac/install - pkg/apis/rbac/v1alpha1 + - pkg/apis/rbac/v1beta1 + - pkg/apis/settings + - pkg/apis/settings/v1alpha1 - pkg/apis/storage - - pkg/apis/storage/install + - pkg/apis/storage/v1 - pkg/apis/storage/v1beta1 - - pkg/auth/user - - pkg/conversion - - pkg/conversion/queryparams - - pkg/fields - - pkg/genericapiserver/openapi/common - - pkg/labels - - pkg/runtime - - pkg/runtime/serializer - - pkg/runtime/serializer/json - - pkg/runtime/serializer/protobuf - - pkg/runtime/serializer/recognizer - - pkg/runtime/serializer/streaming - - pkg/runtime/serializer/versioning - - pkg/selection - - pkg/third_party/forked/golang/reflect - - pkg/third_party/forked/golang/template - - pkg/types - pkg/util - - pkg/util/cert - - pkg/util/clock - - pkg/util/diff - - pkg/util/errors - - pkg/util/flowcontrol - - pkg/util/framer - - pkg/util/homedir - - pkg/util/integer - - pkg/util/intstr - - pkg/util/json - - pkg/util/jsonpath - - pkg/util/labels - - pkg/util/net - pkg/util/parsers - - pkg/util/rand - - pkg/util/runtime - - pkg/util/sets - - pkg/util/uuid - - pkg/util/validation - - pkg/util/validation/field - - pkg/util/wait - - pkg/util/yaml - pkg/version - - pkg/watch - - pkg/watch/versioned - - plugin/pkg/client/auth - - plugin/pkg/client/auth/gcp - - plugin/pkg/client/auth/oidc - rest + - rest/watch - tools/auth - tools/cache - tools/clientcmd @@ -349,11 +270,11 @@ imports: - tools/clientcmd/api/latest - tools/clientcmd/api/v1 - tools/metrics + - tools/remotecommand - transport -- name: k8s.io/kubernetes - version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 - subpackages: - - pkg/api - - pkg/client/unversioned/remotecommand - - pkg/util/exec + - util/cert + - util/exec + - util/flowcontrol + - util/homedir + - util/integer testImports: [] diff --git a/glide.yaml b/glide.yaml index 12ad3c548524dae544c211d1688c598635ac1b6c..fead695873476449623588a4acb962eb43b9a32f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,26 +1,39 @@ package: github.com/zalando-incubator/postgres-operator import: -- package: github.com/gogo/protobuf - version: ^0.3.0 - package: github.com/Sirupsen/logrus - version: ^0.11.5 + version: ^1.0.1 +- package: github.com/aws/aws-sdk-go + version: ^1.8.24 subpackages: - - client + - aws + - aws/session + - service/ec2 - package: github.com/lib/pq - package: github.com/motomux/pretty -- package: golang.org/x/net - subpackages: - - context - package: k8s.io/apimachinery - version: 84c15da65eb86243c295d566203d7689cc6ac04b subpackages: - - pkg/util/json + - pkg/api/errors + - pkg/api/meta + - pkg/api/resource + - pkg/apis/meta/v1 + - pkg/fields + - pkg/labels + - pkg/runtime + - pkg/runtime/schema + - pkg/runtime/serializer + - pkg/types + - pkg/util/intstr - pkg/util/remotecommand + - pkg/watch - package: k8s.io/client-go - version: ^2.0.0 -- package: k8s.io/kubernetes - version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 + version: ^4.0.0-beta.0 subpackages: - - pkg/client/unversioned/remotecommand -- package: github.com/aws/aws-sdk-go - version: ^1.8.24 + - kubernetes + - pkg/api + - pkg/api/v1 + - pkg/apis/apps/v1beta1 + - pkg/apis/extensions/v1beta1 + - rest + - tools/cache + - tools/clientcmd + - tools/remotecommand diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8683803b9f871e38b0256b97d29e4b279b072e3a..1d7ffef9547e706f14b94fef6b8f94ff92ffcfd4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,10 +11,10 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -35,11 +35,8 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient k8sutil.KubernetesClient - RestClient *rest.RESTClient - RestConfig *rest.Config - TeamsAPIClient *teams.API OpConfig config.Config + RestConfig *rest.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller } @@ -65,8 +62,11 @@ type Cluster struct { mu sync.Mutex masterLess bool userSyncStrategy spec.UserSyncer - deleteOptions *v1.DeleteOptions + deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO + + teamsAPIClient *teams.API + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? } type compareStatefulsetResult struct { @@ -77,8 +77,8 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { - lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) +func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { + lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true @@ -101,15 +101,17 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { kubeResources: kubeResources, masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, + KubeClient: kubeClient, + teamsAPIClient: teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger), } return cluster } func (c *Cluster) clusterName() spec.NamespacedName { - return util.NameFromMeta(c.Metadata) + return util.NameFromMeta(c.ObjectMeta) } func (c *Cluster) teamName() string { @@ -125,8 +127,8 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods - _, err = c.RestClient.Patch(api.MergePatchType). - RequestURI(c.Metadata.GetSelfLink()). + _, err = c.KubeClient.RESTClient.Patch(types.MergePatchType). + RequestURI(c.GetSelfLink()). Body(request). DoRaw() @@ -136,7 +138,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } if err != nil { - c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) + c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err) } } @@ -179,7 +181,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create endpoint: %v", err) } - c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) for _, role := range []PostgresRole{Master, Replica} { if role == Replica && !c.Spec.ReplicaLoadBalancer { @@ -189,7 +191,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { @@ -206,12 +208,12 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } - c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") if err = c.waitStatefulsetPodsReady(); err != nil { - c.logger.Errorf("Failed to create cluster: %s", err) + c.logger.Errorf("Failed to create cluster: %v", err) return err } c.logger.Infof("pods are ready") @@ -232,7 +234,7 @@ func (c *Cluster) Create() error { err = c.listResources() if err != nil { - c.logger.Errorf("could not list resources: %s", err) + c.logger.Errorf("could not list resources: %v", err) } return nil @@ -242,7 +244,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match //TODO: improve comparison match = true if c.Service[role].Spec.Type != service.Spec.Type { - return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", + return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", role, service.Spec.Type, c.Service[role].Spec.Type) } oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges @@ -258,7 +260,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] if oldDNSAnnotation != newDNSAnnotation { - return false, fmt.Sprintf("new %s service's '%s' annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) + return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) } return true, "" @@ -289,7 +291,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { - c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Warnf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) return &compareStatefulsetResult{} } // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through @@ -332,12 +334,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name)) } } @@ -404,8 +406,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.logger.Debugf("Cluster update from version %s to %s", - c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) + c.logger.Debugf("Cluster update from version %q to %q", + c.ResourceVersion, newSpec.ResourceVersion) /* Make sure we update when this function exists */ defer func() { @@ -430,7 +432,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { if err != nil { return fmt.Errorf("could not create new %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta)) } } // only proceed further if both old and new load balancer were present @@ -445,7 +447,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not update %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) + c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) } } @@ -470,11 +472,11 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } } //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted - c.logger.Infof("statefulset '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) } if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison - c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed", + c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed", c.Spec.PgVersion, newSpec.Spec.PgVersion) //TODO: rewrite pg version in tpr spec } diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index fbd913c21facebf6f62387db0ea75465c34f6010..e00715a5df22977965c51228fb685cd26b8ed927 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -4,9 +4,11 @@ import ( "bytes" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/client-go/pkg/api" - "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/remotecommand" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -17,7 +19,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( execErr bytes.Buffer ) - pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name) + pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("could not get pod info: %v", err) } @@ -26,17 +28,17 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( return "", fmt.Errorf("could not determine which container to use") } - req := c.RestClient.Post(). + req := c.KubeClient.RESTClient.Post(). Resource("pods"). Name(podName.Name). Namespace(podName.Namespace). SubResource("exec") - req.VersionedParams(&api.PodExecOptions{ + req.VersionedParams(&v1.PodExecOptions{ Container: pod.Spec.Containers[0].Name, Command: command, Stdout: true, Stderr: true, - }, api.ParameterCodec) + }, scheme.ParameterCodec) exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) if err != nil { diff --git a/pkg/cluster/filesystems.go b/pkg/cluster/filesystems.go index e68e5614c073a0a827a01ecf272f15be7211d166..65e7048c7e2dc5d09fdabc2a62411d07f5f78b85 100644 --- a/pkg/cluster/filesystems.go +++ b/pkg/cluster/filesystems.go @@ -39,5 +39,5 @@ func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizer return err } - return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) + return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %q", fsType) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 0cba328373019ebde2c74b1d002d43e7334f3d9f..3ddefbb90504d2e15092c838b84b9d2dde6f0bc8 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1,14 +1,15 @@ package cluster import ( + "encoding/json" "fmt" "sort" - "encoding/json" - "k8s.io/client-go/pkg/api/resource" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/util/intstr" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -198,7 +199,7 @@ PATRONI_INITDB_PARAMS: } result, err := json.Marshal(config) if err != nil { - c.logger.Errorf("Cannot convert spilo configuration into JSON: %s", err) + c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err) return "" } return string(result) @@ -210,7 +211,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme envVars := []v1.EnvVar{ { Name: "SCOPE", - Value: c.Metadata.Name, + Value: c.Name, }, { Name: "PGROOT", @@ -273,7 +274,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme } privilegedMode := bool(true) container := v1.Container{ - Name: c.Metadata.Name, + Name: c.Name, Image: c.OpConfig.DockerImage, ImagePullPolicy: v1.PullAlways, Resources: *resourceRequirements, @@ -311,9 +312,9 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme } template := v1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Labels: c.labelsSet(), - Namespace: c.Metadata.Name, + Namespace: c.Name, }, Spec: podSpec, } @@ -337,14 +338,14 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful } statefulSet := &v1beta1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: c.Metadata.Name, - Namespace: c.Metadata.Namespace, + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name, + Namespace: c.Namespace, Labels: c.labelsSet(), }, Spec: v1beta1.StatefulSetSpec{ Replicas: &spec.NumberOfInstances, - ServiceName: c.Metadata.Name, + ServiceName: c.Name, Template: *podTemplate, VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate}, }, @@ -354,7 +355,7 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful } func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { - metadata := v1.ObjectMeta{ + metadata := metav1.ObjectMeta{ Name: constants.DataVolumeName, } if volumeStorageClass != "" { @@ -386,7 +387,7 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { secrets = make(map[string]*v1.Secret, len(c.pgUsers)) - namespace := c.Metadata.Namespace + namespace := c.Namespace for username, pgUser := range c.pgUsers { //Skip users with no password i.e. human users (they'll be authenticated using pam) secret := c.generateSingleUserSecret(namespace, pgUser) @@ -412,7 +413,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) } username := pgUser.Name secret := v1.Secret{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: c.credentialSecretName(username), Namespace: namespace, Labels: c.labelsSet(), @@ -429,7 +430,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { dnsNameFunction := c.masterDnsName - name := c.Metadata.Name + name := c.Name if role == Replica { dnsNameFunction = c.replicaDnsName name = name + "-repl" @@ -450,7 +451,7 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { - // safe default value: lock load balancer to only local address unless overriden explicitely. + // safe default value: lock load balancer to only local address unless overridden explicitly. sourceRanges := []string{localHost} allowedSourceRanges := newSpec.AllowedSourceRanges if len(allowedSourceRanges) >= 0 { @@ -468,9 +469,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) } service := &v1.Service{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: c.Metadata.Namespace, + Namespace: c.Namespace, Labels: c.roleLabelsSet(role), Annotations: annotations, }, @@ -482,9 +483,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ - ObjectMeta: v1.ObjectMeta{ - Name: c.Metadata.Name, - Namespace: c.Metadata.Namespace, + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name, + Namespace: c.Namespace, Labels: c.roleLabelsSet(Master), }, } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index fd677006c5abbfa0539a80299b2b3e1682c26cc7..d06da2b81d0035debe4598521f5255211b7270bc 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -22,7 +22,7 @@ var getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.r ORDER BY 1;` func (c *Cluster) pgConnectionString() string { - hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace) + hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Name, c.Namespace) username := c.systemUsers[constants.SuperuserKeyName].Name password := c.systemUsers[constants.SuperuserKeyName].Password diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 52ecdfffea266064834974b75ed227887bcd9f1c..2b7ed438e71626f8526ded8578e7843c7bf662db 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -3,6 +3,7 @@ package cluster import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" @@ -11,8 +12,8 @@ import ( ) func (c *Cluster) listPods() ([]v1.Pod, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -34,11 +35,11 @@ func (c *Cluster) deletePods() error { for _, obj := range pods { podName := util.NameFromMeta(obj.ObjectMeta) - c.logger.Debugf("Deleting pod '%s'", podName) + c.logger.Debugf("Deleting pod %q", podName) if err := c.deletePod(podName); err != nil { - c.logger.Errorf("could not delete pod '%s': %s", podName, err) + c.logger.Errorf("could not delete pod %q: %v", podName, err) } else { - c.logger.Infof("pod '%s' has been deleted", podName) + c.logger.Infof("pod %q has been deleted", podName) } } if len(pods) > 0 { @@ -106,16 +107,16 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { if err := c.waitForPodLabel(ch); err != nil { return err } - c.logger.Infof("pod '%s' is ready", podName) + c.logger.Infof("pod %q is ready", podName) return nil } func (c *Cluster) recreatePods() error { ls := c.labelsSet() - namespace := c.Metadata.Namespace + namespace := c.Namespace - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: ls.String(), } @@ -135,7 +136,7 @@ func (c *Cluster) recreatePods() error { } if err := c.recreatePod(pod); err != nil { - return fmt.Errorf("could not recreate replica pod '%s': %v", util.NameFromMeta(pod.ObjectMeta), err) + return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } } if masterPod.Name == "" { @@ -143,10 +144,10 @@ func (c *Cluster) recreatePods() error { } else { //TODO: do manual failover //TODO: specify master, leave new master empty - c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) + c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) if err := c.recreatePod(masterPod); err != nil { - return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) + return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index b71f8355fbc62458e7040d9c188174dae9ca7a0b..426890314ebc069e138bb102f6fbda1dcccc8eff 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -3,7 +3,8 @@ package cluster import ( "fmt" - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" @@ -15,8 +16,8 @@ import ( ) func (c *Cluster) loadResources() error { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -60,7 +61,7 @@ func (c *Cluster) loadResources() error { continue } c.Secrets[secret.UID] = &secrets.Items[i] - c.logger.Debugf("secret loaded, uid: %s", secret.UID) + c.logger.Debugf("secret loaded, uid: %q", secret.UID) } statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) @@ -79,19 +80,19 @@ func (c *Cluster) loadResources() error { func (c *Cluster) listResources() error { if c.Statefulset != nil { - c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) + c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } for _, obj := range c.Secrets { - c.logger.Infof("Found secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } if c.Endpoint != nil { - c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) + c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) } for role, service := range c.Service { - c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) + c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) } pods, err := c.listPods() @@ -100,7 +101,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pods { - c.logger.Infof("Found pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } pvcs, err := c.listPersistentVolumeClaims() @@ -109,7 +110,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pvcs { - c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } return nil @@ -128,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { return nil, err } c.Statefulset = statefulSet - c.logger.Debugf("Created new statefulset '%s', uid: %s", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) + c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) return statefulSet, nil } @@ -143,15 +144,15 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { patchData, err := specPatch(newStatefulSet.Spec) if err != nil { - return fmt.Errorf("could not form patch for the statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err) } statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( c.Statefulset.Name, - api.MergePatchType, + types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) } c.Statefulset = statefulSet @@ -171,9 +172,9 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error orphanDepencies := true oldStatefulset := c.Statefulset - options := v1.DeleteOptions{OrphanDependents: &orphanDepencies} + options := metav1.DeleteOptions{OrphanDependents: &orphanDepencies} if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { - return fmt.Errorf("could not delete statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err) } // make sure we clear the stored statefulset status if the subsequent create fails. c.Statefulset = nil @@ -182,7 +183,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout, func() (bool, error) { - _, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name) + _, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{}) return err != nil, nil }) @@ -193,7 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error // create the new statefulset with the desired spec. It would take over the remaining pods. createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) if err != nil { - return fmt.Errorf("could not create statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err) } // check that all the previous replicas were picked up. if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && @@ -215,7 +216,7 @@ func (c *Cluster) deleteStatefulSet() error { if err != nil { return err } - c.logger.Infof("statefulset '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.Statefulset = nil if err := c.deletePods(); err != nil { @@ -262,19 +263,19 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error if role == Master { // for the master service we need to re-create the endpoint as well. Get the up-to-date version of // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) - currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name) + currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get current cluster endpoints: %v", err) } } err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) if err != nil { - return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not delete service %q: %v", serviceName, err) } c.Endpoint = nil svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) if err != nil { - return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not create service %q: %v", serviceName, err) } c.Service[role] = svc if role == Master { @@ -282,7 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) if err != nil { - return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) + return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) } c.Endpoint = ep } @@ -294,25 +295,25 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( c.Service[role].Name, - api.StrategicMergePatchType, + types.StrategicMergePatchType, []byte(annotationsPatchData), "") if err != nil { - return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) } } patchData, err := specPatch(newService.Spec) if err != nil { - return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( c.Service[role].Name, - api.MergePatchType, + types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch service '%s': %v", serviceName, err) + return fmt.Errorf("could not patch service %q: %v", serviceName, err) } c.Service[role] = svc @@ -329,7 +330,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { if err != nil { return err } - c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) c.Service[role] = nil return nil } @@ -358,7 +359,7 @@ func (c *Cluster) deleteEndpoint() error { if err != nil { return err } - c.logger.Infof("endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) + c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) c.Endpoint = nil return nil @@ -371,11 +372,11 @@ func (c *Cluster) applySecrets() error { secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) if k8sutil.ResourceAlreadyExists(err) { var userMap map[string]spec.PgUser - curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name) + curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get current secret: %v", err) } - c.logger.Debugf("secret '%s' already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) + c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { secretUsername = constants.SuperuserKeyName userMap = c.systemUsers @@ -392,10 +393,10 @@ func (c *Cluster) applySecrets() error { continue } else { if err != nil { - return fmt.Errorf("could not create secret for user '%s': %v", secretUsername, err) + return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) } c.Secrets[secret.UID] = secret - c.logger.Debugf("Created new secret '%s', uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) + c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) } } @@ -403,12 +404,12 @@ func (c *Cluster) applySecrets() error { } func (c *Cluster) deleteSecret(secret *v1.Secret) error { - c.logger.Debugf("Deleting secret '%s'", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) if err != nil { return err } - c.logger.Infof("secret '%s' has been deleted", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Infof("secret %q has been deleted", util.NameFromMeta(secret.ObjectMeta)) delete(c.Secrets, secret.UID) return err diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 625eb34f52de269d5d5ec771e0a2245b5d61df60..2b796b511aa2237c7cc5b34fd5322c2a0f1c3e2b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -95,7 +95,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err != nil { return fmt.Errorf("could not create missing %s service: %v", role, err) } - c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) + c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) return nil } @@ -110,7 +110,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err := c.updateService(role, desiredSvc); err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } @@ -122,7 +122,7 @@ func (c *Cluster) syncEndpoint() error { if err != nil { return fmt.Errorf("could not create missing endpoint: %v", err) } - c.logger.Infof("Created missing endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) return nil } @@ -151,7 +151,7 @@ func (c *Cluster) syncStatefulSet() error { if err != nil { return fmt.Errorf("cluster is not ready: %v", err) } - c.logger.Infof("Created missing statefulset '%s'", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) if !rollUpdate { return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 31d46cc42f41a5a9ac31aa91c5348addb6fe63da..58f92fac1e910148ce969c9719611c065db55637 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -6,9 +6,10 @@ import ( "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/labels" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -76,11 +77,11 @@ func metadataAnnotationsPatch(annotations map[string]string) string { func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { - c.logger.Infof("statefulset '%s' has been changed", + c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("statefulset '%s' is not in the desired state and needs to be updated", + c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", util.NameFromMeta(old.ObjectMeta), ) } @@ -88,18 +89,18 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate if len(reasons) > 0 { for _, reason := range reasons { - c.logger.Infof("Reason: %s", reason) + c.logger.Infof("Reason: %q", reason) } } } func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { if isUpdate { - c.logger.Infof("%s service '%s' has been changed", + c.logger.Infof("%s service %q has been changed", role, util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("%s service '%s is not in the desired state and needs to be updated", + c.logger.Infof("%s service %q is not in the desired state and needs to be updated", role, util.NameFromMeta(old.ObjectMeta), ) } @@ -123,10 +124,10 @@ func (c *Cluster) getOAuthToken() (string, error) { // Temporary getting postgresql-operator secret from the NamespaceDefault credentialsSecret, err := c.KubeClient. Secrets(c.OpConfig.OAuthTokenSecretName.Namespace). - Get(c.OpConfig.OAuthTokenSecretName.Name) + Get(c.OpConfig.OAuthTokenSecretName.Name, metav1.GetOptions{}) if err != nil { - c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName) + c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) return "", fmt.Errorf("could not get credentials secret: %v", err) } data := credentialsSecret.Data @@ -152,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return []string{}, fmt.Errorf("could not get oauth token: %v", err) } - teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) + teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token) if err != nil { return nil, fmt.Errorf("could not get team info: %v", err) } @@ -193,10 +194,10 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error { func (c *Cluster) waitStatefulsetReady() error { return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } - ss, err := c.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions) + ss, err := c.KubeClient.StatefulSets(c.Namespace).List(listOptions) if err != nil { return false, err } @@ -211,17 +212,17 @@ func (c *Cluster) waitStatefulsetReady() error { func (c *Cluster) waitPodLabelsReady() error { ls := c.labelsSet() - namespace := c.Metadata.Namespace + namespace := c.Namespace - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: ls.String(), } - masterListOption := v1.ListOptions{ + masterListOption := metav1.ListOptions{ LabelSelector: labels.Merge(ls, labels.Set{ c.OpConfig.PodRoleLabel: constants.PodRoleMaster, }).String(), } - replicaListOption := v1.ListOptions{ + replicaListOption := metav1.ListOptions{ LabelSelector: labels.Merge(ls, labels.Set{ c.OpConfig.PodRoleLabel: constants.PodRoleReplica, }).String(), @@ -277,7 +278,7 @@ func (c *Cluster) labelsSet() labels.Set { for k, v := range c.OpConfig.ClusterLabels { lbls[k] = v } - lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name + lbls[c.OpConfig.ClusterNameLabel] = c.Name return labels.Set(lbls) } @@ -307,7 +308,7 @@ func (c *Cluster) credentialSecretName(username string) string { // and must start and end with an alphanumeric character return fmt.Sprintf(constants.UserSecretTemplate, strings.Replace(username, "_", "-", -1), - c.Metadata.Name) + c.Name) } func (c *Cluster) podSpiloRole(pod *v1.Pod) string { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 9200044721def43a0317f57eabaffaa152e87a73..d2ed26f900717e12166c2e0cd9bb248ea12a8a61 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -5,7 +5,8 @@ import ( "strconv" "strings" - "k8s.io/client-go/pkg/api/resource" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" @@ -16,8 +17,8 @@ import ( ) func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -35,7 +36,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { return err } for _, pvc := range pvcs { - c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) + c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) } @@ -62,14 +63,14 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { if lastDash > 0 && lastDash < len(pvc.Name)-1 { pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) if err != nil { - return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) + return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) } if int32(pvcNumber) > lastPodIndex { - c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name) continue } } - pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) + pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("could not get PersistentVolume: %v", err) } @@ -118,22 +119,22 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume if err != nil { return err } - c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) + c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { - return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) + return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err) } - c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) + c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) podName := getPodNameFromPersistentVolume(pv) if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { - return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) + return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) } - c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) + c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) pv.Spec.Capacity[v1.ResourceStorage] = newQuantity - c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) + c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { - return fmt.Errorf("could not update persistent volume: %s", err) + return fmt.Errorf("could not update persistent volume: %q", err) } - c.logger.Debugf("successfully updated persistent volume %s", pv.Name) + c.logger.Debugf("successfully updated persistent volume %q", pv.Name) } } if len(pvs) > 0 && totalCompatible == 0 { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index dc62504d3396378491a312f0fcaa607db209e3eb..398ee086e8dba49381bc6ce96316a45adb870976 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -14,22 +15,25 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" - "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient k8sutil.KubernetesClient - RestClient *rest.RESTClient - TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser + + NoDatabaseAccess bool + NoTeamsAPI bool + ConfigMapName spec.NamespacedName + Namespace string } type Controller struct { - Config - + config Config opConfig *config.Config - logger *logrus.Entry + + logger *logrus.Entry + KubeClient k8sutil.KubernetesClient + RestClient rest.Interface // kubernetes API group REST client clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster @@ -39,23 +43,16 @@ type Controller struct { podInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO - + clusterEventQueues []*cache.FIFO lastClusterSyncTime int64 } -func NewController(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func NewController(controllerConfig *Config) *Controller { logger := logrus.New() - if operatorConfig.DebugLogging { - logger.Level = logrus.DebugLevel - } - - controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, + config: *controllerConfig, + opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), clusters: make(map[spec.NamespacedName]*cluster.Cluster), stopChs: make(map[spec.NamespacedName]chan struct{}), @@ -63,22 +60,57 @@ func NewController(controllerConfig *Config, operatorConfig *config.Config) *Con } } -func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - wg.Add(1) +func (c *Controller) initClients() { + client, err := k8sutil.ClientSet(c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create client: %v", err) + } + c.KubeClient = k8sutil.NewFromKubernetesInterface(client) - c.initController() + c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create rest client: %v", err) + } +} - go c.runInformers(stopCh) +func (c *Controller) initOperatorConfig() { + configMapData := make(map[string]string) - for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(i) + if c.config.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). + Get(c.config.ConfigMapName.Name, metav1.GetOptions{}) + if err != nil { + panic(err) + } + + configMapData = configMap.Data + } else { + c.logger.Infoln("No ConfigMap specified. Loading default values") } - c.logger.Info("Started working in background") + if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var + configMapData["namespace"] = c.config.Namespace + } + if c.config.NoDatabaseAccess { + configMapData["enable_database_access"] = "false" + } + if c.config.NoTeamsAPI { + configMapData["enable_teams_api"] = "false" + } + + c.opConfig = config.NewFromMap(configMapData) } func (c *Controller) initController() { + c.initClients() + c.initOperatorConfig() + + c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) + + if c.opConfig.DebugLogging { + c.logger.Level = logrus.DebugLevel + } + if err := c.createTPR(); err != nil { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) } @@ -86,27 +118,24 @@ func (c *Controller) initController() { if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { c.logger.Warningf("could not get infrastructure roles: %v", err) } else { - c.InfrastructureRoles = infraRoles + c.config.InfrastructureRoles = infraRoles } // Postgresqls - clusterLw := &cache.ListWatch{ - ListFunc: c.clusterListFunc, - WatchFunc: c.clusterWatchFunc, - } c.postgresqlInformer = cache.NewSharedIndexInformer( - clusterLw, + &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + }, &spec.Postgresql{}, constants.QueueResyncPeriodTPR, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + cache.Indexers{}) - if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.postgresqlAdd, UpdateFunc: c.postgresqlUpdate, DeleteFunc: c.postgresqlDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } + }) // Pods podLw := &cache.ListWatch{ @@ -120,13 +149,11 @@ func (c *Controller) initController() { constants.QueueResyncPeriodPod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.podAdd, UpdateFunc: c.podUpdate, DeleteFunc: c.podDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } + }) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) for i := range c.clusterEventQueues { @@ -141,6 +168,21 @@ func (c *Controller) initController() { } } +func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + c.initController() + + go c.runInformers(stopCh) + + for i := range c.clusterEventQueues { + go c.processClusterEventsQueue(i) + } + + c.logger.Info("Started working in background") +} + func (c *Controller) runInformers(stopCh <-chan struct{}) { go c.postgresqlInformer.Run(stopCh) go c.podInformer.Run(stopCh) diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index c3fb2a5e8a052318f74f1c01fe7dadb15c005054..6575455dbb882fafcca9d58a3afb8c202f016d44 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,27 +1,20 @@ package controller import ( - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/watch" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" ) -func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error) { +func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) { var labelSelector string var fieldSelector string - if options.LabelSelector != nil { - labelSelector = options.LabelSelector.String() - } - - if options.FieldSelector != nil { - fieldSelector = options.FieldSelector.String() - } - opts := v1.ListOptions{ + opts := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, Watch: options.Watch, @@ -32,19 +25,11 @@ func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error return c.KubeClient.Pods(c.opConfig.Namespace).List(opts) } -func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) { +func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { var labelSelector string var fieldSelector string - if options.LabelSelector != nil { - labelSelector = options.LabelSelector.String() - } - - if options.FieldSelector != nil { - fieldSelector = options.FieldSelector.String() - } - - opts := v1.ListOptions{ + opts := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, Watch: options.Watch, @@ -122,7 +107,7 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { c.clustersMu.RUnlock() if ok { - c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) + c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) cluster.ReceivePodEvent(event) } case <-stopCh: diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37f8579e925fa9e9875328568c3d01c3b3..5b3d97080a4196b2c6156b28be189464c32e3da6 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -1,17 +1,16 @@ package controller import ( + "encoding/json" "fmt" "reflect" "sync/atomic" "time" - "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/fields" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/types" - "k8s.io/client-go/pkg/watch" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/cluster" @@ -26,52 +25,43 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) { for { select { case <-ticker.C: - c.clusterListFunc(api.ListOptions{ResourceVersion: "0"}) + c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}) case <-stopCh: return } } } -func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { - c.logger.Info("Getting list of currently running clusters") - - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - - object, err := req.Do().Get() +func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { + var list spec.PostgresqlList + var activeClustersCnt, failedClustersCnt int - if err != nil { - return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) - } + req := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, metav1.ParameterCodec) - objList, err := meta.ExtractList(object) + b, err := req.DoRaw() if err != nil { - return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) + return nil, err } + err = json.Unmarshal(b, &list) if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { c.logger.Debugln("skipping resync of clusters") - return object, err + return &list, err } - var activeClustersCnt, failedClustersCnt int - for _, obj := range objList { - pg, ok := obj.(*spec.Postgresql) - if !ok { - return nil, fmt.Errorf("could not cast object to postgresql") - } - + for _, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, pg, spec.EventSync) + c.queueClusterEvent(nil, &pg, spec.EventSync) activeClustersCnt++ } - if len(objList) > 0 { + if len(list.Items) > 0 { if failedClustersCnt > 0 && activeClustersCnt == 0 { c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) } else if failedClustersCnt == 0 && activeClustersCnt > 0 { @@ -85,15 +75,48 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - return object, err + return &list, err } -func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - return req.Watch() +type tprDecoder struct { + dec *json.Decoder + close func() error +} + +func (d *tprDecoder) Close() { + d.close() +} + +func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { + var e struct { + Type watch.EventType + Object spec.Postgresql + } + if err := d.dec.Decode(&e); err != nil { + return watch.Error, nil, err + } + + return e.Type, &e.Object, nil +} + +func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) { + options.Watch = true + r, err := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, metav1.ParameterCodec). + FieldsSelectorParam(nil). + Stream() + + if err != nil { + return nil, err + } + + return watch.NewStreamWatcher(&tprDecoder{ + dec: json.NewDecoder(r), + close: r.Close, + }), nil } func (c *Controller) processEvent(obj interface{}) error { @@ -106,9 +129,9 @@ func (c *Controller) processEvent(obj interface{}) error { logger := c.logger.WithField("worker", event.WorkerID) if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { - clusterName = util.NameFromMeta(event.NewSpec.Metadata) + clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) } else { - clusterName = util.NameFromMeta(event.OldSpec.Metadata) + clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) } c.clustersMu.RLock() @@ -118,14 +141,14 @@ func (c *Controller) processEvent(obj interface{}) error { switch event.EventType { case spec.EventAdd: if clusterFound { - logger.Debugf("Cluster '%s' already exists", clusterName) + logger.Debugf("Cluster %q already exists", clusterName) return nil } - logger.Infof("Creation of the '%s' cluster started", clusterName) + logger.Infof("Creation of the %q cluster started", clusterName) stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -140,31 +163,31 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } - logger.Infof("Cluster '%s' has been created", clusterName) + logger.Infof("Cluster %q has been created", clusterName) case spec.EventUpdate: - logger.Infof("Update of the '%s' cluster started", clusterName) + logger.Infof("Update of the %q cluster started", clusterName) if !clusterFound { - logger.Warnf("Cluster '%s' does not exist", clusterName) + logger.Warnf("Cluster %q does not exist", clusterName) return nil } if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) + cl.Error = fmt.Errorf("could not update cluster: %v", err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been updated", clusterName) + logger.Infof("Cluster %q has been updated", clusterName) case spec.EventDelete: - logger.Infof("Deletion of the '%s' cluster started", clusterName) + logger.Infof("Deletion of the %q cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + logger.Errorf("Unknown cluster: %q", clusterName) return nil } if err := cl.Delete(); err != nil { - logger.Errorf("could not delete cluster '%s': %s", clusterName, err) + logger.Errorf("could not delete cluster %q: %v", clusterName, err) return nil } close(c.stopChs[clusterName]) @@ -174,14 +197,14 @@ func (c *Controller) processEvent(obj interface{}) error { delete(c.stopChs, clusterName) c.clustersMu.Unlock() - logger.Infof("Cluster '%s' has been deleted", clusterName) + logger.Infof("Cluster %q has been deleted", clusterName) case spec.EventSync: - logger.Infof("Syncing of the '%s' cluster started", clusterName) + logger.Infof("Syncing of the %q cluster started", clusterName) // no race condition because a cluster is always processed by single worker if !clusterFound { stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -191,13 +214,13 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) + cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been synced", clusterName) + logger.Infof("Cluster %q has been synced", clusterName) } return nil @@ -219,8 +242,8 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec ) if old != nil { //update, delete - uid = old.Metadata.GetUID() - clusterName = util.NameFromMeta(old.Metadata) + uid = old.GetUID() + clusterName = util.NameFromMeta(old.ObjectMeta) if eventType == spec.EventUpdate && new.Error == nil && old.Error != nil { eventType = spec.EventSync clusterError = new.Error @@ -228,13 +251,13 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec clusterError = old.Error } } else { //add, sync - uid = new.Metadata.GetUID() - clusterName = util.NameFromMeta(new.Metadata) + uid = new.GetUID() + clusterName = util.NameFromMeta(new.ObjectMeta) clusterError = new.Error } if clusterError != nil && eventType != spec.EventDelete { - c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %v)", eventType, clusterName, clusterError) + c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError) return } @@ -251,7 +274,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) } - c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName) + c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName) } func (c *Controller) postgresqlAdd(obj interface{}) { @@ -274,7 +297,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { if !ok { c.logger.Errorf("could not cast to postgresql spec") } - if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { + if pgOld.ResourceVersion == pgNew.ResourceVersion { return } if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index d7ea55e3a3916e98867590a416d6df838d0356ef..f02f27955172ded410e87dac4458c56cd138deac 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -4,6 +4,7 @@ import ( "fmt" "hash/crc32" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" @@ -16,15 +17,12 @@ import ( func (c *Controller) makeClusterConfig() cluster.Config { infrastructureRoles := make(map[string]spec.PgUser) - for k, v := range c.InfrastructureRoles { + for k, v := range c.config.InfrastructureRoles { infrastructureRoles[k] = v } return cluster.Config{ - KubeClient: c.KubeClient, - RestClient: c.RestClient, - RestConfig: c.RestConfig, - TeamsAPIClient: c.TeamsAPIClient, + RestConfig: c.config.RestConfig, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, } @@ -32,7 +30,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource { return &extv1beta.ThirdPartyResource{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ //ThirdPartyResources are cluster-wide Name: TPRName, }, @@ -48,17 +46,16 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { } func (c *Controller) createTPR() error { - TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) - tpr := thirdPartyResource(TPRName) + tpr := thirdPartyResource(constants.TPRName) _, err := c.KubeClient.ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err } - c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) + c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName) } else { - c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) + c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName) } return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) @@ -72,9 +69,9 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r infraRolesSecret, err := c.KubeClient. Secrets(rolesSecret.Namespace). - Get(rolesSecret.Name) + Get(rolesSecret.Name, metav1.GetOptions{}) if err != nil { - c.logger.Debugf("Infrastructure roles secret name: %s", *rolesSecret) + c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) } @@ -102,7 +99,7 @@ Users: case "inrole": t.MemberOf = append(t.MemberOf, s) default: - c.logger.Warnf("Unknown key %s", p) + c.logger.Warnf("Unknown key %q", p) } } } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index bd7d7d049419f67397cdde2343524619c1005e7b..88c51258eb297aaeb7a67c43f3377c131c0fbf13 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -5,11 +5,11 @@ import ( "reflect" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) @@ -21,7 +21,7 @@ type mockSecret struct { v1core.SecretInterface } -func (c *mockSecret) Get(name string) (*v1.Secret, error) { +func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) { if name != testInfrastructureRolesSecretName { return nil, fmt.Errorf("NotFound") } @@ -48,7 +48,7 @@ func newMockKubernetesClient() k8sutil.KubernetesClient { } func newMockController() *Controller { - controller := NewController(&Config{}, &config.Config{}) + controller := NewController(&Config{}) controller.opConfig.ClusterNameLabel = "cluster-name" controller.opConfig.InfrastructureRolesSecretName = spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} @@ -70,7 +70,7 @@ func TestPodClusterName(t *testing.T) { }, { &v1.Pod{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: v1.NamespaceDefault, Labels: map[string]string{ mockController.opConfig.ClusterNameLabel: "testcluster", diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 3649eff5098b70331184810e8ff6a350fb25dcad..8598c66cd9c015fe796b237a0b68284731ef5931 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -6,9 +6,7 @@ import ( "strings" "time" - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster. @@ -71,8 +69,8 @@ const ( // Postgresql defines PostgreSQL Third Party (resource) Object. type Postgresql struct { - unversioned.TypeMeta `json:",inline"` - Metadata v1.ObjectMeta `json:"metadata"` + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` Spec PostgresSpec `json:"spec"` Status PostgresStatus `json:"status,omitempty"` @@ -88,7 +86,7 @@ type PostgresSpec struct { TeamID string `json:"teamId"` AllowedSourceRanges []string `json:"allowedSourceRanges"` - // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest + // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omitted from the manifest UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` @@ -99,8 +97,8 @@ type PostgresSpec struct { // PostgresqlList defines a list of PostgreSQL clusters. type PostgresqlList struct { - unversioned.TypeMeta `json:",inline"` - Metadata unversioned.ListMeta `json:"metadata"` + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` Items []Postgresql `json:"items"` } @@ -191,21 +189,6 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { return nil } -// GetObject implements Object interface for PostgreSQL TPR spec object. -func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { - return &p.TypeMeta -} - -// GetObjectMeta implements ObjectMetaAccessor interface for PostgreSQL TPR spec object. -func (p *Postgresql) GetObjectMeta() meta.Object { - return &p.Metadata -} - -// GetListMeta implements ListMetaAccessor interface for PostgreSQL TPR List spec object. -func (pl *PostgresqlList) GetListMeta() unversioned.List { - return &pl.Metadata -} - func extractClusterName(clusterName string, teamName string) (string, error) { teamNameLen := len(teamName) if len(clusterName) < teamNameLen+2 { @@ -223,10 +206,6 @@ func extractClusterName(clusterName string, teamName string) (string, error) { return clusterName[teamNameLen+1:], nil } -// The code below is used only to work around a known problem with third-party -// resources and ugorji. If/when these issues are resolved, the code below -// should no longer be required. -// type postgresqlListCopy PostgresqlList type postgresqlCopy Postgresql @@ -236,7 +215,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { err := json.Unmarshal(data, &tmp) if err != nil { - metaErr := json.Unmarshal(data, &tmp.Metadata) + metaErr := json.Unmarshal(data, &tmp.ObjectMeta) if metaErr != nil { return err } @@ -250,7 +229,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { } tmp2 := Postgresql(tmp) - clusterName, err := extractClusterName(tmp2.Metadata.Name, tmp2.Spec.TeamID) + clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID) if err == nil { tmp2.Spec.ClusterName = clusterName } else { diff --git a/pkg/spec/postgresql_test.go b/pkg/spec/postgresql_test.go index d067b8d55b322c4e61ff12d5454f9adfaf8ccea4..7c67bf0a949f6fc997f872f4e3cc2c62ba11fe60 100644 --- a/pkg/spec/postgresql_test.go +++ b/pkg/spec/postgresql_test.go @@ -8,8 +8,7 @@ import ( "testing" "time" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var parseTimeTests = []struct { @@ -104,11 +103,11 @@ var unmarshalCluster = []struct { "kind": "Postgresql","apiVersion": "acid.zalan.do/v1", "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster1", }, Status: ClusterStatusInvalid, @@ -184,11 +183,11 @@ var unmarshalCluster = []struct { } }`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster1", }, Spec: PostgresSpec{ @@ -250,11 +249,11 @@ var unmarshalCluster = []struct { { []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "teapot-testcluster1", }, Spec: PostgresSpec{TeamID: "acid"}, @@ -278,16 +277,16 @@ var postgresqlList = []struct { }{ {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":"Running"}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`), PostgresqlList{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "List", APIVersion: "v1", }, Items: []Postgresql{{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster42", Namespace: "default", Labels: map[string]string{"team": "acid"}, @@ -363,7 +362,7 @@ func TestClusterName(t *testing.T) { continue } if name != tt.clusterName { - t.Errorf("Expected cluserName: %s, got: %s", tt.clusterName, name) + t.Errorf("Expected cluserName: %q, got: %q", tt.clusterName, name) } } } @@ -400,7 +399,7 @@ func TestMarshalMaintenanceWindow(t *testing.T) { } if !bytes.Equal(s, tt.in) { - t.Errorf("Expected Marshal: %s, got: %s", string(tt.in), string(s)) + t.Errorf("Expected Marshal: %q, got: %q", string(tt.in), string(s)) } } } @@ -435,7 +434,7 @@ func TestMarshal(t *testing.T) { continue } if !bytes.Equal(m, tt.marshal) { - t.Errorf("Marshal Postgresql expected: %s, got: %s", string(tt.marshal), string(m)) + t.Errorf("Marshal Postgresql expected: %q, got: %q", string(tt.marshal), string(m)) } } } @@ -446,8 +445,8 @@ func TestPostgresMeta(t *testing.T) { t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) } - if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.Metadata) { - t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) + if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.ObjectMeta) { + t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ObjectMeta, a) } } } @@ -476,8 +475,8 @@ func TestPostgresListMeta(t *testing.T) { t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) } - if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.Metadata) { - t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) + if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.ListMeta) { + t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ListMeta, a) } return diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce923243dba9576be5fde37c2894d932be..1a43cde057cd44d8074135afac29ea6fff3d98d2 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,12 +1,12 @@ package spec import ( + "database/sql" "fmt" "strings" - "database/sql" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/types" ) // EvenType contains type of the events for the TPRs and Pods received from Kubernetes diff --git a/pkg/spec/types_test.go b/pkg/spec/types_test.go index b690586e9096d0872a328ca43930ceb47d354f85..d0368268a86b4935889b0aeb197b4e9338cb74ea 100644 --- a/pkg/spec/types_test.go +++ b/pkg/spec/types_test.go @@ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) { var actual NamespacedName err := actual.Decode(tt) if err == nil { - t.Errorf("Error expected for '%s', got: %#v", tt, actual) + t.Errorf("Error expected for %q, got: %#v", tt, actual) } } } diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 3a56aa35ab4437da323d4f61ed632a3c0c89f4de..79c60cad2c1b2a0d7e02a69b86518c12f47b3f25 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -4,10 +4,7 @@ import "time" // General kubernetes-related constants const ( - ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" + K8sAPIPath = "/apis" StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 85fb42b1b16c70cffe3f71365978702ce6ec98fa..9f584c37043a646dbd98d1c7a5206516477db272 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,7 +2,7 @@ package constants const ( PasswordLength = 64 - UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName + UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName SuperuserKeyName = "superuser" ReplicationUserKeyName = "replication" RoleFlagSuperuser = "SUPERUSER" diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index 7207b4583752291f453062ababd6b32d6d355e98..a0a00d25928bedc941f5a0899d6f0982f0b230fb 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -2,9 +2,10 @@ package constants // Different properties of the PostgreSQL Third Party Resources const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" + TPRKind = "postgresql" + TPRGroup = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" - ResourceName = TPRName + "s" + TPRName = TPRKind + "." + TPRGroup + ResourceName = TPRKind + "s" ) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go index ceea73984d4c94b15c42b660063409cd47bece23..fc2943d46dcb68a4064f62d8250eb790a2655670 100644 --- a/pkg/util/filesystems/ext234.go +++ b/pkg/util/filesystems/ext234.go @@ -37,5 +37,5 @@ func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func( (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { return nil } - return fmt.Errorf("unrecognized output: %s, assuming error", out) + return fmt.Errorf("unrecognized output: %q, assuming error", out) } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 981083cb9c8c6fe43006b8d7d69ed1096a0cdb08..09ad38444811c7e365bfb025a34d5af578198f4c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,22 +1,19 @@ package k8sutil import ( - "fmt" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" "k8s.io/client-go/pkg/api" - apierrors "k8s.io/client-go/pkg/api/errors" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/runtime/serializer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) @@ -31,6 +28,7 @@ type KubernetesClient struct { v1core.ConfigMapsGetter v1beta1.StatefulSetsGetter extensions.ThirdPartyResourcesGetter + RESTClient rest.Interface } func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { @@ -44,6 +42,7 @@ func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { c.PersistentVolumesGetter = src.CoreV1() c.StatefulSetsGetter = src.AppsV1beta1() c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() + c.RESTClient = src.CoreV1().RESTClient() return } @@ -51,6 +50,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) } + return rest.InClusterConfig() } @@ -66,35 +66,24 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } -func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { - c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion} - c.APIPath = constants.K8sAPIPath - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - unversioned.GroupVersion{ - Group: constants.TPRVendor, - Version: constants.TPRApiVersion, - }, - &spec.Postgresql{}, - &spec.PostgresqlList{}, - &api.ListOptions{}, - &api.DeleteOptions{}, - ) - return nil - }) - if err := schemeBuilder.AddToScheme(api.Scheme); err != nil { - return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) +func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { + cfg.GroupVersion = &schema.GroupVersion{ + Group: constants.TPRGroup, + Version: constants.TPRApiVersion, } + cfg.APIPath = constants.K8sAPIPath + cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - return rest.RESTClientFor(c) + return rest.RESTClientFor(&cfg) } func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { return retryutil.Retry(interval, timeout, func() (bool, error) { - _, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() + _, err := restclient. + Get(). + Namespace(ns). + Resource(constants.ResourceName). + DoRaw() if err != nil { if ResourceNotFound(err) { // not set up yet. wait more. return false, nil diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 8ff32d305822410d52f1ab26267141ee2c685e16..1b5f5966fea42db67714b1c8776542bba6e11ac5 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -66,11 +66,11 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque switch r.Kind { case spec.PGSyncUserAdd: if err := s.createPgUser(r.User, db); err != nil { - return fmt.Errorf("could not create user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not create user %q: %v", r.User.Name, err) } case spec.PGsyncUserAlter: if err := s.alterPgUser(r.User, db); err != nil { - return fmt.Errorf("could not alter user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) } default: return fmt.Errorf("unrecognized operation: %v", r.Kind) @@ -100,7 +100,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s, query: %v", err, query) + err = fmt.Errorf("dB error: %v, query: %q", err, query) return } @@ -122,7 +122,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s query %v", err, query) + err = fmt.Errorf("dB error: %v query %q", err, query) return } diff --git a/pkg/util/util.go b/pkg/util/util.go index abe4a8237292dcb412851ef25dd1b3824431bb82..c66622679ca973010188dd42d7a0c87d1b6c6b3c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -8,7 +8,7 @@ import ( "time" "github.com/motomux/pretty" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -34,7 +34,7 @@ func RandomPassword(n int) string { } // NameFromMeta converts a metadata object to the NamespacedName name representation. -func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { +func NameFromMeta(meta metav1.ObjectMeta) spec.NamespacedName { return spec.NamespacedName{ Namespace: meta.Namespace, Name: meta.Name, diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 067f64927cfe2cd1820c08876bbb091ba14d4bbc..cfd37c033f71b76eff64544cd0682534d527720a 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -53,7 +53,7 @@ func TestRandomPassword(t *testing.T) { } func TestNameFromMeta(t *testing.T) { - meta := v1.ObjectMeta{ + meta := metav1.ObjectMeta{ Name: "testcluster", Namespace: "default", } @@ -73,7 +73,7 @@ func TestPGUserPassword(t *testing.T) { for _, tt := range pgUsers { pwd := PGUserPassword(tt.in) if pwd != tt.out { - t.Errorf("PgUserPassword expected: %s, got: %s", tt.out, pwd) + t.Errorf("PgUserPassword expected: %q, got: %q", tt.out, pwd) } } } @@ -81,7 +81,7 @@ func TestPGUserPassword(t *testing.T) { func TestPrettyDiff(t *testing.T) { for _, tt := range prettyDiffTest { if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { - t.Errorf("PrettyDiff expected: %s, got: %s", tt.out, actual) + t.Errorf("PrettyDiff expected: %q, got: %q", tt.out, actual) } } } diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go index 8d6ec12b7ff5eb4271eb88b0fa6121319adeafe0..c213a112600fb7f950bd72defe31d24755dac176 100644 --- a/pkg/util/volumes/ebs.go +++ b/pkg/util/volumes/ebs.go @@ -42,11 +42,11 @@ func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { volumeID := pv.Spec.AWSElasticBlockStore.VolumeID if volumeID == "" { - return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) + return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) } idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 if idx == 0 { - return "", fmt.Errorf("malfored EBS volume id %s", volumeID) + return "", fmt.Errorf("malfored EBS volume id %q", volumeID) } return volumeID[idx:], nil } @@ -60,7 +60,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { } vol := volumeOutput.Volumes[0] if *vol.VolumeId != volumeId { - return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) + return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId) } if *vol.Size == newSize { // nothing to do @@ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { state := *output.VolumeModification.ModificationState if state == constants.EBSVolumeStateFailed { - return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) + return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId) } if state == "" { return fmt.Errorf("received empty modification status") @@ -91,10 +91,10 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { return false, fmt.Errorf("could not describe volume modification: %v", err) } if len(out.VolumesModifications) != 1 { - return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) + return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId) } if *out.VolumesModifications[0].VolumeId != volumeId { - return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", + return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q", *out.VolumesModifications[0].VolumeId, volumeId) } return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil