• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 20856627721

09 Jan 2026 03:23PM UTC coverage: 43.594%. First build
20856627721

Pull #3019

github

web-flow
Merge 3c1688b8c into 55cc167fc
Pull Request #3019: Check cluster UID

0 of 6 new or added lines in 1 file covered. (0.0%)

6543 of 15009 relevant lines covered (43.59%)

16.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.99
/pkg/controller/postgresql.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14

15
        v1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19

20
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
21
        "github.com/zalando/postgres-operator/pkg/cluster"
22
        "github.com/zalando/postgres-operator/pkg/spec"
23
        "github.com/zalando/postgres-operator/pkg/util"
24
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
25
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
26
)
27

28
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
29
        defer wg.Done()
×
30
        ticker := time.NewTicker(c.opConfig.ResyncPeriod)
×
31

×
32
        for {
×
33
                select {
×
34
                case <-ticker.C:
×
35
                        if err := c.clusterListAndSync(); err != nil {
×
36
                                c.logger.Errorf("could not list clusters: %v", err)
×
37
                        }
×
38
                case <-stopCh:
×
39
                        return
×
40
                }
41
        }
42
}
43

44
// clusterListFunc obtains a list of all PostgreSQL clusters
45
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
×
46
        var pgList acidv1.PostgresqlList
×
47

×
48
        // TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
×
49
        list, err := c.KubeClient.PostgresqlsGetter.Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
×
50
        if err != nil {
×
51
                c.logger.Errorf("could not list postgresql objects: %v", err)
×
52
        }
×
53
        if c.controllerID != "" {
×
54
                c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
×
55
        }
×
56
        for _, pg := range list.Items {
×
57
                if pg.Error == "" && c.hasOwnership(&pg) {
×
58
                        pgList.Items = append(pgList.Items, pg)
×
59
                }
×
60
        }
61

62
        return &pgList, err
×
63
}
64

65
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
66
func (c *Controller) clusterListAndSync() error {
×
67
        var (
×
68
                err   error
×
69
                event EventType
×
70
        )
×
71

×
72
        currentTime := time.Now().Unix()
×
73
        timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
×
74
        timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
×
75

×
76
        if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
×
77
                event = EventSync
×
78
        } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
×
79
                event = EventRepair
×
80
        }
×
81
        if event != "" {
×
82
                var list *acidv1.PostgresqlList
×
83
                if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
84
                        return err
×
85
                }
×
86
                c.queueEvents(list, event)
×
87
        } else {
×
88
                c.logger.Infof("not enough time passed since the last sync (%v seconds) or repair (%v seconds)",
×
89
                        timeFromPreviousSync, timeFromPreviousRepair)
×
90
        }
×
91
        return nil
×
92
}
93

94
// queueEvents queues a sync or repair event for every cluster with a valid manifest
95
func (c *Controller) queueEvents(list *acidv1.PostgresqlList, event EventType) {
×
96
        var activeClustersCnt, failedClustersCnt, clustersToRepair int
×
97
        for i, pg := range list.Items {
×
98
                // XXX: check the cluster status field instead
×
99
                if pg.Error != "" {
×
100
                        failedClustersCnt++
×
101
                        continue
×
102
                }
103
                activeClustersCnt++
×
104
                // check if that cluster needs repair
×
105
                if event == EventRepair {
×
106
                        if pg.Status.Success() {
×
107
                                continue
×
108
                        } else {
×
109
                                clustersToRepair++
×
110
                        }
×
111
                }
112
                c.queueClusterEvent(nil, &list.Items[i], event)
×
113
        }
114
        if len(list.Items) > 0 {
×
115
                if failedClustersCnt > 0 && activeClustersCnt == 0 {
×
116
                        c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt)
×
117
                } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
×
118
                        c.logger.Infof("there are %d clusters running", activeClustersCnt)
×
119
                } else {
×
120
                        c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
×
121
                }
×
122
                if clustersToRepair > 0 {
×
123
                        c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair)
×
124
                }
×
125
        } else {
×
126
                c.logger.Infof("no clusters running")
×
127
        }
×
128
        if event == EventRepair || event == EventSync {
×
129
                atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix())
×
130
                if event == EventSync {
×
131
                        atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
×
132
                }
×
133
        }
134
}
135

136
func (c *Controller) acquireInitialListOfClusters() error {
×
137
        var (
×
138
                list        *acidv1.PostgresqlList
×
139
                err         error
×
140
                clusterName spec.NamespacedName
×
141
        )
×
142

×
143
        if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
144
                return err
×
145
        }
×
146
        c.logger.Debug("acquiring initial list of clusters")
×
147
        for _, pg := range list.Items {
×
148
                // XXX: check the cluster status field instead
×
149
                if pg.Error != "" {
×
150
                        continue
×
151
                }
152
                clusterName = util.NameFromMeta(pg.ObjectMeta)
×
153
                c.addCluster(c.logger, clusterName, &pg)
×
154
                c.logger.Debugf("added new cluster: %q", clusterName)
×
155
        }
156
        // initiate initial sync of all clusters.
157
        c.queueEvents(list, EventSync)
×
158
        return nil
×
159
}
160

161
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
×
162
        if c.opConfig.EnableTeamIdClusternamePrefix {
×
163
                if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
×
164
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
×
165
                        return nil, err
×
166
                }
×
167
        }
168

169
        cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
×
170
        cl.Run(c.stopCh)
×
171
        teamName := strings.ToLower(cl.Spec.TeamID)
×
172

×
173
        defer c.clustersMu.Unlock()
×
174
        c.clustersMu.Lock()
×
175

×
176
        c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
×
177
        c.clusters[clusterName] = cl
×
178
        c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
×
179
        c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
×
180

×
181
        return cl, nil
×
182
}
183

184
func (c *Controller) processEvent(event ClusterEvent) {
×
185
        var clusterName spec.NamespacedName
×
186
        var clHistory ringlog.RingLogger
×
NEW
187
        var clusterUID types.UID
×
188
        var err error
×
189

×
190
        lg := c.logger.WithField("worker", event.WorkerID)
×
191

×
192
        if event.EventType == EventAdd || event.EventType == EventSync || event.EventType == EventRepair {
×
193
                clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
×
NEW
194
                clusterUID = event.NewSpec.UID
×
195
        } else {
×
196
                clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
×
NEW
197
                clusterUID = event.OldSpec.UID
×
198
        }
×
199
        lg = lg.WithField("cluster-name", clusterName)
×
200

×
201
        c.clustersMu.RLock()
×
202
        cl, clusterFound := c.clusters[clusterName]
×
NEW
203
        if clusterFound && cl.UID != clusterUID {
×
NEW
204
                clusterFound = false
×
NEW
205
        }
×
206
        if clusterFound {
×
207
                clHistory = c.clusterHistory[clusterName]
×
208
        }
×
209
        c.clustersMu.RUnlock()
×
210

×
211
        defer c.curWorkerCluster.Store(event.WorkerID, nil)
×
212

×
213
        if event.EventType == EventRepair {
×
214
                runRepair, lastOperationStatus := cl.NeedsRepair()
×
215
                if !runRepair {
×
216
                        lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
×
217
                        return
×
218
                }
×
219
                lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
×
220
                event.EventType = EventSync
×
221
        }
222

223
        if event.EventType == EventAdd || event.EventType == EventUpdate || event.EventType == EventSync {
×
224
                // handle deprecated parameters by possibly assigning their values to the new ones.
×
225
                if event.OldSpec != nil {
×
226
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.OldSpec.Spec)
×
227
                }
×
228
                if event.NewSpec != nil {
×
229
                        c.warnOnDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
230
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
231
                }
×
232

233
                if err = c.submitRBACCredentials(event); err != nil {
×
234
                        c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
×
235
                }
×
236

237
        }
238

239
        switch event.EventType {
×
240
        case EventAdd:
×
241
                if clusterFound {
×
242
                        lg.Infof("received add event for already existing Postgres cluster")
×
243
                        return
×
244
                }
×
245

246
                lg.Infof("creating a new Postgres cluster")
×
247

×
248
                cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
249
                if err != nil {
×
250
                        lg.Errorf("creation of cluster is blocked: %v", err)
×
251
                        return
×
252
                }
×
253

254
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
255

×
256
                err = cl.Create()
×
257
                if err != nil {
×
258
                        cl.Status = acidv1.PostgresStatus{PostgresClusterStatus: acidv1.ClusterStatusInvalid}
×
259
                        cl.Error = fmt.Sprintf("could not create cluster: %v", err)
×
260
                        lg.Error(cl.Error)
×
261
                        c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
×
262
                        return
×
263
                }
×
264

265
                lg.Infoln("cluster has been created")
×
266
        case EventUpdate:
×
267
                lg.Infoln("update of the cluster started")
×
268

×
269
                if !clusterFound {
×
270
                        lg.Warningln("cluster does not exist")
×
271
                        return
×
272
                }
×
273
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
274
                err = cl.Update(event.OldSpec, event.NewSpec)
×
275
                if err != nil {
×
276
                        cl.Error = fmt.Sprintf("could not update cluster: %v", err)
×
277
                        lg.Error(cl.Error)
×
278

×
279
                        return
×
280
                }
×
281
                cl.Error = ""
×
282
                lg.Infoln("cluster has been updated")
×
283

×
284
                clHistory.Insert(&spec.Diff{
×
285
                        EventTime:   event.EventTime,
×
286
                        ProcessTime: time.Now(),
×
287
                        Diff:        util.Diff(event.OldSpec, event.NewSpec),
×
288
                })
×
289
        case EventDelete:
×
290
                if !clusterFound {
×
291
                        lg.Errorf("unknown cluster: %q", clusterName)
×
292
                        return
×
293
                }
×
294

295
                teamName := strings.ToLower(cl.Spec.TeamID)
×
296
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
297

×
298
                // when using finalizers the deletion already happened
×
299
                if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
×
300
                        lg.Infoln("deletion of the cluster started")
×
301
                        if err := cl.Delete(); err != nil {
×
302
                                cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
×
303
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
304
                        }
×
305
                }
306

307
                func() {
×
308
                        defer c.clustersMu.Unlock()
×
309
                        c.clustersMu.Lock()
×
310

×
311
                        delete(c.clusters, clusterName)
×
312
                        delete(c.clusterLogs, clusterName)
×
313
                        delete(c.clusterHistory, clusterName)
×
314
                        for i, val := range c.teamClusters[teamName] {
×
315
                                if val == clusterName {
×
316
                                        copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
×
317
                                        c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
×
318
                                        c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
×
319
                                        break
×
320
                                }
321
                        }
322
                }()
323

324
                lg.Infof("cluster has been deleted")
×
325
        case EventSync:
×
326
                lg.Infof("syncing of the cluster started")
×
327

×
328
                // no race condition because a cluster is always processed by single worker
×
329
                if !clusterFound {
×
330
                        cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
331
                        if err != nil {
×
332
                                lg.Errorf("syncing of cluster is blocked: %v", err)
×
333
                                return
×
334
                        }
×
335
                }
336

337
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
338

×
339
                // has this cluster been marked as deleted already, then we shall start cleaning up
×
340
                if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
×
341
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
342
                        if err = cl.Delete(); err != nil {
×
343
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
344
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
345
                                lg.Error(cl.Error)
×
346
                                return
×
347
                        }
×
348
                } else {
×
349
                        if err = cl.Sync(event.NewSpec); err != nil {
×
350
                                cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
×
351
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
×
352
                                lg.Error(cl.Error)
×
353
                                return
×
354
                        }
×
355
                        lg.Infof("cluster has been synced")
×
356
                }
357
                cl.Error = ""
×
358
        }
359
}
360

361
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
362
        defer wg.Done()
×
363

×
364
        go func() {
×
365
                <-stopCh
×
366
                c.clusterEventQueues[idx].Close()
×
367
        }()
×
368

369
        for {
×
370
                obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}, bool) error { return nil }))
×
371
                if err != nil {
×
372
                        if err == cache.ErrFIFOClosed {
×
373
                                return
×
374
                        }
×
375
                        c.logger.Errorf("error when processing cluster events queue: %v", err)
×
376
                        continue
×
377
                }
378
                event, ok := obj.(ClusterEvent)
×
379
                if !ok {
×
380
                        c.logger.Errorf("could not cast to ClusterEvent")
×
381
                }
×
382

383
                c.processEvent(event)
×
384
        }
385
}
386

387
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
×
388

×
389
        deprecate := func(deprecated, replacement string) {
×
390
                c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
×
391
        }
×
392

393
        if spec.UseLoadBalancer != nil {
×
394
                deprecate("useLoadBalancer", "enableMasterLoadBalancer")
×
395
        }
×
396
        if spec.ReplicaLoadBalancer != nil {
×
397
                deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
×
398
        }
×
399

400
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
×
401
                (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
×
402
                c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
×
403
        }
×
404
}
405

406
// mergeDeprecatedPostgreSQLSpecParameters modifies the spec passed to the cluster by setting current parameter
407
// values from the obsolete ones. Note: while the spec that is modified is a copy made in queueClusterEvent, it is
408
// still a shallow copy, so be extra careful not to modify values pointer fields point to, but copy them instead.
409
func (c *Controller) mergeDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) *acidv1.PostgresSpec {
2✔
410
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
2✔
411
                (spec.EnableReplicaLoadBalancer == nil && spec.EnableMasterLoadBalancer == nil) {
3✔
412
                if spec.UseLoadBalancer != nil {
2✔
413
                        spec.EnableMasterLoadBalancer = new(bool)
1✔
414
                        *spec.EnableMasterLoadBalancer = *spec.UseLoadBalancer
1✔
415
                }
1✔
416
                if spec.ReplicaLoadBalancer != nil {
2✔
417
                        spec.EnableReplicaLoadBalancer = new(bool)
1✔
418
                        *spec.EnableReplicaLoadBalancer = *spec.ReplicaLoadBalancer
1✔
419
                }
1✔
420
        }
421
        spec.ReplicaLoadBalancer = nil
2✔
422
        spec.UseLoadBalancer = nil
2✔
423

2✔
424
        return spec
2✔
425
}
426

427
func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.Postgresql, eventType EventType) {
×
428
        var (
×
429
                uid          types.UID
×
430
                clusterName  spec.NamespacedName
×
431
                clusterError string
×
432
        )
×
433

×
434
        if informerOldSpec != nil { //update, delete
×
435
                uid = informerOldSpec.GetUID()
×
436
                clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
×
437

×
438
                // user is fixing previously incorrect spec
×
439
                if eventType == EventUpdate && informerNewSpec.Error == "" && informerOldSpec.Error != "" {
×
440
                        eventType = EventSync
×
441
                }
×
442

443
                // set current error to be one of the new spec if present
444
                if informerNewSpec != nil {
×
445
                        clusterError = informerNewSpec.Error
×
446
                } else {
×
447
                        clusterError = informerOldSpec.Error
×
448
                }
×
449
        } else { //add, sync
×
450
                uid = informerNewSpec.GetUID()
×
451
                clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
×
452
                clusterError = informerNewSpec.Error
×
453
        }
×
454

455
        if eventType == EventDelete {
×
456
                // when owner references are used operator cannot block deletion
×
457
                if c.opConfig.EnableOwnerReferences == nil || !*c.opConfig.EnableOwnerReferences {
×
458
                        // only allow deletion if delete annotations are set and conditions are met
×
459
                        if err := c.meetsClusterDeleteAnnotations(informerOldSpec); err != nil {
×
460
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
461
                                        "ignoring %q event for cluster %q - manifest does not fulfill delete requirements: %s", eventType, clusterName, err)
×
462
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
463
                                        "please, recreate Postgresql resource %q and set annotations to delete properly", clusterName)
×
464
                                if currentManifest, marshalErr := json.Marshal(informerOldSpec); marshalErr != nil {
×
465
                                        c.logger.WithField("cluster-name", clusterName).Warnf("could not marshal current manifest:\n%+v", informerOldSpec)
×
466
                                } else {
×
467
                                        c.logger.WithField("cluster-name", clusterName).Warnf("%s\n", string(currentManifest))
×
468
                                }
×
469
                                return
×
470
                        }
471
                }
472
        }
473

474
        if clusterError != "" && eventType != EventDelete {
×
475
                c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
×
476

×
477
                switch eventType {
×
478
                case EventAdd:
×
479
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
×
480
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
×
481
                case EventUpdate:
×
482
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
×
483
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
×
484
                default:
×
485
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
×
486
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
×
487
                }
488

489
                return
×
490
        }
491

492
        // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
493
        // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
494
        // effect, the modified state will be returned together with subsequent events).
495

496
        workerID := c.clusterWorkerID(clusterName)
×
497
        clusterEvent := ClusterEvent{
×
498
                EventTime: time.Now(),
×
499
                EventType: eventType,
×
500
                UID:       uid,
×
501
                OldSpec:   informerOldSpec.Clone(),
×
502
                NewSpec:   informerNewSpec.Clone(),
×
503
                WorkerID:  workerID,
×
504
        }
×
505

×
506
        lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
×
507
        if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
×
508
                lg.Errorf("error while queueing cluster event: %v", clusterEvent)
×
509
        }
×
510
        lg.Infof("%s event has been queued", eventType)
×
511

×
512
        if eventType != EventDelete {
×
513
                return
×
514
        }
×
515
        // A delete event discards all prior requests for that cluster.
516
        for _, evType := range []EventType{EventAdd, EventSync, EventUpdate, EventRepair} {
×
517
                obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
×
518
                if err != nil {
×
519
                        lg.Warningf("could not get event from the queue: %v", err)
×
520
                        continue
×
521
                }
522

523
                if !exists {
×
524
                        continue
×
525
                }
526

527
                err = c.clusterEventQueues[workerID].Delete(obj)
×
528
                if err != nil {
×
529
                        lg.Warningf("could not delete event from the queue: %v", err)
×
530
                } else {
×
531
                        lg.Debugf("event %s has been discarded for the cluster", evType)
×
532
                }
×
533
        }
534
}
535

536
func (c *Controller) postgresqlAdd(obj interface{}) {
×
537
        pg := c.postgresqlCheck(obj)
×
538
        if pg != nil {
×
539
                // We will not get multiple Add events for the same cluster
×
540
                c.queueClusterEvent(nil, pg, EventAdd)
×
541
        }
×
542
}
543

544
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
×
545
        pgOld := c.postgresqlCheck(prev)
×
546
        pgNew := c.postgresqlCheck(cur)
×
547
        if pgOld != nil && pgNew != nil {
×
548
                // Avoid the inifinite recursion for status updates
×
549
                if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
×
550
                        if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
×
551
                                return
×
552
                        }
×
553
                }
554
                c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
555
        }
556
}
557

558
func (c *Controller) postgresqlDelete(obj interface{}) {
×
559
        pg := c.postgresqlCheck(obj)
×
560
        if pg != nil {
×
561
                c.queueClusterEvent(pg, nil, EventDelete)
×
562
        }
×
563
}
564

565
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
×
566
        pg, ok := obj.(*acidv1.Postgresql)
×
567
        if !ok {
×
568
                c.logger.Errorf("could not cast to postgresql spec")
×
569
                return nil
×
570
        }
×
571
        if !c.hasOwnership(pg) {
×
572
                return nil
×
573
        }
×
574
        return pg
×
575
}
576

577
/*
578
Ensures the pod service account and role bindings exists in a namespace
579
before a PG cluster is created there so that a user does not have to deploy
580
these credentials manually.  StatefulSets require the service account to
581
create pods; Patroni requires relevant RBAC bindings to access endpoints
582
or config maps.
583

584
The operator does not sync accounts/role bindings after creation.
585
*/
586
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
×
587

×
588
        namespace := event.NewSpec.GetNamespace()
×
589

×
590
        if err := c.createPodServiceAccount(namespace); err != nil {
×
591
                return fmt.Errorf("could not create pod service account %q : %v", c.opConfig.PodServiceAccountName, err)
×
592
        }
×
593

594
        if err := c.createRoleBindings(namespace); err != nil {
×
595
                return fmt.Errorf("could not create role binding %q : %v", c.PodServiceAccountRoleBinding.Name, err)
×
596
        }
×
597
        return nil
×
598
}
599

600
func (c *Controller) createPodServiceAccount(namespace string) error {
×
601

×
602
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
603
        _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
×
604
        if k8sutil.ResourceNotFound(err) {
×
605

×
606
                c.logger.Infof("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)
×
607

×
608
                // get a separate copy of service account
×
609
                // to prevent a race condition when setting a namespace for many clusters
×
610
                sa := *c.PodServiceAccount
×
611
                if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
×
612
                        return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
×
613
                }
×
614

615
                c.logger.Infof("successfully deployed the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
616
        } else if k8sutil.ResourceAlreadyExists(err) {
×
617
                return nil
×
618
        }
×
619

620
        return err
×
621
}
622

623
func (c *Controller) createRoleBindings(namespace string) error {
×
624

×
625
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
626
        podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
×
627

×
628
        _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
×
629
        if k8sutil.ResourceNotFound(err) {
×
630

×
631
                c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
×
632

×
633
                // get a separate copy of role binding
×
634
                // to prevent a race condition when setting a namespace for many clusters
×
635
                rb := *c.PodServiceAccountRoleBinding
×
636
                _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
×
637
                if err != nil {
×
638
                        return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
×
639
                }
×
640

641
                c.logger.Infof("successfully deployed the role binding for the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
642

643
        } else if k8sutil.ResourceAlreadyExists(err) {
×
644
                return nil
×
645
        }
×
646

647
        return err
×
648
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc