• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 26742307267

01 Jun 2026 07:53AM UTC coverage: 43.647% (-0.2%) from 43.809%
26742307267

Pull #3016

github

web-flow
Merge ef314b903 into f988e4cf0
Pull Request #3016: feature: improve deletion speed and multi cluster handling by using background sync

11 of 100 new or added lines in 5 files covered. (11.0%)

3 existing lines in 2 files now uncovered.

6705 of 15362 relevant lines covered (43.65%)

17.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.72
/pkg/controller/postgresql.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "reflect"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14

15
        v1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19

20
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
21
        "github.com/zalando/postgres-operator/pkg/cluster"
22
        "github.com/zalando/postgres-operator/pkg/spec"
23
        "github.com/zalando/postgres-operator/pkg/util"
24
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
25
        "github.com/zalando/postgres-operator/pkg/util/ringlog"
26
)
27

28
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
29
        defer wg.Done()
×
30
        ticker := time.NewTicker(c.opConfig.ResyncPeriod)
×
31

×
32
        for {
×
33
                select {
×
34
                case <-ticker.C:
×
35
                        if err := c.clusterListAndSync(); err != nil {
×
36
                                c.logger.Errorf("could not list clusters: %v", err)
×
37
                        }
×
38
                case <-stopCh:
×
39
                        return
×
40
                }
41
        }
42
}
43

44
// clusterListFunc obtains a list of all PostgreSQL clusters
45
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
×
46
        var pgList acidv1.PostgresqlList
×
47

×
48
        // TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
×
49
        list, err := c.KubeClient.PostgresqlsGetter.Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
×
50
        if err != nil {
×
51
                c.logger.Errorf("could not list postgresql objects: %v", err)
×
52
        }
×
53
        if c.controllerID != "" {
×
54
                c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
×
55
        }
×
56
        for _, pg := range list.Items {
×
57
                if pg.Error == "" && c.hasOwnership(&pg) {
×
58
                        pgList.Items = append(pgList.Items, pg)
×
59
                }
×
60
        }
61

62
        return &pgList, err
×
63
}
64

65
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
66
func (c *Controller) clusterListAndSync() error {
×
67
        var (
×
68
                err   error
×
69
                event EventType
×
70
        )
×
71

×
72
        currentTime := time.Now().Unix()
×
73
        timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
×
74
        timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
×
75

×
76
        if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
×
77
                event = EventSync
×
78
        } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
×
79
                event = EventRepair
×
80
        }
×
81
        if event != "" {
×
82
                var list *acidv1.PostgresqlList
×
83
                if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
84
                        return err
×
85
                }
×
86
                c.queueEvents(list, event)
×
87
        } else {
×
88
                c.logger.Infof("not enough time passed since the last sync (%v seconds) or repair (%v seconds)",
×
89
                        timeFromPreviousSync, timeFromPreviousRepair)
×
90
        }
×
91
        return nil
×
92
}
93

94
// queueEvents queues a sync or repair event for every cluster with a valid manifest
95
func (c *Controller) queueEvents(list *acidv1.PostgresqlList, event EventType) {
×
96
        var activeClustersCnt, failedClustersCnt, clustersToRepair int
×
97
        for i, pg := range list.Items {
×
98
                // XXX: check the cluster status field instead
×
99
                if pg.Error != "" {
×
100
                        failedClustersCnt++
×
101
                        continue
×
102
                }
103
                activeClustersCnt++
×
104
                // check if that cluster needs repair
×
105
                if event == EventRepair {
×
106
                        if pg.Status.Success() {
×
107
                                continue
×
108
                        } else {
×
109
                                clustersToRepair++
×
110
                        }
×
111
                }
112
                c.queueClusterEvent(nil, &list.Items[i], event)
×
113
        }
114
        if len(list.Items) > 0 {
×
115
                if failedClustersCnt > 0 && activeClustersCnt == 0 {
×
116
                        c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt)
×
117
                } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
×
118
                        c.logger.Infof("there are %d clusters running", activeClustersCnt)
×
119
                } else {
×
120
                        c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
×
121
                }
×
122
                if clustersToRepair > 0 {
×
123
                        c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair)
×
124
                }
×
125
        } else {
×
126
                c.logger.Infof("no clusters running")
×
127
        }
×
128
        if event == EventRepair || event == EventSync {
×
129
                atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix())
×
130
                if event == EventSync {
×
131
                        atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
×
132
                }
×
133
        }
134
}
135

136
func (c *Controller) acquireInitialListOfClusters() error {
×
137
        var (
×
138
                list        *acidv1.PostgresqlList
×
139
                err         error
×
140
                clusterName spec.NamespacedName
×
141
        )
×
142

×
143
        if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
×
144
                return err
×
145
        }
×
146
        c.logger.Debug("acquiring initial list of clusters")
×
147
        for _, pg := range list.Items {
×
148
                // XXX: check the cluster status field instead
×
149
                if pg.Error != "" {
×
150
                        continue
×
151
                }
152
                clusterName = util.NameFromMeta(pg.ObjectMeta)
×
153
                c.addCluster(c.logger, clusterName, &pg)
×
154
                c.logger.Debugf("added new cluster: %q", clusterName)
×
155
        }
156
        // initiate initial sync of all clusters.
157
        c.queueEvents(list, EventSync)
×
158
        return nil
×
159
}
160

161
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
×
162
        if c.opConfig.EnableTeamIdClusternamePrefix {
×
163
                if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
×
164
                        pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid
×
165
                        c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec)
×
166
                        return nil, err
×
167
                }
×
168
        }
169

NEW
170
        cl := cluster.New(context.Background(), c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
×
171
        cl.Run(c.stopCh)
×
172
        teamName := strings.ToLower(cl.Spec.TeamID)
×
173

×
174
        defer c.clustersMu.Unlock()
×
175
        c.clustersMu.Lock()
×
176

×
177
        c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
×
178
        c.clusters[clusterName] = cl
×
179
        c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
×
180
        c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
×
181

×
182
        return cl, nil
×
183
}
184

185
func (c *Controller) processEvent(event ClusterEvent) {
×
186
        var clusterName spec.NamespacedName
×
187
        var clHistory ringlog.RingLogger
×
188
        var err error
×
189

×
190
        lg := c.logger.WithField("worker", event.WorkerID)
×
191

×
192
        if event.EventType == EventAdd || event.EventType == EventSync || event.EventType == EventRepair {
×
193
                clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
×
194
        } else {
×
195
                clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
×
196
        }
×
197
        lg = lg.WithField("cluster-name", clusterName)
×
198

×
199
        c.clustersMu.RLock()
×
200
        cl, clusterFound := c.clusters[clusterName]
×
201
        if clusterFound {
×
202
                clHistory = c.clusterHistory[clusterName]
×
203
        }
×
204
        c.clustersMu.RUnlock()
×
205

×
206
        defer c.curWorkerCluster.Store(event.WorkerID, nil)
×
207

×
208
        if event.EventType == EventRepair {
×
209
                runRepair, lastOperationStatus := cl.NeedsRepair()
×
210
                if !runRepair {
×
211
                        lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
×
212
                        return
×
213
                }
×
214
                lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
×
215
                event.EventType = EventSync
×
216
        }
217

218
        if event.EventType == EventAdd || event.EventType == EventUpdate || event.EventType == EventSync {
×
219
                // handle deprecated parameters by possibly assigning their values to the new ones.
×
220
                if event.OldSpec != nil {
×
221
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.OldSpec.Spec)
×
222
                }
×
223
                if event.NewSpec != nil {
×
224
                        c.warnOnDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
225
                        c.mergeDeprecatedPostgreSQLSpecParameters(&event.NewSpec.Spec)
×
226
                }
×
227

228
                if err = c.submitRBACCredentials(event); err != nil {
×
229
                        c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
×
230
                }
×
231

232
        }
233

234
        switch event.EventType {
×
235
        case EventAdd:
×
236
                if clusterFound {
×
237
                        lg.Infof("received add event for already existing Postgres cluster")
×
238
                        return
×
239
                }
×
240

241
                lg.Infof("creating a new Postgres cluster")
×
242

×
243
                cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
244
                if err != nil {
×
245
                        lg.Errorf("creation of cluster is blocked: %v", err)
×
246
                        return
×
247
                }
×
248

249
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
250

×
251
                err = cl.Create()
×
252
                if err != nil {
×
253
                        cl.Status = acidv1.PostgresStatus{PostgresClusterStatus: acidv1.ClusterStatusInvalid}
×
254
                        cl.Error = fmt.Sprintf("could not create cluster: %v", err)
×
255
                        lg.Error(cl.Error)
×
256
                        c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
×
257
                        return
×
258
                }
×
259

260
                lg.Infoln("cluster has been created")
×
261
        case EventUpdate:
×
262
                if !clusterFound {
×
263
                        lg.Warningln("cluster does not exist")
×
264
                        return
×
265
                }
×
266
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
267

×
268
                // Check if this cluster has been marked for deletion
×
269
                if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
×
270
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
NEW
271
                        cl.Cancel() // Cancel any ongoing operations
×
272
                        if err = cl.Delete(); err != nil {
×
273
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
274
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
275
                                lg.Error(cl.Error)
×
276
                                return
×
277
                        }
×
278
                        lg.Infoln("cluster has been deleted via update event")
×
279
                        return
×
280
                }
281

282
                err = cl.Update(event.OldSpec, event.NewSpec)
×
283
                if err != nil {
×
284
                        cl.Error = fmt.Sprintf("could not update cluster: %v", err)
×
285
                        lg.Error(cl.Error)
×
286

×
287
                        return
×
288
                }
×
289
                cl.Error = ""
×
290
                lg.Infoln("cluster has been updated")
×
291

×
292
                clHistory.Insert(&spec.Diff{
×
293
                        EventTime:   event.EventTime,
×
294
                        ProcessTime: time.Now(),
×
295
                        Diff:        util.Diff(event.OldSpec, event.NewSpec),
×
296
                })
×
297
        case EventDelete:
×
298
                if !clusterFound {
×
299
                        lg.Errorf("unknown cluster: %q", clusterName)
×
300
                        return
×
301
                }
×
302

303
                teamName := strings.ToLower(cl.Spec.TeamID)
×
304
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
305

×
306
                // when using finalizers the deletion already happened
×
307
                if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
×
308
                        lg.Infoln("deletion of the cluster started")
×
NEW
309
                        cl.Cancel() // Cancel any ongoing operations
×
310
                        if err := cl.Delete(); err != nil {
×
311
                                cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
×
312
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
313
                        }
×
314
                }
315

316
                func() {
×
317
                        defer c.clustersMu.Unlock()
×
318
                        c.clustersMu.Lock()
×
319

×
320
                        delete(c.clusters, clusterName)
×
321
                        delete(c.clusterLogs, clusterName)
×
322
                        delete(c.clusterHistory, clusterName)
×
323
                        for i, val := range c.teamClusters[teamName] {
×
324
                                if val == clusterName {
×
325
                                        copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
×
326
                                        c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
×
327
                                        c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
×
328
                                        break
×
329
                                }
330
                        }
331
                }()
332

333
                lg.Infof("cluster has been deleted")
×
334
        case EventSync:
×
335
                // no race condition because a cluster is always processed by single worker
×
336
                if !clusterFound {
×
337
                        cl, err = c.addCluster(lg, clusterName, event.NewSpec)
×
338
                        if err != nil {
×
339
                                lg.Errorf("syncing of cluster is blocked: %v", err)
×
340
                                return
×
341
                        }
×
342
                }
343

344
                c.curWorkerCluster.Store(event.WorkerID, cl)
×
345

×
346
                // has this cluster been marked as deleted already, then we shall start cleaning up
×
347
                if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
×
348
                        lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
NEW
349
                        cl.Cancel() // Cancel any ongoing operations
×
350
                        if err = cl.Delete(); err != nil {
×
351
                                cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
×
352
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
×
353
                                lg.Error(cl.Error)
×
354
                                return
×
355
                        }
×
NEW
356
                        return
×
357
                }
358

359
                // Try to start sync - returns false if sync already running or cluster deleted
NEW
360
                if !cl.StartSync() {
×
NEW
361
                        lg.Infof("sync already in progress, will resync when current sync completes")
×
NEW
362
                        return
×
NEW
363
                }
×
364

365
                // Run sync in background goroutine so we can process other events (like delete)
NEW
366
                lg.Infof("syncing of the cluster started (background)")
×
NEW
367
                go func() {
×
NEW
368
                        defer cl.EndSync()
×
NEW
369

×
NEW
370
                        if err := cl.Sync(event.NewSpec); err != nil {
×
371
                                cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
×
372
                                c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
×
373
                                lg.Error(cl.Error)
×
374
                                return
×
375
                        }
×
NEW
376
                        cl.Error = ""
×
UNCOV
377
                        lg.Infof("cluster has been synced")
×
NEW
378

×
NEW
379
                        // Check if resync was requested while we were syncing
×
NEW
380
                        if cl.NeedsResync() {
×
NEW
381
                                lg.Infof("resync requested, queueing new sync event")
×
NEW
382
                                c.queueClusterEvent(nil, event.NewSpec, EventSync)
×
NEW
383
                        }
×
384
                }()
385
        }
386
}
387

388
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
389
        defer wg.Done()
×
390

×
391
        go func() {
×
392
                <-stopCh
×
393
                c.clusterEventQueues[idx].Close()
×
394
        }()
×
395

396
        for {
×
397
                obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}, bool) error { return nil }))
×
398
                if err != nil {
×
399
                        if err == cache.ErrFIFOClosed {
×
400
                                return
×
401
                        }
×
402
                        c.logger.Errorf("error when processing cluster events queue: %v", err)
×
403
                        continue
×
404
                }
405
                event, ok := obj.(ClusterEvent)
×
406
                if !ok {
×
407
                        c.logger.Errorf("could not cast to ClusterEvent")
×
408
                }
×
409

410
                c.processEvent(event)
×
411
        }
412
}
413

414
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
×
415
        deprecate := func(deprecated, replacement string) {
×
416
                c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
×
417
        }
×
418

419
        if spec.UseLoadBalancer != nil {
×
420
                deprecate("useLoadBalancer", "enableMasterLoadBalancer")
×
421
        }
×
422
        if spec.ReplicaLoadBalancer != nil {
×
423
                deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
×
424
        }
×
425

426
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
×
427
                (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
×
428
                c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
×
429
        }
×
430
}
431

432
// mergeDeprecatedPostgreSQLSpecParameters modifies the spec passed to the cluster by setting current parameter
433
// values from the obsolete ones. Note: while the spec that is modified is a copy made in queueClusterEvent, it is
434
// still a shallow copy, so be extra careful not to modify values pointer fields point to, but copy them instead.
435
func (c *Controller) mergeDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) *acidv1.PostgresSpec {
2✔
436
        if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
2✔
437
                (spec.EnableReplicaLoadBalancer == nil && spec.EnableMasterLoadBalancer == nil) {
3✔
438
                if spec.UseLoadBalancer != nil {
2✔
439
                        spec.EnableMasterLoadBalancer = new(bool)
1✔
440
                        *spec.EnableMasterLoadBalancer = *spec.UseLoadBalancer
1✔
441
                }
1✔
442
                if spec.ReplicaLoadBalancer != nil {
2✔
443
                        spec.EnableReplicaLoadBalancer = new(bool)
1✔
444
                        *spec.EnableReplicaLoadBalancer = *spec.ReplicaLoadBalancer
1✔
445
                }
1✔
446
        }
447
        spec.ReplicaLoadBalancer = nil
2✔
448
        spec.UseLoadBalancer = nil
2✔
449

2✔
450
        return spec
2✔
451
}
452

453
func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.Postgresql, eventType EventType) {
×
454
        var (
×
455
                uid          types.UID
×
456
                clusterName  spec.NamespacedName
×
457
                clusterError string
×
458
        )
×
459

×
460
        if informerOldSpec != nil { // update, delete
×
461
                uid = informerOldSpec.GetUID()
×
462
                clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
×
463

×
464
                // user is fixing previously incorrect spec
×
465
                if eventType == EventUpdate && informerNewSpec.Error == "" && informerOldSpec.Error != "" {
×
466
                        eventType = EventSync
×
467
                }
×
468

469
                // set current error to be one of the new spec if present
470
                if informerNewSpec != nil {
×
471
                        clusterError = informerNewSpec.Error
×
472
                } else {
×
473
                        clusterError = informerOldSpec.Error
×
474
                }
×
475
        } else { // add, sync
×
476
                uid = informerNewSpec.GetUID()
×
477
                clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
×
478
                clusterError = informerNewSpec.Error
×
479
        }
×
480

481
        if eventType == EventDelete {
×
482
                // when owner references are used operator cannot block deletion
×
483
                if c.opConfig.EnableOwnerReferences == nil || !*c.opConfig.EnableOwnerReferences {
×
484
                        // only allow deletion if delete annotations are set and conditions are met
×
485
                        if err := c.meetsClusterDeleteAnnotations(informerOldSpec); err != nil {
×
486
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
487
                                        "ignoring %q event for cluster %q - manifest does not fulfill delete requirements: %s", eventType, clusterName, err)
×
488
                                c.logger.WithField("cluster-name", clusterName).Warnf(
×
489
                                        "please, recreate Postgresql resource %q and set annotations to delete properly", clusterName)
×
490
                                if currentManifest, marshalErr := json.Marshal(informerOldSpec); marshalErr != nil {
×
491
                                        c.logger.WithField("cluster-name", clusterName).Warnf("could not marshal current manifest:\n%+v", informerOldSpec)
×
492
                                } else {
×
493
                                        c.logger.WithField("cluster-name", clusterName).Warnf("%s\n", string(currentManifest))
×
494
                                }
×
495
                                return
×
496
                        }
497
                }
498
        }
499

500
        // If the cluster is marked for deletion, cancel any ongoing operations immediately
501
        // This unblocks stuck Sync operations so the delete can proceed
NEW
502
        if informerNewSpec != nil && !informerNewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
×
NEW
503
                c.clustersMu.RLock()
×
NEW
504
                if cl, found := c.clusters[clusterName]; found {
×
NEW
505
                        c.logger.WithField("cluster-name", clusterName).Infof(
×
NEW
506
                                "cluster marked for deletion (DeletionTimestamp: %s), cancelling ongoing operations",
×
NEW
507
                                informerNewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
NEW
508
                        cl.Cancel()
×
NEW
509
                }
×
NEW
510
                c.clustersMu.RUnlock()
×
511
        }
512

513
        if clusterError != "" && eventType != EventDelete {
×
514
                c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
×
515

×
516
                switch eventType {
×
517
                case EventAdd:
×
518
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
×
519
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
520
                        if err != nil {
×
521
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
522
                        }
×
523
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
×
524
                case EventUpdate:
×
525
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
×
526
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
527
                        if err != nil {
×
528
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
529
                        }
×
530
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
×
531
                default:
×
532
                        informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
×
533
                        _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
×
534
                        if err != nil {
×
535
                                c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
×
536
                        }
×
537
                        c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
×
538
                }
539

540
                return
×
541
        }
542

543
        // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
544
        // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
545
        // effect, the modified state will be returned together with subsequent events).
546

547
        workerID := c.clusterWorkerID(clusterName)
×
548
        clusterEvent := ClusterEvent{
×
549
                EventTime: time.Now(),
×
550
                EventType: eventType,
×
551
                UID:       uid,
×
552
                OldSpec:   informerOldSpec.Clone(),
×
553
                NewSpec:   informerNewSpec.Clone(),
×
554
                WorkerID:  workerID,
×
555
        }
×
556

×
557
        lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
×
558
        if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
×
559
                lg.Errorf("error while queueing cluster event: %v", clusterEvent)
×
560
        }
×
561
        lg.Infof("%s event has been queued", eventType)
×
562

×
563
        if eventType != EventDelete {
×
564
                return
×
565
        }
×
566
        // A delete event discards all prior requests for that cluster.
567
        for _, evType := range []EventType{EventAdd, EventSync, EventUpdate, EventRepair} {
×
568
                obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
×
569
                if err != nil {
×
570
                        lg.Warningf("could not get event from the queue: %v", err)
×
571
                        continue
×
572
                }
573

574
                if !exists {
×
575
                        continue
×
576
                }
577

578
                err = c.clusterEventQueues[workerID].Delete(obj)
×
579
                if err != nil {
×
580
                        lg.Warningf("could not delete event from the queue: %v", err)
×
581
                } else {
×
582
                        lg.Debugf("event %s has been discarded for the cluster", evType)
×
583
                }
×
584
        }
585
}
586

587
func (c *Controller) postgresqlAdd(obj interface{}) {
×
588
        pg := c.postgresqlCheck(obj)
×
589
        if pg != nil {
×
590
                // We will not get multiple Add events for the same cluster
×
591
                c.queueClusterEvent(nil, pg, EventAdd)
×
592
        }
×
593
}
594

595
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
×
596
        pgOld := c.postgresqlCheck(prev)
×
597
        pgNew := c.postgresqlCheck(cur)
×
598
        if pgOld != nil && pgNew != nil {
×
599
                clusterName := util.NameFromMeta(pgNew.ObjectMeta)
×
600

×
601
                // Check if DeletionTimestamp was set (resource marked for deletion)
×
602
                deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
×
603
                if deletionTimestampChanged {
×
604
                        c.logger.WithField("cluster-name", clusterName).Infof(
×
605
                                "UPDATE event: DeletionTimestamp set to %s, queueing event",
×
606
                                pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
×
607
                        c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
608
                        return
×
609
                }
×
610

611
                // Avoid the infinite recursion for status updates
612
                if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
×
613
                        if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
×
NEW
614
                                c.logger.WithField("cluster-name", clusterName).Debugf(
×
NEW
615
                                        "UPDATE event: no spec/annotation changes, skipping")
×
616
                                return
×
617
                        }
×
618
                }
619

NEW
620
                c.logger.WithField("cluster-name", clusterName).Infof("UPDATE event: spec or annotations changed, queueing event")
×
UNCOV
621
                c.queueClusterEvent(pgOld, pgNew, EventUpdate)
×
622
        }
623
}
624

625
func (c *Controller) postgresqlDelete(obj interface{}) {
×
626
        pg := c.postgresqlCheck(obj)
×
627
        if pg != nil {
×
628
                c.queueClusterEvent(pg, nil, EventDelete)
×
629
        }
×
630
}
631

632
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
×
633
        pg, ok := obj.(*acidv1.Postgresql)
×
634
        if !ok {
×
635
                c.logger.Errorf("could not cast to postgresql spec")
×
636
                return nil
×
637
        }
×
638
        if !c.hasOwnership(pg) {
×
639
                return nil
×
640
        }
×
641
        return pg
×
642
}
643

644
/*
645
Ensures the pod service account and role bindings exists in a namespace
646
before a PG cluster is created there so that a user does not have to deploy
647
these credentials manually.  StatefulSets require the service account to
648
create pods; Patroni requires relevant RBAC bindings to access endpoints
649
or config maps.
650

651
The operator does not sync accounts/role bindings after creation.
652
*/
653
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
×
654
        namespace := event.NewSpec.GetNamespace()
×
655

×
656
        if err := c.createPodServiceAccount(namespace); err != nil {
×
657
                return fmt.Errorf("could not create pod service account %q : %v", c.opConfig.PodServiceAccountName, err)
×
658
        }
×
659

660
        if err := c.createRoleBindings(namespace); err != nil {
×
661
                return fmt.Errorf("could not create role binding %q : %v", c.PodServiceAccountRoleBinding.Name, err)
×
662
        }
×
663
        return nil
×
664
}
665

666
func (c *Controller) createPodServiceAccount(namespace string) error {
×
667
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
668
        _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
×
669
        if k8sutil.ResourceNotFound(err) {
×
670

×
671
                c.logger.Infof("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)
×
672

×
673
                // get a separate copy of service account
×
674
                // to prevent a race condition when setting a namespace for many clusters
×
675
                sa := *c.PodServiceAccount
×
676
                if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
×
677
                        return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
×
678
                }
×
679

680
                c.logger.Infof("successfully deployed the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
681
        } else if k8sutil.ResourceAlreadyExists(err) {
×
682
                return nil
×
683
        }
×
684

685
        return err
×
686
}
687

688
func (c *Controller) createRoleBindings(namespace string) error {
×
689
        podServiceAccountName := c.opConfig.PodServiceAccountName
×
690
        podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
×
691

×
692
        _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
×
693
        if k8sutil.ResourceNotFound(err) {
×
694

×
695
                c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
×
696

×
697
                // get a separate copy of role binding
×
698
                // to prevent a race condition when setting a namespace for many clusters
×
699
                rb := *c.PodServiceAccountRoleBinding
×
700
                _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
×
701
                if err != nil {
×
702
                        return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
×
703
                }
×
704

705
                c.logger.Infof("successfully deployed the role binding for the pod service account %q to the %q namespace", podServiceAccountName, namespace)
×
706

707
        } else if k8sutil.ResourceAlreadyExists(err) {
×
708
                return nil
×
709
        }
×
710

711
        return err
×
712
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc