• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 20213032632

14 Dec 2025 07:30PM UTC coverage: 42.144% (-0.08%) from 42.227%
20213032632

Pull #3015

github

web-flow
Merge 9ca87d9db into a27727f8d
Pull Request #3015: Fix #3014: deletion timestamp handling for clusters with finalizers

8 of 45 new or added lines in 2 files covered. (17.78%)

15 existing lines in 2 files now uncovered.

6537 of 15511 relevant lines covered (42.14%)

15.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.92
/pkg/controller/postgresql.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14

15
        v1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19

20
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
21
        "github.com/zalando/postgres-operator/pkg/cluster"
22
        "github.com/zalando/postgres-operator/pkg/spec"
23
        "github.com/zalando/postgres-operator/pkg/util"
24
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
25
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
26
)
27

28
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
29
        defer wg.Done()
×
30
        ticker := time.NewTicker(c.opConfig.ResyncPeriod)
×
31

×
32
        for {
×
33
                select {
×
34
                case <-ticker.C:
×
35
                        if err := c.clusterListAndSync(); err != nil {
×
36
                                c.logger.Errorf("could not list clusters: %v", err)
×
37
                        }
×
38
                case <-stopCh:
×
39
                        return
×
40
                }
41
        }
42
}
43

44
// clusterListFunc obtains a list of all PostgreSQL clusters
45
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
×
46
        var pgList acidv1.PostgresqlList
×
47

×
48
        // TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
×
49
        list, err := c.KubeClient.PostgresqlsGetter.Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
×
50
        if err != nil {
×
51
                c.logger.Errorf("could not list postgresql objects: %v", err)
×
52
        }
×
53
        if c.controllerID != "" {
×
54
                c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
×
55
        }
×
56
        for _, pg := range list.Items {
×
57
                if pg.Error == "" && c.hasOwnership(&pg) {
×
58
                        pgList.Items = append(pgList.Items, pg)
×
59
                }
×
60
        }
61

62
        return &pgList, err
×
63
}
64

65
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
66
func (c *Controller) clusterListAndSync() error {
×
67
        var (
×
68
                err   error
×
69
                event EventType
×
70
        )
×
71

×
72
        currentTime := time.Now().Unix()
×
73
        timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
×
74
        timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
×
75

×
76
        if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
×
77
                event = EventSync
×
78
        } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
×
79
                event = EventRepair
×
80
        }
×
81
        if event != "" {
×
82
                var list *acidv1.PostgresqlList
×
83
                if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
84
                        return err
×
85
                }
×
86
                c.queueEvents(list, event)
×
87
        } else {
×
88
                c.logger.Infof("not enough time passed since the last sync (%v seconds) or repair (%v seconds)",
×
89
                        timeFromPreviousSync, timeFromPreviousRepair)
×
90
        }
×
91
        return nil
×
92
}
93

94
// queueEvents queues a sync or repair event for every cluster with a valid manifest
95
func (c *Controller) queueEvents(list *acidv1.PostgresqlList, event EventType) {
×
96
        var activeClustersCnt, failedClustersCnt, clustersToRepair int
×
97
        for i, pg := range list.Items {
×
98
                // XXX: check the cluster status field instead
×
99
                if pg.Error != "" {
×
100
                        failedClustersCnt++
×
101
                        continue
×
102
                }
103
                activeClustersCnt++
×
104
                // check if that cluster needs repair
×
105
                if event == EventRepair {
×
106
                        if pg.Status.Success() {
×
107
                                continue
×
108
                        } else {
×
109
                                clustersToRepair++
×
110
                        }
×
111
                }
112
                c.queueClusterEvent(nil, &list.Items[i], event)
×
113
        }
114
        if len(list.Items) > 0 {
×
115
                if failedClustersCnt > 0 && activeClustersCnt == 0 {
×
116
                        c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt)
×
117
                } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
×
118
                        c.logger.Infof("there are %d clusters running", activeClustersCnt)
×
119
                } else {
×
120
                        c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
×
121
                }
×
122
                if clustersToRepair > 0 {
×
123
                        c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair)
×
124
                }
×
125
        } else {
×
126
                c.logger.Infof("no clusters running")
×
127
        }
×
128
        if event == EventRepair || event == EventSync {
×
129
                atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix())
×
130
                if event == EventSync {
×
131
                        atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
×
132
                }
×
133
        }
134
}
135

136
func (c *Controller) acquireInitialListOfClusters() error {
×
137
        var (
×
138
                list        *acidv1.PostgresqlList
×
139
                err         error
×
140
                clusterName spec.NamespacedName
×
141
        )
×
142

×
143
        if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
144
                return err
×
145
        }
×
146
        c.logger.Debug("acquiring initial list of clusters")
×
147
        for _, pg := range list.Items {
×
148
                // XXX: check the cluster status field instead
×
149
                if pg.Error != "" {
×
150
                        continue
×
151
                }
152
                clusterName = util.NameFromMeta(pg.ObjectMeta)
×
153
                c.addCluster(c.logger, clusterName, &pg)
×
154
                c.logger.Debugf("added new cluster: %q", clusterName)
×
155
        }
156
        // initiate initial sync of all clusters.
157
        c.queueEvents(list, EventSync)
×
158
        return nil
×
159
}
160

161
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
×
162
        if c.opConfig.EnableTeamIdClusternamePrefix {
×
163
                if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
×
164
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
×
165
                        return nil, err
×
166
                }
×
167
        }
168

169
        cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
×
170
        cl.Run(c.stopCh)
×
171
        teamName := strings.ToLower(cl.Spec.TeamID)
×
172

×
173
        defer c.clustersMu.Unlock()
×
174
        c.clustersMu.Lock()
×
175

×
176
        c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
×
177
        c.clusters[clusterName] = cl
×
178
        c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
×
179
        c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
×
180

×
181
        return cl, nil
×
182
}
183

184
func (c *Controller) processEvent(event ClusterEvent) {
×
185
        var clusterName spec.NamespacedName
×
186
        var clHistory ringlog.RingLogger
×
187
        var err error
×
188

×
189
        lg := c.logger.WithField("worker", event.WorkerID)
×
190

×
191
        if event.EventType == EventAdd || event.EventType == EventSync || event.EventType == EventRepair {
×
192
                clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
×
193
        } else {
×
194
                clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
×
195
        }
×
196
        lg = lg.WithField("cluster-name", clusterName)
×
197

×
198
        c.clustersMu.RLock()
×
199
        cl, clusterFound := c.clusters[clusterName]
×
200
        if clusterFound {
×
201
                clHistory = c.clusterHistory[clusterName]
×
202
        }
×
203
        c.clustersMu.RUnlock()
×
204

×
205
        defer c.curWorkerCluster.Store(event.WorkerID, nil)
×
206

×
207
        if event.EventType == EventRepair {
×
208
                runRepair, lastOperationStatus := cl.NeedsRepair()
×
209
                if !runRepair {
×
210
                        lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
×
211
                        return
×
212
                }
×
213
                lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
×
214
                event.EventType = EventSync
×
215
        }
216

217
        if event.EventType == EventAdd || event.EventType == EventUpdate || event.EventType == EventSync {
×
218
                // handle deprecated parameters by possibly assigning their values to the new ones.
×
219
                if event.OldSpec != nil {
×
220
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.OldSpec.Spec)
×
221
                }
×
222
                if event.NewSpec != nil {
×
223
                        c.warnOnDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
224
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
225
                }
×
226

227
                if err = c.submitRBACCredentials(event); err != nil {
×
228
                        c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
×
229
                }
×
230

231
        }
232

233
        switch event.EventType {
×
234
        case EventAdd:
×
235
                if clusterFound {
×
236
                        lg.Infof("received add event for already existing Postgres cluster")
×
237
                        return
×
238
                }
×
239

240
                lg.Infof("creating a new Postgres cluster")
×
241

×
242
                cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
243
                if err != nil {
×
244
                        lg.Errorf("creation of cluster is blocked: %v", err)
×
245
                        return
×
246
                }
×
247

248
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
249

×
250
                err = cl.Create()
×
251
                if err != nil {
×
252
                        cl.Status = acidv1.PostgresStatus{PostgresClusterStatus: acidv1.ClusterStatusInvalid}
×
253
                        cl.Error = fmt.Sprintf("could not create cluster: %v", err)
×
254
                        lg.Error(cl.Error)
×
255
                        c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
×
256
                        return
×
257
                }
×
258

259
                lg.Infoln("cluster has been created")
×
260
        case EventUpdate:
×
261
                if !clusterFound {
×
262
                        lg.Warningln("cluster does not exist")
×
263
                        return
×
264
                }
×
265
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
NEW
266

×
NEW
267
                // Check if this cluster has been marked for deletion
×
NEW
268
                if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
×
NEW
269
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
NEW
270
                        if err = cl.Delete(); err != nil {
×
NEW
271
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
NEW
272
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
NEW
273
                                lg.Error(cl.Error)
×
NEW
274
                                return
×
NEW
275
                        }
×
NEW
276
                        lg.Infoln("cluster has been deleted via update event")
×
NEW
277
                        return
×
278
                }
279

NEW
280
                lg.Infoln("update of the cluster started")
×
281
                err = cl.Update(event.OldSpec, event.NewSpec)
×
282
                if err != nil {
×
283
                        cl.Error = fmt.Sprintf("could not update cluster: %v", err)
×
284
                        lg.Error(cl.Error)
×
285

×
286
                        return
×
287
                }
×
288
                cl.Error = ""
×
289
                lg.Infoln("cluster has been updated")
×
290

×
291
                clHistory.Insert(&spec.Diff{
×
292
                        EventTime:   event.EventTime,
×
293
                        ProcessTime: time.Now(),
×
294
                        Diff:        util.Diff(event.OldSpec, event.NewSpec),
×
295
                })
×
296
        case EventDelete:
×
297
                if !clusterFound {
×
298
                        lg.Errorf("unknown cluster: %q", clusterName)
×
299
                        return
×
300
                }
×
301

302
                teamName := strings.ToLower(cl.Spec.TeamID)
×
303
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
304

×
305
                // when using finalizers the deletion already happened
×
306
                if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
×
307
                        lg.Infoln("deletion of the cluster started")
×
308
                        if err := cl.Delete(); err != nil {
×
309
                                cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
×
310
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
311
                        }
×
312
                }
313

314
                func() {
×
315
                        defer c.clustersMu.Unlock()
×
316
                        c.clustersMu.Lock()
×
317

×
318
                        delete(c.clusters, clusterName)
×
319
                        delete(c.clusterLogs, clusterName)
×
320
                        delete(c.clusterHistory, clusterName)
×
321
                        for i, val := range c.teamClusters[teamName] {
×
322
                                if val == clusterName {
×
323
                                        copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
×
324
                                        c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
×
325
                                        c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
×
326
                                        break
×
327
                                }
328
                        }
329
                }()
330

331
                lg.Infof("cluster has been deleted")
×
332
        case EventSync:
×
333
                lg.Infof("syncing of the cluster started")
×
334

×
335
                // no race condition because a cluster is always processed by single worker
×
336
                if !clusterFound {
×
337
                        cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
338
                        if err != nil {
×
339
                                lg.Errorf("syncing of cluster is blocked: %v", err)
×
340
                                return
×
341
                        }
×
342
                }
343

344
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
345

×
346
                // has this cluster been marked as deleted already, then we shall start cleaning up
×
347
                if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
×
348
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
349
                        if err = cl.Delete(); err != nil {
×
350
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
351
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
352
                                lg.Error(cl.Error)
×
353
                                return
×
354
                        }
×
355
                } else {
×
356
                        if err = cl.Sync(event.NewSpec); err != nil {
×
357
                                cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
×
358
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
×
359
                                lg.Error(cl.Error)
×
360
                                return
×
361
                        }
×
362
                        lg.Infof("cluster has been synced")
×
363
                }
364
                cl.Error = ""
×
365
        }
366
}
367

368
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
369
        defer wg.Done()
×
370

×
371
        go func() {
×
372
                <-stopCh
×
373
                c.clusterEventQueues[idx].Close()
×
374
        }()
×
375

376
        for {
×
377
                obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}, bool) error { return nil }))
×
378
                if err != nil {
×
379
                        if err == cache.ErrFIFOClosed {
×
380
                                return
×
381
                        }
×
382
                        c.logger.Errorf("error when processing cluster events queue: %v", err)
×
383
                        continue
×
384
                }
385
                event, ok := obj.(ClusterEvent)
×
386
                if !ok {
×
387
                        c.logger.Errorf("could not cast to ClusterEvent")
×
388
                }
×
389

390
                c.processEvent(event)
×
391
        }
392
}
393

394
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
×
395
        deprecate := func(deprecated, replacement string) {
×
396
                c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
×
397
        }
×
398

399
        if spec.UseLoadBalancer != nil {
×
400
                deprecate("useLoadBalancer", "enableMasterLoadBalancer")
×
401
        }
×
402
        if spec.ReplicaLoadBalancer != nil {
×
403
                deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
×
404
        }
×
405

406
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
×
407
                (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
×
408
                c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
×
409
        }
×
410
}
411

412
// mergeDeprecatedPostgreSQLSpecParameters modifies the spec passed to the cluster by setting current parameter
413
// values from the obsolete ones. Note: while the spec that is modified is a copy made in queueClusterEvent, it is
414
// still a shallow copy, so be extra careful not to modify values pointer fields point to, but copy them instead.
415
func (c *Controller) mergeDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) *acidv1.PostgresSpec {
2✔
416
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
2✔
417
                (spec.EnableReplicaLoadBalancer == nil && spec.EnableMasterLoadBalancer == nil) {
3✔
418
                if spec.UseLoadBalancer != nil {
2✔
419
                        spec.EnableMasterLoadBalancer = new(bool)
1✔
420
                        *spec.EnableMasterLoadBalancer = *spec.UseLoadBalancer
1✔
421
                }
1✔
422
                if spec.ReplicaLoadBalancer != nil {
2✔
423
                        spec.EnableReplicaLoadBalancer = new(bool)
1✔
424
                        *spec.EnableReplicaLoadBalancer = *spec.ReplicaLoadBalancer
1✔
425
                }
1✔
426
        }
427
        spec.ReplicaLoadBalancer = nil
2✔
428
        spec.UseLoadBalancer = nil
2✔
429

2✔
430
        return spec
2✔
431
}
432

433
func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.Postgresql, eventType EventType) {
×
434
        var (
×
435
                uid          types.UID
×
436
                clusterName  spec.NamespacedName
×
437
                clusterError string
×
438
        )
×
439

×
NEW
440
        if informerOldSpec != nil { // update, delete
×
441
                uid = informerOldSpec.GetUID()
×
442
                clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
×
443

×
444
                // user is fixing previously incorrect spec
×
445
                if eventType == EventUpdate && informerNewSpec.Error == "" && informerOldSpec.Error != "" {
×
446
                        eventType = EventSync
×
447
                }
×
448

449
                // set current error to be one of the new spec if present
450
                if informerNewSpec != nil {
×
451
                        clusterError = informerNewSpec.Error
×
452
                } else {
×
453
                        clusterError = informerOldSpec.Error
×
454
                }
×
NEW
455
        } else { // add, sync
×
456
                uid = informerNewSpec.GetUID()
×
457
                clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
×
458
                clusterError = informerNewSpec.Error
×
459
        }
×
460

461
        if eventType == EventDelete {
×
462
                // when owner references are used operator cannot block deletion
×
463
                if c.opConfig.EnableOwnerReferences == nil || !*c.opConfig.EnableOwnerReferences {
×
464
                        // only allow deletion if delete annotations are set and conditions are met
×
465
                        if err := c.meetsClusterDeleteAnnotations(informerOldSpec); err != nil {
×
466
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
467
                                        "ignoring %q event for cluster %q - manifest does not fulfill delete requirements: %s", eventType, clusterName, err)
×
468
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
469
                                        "please, recreate Postgresql resource %q and set annotations to delete properly", clusterName)
×
470
                                if currentManifest, marshalErr := json.Marshal(informerOldSpec); marshalErr != nil {
×
471
                                        c.logger.WithField("cluster-name", clusterName).Warnf("could not marshal current manifest:\n%+v", informerOldSpec)
×
472
                                } else {
×
473
                                        c.logger.WithField("cluster-name", clusterName).Warnf("%s\n", string(currentManifest))
×
474
                                }
×
475
                                return
×
476
                        }
477
                }
478
        }
479

480
        if clusterError != "" && eventType != EventDelete {
×
481
                c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
×
482

×
483
                switch eventType {
×
484
                case EventAdd:
×
485
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
×
486
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
×
487
                case EventUpdate:
×
488
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
×
489
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
×
490
                default:
×
491
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
×
492
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
×
493
                }
494

495
                return
×
496
        }
497

498
        // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
499
        // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
500
        // effect, the modified state will be returned together with subsequent events).
501

502
        workerID := c.clusterWorkerID(clusterName)
×
503
        clusterEvent := ClusterEvent{
×
504
                EventTime: time.Now(),
×
505
                EventType: eventType,
×
506
                UID:       uid,
×
507
                OldSpec:   informerOldSpec.Clone(),
×
508
                NewSpec:   informerNewSpec.Clone(),
×
509
                WorkerID:  workerID,
×
510
        }
×
511

×
512
        lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
×
513
        if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
×
514
                lg.Errorf("error while queueing cluster event: %v", clusterEvent)
×
515
        }
×
516
        lg.Infof("%s event has been queued", eventType)
×
517

×
518
        if eventType != EventDelete {
×
519
                return
×
520
        }
×
521
        // A delete event discards all prior requests for that cluster.
522
        for _, evType := range []EventType{EventAdd, EventSync, EventUpdate, EventRepair} {
×
523
                obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
×
524
                if err != nil {
×
525
                        lg.Warningf("could not get event from the queue: %v", err)
×
526
                        continue
×
527
                }
528

529
                if !exists {
×
530
                        continue
×
531
                }
532

533
                err = c.clusterEventQueues[workerID].Delete(obj)
×
534
                if err != nil {
×
535
                        lg.Warningf("could not delete event from the queue: %v", err)
×
536
                } else {
×
537
                        lg.Debugf("event %s has been discarded for the cluster", evType)
×
538
                }
×
539
        }
540
}
541

542
func (c *Controller) postgresqlAdd(obj interface{}) {
×
543
        pg := c.postgresqlCheck(obj)
×
544
        if pg != nil {
×
545
                // We will not get multiple Add events for the same cluster
×
546
                c.queueClusterEvent(nil, pg, EventAdd)
×
547
        }
×
548
}
549

550
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
×
551
        pgOld := c.postgresqlCheck(prev)
×
552
        pgNew := c.postgresqlCheck(cur)
×
553
        if pgOld != nil && pgNew != nil {
×
NEW
554
                clusterName := util.NameFromMeta(pgNew.ObjectMeta)
×
NEW
555

×
NEW
556
                // Check if DeletionTimestamp was set (resource marked for deletion)
×
NEW
557
                deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
×
NEW
558
                if deletionTimestampChanged {
×
NEW
UNCOV
559
                        c.logger.WithField("cluster-name", clusterName).Infof(
×
NEW
560
                                "UPDATE event: DeletionTimestamp set to %s, queueing event",
×
NEW
UNCOV
561
                                pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
NEW
UNCOV
562
                        c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
NEW
UNCOV
563
                        return
×
NEW
564
                }
×
565

566
                // Avoid the infinite recursion for status updates
567
                if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
×
568
                        if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
×
569
                                return
×
570
                        }
×
571
                }
572
                c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
573
        }
574
}
575

576
func (c *Controller) postgresqlDelete(obj interface{}) {
×
577
        pg := c.postgresqlCheck(obj)
×
578
        if pg != nil {
×
579
                c.queueClusterEvent(pg, nil, EventDelete)
×
UNCOV
580
        }
×
581
}
582

583
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
×
584
        pg, ok := obj.(*acidv1.Postgresql)
×
585
        if !ok {
×
586
                c.logger.Errorf("could not cast to postgresql spec")
×
587
                return nil
×
588
        }
×
589
        if !c.hasOwnership(pg) {
×
590
                return nil
×
591
        }
×
592
        return pg
×
593
}
594

595
/*
596
Ensures the pod service account and role bindings exists in a namespace
597
before a PG cluster is created there so that a user does not have to deploy
598
these credentials manually.  StatefulSets require the service account to
599
create pods; Patroni requires relevant RBAC bindings to access endpoints
600
or config maps.
601

602
The operator does not sync accounts/role bindings after creation.
603
*/
604
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
×
605
        namespace := event.NewSpec.GetNamespace()
×
606

×
607
        if err := c.createPodServiceAccount(namespace); err != nil {
×
608
                return fmt.Errorf("could not create pod service account %q : %v", c.opConfig.PodServiceAccountName, err)
×
609
        }
×
610

UNCOV
611
        if err := c.createRoleBindings(namespace); err != nil {
×
612
                return fmt.Errorf("could not create role binding %q : %v", c.PodServiceAccountRoleBinding.Name, err)
×
613
        }
×
614
        return nil
×
615
}
616

UNCOV
617
func (c *Controller) createPodServiceAccount(namespace string) error {
×
618
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
619
        _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
×
620
        if k8sutil.ResourceNotFound(err) {
×
621

×
622
                c.logger.Infof("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)
×
623

×
624
                // get a separate copy of service account
×
625
                // to prevent a race condition when setting a namespace for many clusters
×
626
                sa := *c.PodServiceAccount
×
627
                if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
×
628
                        return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
×
629
                }
×
630

UNCOV
631
                c.logger.Infof("successfully deployed the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
632
        } else if k8sutil.ResourceAlreadyExists(err) {
×
633
                return nil
×
634
        }
×
635

636
        return err
×
637
}
638

639
func (c *Controller) createRoleBindings(namespace string) error {
×
640
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
641
        podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
×
642

×
643
        _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
×
644
        if k8sutil.ResourceNotFound(err) {
×
645

×
646
                c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
×
647

×
648
                // get a separate copy of role binding
×
649
                // to prevent a race condition when setting a namespace for many clusters
×
650
                rb := *c.PodServiceAccountRoleBinding
×
651
                _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
×
652
                if err != nil {
×
653
                        return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
×
654
                }
×
655

656
                c.logger.Infof("successfully deployed the role binding for the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
657

658
        } else if k8sutil.ResourceAlreadyExists(err) {
×
659
                return nil
×
660
        }
×
661

662
        return err
×
663
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc