• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 17069871161

19 Aug 2025 12:40PM UTC coverage: 42.059% (-3.4%) from 45.498%
17069871161

push

github

web-flow
upgrade Go from 1.23.4 to 1.25.0 (#2945)

* upgrade go to 1.25
* add minor version to be Go 1.25.0
* revert the Go version on README to keep the history of the release

5 of 9 new or added lines in 7 files covered. (55.56%)

531 existing lines in 13 files now uncovered.

6493 of 15438 relevant lines covered (42.06%)

15.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.02
/pkg/controller/postgresql.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14

15
        v1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19

20
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
21
        "github.com/zalando/postgres-operator/pkg/cluster"
22
        "github.com/zalando/postgres-operator/pkg/spec"
23
        "github.com/zalando/postgres-operator/pkg/util"
24
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
25
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
26
)
27

28
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
29
        defer wg.Done()
×
30
        ticker := time.NewTicker(c.opConfig.ResyncPeriod)
×
31

×
32
        for {
×
33
                select {
×
34
                case <-ticker.C:
×
35
                        if err := c.clusterListAndSync(); err != nil {
×
36
                                c.logger.Errorf("could not list clusters: %v", err)
×
37
                        }
×
38
                case <-stopCh:
×
39
                        return
×
40
                }
41
        }
42
}
43

44
// clusterListFunc obtains a list of all PostgreSQL clusters
45
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
×
46
        var pgList acidv1.PostgresqlList
×
47

×
48
        // TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
×
49
        list, err := c.KubeClient.PostgresqlsGetter.Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
×
50
        if err != nil {
×
51
                c.logger.Errorf("could not list postgresql objects: %v", err)
×
52
        }
×
53
        if c.controllerID != "" {
×
54
                c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
×
55
        }
×
56
        for _, pg := range list.Items {
×
57
                if pg.Error == "" && c.hasOwnership(&pg) {
×
58
                        pgList.Items = append(pgList.Items, pg)
×
59
                }
×
60
        }
61

62
        return &pgList, err
×
63
}
64

65
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
66
func (c *Controller) clusterListAndSync() error {
×
67
        var (
×
68
                err   error
×
69
                event EventType
×
70
        )
×
71

×
72
        currentTime := time.Now().Unix()
×
73
        timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
×
74
        timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
×
75

×
76
        if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
×
77
                event = EventSync
×
78
        } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
×
79
                event = EventRepair
×
80
        }
×
81
        if event != "" {
×
82
                var list *acidv1.PostgresqlList
×
83
                if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
84
                        return err
×
85
                }
×
86
                c.queueEvents(list, event)
×
87
        } else {
×
88
                c.logger.Infof("not enough time passed since the last sync (%v seconds) or repair (%v seconds)",
×
89
                        timeFromPreviousSync, timeFromPreviousRepair)
×
90
        }
×
91
        return nil
×
92
}
93

94
// queueEvents queues a sync or repair event for every cluster with a valid manifest
95
func (c *Controller) queueEvents(list *acidv1.PostgresqlList, event EventType) {
×
96
        var activeClustersCnt, failedClustersCnt, clustersToRepair int
×
97
        for i, pg := range list.Items {
×
98
                // XXX: check the cluster status field instead
×
99
                if pg.Error != "" {
×
100
                        failedClustersCnt++
×
101
                        continue
×
102
                }
103
                activeClustersCnt++
×
104
                // check if that cluster needs repair
×
105
                if event == EventRepair {
×
106
                        if pg.Status.Success() {
×
107
                                continue
×
108
                        } else {
×
109
                                clustersToRepair++
×
110
                        }
×
111
                }
112
                c.queueClusterEvent(nil, &list.Items[i], event)
×
113
        }
114
        if len(list.Items) > 0 {
×
115
                if failedClustersCnt > 0 && activeClustersCnt == 0 {
×
116
                        c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt)
×
117
                } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
×
118
                        c.logger.Infof("there are %d clusters running", activeClustersCnt)
×
119
                } else {
×
120
                        c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
×
121
                }
×
122
                if clustersToRepair > 0 {
×
123
                        c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair)
×
124
                }
×
125
        } else {
×
126
                c.logger.Infof("no clusters running")
×
127
        }
×
128
        if event == EventRepair || event == EventSync {
×
129
                atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix())
×
130
                if event == EventSync {
×
131
                        atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
×
132
                }
×
133
        }
134
}
135

136
func (c *Controller) acquireInitialListOfClusters() error {
×
137
        var (
×
138
                list        *acidv1.PostgresqlList
×
139
                err         error
×
140
                clusterName spec.NamespacedName
×
141
        )
×
142

×
143
        if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
144
                return err
×
145
        }
×
146
        c.logger.Debug("acquiring initial list of clusters")
×
147
        for _, pg := range list.Items {
×
148
                // XXX: check the cluster status field instead
×
149
                if pg.Error != "" {
×
150
                        continue
×
151
                }
152
                clusterName = util.NameFromMeta(pg.ObjectMeta)
×
153
                c.addCluster(c.logger, clusterName, &pg)
×
154
                c.logger.Debugf("added new cluster: %q", clusterName)
×
155
        }
156
        // initiate initial sync of all clusters.
157
        c.queueEvents(list, EventSync)
×
158
        return nil
×
159
}
160

161
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
×
162
        if c.opConfig.EnableTeamIdClusternamePrefix {
×
163
                if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
×
164
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
×
165
                        return nil, err
×
166
                }
×
167
        }
168

169
        cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
×
170
        cl.Run(c.stopCh)
×
171
        teamName := strings.ToLower(cl.Spec.TeamID)
×
172

×
173
        defer c.clustersMu.Unlock()
×
174
        c.clustersMu.Lock()
×
175

×
176
        c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
×
177
        c.clusters[clusterName] = cl
×
178
        c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
×
179
        c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
×
180

×
181
        return cl, nil
×
182
}
183

184
func (c *Controller) processEvent(event ClusterEvent) {
×
185
        var clusterName spec.NamespacedName
×
186
        var clHistory ringlog.RingLogger
×
187
        var err error
×
188

×
189
        lg := c.logger.WithField("worker", event.WorkerID)
×
190

×
191
        if event.EventType == EventAdd || event.EventType == EventSync || event.EventType == EventRepair {
×
192
                clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
×
193
        } else {
×
194
                clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
×
195
        }
×
196
        lg = lg.WithField("cluster-name", clusterName)
×
197

×
198
        c.clustersMu.RLock()
×
199
        cl, clusterFound := c.clusters[clusterName]
×
200
        if clusterFound {
×
201
                clHistory = c.clusterHistory[clusterName]
×
202
        }
×
203
        c.clustersMu.RUnlock()
×
204

×
205
        defer c.curWorkerCluster.Store(event.WorkerID, nil)
×
206

×
207
        if event.EventType == EventRepair {
×
208
                runRepair, lastOperationStatus := cl.NeedsRepair()
×
209
                if !runRepair {
×
210
                        lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
×
211
                        return
×
212
                }
×
213
                lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
×
214
                event.EventType = EventSync
×
215
        }
216

217
        if event.EventType == EventAdd || event.EventType == EventUpdate || event.EventType == EventSync {
×
218
                // handle deprecated parameters by possibly assigning their values to the new ones.
×
219
                if event.OldSpec != nil {
×
220
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.OldSpec.Spec)
×
221
                }
×
222
                if event.NewSpec != nil {
×
223
                        c.warnOnDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
224
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
225
                }
×
226

227
                if err = c.submitRBACCredentials(event); err != nil {
×
228
                        c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
×
229
                }
×
230

231
        }
232

233
        switch event.EventType {
×
234
        case EventAdd:
×
235
                if clusterFound {
×
236
                        lg.Infof("received add event for already existing Postgres cluster")
×
237
                        return
×
238
                }
×
239

240
                lg.Infof("creating a new Postgres cluster")
×
241

×
242
                cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
243
                if err != nil {
×
244
                        lg.Errorf("creation of cluster is blocked: %v", err)
×
245
                        return
×
246
                }
×
247

248
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
249

×
250
                err = cl.Create()
×
251
                if err != nil {
×
252
                        cl.Status = acidv1.PostgresStatus{PostgresClusterStatus: acidv1.ClusterStatusInvalid}
×
253
                        cl.Error = fmt.Sprintf("could not create cluster: %v", err)
×
254
                        lg.Error(cl.Error)
×
255
                        c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
×
256
                        return
×
257
                }
×
258

259
                lg.Infoln("cluster has been created")
×
260
        case EventUpdate:
×
261
                lg.Infoln("update of the cluster started")
×
262

×
263
                if !clusterFound {
×
264
                        lg.Warningln("cluster does not exist")
×
265
                        return
×
266
                }
×
267
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
268
                err = cl.Update(event.OldSpec, event.NewSpec)
×
269
                if err != nil {
×
270
                        cl.Error = fmt.Sprintf("could not update cluster: %v", err)
×
271
                        lg.Error(cl.Error)
×
272

×
273
                        return
×
274
                }
×
275
                cl.Error = ""
×
276
                lg.Infoln("cluster has been updated")
×
277

×
278
                clHistory.Insert(&spec.Diff{
×
279
                        EventTime:   event.EventTime,
×
280
                        ProcessTime: time.Now(),
×
281
                        Diff:        util.Diff(event.OldSpec, event.NewSpec),
×
282
                })
×
283
        case EventDelete:
×
284
                if !clusterFound {
×
285
                        lg.Errorf("unknown cluster: %q", clusterName)
×
286
                        return
×
287
                }
×
288

289
                teamName := strings.ToLower(cl.Spec.TeamID)
×
290
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
291

×
292
                // when using finalizers the deletion already happened
×
293
                if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
×
294
                        lg.Infoln("deletion of the cluster started")
×
295
                        if err := cl.Delete(); err != nil {
×
296
                                cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
×
297
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
298
                        }
×
299
                }
300

301
                func() {
×
302
                        defer c.clustersMu.Unlock()
×
303
                        c.clustersMu.Lock()
×
304

×
305
                        delete(c.clusters, clusterName)
×
306
                        delete(c.clusterLogs, clusterName)
×
307
                        delete(c.clusterHistory, clusterName)
×
308
                        for i, val := range c.teamClusters[teamName] {
×
309
                                if val == clusterName {
×
310
                                        copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
×
311
                                        c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
×
312
                                        c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
×
313
                                        break
×
314
                                }
315
                        }
316
                }()
317

318
                lg.Infof("cluster has been deleted")
×
319
        case EventSync:
×
320
                lg.Infof("syncing of the cluster started")
×
321

×
322
                // no race condition because a cluster is always processed by single worker
×
323
                if !clusterFound {
×
324
                        cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
325
                        if err != nil {
×
326
                                lg.Errorf("syncing of cluster is blocked: %v", err)
×
327
                                return
×
328
                        }
×
329
                }
330

331
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
332

×
333
                // has this cluster been marked as deleted already, then we shall start cleaning up
×
334
                if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
×
335
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
336
                        if err = cl.Delete(); err != nil {
×
337
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
338
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
339
                                lg.Error(cl.Error)
×
340
                                return
×
341
                        }
×
342
                } else {
×
343
                        if err = cl.Sync(event.NewSpec); err != nil {
×
344
                                cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
×
345
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
×
346
                                lg.Error(cl.Error)
×
347
                                return
×
348
                        }
×
349
                        lg.Infof("cluster has been synced")
×
350
                }
351
                cl.Error = ""
×
352
        }
353
}
354

355
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
356
        defer wg.Done()
×
357

×
358
        go func() {
×
359
                <-stopCh
×
360
                c.clusterEventQueues[idx].Close()
×
361
        }()
×
362

363
        for {
×
364
                obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}, bool) error { return nil }))
×
365
                if err != nil {
×
366
                        if err == cache.ErrFIFOClosed {
×
367
                                return
×
368
                        }
×
369
                        c.logger.Errorf("error when processing cluster events queue: %v", err)
×
370
                        continue
×
371
                }
372
                event, ok := obj.(ClusterEvent)
×
373
                if !ok {
×
374
                        c.logger.Errorf("could not cast to ClusterEvent")
×
375
                }
×
376

377
                c.processEvent(event)
×
378
        }
379
}
380

381
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
×
382

×
383
        deprecate := func(deprecated, replacement string) {
×
384
                c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
×
385
        }
×
386

387
        if spec.UseLoadBalancer != nil {
×
388
                deprecate("useLoadBalancer", "enableMasterLoadBalancer")
×
389
        }
×
390
        if spec.ReplicaLoadBalancer != nil {
×
391
                deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
×
392
        }
×
393

394
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
×
395
                (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
×
396
                c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
×
397
        }
×
398
}
399

400
// mergeDeprecatedPostgreSQLSpecParameters modifies the spec passed to the cluster by setting current parameter
401
// values from the obsolete ones. Note: while the spec that is modified is a copy made in queueClusterEvent, it is
402
// still a shallow copy, so be extra careful not to modify values pointer fields point to, but copy them instead.
403
func (c *Controller) mergeDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) *acidv1.PostgresSpec {
2✔
404
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
2✔
405
                (spec.EnableReplicaLoadBalancer == nil && spec.EnableMasterLoadBalancer == nil) {
3✔
406
                if spec.UseLoadBalancer != nil {
2✔
407
                        spec.EnableMasterLoadBalancer = new(bool)
1✔
408
                        *spec.EnableMasterLoadBalancer = *spec.UseLoadBalancer
1✔
409
                }
1✔
410
                if spec.ReplicaLoadBalancer != nil {
2✔
411
                        spec.EnableReplicaLoadBalancer = new(bool)
1✔
412
                        *spec.EnableReplicaLoadBalancer = *spec.ReplicaLoadBalancer
1✔
413
                }
1✔
414
        }
415
        spec.ReplicaLoadBalancer = nil
2✔
416
        spec.UseLoadBalancer = nil
2✔
417

2✔
418
        return spec
2✔
419
}
420

421
func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.Postgresql, eventType EventType) {
×
422
        var (
×
423
                uid          types.UID
×
424
                clusterName  spec.NamespacedName
×
425
                clusterError string
×
426
        )
×
427

×
428
        if informerOldSpec != nil { //update, delete
×
429
                uid = informerOldSpec.GetUID()
×
430
                clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
×
431

×
432
                // user is fixing previously incorrect spec
×
433
                if eventType == EventUpdate && informerNewSpec.Error == "" && informerOldSpec.Error != "" {
×
434
                        eventType = EventSync
×
435
                }
×
436

437
                // set current error to be one of the new spec if present
438
                if informerNewSpec != nil {
×
439
                        clusterError = informerNewSpec.Error
×
440
                } else {
×
441
                        clusterError = informerOldSpec.Error
×
442
                }
×
443
        } else { //add, sync
×
444
                uid = informerNewSpec.GetUID()
×
445
                clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
×
446
                clusterError = informerNewSpec.Error
×
447
        }
×
448

449
        if eventType == EventDelete {
×
450
                // when owner references are used operator cannot block deletion
×
451
                if c.opConfig.EnableOwnerReferences == nil || !*c.opConfig.EnableOwnerReferences {
×
452
                        // only allow deletion if delete annotations are set and conditions are met
×
453
                        if err := c.meetsClusterDeleteAnnotations(informerOldSpec); err != nil {
×
454
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
455
                                        "ignoring %q event for cluster %q - manifest does not fulfill delete requirements: %s", eventType, clusterName, err)
×
456
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
457
                                        "please, recreate Postgresql resource %q and set annotations to delete properly", clusterName)
×
458
                                if currentManifest, marshalErr := json.Marshal(informerOldSpec); marshalErr != nil {
×
459
                                        c.logger.WithField("cluster-name", clusterName).Warnf("could not marshal current manifest:\n%+v", informerOldSpec)
×
460
                                } else {
×
461
                                        c.logger.WithField("cluster-name", clusterName).Warnf("%s\n", string(currentManifest))
×
462
                                }
×
463
                                return
×
464
                        }
465
                }
466
        }
467

468
        if clusterError != "" && eventType != EventDelete {
×
469
                c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
×
470

×
471
                switch eventType {
×
472
                case EventAdd:
×
473
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
×
474
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
×
475
                case EventUpdate:
×
476
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
×
477
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
×
478
                default:
×
479
                        c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
×
480
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
×
481
                }
482

483
                return
×
484
        }
485

486
        // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
487
        // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
488
        // effect, the modified state will be returned together with subsequent events).
489

490
        workerID := c.clusterWorkerID(clusterName)
×
491
        clusterEvent := ClusterEvent{
×
492
                EventTime: time.Now(),
×
493
                EventType: eventType,
×
494
                UID:       uid,
×
495
                OldSpec:   informerOldSpec.Clone(),
×
496
                NewSpec:   informerNewSpec.Clone(),
×
497
                WorkerID:  workerID,
×
498
        }
×
499

×
500
        lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
×
501
        if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
×
502
                lg.Errorf("error while queueing cluster event: %v", clusterEvent)
×
503
        }
×
504
        lg.Infof("%s event has been queued", eventType)
×
505

×
506
        if eventType != EventDelete {
×
507
                return
×
508
        }
×
509
        // A delete event discards all prior requests for that cluster.
510
        for _, evType := range []EventType{EventAdd, EventSync, EventUpdate, EventRepair} {
×
511
                obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
×
512
                if err != nil {
×
513
                        lg.Warningf("could not get event from the queue: %v", err)
×
514
                        continue
×
515
                }
516

517
                if !exists {
×
518
                        continue
×
519
                }
520

521
                err = c.clusterEventQueues[workerID].Delete(obj)
×
522
                if err != nil {
×
523
                        lg.Warningf("could not delete event from the queue: %v", err)
×
524
                } else {
×
525
                        lg.Debugf("event %s has been discarded for the cluster", evType)
×
526
                }
×
527
        }
528
}
529

530
func (c *Controller) postgresqlAdd(obj interface{}) {
×
531
        pg := c.postgresqlCheck(obj)
×
532
        if pg != nil {
×
533
                // We will not get multiple Add events for the same cluster
×
534
                c.queueClusterEvent(nil, pg, EventAdd)
×
535
        }
×
536
}
537

538
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
×
539
        pgOld := c.postgresqlCheck(prev)
×
540
        pgNew := c.postgresqlCheck(cur)
×
541
        if pgOld != nil && pgNew != nil {
×
542
                // Avoid the inifinite recursion for status updates
×
543
                if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
×
544
                        if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
×
545
                                return
×
546
                        }
×
547
                }
548
                c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
549
        }
550
}
551

552
func (c *Controller) postgresqlDelete(obj interface{}) {
×
553
        pg := c.postgresqlCheck(obj)
×
554
        if pg != nil {
×
555
                c.queueClusterEvent(pg, nil, EventDelete)
×
556
        }
×
557
}
558

559
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
×
560
        pg, ok := obj.(*acidv1.Postgresql)
×
561
        if !ok {
×
562
                c.logger.Errorf("could not cast to postgresql spec")
×
563
                return nil
×
564
        }
×
565
        if !c.hasOwnership(pg) {
×
566
                return nil
×
567
        }
×
568
        return pg
×
569
}
570

571
/*
572
Ensures the pod service account and role bindings exists in a namespace
573
before a PG cluster is created there so that a user does not have to deploy
574
these credentials manually.  StatefulSets require the service account to
575
create pods; Patroni requires relevant RBAC bindings to access endpoints
576
or config maps.
577

578
The operator does not sync accounts/role bindings after creation.
579
*/
580
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
×
581

×
582
        namespace := event.NewSpec.GetNamespace()
×
583

×
584
        if err := c.createPodServiceAccount(namespace); err != nil {
×
585
                return fmt.Errorf("could not create pod service account %q : %v", c.opConfig.PodServiceAccountName, err)
×
586
        }
×
587

588
        if err := c.createRoleBindings(namespace); err != nil {
×
589
                return fmt.Errorf("could not create role binding %q : %v", c.PodServiceAccountRoleBinding.Name, err)
×
590
        }
×
591
        return nil
×
592
}
593

594
func (c *Controller) createPodServiceAccount(namespace string) error {
×
595

×
596
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
597
        _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
×
598
        if k8sutil.ResourceNotFound(err) {
×
599

×
NEW
600
                c.logger.Infof("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)
×
601

×
602
                // get a separate copy of service account
×
603
                // to prevent a race condition when setting a namespace for many clusters
×
604
                sa := *c.PodServiceAccount
×
605
                if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
×
606
                        return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
×
607
                }
×
608

609
                c.logger.Infof("successfully deployed the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
610
        } else if k8sutil.ResourceAlreadyExists(err) {
×
611
                return nil
×
612
        }
×
613

614
        return err
×
615
}
616

617
func (c *Controller) createRoleBindings(namespace string) error {
×
618

×
619
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
620
        podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
×
621

×
622
        _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
×
623
        if k8sutil.ResourceNotFound(err) {
×
624

×
625
                c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
×
626

×
627
                // get a separate copy of role binding
×
628
                // to prevent a race condition when setting a namespace for many clusters
×
629
                rb := *c.PodServiceAccountRoleBinding
×
630
                _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
×
631
                if err != nil {
×
632
                        return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
×
633
                }
×
634

635
                c.logger.Infof("successfully deployed the role binding for the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
636

637
        } else if k8sutil.ResourceAlreadyExists(err) {
×
638
                return nil
×
639
        }
×
640

641
        return err
×
642
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc