• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 22137962296

18 Feb 2026 11:30AM UTC coverage: 43.626% (+0.06%) from 43.568%
22137962296

Pull #3044

github

web-flow
Merge f99ee3038 into cffa0ee63
Pull Request #3044: do not reset secrets of standby clusters

42 of 51 new or added lines in 4 files covered. (82.35%)

24 existing lines in 1 file now uncovered.

6598 of 15124 relevant lines covered (43.63%)

16.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.96
/pkg/controller/controller.go
1
package controller
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "fmt"
8
        "os"
9
        "strings"
10
        "sync"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
15
        "github.com/zalando/postgres-operator/pkg/apiserver"
16
        "github.com/zalando/postgres-operator/pkg/cluster"
17
        acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
18
        "github.com/zalando/postgres-operator/pkg/spec"
19
        "github.com/zalando/postgres-operator/pkg/teams"
20
        "github.com/zalando/postgres-operator/pkg/util"
21
        "github.com/zalando/postgres-operator/pkg/util/config"
22
        "github.com/zalando/postgres-operator/pkg/util/constants"
23
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
24
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
25
        v1 "k8s.io/api/core/v1"
26
        rbacv1 "k8s.io/api/rbac/v1"
27
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
        "k8s.io/apimachinery/pkg/types"
29
        "k8s.io/client-go/kubernetes/scheme"
30
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
31
        "k8s.io/client-go/tools/cache"
32
        "k8s.io/client-go/tools/record"
33
        "k8s.io/client-go/tools/reference"
34
)
35

36
// Controller represents operator controller
37
type Controller struct {
38
        config    spec.ControllerConfig
39
        opConfig  *config.Config
40
        pgTeamMap teams.PostgresTeamMap
41

42
        logger     *logrus.Entry
43
        KubeClient k8sutil.KubernetesClient
44
        apiserver  *apiserver.Server
45

46
        eventRecorder    record.EventRecorder
47
        eventBroadcaster record.EventBroadcaster
48

49
        stopCh chan struct{}
50

51
        controllerID     string
52
        curWorkerID      uint32 //initialized with 0
53
        curWorkerCluster sync.Map
54
        clusterWorkers   map[spec.NamespacedName]uint32
55
        clustersMu       sync.RWMutex
56
        clusters         map[spec.NamespacedName]*cluster.Cluster
57
        clusterLogs      map[spec.NamespacedName]ringlog.RingLogger
58
        clusterHistory   map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
59
        teamClusters     map[string][]spec.NamespacedName
60

61
        postgresqlInformer   cache.SharedIndexInformer
62
        postgresTeamInformer cache.SharedIndexInformer
63
        podInformer          cache.SharedIndexInformer
64
        nodesInformer        cache.SharedIndexInformer
65
        podCh                chan cluster.PodEvent
66

67
        clusterEventQueues    []*cache.FIFO // [workerID]Queue
68
        lastClusterSyncTime   int64
69
        lastClusterRepairTime int64
70

71
        workerLogs map[uint32]ringlog.RingLogger
72

73
        PodServiceAccount            *v1.ServiceAccount
74
        PodServiceAccountRoleBinding *rbacv1.RoleBinding
75
}
76

77
// NewController creates a new controller
78
func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller {
3✔
79
        logger := logrus.New()
3✔
80
        if controllerConfig.EnableJsonLogging {
3✔
81
                logger.SetFormatter(&logrus.JSONFormatter{})
×
82
        } else {
3✔
83
                if os.Getenv("LOG_NOQUOTE") != "" {
3✔
84
                        logger.SetFormatter(&logrus.TextFormatter{PadLevelText: true, DisableQuote: true})
×
85
                }
×
86
        }
87

88
        var myComponentName = "postgres-operator"
3✔
89
        if controllerId != "" {
6✔
90
                myComponentName += "/" + controllerId
3✔
91
        }
3✔
92

93
        eventBroadcaster := record.NewBroadcaster()
3✔
94

3✔
95
        // disabling the sending of events also to the logoutput
3✔
96
        // the operator currently duplicates a lot of log entries with this setup
3✔
97
        // eventBroadcaster.StartLogging(logger.Infof)
3✔
98
        scheme := scheme.Scheme
3✔
99
        acidv1.AddToScheme(scheme)
3✔
100
        recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: myComponentName})
3✔
101

3✔
102
        c := &Controller{
3✔
103
                config:           *controllerConfig,
3✔
104
                opConfig:         &config.Config{},
3✔
105
                logger:           logger.WithField("pkg", "controller"),
3✔
106
                eventRecorder:    recorder,
3✔
107
                eventBroadcaster: eventBroadcaster,
3✔
108
                controllerID:     controllerId,
3✔
109
                curWorkerCluster: sync.Map{},
3✔
110
                clusterWorkers:   make(map[spec.NamespacedName]uint32),
3✔
111
                clusters:         make(map[spec.NamespacedName]*cluster.Cluster),
3✔
112
                clusterLogs:      make(map[spec.NamespacedName]ringlog.RingLogger),
3✔
113
                clusterHistory:   make(map[spec.NamespacedName]ringlog.RingLogger),
3✔
114
                teamClusters:     make(map[string][]spec.NamespacedName),
3✔
115
                stopCh:           make(chan struct{}),
3✔
116
                podCh:            make(chan cluster.PodEvent),
3✔
117
        }
3✔
118
        logger.Hooks.Add(c)
3✔
119

3✔
120
        return c
3✔
121
}
122

123
func (c *Controller) initClients() {
×
124
        var err error
×
125

×
126
        c.KubeClient, err = k8sutil.NewFromConfig(c.config.RestConfig)
×
127
        if err != nil {
×
128
                c.logger.Fatalf("could not create kubernetes clients: %v", err)
×
129
        }
×
130
        c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")})
×
131
        if err != nil {
×
132
                c.logger.Fatalf("could not setup kubernetes event sink: %v", err)
×
133
        }
×
134

135
}
136

137
func (c *Controller) initOperatorConfig() {
×
138
        configMapData := make(map[string]string)
×
139

×
140
        if c.config.ConfigMapName != (spec.NamespacedName{}) {
×
141
                configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace).
×
142
                        Get(context.TODO(), c.config.ConfigMapName.Name, metav1.GetOptions{})
×
143
                if err != nil {
×
144
                        panic(err)
×
145
                }
146

147
                configMapData = configMap.Data
×
148
        } else {
×
149
                c.logger.Infoln("no ConfigMap specified. Loading default values")
×
150
        }
×
151

152
        c.opConfig = config.NewFromMap(configMapData)
×
153
        c.warnOnDeprecatedOperatorParameters()
×
154

×
155
        if c.opConfig.SetMemoryRequestToLimit {
×
156

×
157
                isSmaller, err := util.IsSmallerQuantity(c.opConfig.DefaultMemoryRequest, c.opConfig.DefaultMemoryLimit)
×
158
                if err != nil {
×
159
                        panic(err)
×
160
                }
161
                if isSmaller {
×
162
                        c.logger.Warningf("The default memory request of %v for Postgres containers is increased to match the default memory limit of %v.", c.opConfig.DefaultMemoryRequest, c.opConfig.DefaultMemoryLimit)
×
163
                        c.opConfig.DefaultMemoryRequest = c.opConfig.DefaultMemoryLimit
×
164
                }
×
165

166
                isSmaller, err = util.IsSmallerQuantity(c.opConfig.ScalyrMemoryRequest, c.opConfig.ScalyrMemoryLimit)
×
167
                if err != nil {
×
168
                        panic(err)
×
169
                }
170
                if isSmaller {
×
171
                        c.logger.Warningf("The memory request of %v for the Scalyr sidecar container is increased to match the memory limit of %v.", c.opConfig.ScalyrMemoryRequest, c.opConfig.ScalyrMemoryLimit)
×
172
                        c.opConfig.ScalyrMemoryRequest = c.opConfig.ScalyrMemoryLimit
×
173
                }
×
174

175
                // generateStatefulSet adjusts values for individual Postgres clusters
176
        }
177

178
}
179

180
func (c *Controller) modifyConfigFromEnvironment() {
×
181
        c.opConfig.WatchedNamespace = c.getEffectiveNamespace(os.Getenv("WATCHED_NAMESPACE"), c.opConfig.WatchedNamespace)
×
182

×
183
        if c.config.NoDatabaseAccess {
×
184
                c.opConfig.EnableDBAccess = false
×
185
        }
×
186
        if c.config.NoTeamsAPI {
×
187
                c.opConfig.EnableTeamsAPI = false
×
188
        }
×
189
        scalyrAPIKey := os.Getenv("SCALYR_API_KEY")
×
190
        if scalyrAPIKey != "" {
×
191
                c.opConfig.ScalyrAPIKey = scalyrAPIKey
×
192
        }
×
193
}
194

195
// warningOnDeprecatedParameters emits warnings upon finding deprecated parmaters
196
func (c *Controller) warnOnDeprecatedOperatorParameters() {
×
197
        if c.opConfig.EnableLoadBalancer != nil {
×
198
                c.logger.Warningf("Operator configuration parameter 'enable_load_balancer' is deprecated and takes no effect. " +
×
199
                        "Consider using the 'enable_master_load_balancer' or 'enable_replica_load_balancer' instead.")
×
200
        }
×
201

202
        if len(c.opConfig.SidecarImages) > 0 {
×
203
                c.logger.Warningf("Operator configuration parameter 'sidecar_docker_images' is deprecated. " +
×
204
                        "Consider using 'sidecars' instead.")
×
205
        }
×
206
}
207

208
func compactValue(v string) string {
×
209
        var compact bytes.Buffer
×
210
        if err := json.Compact(&compact, []byte(v)); err != nil {
×
211
                panic("Hard coded json strings broken!")
×
212
        }
213
        return compact.String()
×
214
}
215

216
func (c *Controller) initPodServiceAccount() {
×
217

×
218
        if c.opConfig.PodServiceAccountDefinition == "" {
×
219
                stringValue := `
×
220
                {
×
221
                        "apiVersion": "v1",
×
222
                        "kind": "ServiceAccount",
×
223
                        "metadata": {
×
224
                                "name": "postgres-pod"
×
225
                        }
×
226
                }`
×
227

×
228
                c.opConfig.PodServiceAccountDefinition = compactValue(stringValue)
×
229

×
230
        }
×
231

232
        // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
233
        decode := scheme.Codecs.UniversalDeserializer().Decode
×
234
        obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountDefinition), nil, nil)
×
235

×
236
        switch {
×
237
        case err != nil:
×
238
                panic(fmt.Errorf("Unable to parse pod service account definition from the operator configuration: %v", err))
×
239
        case groupVersionKind.Kind != "ServiceAccount":
×
240
                panic(fmt.Errorf("pod service account definition in the operator configuration defines another type of resource: %v", groupVersionKind.Kind))
×
241
        default:
×
242
                c.PodServiceAccount = obj.(*v1.ServiceAccount)
×
243
                if c.PodServiceAccount.Name != c.opConfig.PodServiceAccountName {
×
244
                        c.logger.Warnf("in the operator configuration, the pod service account name %v does not match the name %v given in the account definition; using the former for consistency", c.opConfig.PodServiceAccountName, c.PodServiceAccount.Name)
×
245
                        c.PodServiceAccount.Name = c.opConfig.PodServiceAccountName
×
246
                }
×
247
                c.PodServiceAccount.Namespace = ""
×
248
        }
249

250
        // actual service accounts are deployed at the time of Postgres/Spilo cluster creation
251
}
252

253
func (c *Controller) initRoleBinding() {
×
254

×
255
        // service account on its own lacks any rights starting with k8s v1.8
×
256
        // operator binds it to the cluster role with sufficient privileges
×
257
        // we assume the role is created by the k8s administrator
×
258
        if c.opConfig.PodServiceAccountRoleBindingDefinition == "" {
×
259
                stringValue := fmt.Sprintf(`
×
260
                {
×
261
                        "apiVersion": "rbac.authorization.k8s.io/v1",
×
262
                        "kind": "RoleBinding",
×
263
                        "metadata": {
×
264
                                   "name": "%s"
×
265
                        },
×
266
                        "roleRef": {
×
267
                                "apiGroup": "rbac.authorization.k8s.io",
×
268
                                "kind": "ClusterRole",
×
269
                                "name": "%s"
×
270
                        },
×
271
                        "subjects": [
×
272
                                {
×
273
                                        "kind": "ServiceAccount",
×
274
                                        "name": "%s"
×
275
                                }
×
276
                        ]
×
277
                }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
×
278
                c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
×
279
        }
×
280
        c.logger.Info("Parse role bindings")
×
281
        // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
×
282
        decode := scheme.Codecs.UniversalDeserializer().Decode
×
283
        obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)
×
284

×
285
        switch {
×
286
        case err != nil:
×
287
                panic(fmt.Errorf("unable to parse the role binding definition from the operator configuration: %v", err))
×
288
        case groupVersionKind.Kind != "RoleBinding":
×
289
                panic(fmt.Errorf("role binding definition in the operator configuration defines another type of resource: %v", groupVersionKind.Kind))
×
290
        default:
×
291
                c.PodServiceAccountRoleBinding = obj.(*rbacv1.RoleBinding)
×
292
                c.PodServiceAccountRoleBinding.Namespace = ""
×
293
                c.logger.Info("successfully parsed")
×
294

295
        }
296

297
        // actual roles bindings ar*logrus.Entrye deployed at the time of Postgres/Spilo cluster creation
298
}
299

300
func logMultiLineConfig(log *logrus.Entry, config string) {
×
301
        lines := strings.Split(config, "\n")
×
302
        for _, l := range lines {
×
303
                log.Infof("%s", l)
×
304
        }
×
305
}
306

307
func (c *Controller) initController() {
×
308
        c.initClients()
×
309
        c.controllerID = os.Getenv("CONTROLLER_ID")
×
310

×
311
        if configObjectName := os.Getenv("POSTGRES_OPERATOR_CONFIGURATION_OBJECT"); configObjectName != "" {
×
312
                if c.opConfig.EnableCRDRegistration != nil && *c.opConfig.EnableCRDRegistration {
×
313
                        if err := c.createConfigurationCRD(); err != nil {
×
314
                                c.logger.Fatalf("could not register Operator Configuration CustomResourceDefinition: %v", err)
×
315
                        }
×
316
                }
317
                if cfg, err := c.readOperatorConfigurationFromCRD(spec.GetOperatorNamespace(), configObjectName); err != nil {
×
318
                        c.logger.Fatalf("unable to read operator configuration: %v", err)
×
319
                } else {
×
320
                        c.opConfig = c.importConfigurationFromCRD(&cfg.Configuration)
×
321
                }
×
322
        } else {
×
323
                c.initOperatorConfig()
×
324
        }
×
325
        c.initPodServiceAccount()
×
326
        c.initRoleBinding()
×
327

×
328
        c.modifyConfigFromEnvironment()
×
329

×
330
        if c.opConfig.EnableCRDRegistration != nil && *c.opConfig.EnableCRDRegistration {
×
331
                if err := c.createPostgresCRD(); err != nil {
×
332
                        c.logger.Fatalf("could not register Postgres CustomResourceDefinition: %v", err)
×
333
                }
×
334
        }
335

336
        c.initSharedInformers()
×
337

×
338
        c.pgTeamMap = teams.PostgresTeamMap{}
×
339
        if c.opConfig.EnablePostgresTeamCRD {
×
340
                c.loadPostgresTeams()
×
341
        }
×
342

343
        if c.opConfig.DebugLogging {
×
344
                c.logger.Logger.Level = logrus.DebugLevel
×
345
        }
×
346

347
        logMultiLineConfig(c.logger, c.opConfig.MustMarshal())
×
348

×
349
        roleDefs := c.getInfrastructureRoleDefinitions()
×
350
        infraRoles, err := c.getInfrastructureRoles(roleDefs)
×
351
        if err != nil {
×
352
                c.logger.Warningf("could not get all infrastructure roles: %v", err)
×
353
        }
×
354
        if len(infraRoles) > 0 {
×
UNCOV
355
                c.config.InfrastructureRoles = infraRoles
×
356
        }
×
357

358
        c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
×
359
        c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers)
×
360
        for i := range c.clusterEventQueues {
×
361
                c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
×
362
                        e, ok := obj.(ClusterEvent)
×
363
                        if !ok {
×
UNCOV
364
                                return "", fmt.Errorf("could not cast to ClusterEvent")
×
365
                        }
×
366

UNCOV
367
                        return queueClusterKey(e.EventType, e.UID), nil
×
368
                })
369
        }
370

UNCOV
371
        c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger)
×
372
}
373

374
func (c *Controller) initSharedInformers() {
×
375

×
376
        // Postgresqls
×
377
        c.postgresqlInformer = acidv1informer.NewPostgresqlInformer(
×
378
                c.KubeClient.AcidV1ClientSet,
×
379
                c.opConfig.WatchedNamespace,
×
380
                constants.QueueResyncPeriodTPR,
×
381
                cache.Indexers{})
×
382

×
383
        c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
384
                AddFunc:    c.postgresqlAdd,
×
385
                UpdateFunc: c.postgresqlUpdate,
×
386
                DeleteFunc: c.postgresqlDelete,
×
387
        })
×
388

×
389
        // PostgresTeams
×
390
        if c.opConfig.EnablePostgresTeamCRD {
×
391
                c.postgresTeamInformer = acidv1informer.NewPostgresTeamInformer(
×
392
                        c.KubeClient.AcidV1ClientSet,
×
393
                        c.opConfig.WatchedNamespace,
×
394
                        constants.QueueResyncPeriodTPR*6, // 30 min
×
395
                        cache.Indexers{})
×
396

×
397
                c.postgresTeamInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
398
                        AddFunc:    c.postgresTeamAdd,
×
399
                        UpdateFunc: c.postgresTeamUpdate,
×
UNCOV
400
                })
×
UNCOV
401
        }
×
402

403
        // Pods
404
        podLw := &cache.ListWatch{
×
405
                ListFunc:  c.podListFunc,
×
406
                WatchFunc: c.podWatchFunc,
×
407
        }
×
408

×
409
        c.podInformer = cache.NewSharedIndexInformer(
×
410
                podLw,
×
411
                &v1.Pod{},
×
412
                constants.QueueResyncPeriodPod,
×
413
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
×
414

×
415
        c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
416
                AddFunc:    c.podAdd,
×
417
                UpdateFunc: c.podUpdate,
×
418
                DeleteFunc: c.podDelete,
×
419
        })
×
420

×
421
        // Kubernetes Nodes
×
422
        nodeLw := &cache.ListWatch{
×
423
                ListFunc:  c.nodeListFunc,
×
424
                WatchFunc: c.nodeWatchFunc,
×
425
        }
×
426

×
427
        c.nodesInformer = cache.NewSharedIndexInformer(
×
428
                nodeLw,
×
429
                &v1.Node{},
×
430
                constants.QueueResyncPeriodNode,
×
431
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
×
432

×
433
        c.nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
434
                AddFunc:    c.nodeAdd,
×
435
                UpdateFunc: c.nodeUpdate,
×
UNCOV
436
                DeleteFunc: c.nodeDelete,
×
UNCOV
437
        })
×
438
}
439

440
// Run starts background controller processes
441
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
442
        c.initController()
×
443

×
444
        // start workers reading from the events queue to prevent the initial sync from blocking on it.
×
445
        for i := range c.clusterEventQueues {
×
446
                wg.Add(1)
×
447
                c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
×
UNCOV
448
                go c.processClusterEventsQueue(i, stopCh, wg)
×
UNCOV
449
        }
×
450

451
        // populate clusters before starting nodeInformer that relies on it and run the initial sync
UNCOV
452
        if err := c.acquireInitialListOfClusters(); err != nil {
×
UNCOV
453
                panic("could not acquire initial list of clusters")
×
454
        }
455

456
        wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD))
×
457
        go c.runPodInformer(stopCh, wg)
×
458
        go c.runPostgresqlInformer(stopCh, wg)
×
459
        go c.clusterResync(stopCh, wg)
×
460
        go c.apiserver.Run(stopCh, wg)
×
461
        go c.kubeNodesInformer(stopCh, wg)
×
462

×
463
        if c.opConfig.EnablePostgresTeamCRD {
×
UNCOV
464
                go c.runPostgresTeamInformer(stopCh, wg)
×
465
        }
×
466

UNCOV
467
        c.logger.Info("started working in background")
×
468
}
469

470
func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
471
        defer wg.Done()
×
472

×
UNCOV
473
        c.podInformer.Run(stopCh)
×
474
}
×
475

476
func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
477
        defer wg.Done()
×
478

×
UNCOV
479
        c.postgresqlInformer.Run(stopCh)
×
480
}
×
481

482
func (c *Controller) runPostgresTeamInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
483
        defer wg.Done()
×
484

×
UNCOV
485
        c.postgresTeamInformer.Run(stopCh)
×
486
}
×
487

488
func queueClusterKey(eventType EventType, uid types.UID) string {
×
UNCOV
489
        return fmt.Sprintf("%s-%s", eventType, uid)
×
490
}
×
491

492
func (c *Controller) kubeNodesInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
493
        defer wg.Done()
×
494

×
UNCOV
495
        c.nodesInformer.Run(stopCh)
×
496
}
×
497

498
func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFromConfigMap string) string {
×
499

×
500
        namespace := util.Coalesce(namespaceFromEnvironment, util.Coalesce(namespaceFromConfigMap, spec.GetOperatorNamespace()))
×
501

×
502
        if namespace == "*" {
×
503

×
504
                namespace = v1.NamespaceAll
×
505
                c.logger.Infof("Listening to all namespaces")
×
506

×
507
        } else {
×
508

×
509
                if _, err := c.KubeClient.Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}); err != nil {
×
510
                        c.logger.Fatalf("Could not find the watched namespace %q", namespace)
×
511
                } else {
×
UNCOV
512
                        c.logger.Infof("Listenting to the specific namespace %q", namespace)
×
UNCOV
513
                }
×
514

515
        }
516

UNCOV
517
        return namespace
×
518
}
519

520
// GetReference of Postgres CR object
521
// i.e. required to emit events to this resource
522
func (c *Controller) GetReference(postgresql *acidv1.Postgresql) *v1.ObjectReference {
×
523
        ref, err := reference.GetReference(scheme.Scheme, postgresql)
×
524
        if err != nil {
×
525
                c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", postgresql.Namespace, postgresql.Name, err)
×
UNCOV
526
        }
×
UNCOV
527
        return ref
×
528
}
529

530
func (c *Controller) meetsClusterDeleteAnnotations(postgresql *acidv1.Postgresql) error {
5✔
531

5✔
532
        deleteAnnotationDateKey := c.opConfig.DeleteAnnotationDateKey
5✔
533
        currentTime := time.Now()
5✔
534
        currentDate := currentTime.Format("2006-01-02") // go's reference date
5✔
535

5✔
536
        if deleteAnnotationDateKey != "" {
10✔
537
                if deleteDate, ok := postgresql.Annotations[deleteAnnotationDateKey]; ok {
9✔
538
                        if deleteDate != currentDate {
5✔
539
                                return fmt.Errorf("annotation %s not matching the current date: got %s, expected %s", deleteAnnotationDateKey, deleteDate, currentDate)
1✔
540
                        }
1✔
541
                } else {
1✔
542
                        return fmt.Errorf("annotation %s not set in manifest to allow cluster deletion", deleteAnnotationDateKey)
1✔
543
                }
1✔
544
        }
545

546
        deleteAnnotationNameKey := c.opConfig.DeleteAnnotationNameKey
3✔
547

3✔
548
        if deleteAnnotationNameKey != "" {
6✔
549
                if clusterName, ok := postgresql.Annotations[deleteAnnotationNameKey]; ok {
5✔
550
                        if clusterName != postgresql.Name {
3✔
551
                                return fmt.Errorf("annotation %s not matching the cluster name: got %s, expected %s", deleteAnnotationNameKey, clusterName, postgresql.Name)
1✔
552
                        }
1✔
553
                } else {
1✔
554
                        return fmt.Errorf("annotation %s not set in manifest to allow cluster deletion", deleteAnnotationNameKey)
1✔
555
                }
1✔
556
        }
557

558
        return nil
1✔
559
}
560

561
// hasOwnership returns true if the controller is the "owner" of the postgresql.
562
// Whether it's owner is determined by the value of 'acid.zalan.do/controller'
563
// annotation. If the value matches the controllerID then it owns it, or if the
564
// controllerID is "" and there's no annotation set.
565
func (c *Controller) hasOwnership(postgresql *acidv1.Postgresql) bool {
3✔
566
        if postgresql.Annotations != nil {
5✔
567
                if owner, ok := postgresql.Annotations[constants.PostgresqlControllerAnnotationKey]; ok {
4✔
568
                        return owner == c.controllerID
2✔
569
                }
2✔
570
        }
571
        return c.controllerID == ""
1✔
572
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc