• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 20964278963

13 Jan 2026 04:25PM UTC coverage: 43.507% (-0.01%) from 43.517%
20964278963

Pull #3005

github

web-flow
Merge eb4a6668d into 32d6d0a7a
Pull Request #3005: Use UpdateStatus instead of patch

29 of 59 new or added lines in 4 files covered. (49.15%)

3 existing lines in 3 files now uncovered.

6553 of 15062 relevant lines covered (43.51%)

16.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.95
/pkg/controller/postgresql.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14

15
        v1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19

20
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
21
        "github.com/zalando/postgres-operator/pkg/cluster"
22
        "github.com/zalando/postgres-operator/pkg/spec"
23
        "github.com/zalando/postgres-operator/pkg/util"
24
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
25
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
26
)
27

28
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
29
        defer wg.Done()
×
30
        ticker := time.NewTicker(c.opConfig.ResyncPeriod)
×
31

×
32
        for {
×
33
                select {
×
34
                case <-ticker.C:
×
35
                        if err := c.clusterListAndSync(); err != nil {
×
36
                                c.logger.Errorf("could not list clusters: %v", err)
×
37
                        }
×
38
                case <-stopCh:
×
39
                        return
×
40
                }
41
        }
42
}
43

44
// clusterListFunc obtains a list of all PostgreSQL clusters
45
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
×
46
        var pgList acidv1.PostgresqlList
×
47

×
48
        // TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
×
49
        list, err := c.KubeClient.PostgresqlsGetter.Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
×
50
        if err != nil {
×
51
                c.logger.Errorf("could not list postgresql objects: %v", err)
×
52
        }
×
53
        if c.controllerID != "" {
×
54
                c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
×
55
        }
×
56
        for _, pg := range list.Items {
×
57
                if pg.Error == "" && c.hasOwnership(&pg) {
×
58
                        pgList.Items = append(pgList.Items, pg)
×
59
                }
×
60
        }
61

62
        return &pgList, err
×
63
}
64

65
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
66
func (c *Controller) clusterListAndSync() error {
×
67
        var (
×
68
                err   error
×
69
                event EventType
×
70
        )
×
71

×
72
        currentTime := time.Now().Unix()
×
73
        timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
×
74
        timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
×
75

×
76
        if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
×
77
                event = EventSync
×
78
        } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
×
79
                event = EventRepair
×
80
        }
×
81
        if event != "" {
×
82
                var list *acidv1.PostgresqlList
×
83
                if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
84
                        return err
×
85
                }
×
86
                c.queueEvents(list, event)
×
87
        } else {
×
88
                c.logger.Infof("not enough time passed since the last sync (%v seconds) or repair (%v seconds)",
×
89
                        timeFromPreviousSync, timeFromPreviousRepair)
×
90
        }
×
91
        return nil
×
92
}
93

94
// queueEvents queues a sync or repair event for every cluster with a valid manifest
95
func (c *Controller) queueEvents(list *acidv1.PostgresqlList, event EventType) {
×
96
        var activeClustersCnt, failedClustersCnt, clustersToRepair int
×
97
        for i, pg := range list.Items {
×
98
                // XXX: check the cluster status field instead
×
99
                if pg.Error != "" {
×
100
                        failedClustersCnt++
×
101
                        continue
×
102
                }
103
                activeClustersCnt++
×
104
                // check if that cluster needs repair
×
105
                if event == EventRepair {
×
106
                        if pg.Status.Success() {
×
107
                                continue
×
108
                        } else {
×
109
                                clustersToRepair++
×
110
                        }
×
111
                }
112
                c.queueClusterEvent(nil, &list.Items[i], event)
×
113
        }
114
        if len(list.Items) > 0 {
×
115
                if failedClustersCnt > 0 && activeClustersCnt == 0 {
×
116
                        c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt)
×
117
                } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
×
118
                        c.logger.Infof("there are %d clusters running", activeClustersCnt)
×
119
                } else {
×
120
                        c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
×
121
                }
×
122
                if clustersToRepair > 0 {
×
123
                        c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair)
×
124
                }
×
125
        } else {
×
126
                c.logger.Infof("no clusters running")
×
127
        }
×
128
        if event == EventRepair || event == EventSync {
×
129
                atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix())
×
130
                if event == EventSync {
×
131
                        atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
×
132
                }
×
133
        }
134
}
135

136
func (c *Controller) acquireInitialListOfClusters() error {
×
137
        var (
×
138
                list        *acidv1.PostgresqlList
×
139
                err         error
×
140
                clusterName spec.NamespacedName
×
141
        )
×
142

×
143
        if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
144
                return err
×
145
        }
×
146
        c.logger.Debug("acquiring initial list of clusters")
×
147
        for _, pg := range list.Items {
×
148
                // XXX: check the cluster status field instead
×
149
                if pg.Error != "" {
×
150
                        continue
×
151
                }
152
                clusterName = util.NameFromMeta(pg.ObjectMeta)
×
153
                c.addCluster(c.logger, clusterName, &pg)
×
154
                c.logger.Debugf("added new cluster: %q", clusterName)
×
155
        }
156
        // initiate initial sync of all clusters.
157
        c.queueEvents(list, EventSync)
×
158
        return nil
×
159
}
160

161
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
×
162
        if c.opConfig.EnableTeamIdClusternamePrefix {
×
163
                if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
×
NEW
164
                        pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid
×
NEW
165
                        c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec)
×
166
                        return nil, err
×
167
                }
×
168
        }
169

170
        cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
×
171
        cl.Run(c.stopCh)
×
172
        teamName := strings.ToLower(cl.Spec.TeamID)
×
173

×
174
        defer c.clustersMu.Unlock()
×
175
        c.clustersMu.Lock()
×
176

×
177
        c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
×
178
        c.clusters[clusterName] = cl
×
179
        c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
×
180
        c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
×
181

×
182
        return cl, nil
×
183
}
184

185
func (c *Controller) processEvent(event ClusterEvent) {
×
186
        var clusterName spec.NamespacedName
×
187
        var clHistory ringlog.RingLogger
×
188
        var err error
×
189

×
190
        lg := c.logger.WithField("worker", event.WorkerID)
×
191

×
192
        if event.EventType == EventAdd || event.EventType == EventSync || event.EventType == EventRepair {
×
193
                clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
×
194
        } else {
×
195
                clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
×
196
        }
×
197
        lg = lg.WithField("cluster-name", clusterName)
×
198

×
199
        c.clustersMu.RLock()
×
200
        cl, clusterFound := c.clusters[clusterName]
×
201
        if clusterFound {
×
202
                clHistory = c.clusterHistory[clusterName]
×
203
        }
×
204
        c.clustersMu.RUnlock()
×
205

×
206
        defer c.curWorkerCluster.Store(event.WorkerID, nil)
×
207

×
208
        if event.EventType == EventRepair {
×
209
                runRepair, lastOperationStatus := cl.NeedsRepair()
×
210
                if !runRepair {
×
211
                        lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
×
212
                        return
×
213
                }
×
214
                lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
×
215
                event.EventType = EventSync
×
216
        }
217

218
        if event.EventType == EventAdd || event.EventType == EventUpdate || event.EventType == EventSync {
×
219
                // handle deprecated parameters by possibly assigning their values to the new ones.
×
220
                if event.OldSpec != nil {
×
221
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.OldSpec.Spec)
×
222
                }
×
223
                if event.NewSpec != nil {
×
224
                        c.warnOnDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
225
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
226
                }
×
227

228
                if err = c.submitRBACCredentials(event); err != nil {
×
229
                        c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
×
230
                }
×
231

232
        }
233

234
        switch event.EventType {
×
235
        case EventAdd:
×
236
                if clusterFound {
×
237
                        lg.Infof("received add event for already existing Postgres cluster")
×
238
                        return
×
239
                }
×
240

241
                lg.Infof("creating a new Postgres cluster")
×
242

×
243
                cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
244
                if err != nil {
×
245
                        lg.Errorf("creation of cluster is blocked: %v", err)
×
246
                        return
×
247
                }
×
248

249
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
250

×
251
                err = cl.Create()
×
252
                if err != nil {
×
253
                        cl.Status = acidv1.PostgresStatus{PostgresClusterStatus: acidv1.ClusterStatusInvalid}
×
254
                        cl.Error = fmt.Sprintf("could not create cluster: %v", err)
×
255
                        lg.Error(cl.Error)
×
256
                        c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
×
257
                        return
×
258
                }
×
259

260
                lg.Infoln("cluster has been created")
×
261
        case EventUpdate:
×
262
                lg.Infoln("update of the cluster started")
×
263

×
264
                if !clusterFound {
×
265
                        lg.Warningln("cluster does not exist")
×
266
                        return
×
267
                }
×
268
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
269
                err = cl.Update(event.OldSpec, event.NewSpec)
×
270
                if err != nil {
×
271
                        cl.Error = fmt.Sprintf("could not update cluster: %v", err)
×
272
                        lg.Error(cl.Error)
×
273

×
274
                        return
×
275
                }
×
276
                cl.Error = ""
×
277
                lg.Infoln("cluster has been updated")
×
278

×
279
                clHistory.Insert(&spec.Diff{
×
280
                        EventTime:   event.EventTime,
×
281
                        ProcessTime: time.Now(),
×
282
                        Diff:        util.Diff(event.OldSpec, event.NewSpec),
×
283
                })
×
284
        case EventDelete:
×
285
                if !clusterFound {
×
286
                        lg.Errorf("unknown cluster: %q", clusterName)
×
287
                        return
×
288
                }
×
289

290
                teamName := strings.ToLower(cl.Spec.TeamID)
×
291
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
292

×
293
                // when using finalizers the deletion already happened
×
294
                if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
×
295
                        lg.Infoln("deletion of the cluster started")
×
296
                        if err := cl.Delete(); err != nil {
×
297
                                cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
×
298
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
299
                        }
×
300
                }
301

302
                func() {
×
303
                        defer c.clustersMu.Unlock()
×
304
                        c.clustersMu.Lock()
×
305

×
306
                        delete(c.clusters, clusterName)
×
307
                        delete(c.clusterLogs, clusterName)
×
308
                        delete(c.clusterHistory, clusterName)
×
309
                        for i, val := range c.teamClusters[teamName] {
×
310
                                if val == clusterName {
×
311
                                        copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
×
312
                                        c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
×
313
                                        c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
×
314
                                        break
×
315
                                }
316
                        }
317
                }()
318

319
                lg.Infof("cluster has been deleted")
×
320
        case EventSync:
×
321
                lg.Infof("syncing of the cluster started")
×
322

×
323
                // no race condition because a cluster is always processed by single worker
×
324
                if !clusterFound {
×
325
                        cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
326
                        if err != nil {
×
327
                                lg.Errorf("syncing of cluster is blocked: %v", err)
×
328
                                return
×
329
                        }
×
330
                }
331

332
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
333

×
334
                // has this cluster been marked as deleted already, then we shall start cleaning up
×
335
                if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
×
336
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
337
                        if err = cl.Delete(); err != nil {
×
338
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
339
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
340
                                lg.Error(cl.Error)
×
341
                                return
×
342
                        }
×
343
                } else {
×
344
                        if err = cl.Sync(event.NewSpec); err != nil {
×
345
                                cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
×
346
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
×
347
                                lg.Error(cl.Error)
×
348
                                return
×
349
                        }
×
350
                        lg.Infof("cluster has been synced")
×
351
                }
352
                cl.Error = ""
×
353
        }
354
}
355

356
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
357
        defer wg.Done()
×
358

×
359
        go func() {
×
360
                <-stopCh
×
361
                c.clusterEventQueues[idx].Close()
×
362
        }()
×
363

364
        for {
×
365
                obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}, bool) error { return nil }))
×
366
                if err != nil {
×
367
                        if err == cache.ErrFIFOClosed {
×
368
                                return
×
369
                        }
×
370
                        c.logger.Errorf("error when processing cluster events queue: %v", err)
×
371
                        continue
×
372
                }
373
                event, ok := obj.(ClusterEvent)
×
374
                if !ok {
×
375
                        c.logger.Errorf("could not cast to ClusterEvent")
×
376
                }
×
377

378
                c.processEvent(event)
×
379
        }
380
}
381

382
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
×
383

×
384
        deprecate := func(deprecated, replacement string) {
×
385
                c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
×
386
        }
×
387

388
        if spec.UseLoadBalancer != nil {
×
389
                deprecate("useLoadBalancer", "enableMasterLoadBalancer")
×
390
        }
×
391
        if spec.ReplicaLoadBalancer != nil {
×
392
                deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
×
393
        }
×
394

395
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
×
396
                (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
×
397
                c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
×
398
        }
×
399
}
400

401
// mergeDeprecatedPostgreSQLSpecParameters modifies the spec passed to the cluster by setting current parameter
402
// values from the obsolete ones. Note: while the spec that is modified is a copy made in queueClusterEvent, it is
403
// still a shallow copy, so be extra careful not to modify values pointer fields point to, but copy them instead.
404
func (c *Controller) mergeDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) *acidv1.PostgresSpec {
2✔
405
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
2✔
406
                (spec.EnableReplicaLoadBalancer == nil && spec.EnableMasterLoadBalancer == nil) {
3✔
407
                if spec.UseLoadBalancer != nil {
2✔
408
                        spec.EnableMasterLoadBalancer = new(bool)
1✔
409
                        *spec.EnableMasterLoadBalancer = *spec.UseLoadBalancer
1✔
410
                }
1✔
411
                if spec.ReplicaLoadBalancer != nil {
2✔
412
                        spec.EnableReplicaLoadBalancer = new(bool)
1✔
413
                        *spec.EnableReplicaLoadBalancer = *spec.ReplicaLoadBalancer
1✔
414
                }
1✔
415
        }
416
        spec.ReplicaLoadBalancer = nil
2✔
417
        spec.UseLoadBalancer = nil
2✔
418

2✔
419
        return spec
2✔
420
}
421

422
func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.Postgresql, eventType EventType) {
×
423
        var (
×
424
                uid          types.UID
×
425
                clusterName  spec.NamespacedName
×
426
                clusterError string
×
427
        )
×
428

×
429
        if informerOldSpec != nil { //update, delete
×
430
                uid = informerOldSpec.GetUID()
×
431
                clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
×
432

×
433
                // user is fixing previously incorrect spec
×
434
                if eventType == EventUpdate && informerNewSpec.Error == "" && informerOldSpec.Error != "" {
×
435
                        eventType = EventSync
×
436
                }
×
437

438
                // set current error to be one of the new spec if present
439
                if informerNewSpec != nil {
×
440
                        clusterError = informerNewSpec.Error
×
441
                } else {
×
442
                        clusterError = informerOldSpec.Error
×
443
                }
×
444
        } else { //add, sync
×
445
                uid = informerNewSpec.GetUID()
×
446
                clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
×
447
                clusterError = informerNewSpec.Error
×
448
        }
×
449

450
        if eventType == EventDelete {
×
451
                // when owner references are used operator cannot block deletion
×
452
                if c.opConfig.EnableOwnerReferences == nil || !*c.opConfig.EnableOwnerReferences {
×
453
                        // only allow deletion if delete annotations are set and conditions are met
×
454
                        if err := c.meetsClusterDeleteAnnotations(informerOldSpec); err != nil {
×
455
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
456
                                        "ignoring %q event for cluster %q - manifest does not fulfill delete requirements: %s", eventType, clusterName, err)
×
457
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
458
                                        "please, recreate Postgresql resource %q and set annotations to delete properly", clusterName)
×
459
                                if currentManifest, marshalErr := json.Marshal(informerOldSpec); marshalErr != nil {
×
460
                                        c.logger.WithField("cluster-name", clusterName).Warnf("could not marshal current manifest:\n%+v", informerOldSpec)
×
461
                                } else {
×
462
                                        c.logger.WithField("cluster-name", clusterName).Warnf("%s\n", string(currentManifest))
×
463
                                }
×
464
                                return
×
465
                        }
466
                }
467
        }
468

469
        if clusterError != "" && eventType != EventDelete {
×
470
                c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
×
471

×
472
                switch eventType {
×
473
                case EventAdd:
×
NEW
474
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
×
NEW
475
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
NEW
476
                        if err != nil {
×
NEW
477
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
NEW
478
                        }
×
479
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
×
480
                case EventUpdate:
×
NEW
481
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
×
NEW
482
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
NEW
483
                        if err != nil {
×
NEW
484
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
NEW
485
                        }
×
486
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
×
487
                default:
×
NEW
488
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
×
NEW
489
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
NEW
490
                        if err != nil {
×
NEW
491
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
NEW
492
                        }
×
UNCOV
493
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
×
494
                }
495

496
                return
×
497
        }
498

499
        // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
500
        // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
501
        // effect, the modified state will be returned together with subsequent events).
502

503
        workerID := c.clusterWorkerID(clusterName)
×
504
        clusterEvent := ClusterEvent{
×
505
                EventTime: time.Now(),
×
506
                EventType: eventType,
×
507
                UID:       uid,
×
508
                OldSpec:   informerOldSpec.Clone(),
×
509
                NewSpec:   informerNewSpec.Clone(),
×
510
                WorkerID:  workerID,
×
511
        }
×
512

×
513
        lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
×
514
        if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
×
515
                lg.Errorf("error while queueing cluster event: %v", clusterEvent)
×
516
        }
×
517
        lg.Infof("%s event has been queued", eventType)
×
518

×
519
        if eventType != EventDelete {
×
520
                return
×
521
        }
×
522
        // A delete event discards all prior requests for that cluster.
523
        for _, evType := range []EventType{EventAdd, EventSync, EventUpdate, EventRepair} {
×
524
                obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
×
525
                if err != nil {
×
526
                        lg.Warningf("could not get event from the queue: %v", err)
×
527
                        continue
×
528
                }
529

530
                if !exists {
×
531
                        continue
×
532
                }
533

534
                err = c.clusterEventQueues[workerID].Delete(obj)
×
535
                if err != nil {
×
536
                        lg.Warningf("could not delete event from the queue: %v", err)
×
537
                } else {
×
538
                        lg.Debugf("event %s has been discarded for the cluster", evType)
×
539
                }
×
540
        }
541
}
542

543
func (c *Controller) postgresqlAdd(obj interface{}) {
×
544
        pg := c.postgresqlCheck(obj)
×
545
        if pg != nil {
×
546
                // We will not get multiple Add events for the same cluster
×
547
                c.queueClusterEvent(nil, pg, EventAdd)
×
548
        }
×
549
}
550

551
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
×
552
        pgOld := c.postgresqlCheck(prev)
×
553
        pgNew := c.postgresqlCheck(cur)
×
554
        if pgOld != nil && pgNew != nil {
×
555
                // Avoid the inifinite recursion for status updates
×
556
                if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
×
557
                        if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
×
558
                                return
×
559
                        }
×
560
                }
561
                c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
562
        }
563
}
564

565
func (c *Controller) postgresqlDelete(obj interface{}) {
×
566
        pg := c.postgresqlCheck(obj)
×
567
        if pg != nil {
×
568
                c.queueClusterEvent(pg, nil, EventDelete)
×
569
        }
×
570
}
571

572
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
×
573
        pg, ok := obj.(*acidv1.Postgresql)
×
574
        if !ok {
×
575
                c.logger.Errorf("could not cast to postgresql spec")
×
576
                return nil
×
577
        }
×
578
        if !c.hasOwnership(pg) {
×
579
                return nil
×
580
        }
×
581
        return pg
×
582
}
583

584
/*
585
Ensures the pod service account and role bindings exists in a namespace
586
before a PG cluster is created there so that a user does not have to deploy
587
these credentials manually.  StatefulSets require the service account to
588
create pods; Patroni requires relevant RBAC bindings to access endpoints
589
or config maps.
590

591
The operator does not sync accounts/role bindings after creation.
592
*/
593
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
×
594

×
595
        namespace := event.NewSpec.GetNamespace()
×
596

×
597
        if err := c.createPodServiceAccount(namespace); err != nil {
×
598
                return fmt.Errorf("could not create pod service account %q : %v", c.opConfig.PodServiceAccountName, err)
×
599
        }
×
600

601
        if err := c.createRoleBindings(namespace); err != nil {
×
602
                return fmt.Errorf("could not create role binding %q : %v", c.PodServiceAccountRoleBinding.Name, err)
×
603
        }
×
604
        return nil
×
605
}
606

607
func (c *Controller) createPodServiceAccount(namespace string) error {
×
608

×
609
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
610
        _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
×
611
        if k8sutil.ResourceNotFound(err) {
×
612

×
613
                c.logger.Infof("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)
×
614

×
615
                // get a separate copy of service account
×
616
                // to prevent a race condition when setting a namespace for many clusters
×
617
                sa := *c.PodServiceAccount
×
618
                if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
×
619
                        return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
×
620
                }
×
621

622
                c.logger.Infof("successfully deployed the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
623
        } else if k8sutil.ResourceAlreadyExists(err) {
×
624
                return nil
×
625
        }
×
626

627
        return err
×
628
}
629

630
func (c *Controller) createRoleBindings(namespace string) error {
×
631

×
632
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
633
        podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
×
634

×
635
        _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
×
636
        if k8sutil.ResourceNotFound(err) {
×
637

×
638
                c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
×
639

×
640
                // get a separate copy of role binding
×
641
                // to prevent a race condition when setting a namespace for many clusters
×
642
                rb := *c.PodServiceAccountRoleBinding
×
643
                _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
×
644
                if err != nil {
×
645
                        return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
×
646
                }
×
647

648
                c.logger.Infof("successfully deployed the role binding for the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
649

650
        } else if k8sutil.ResourceAlreadyExists(err) {
×
651
                return nil
×
652
        }
×
653

654
        return err
×
655
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc