• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 13542869901

26 Feb 2025 11:29AM UTC coverage: 45.503% (+0.05%) from 45.452%
13542869901

Pull #2868

github

web-flow
Merge 6dd01fa95 into 2a4be1cb3
Pull Request #2868: do not remove publications of slot defined in manifest

3 of 27 new or added lines in 3 files covered. (11.11%)

5 existing lines in 1 file now uncovered.

7023 of 15434 relevant lines covered (45.5%)

29.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.08
/pkg/cluster/cluster.go
1
package cluster
2

3
// Postgres CustomResourceDefinition object i.e. Spilo
4

5
import (
6
        "database/sql"
7
        "encoding/json"
8
        "fmt"
9
        "reflect"
10
        "regexp"
11
        "strings"
12
        "sync"
13
        "time"
14

15
        "github.com/sirupsen/logrus"
16
        acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
17
        zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"
18

19
        "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
20
        "github.com/zalando/postgres-operator/pkg/spec"
21
        pgteams "github.com/zalando/postgres-operator/pkg/teams"
22
        "github.com/zalando/postgres-operator/pkg/util"
23
        "github.com/zalando/postgres-operator/pkg/util/config"
24
        "github.com/zalando/postgres-operator/pkg/util/constants"
25
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
26
        "github.com/zalando/postgres-operator/pkg/util/patroni"
27
        "github.com/zalando/postgres-operator/pkg/util/teams"
28
        "github.com/zalando/postgres-operator/pkg/util/users"
29
        "github.com/zalando/postgres-operator/pkg/util/volumes"
30
        appsv1 "k8s.io/api/apps/v1"
31
        batchv1 "k8s.io/api/batch/v1"
32
        v1 "k8s.io/api/core/v1"
33
        policyv1 "k8s.io/api/policy/v1"
34
        rbacv1 "k8s.io/api/rbac/v1"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/types"
37
        "k8s.io/client-go/rest"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/tools/record"
40
        "k8s.io/client-go/tools/reference"
41
)
42

43
var (
44
        alphaNumericRegexp    = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9]*$")
45
        databaseNameRegexp    = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
46
        userRegexp            = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`)
47
        patroniObjectSuffixes = []string{"leader", "config", "sync", "failover"}
48
        finalizerName         = "postgres-operator.acid.zalan.do"
49
)
50

51
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
52
type Config struct {
53
        OpConfig                     config.Config
54
        RestConfig                   *rest.Config
55
        PgTeamMap                    *pgteams.PostgresTeamMap
56
        InfrastructureRoles          map[string]spec.PgUser // inherited from the controller
57
        PodServiceAccount            *v1.ServiceAccount
58
        PodServiceAccountRoleBinding *rbacv1.RoleBinding
59
}
60

61
type kubeResources struct {
62
        Services                      map[PostgresRole]*v1.Service
63
        Endpoints                     map[PostgresRole]*v1.Endpoints
64
        PatroniEndpoints              map[string]*v1.Endpoints
65
        PatroniConfigMaps             map[string]*v1.ConfigMap
66
        Secrets                       map[types.UID]*v1.Secret
67
        Statefulset                   *appsv1.StatefulSet
68
        VolumeClaims                  map[types.UID]*v1.PersistentVolumeClaim
69
        PrimaryPodDisruptionBudget    *policyv1.PodDisruptionBudget
70
        CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
71
        LogicalBackupJob              *batchv1.CronJob
72
        Streams                       map[string]*zalandov1.FabricEventStream
73
        //Pods are treated separately
74
}
75

76
// Cluster describes postgresql cluster
77
type Cluster struct {
78
        kubeResources
79
        acidv1.Postgresql
80
        Config
81
        logger           *logrus.Entry
82
        eventRecorder    record.EventRecorder
83
        patroni          patroni.Interface
84
        pgUsers          map[string]spec.PgUser
85
        pgUsersCache     map[string]spec.PgUser
86
        systemUsers      map[string]spec.PgUser
87
        podSubscribers   map[spec.NamespacedName]chan PodEvent
88
        podSubscribersMu sync.RWMutex
89
        pgDb             *sql.DB
90
        mu               sync.Mutex
91
        userSyncStrategy spec.UserSyncer
92
        deleteOptions    metav1.DeleteOptions
93
        podEventsQueue   *cache.FIFO
94
        replicationSlots map[string]interface{}
95

96
        teamsAPIClient      teams.Interface
97
        oauthTokenGetter    OAuthTokenGetter
98
        KubeClient          k8sutil.KubernetesClient //TODO: move clients to the better place?
99
        currentProcess      Process
100
        processMu           sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
101
        specMu              sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
102
        ConnectionPooler    map[PostgresRole]*ConnectionPoolerObjects
103
        EBSVolumes          map[string]volumes.VolumeProperties
104
        VolumeResizer       volumes.VolumeResizer
105
        currentMajorVersion int
106
}
107

108
type compareStatefulsetResult struct {
109
        match                 bool
110
        replace               bool
111
        rollingUpdate         bool
112
        reasons               []string
113
        deletedPodAnnotations []string
114
}
115

116
type compareLogicalBackupJobResult struct {
117
        match                 bool
118
        reasons               []string
119
        deletedPodAnnotations []string
120
}
121

122
// New creates a new cluster. This function should be called from a controller.
123
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
85✔
124
        deletePropagationPolicy := metav1.DeletePropagationOrphan
85✔
125

85✔
126
        podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
85✔
127
                e, ok := obj.(PodEvent)
×
128
                if !ok {
×
129
                        return "", fmt.Errorf("could not cast to PodEvent")
×
130
                }
×
131

132
                return fmt.Sprintf("%s-%s", e.PodName, e.ResourceVersion), nil
×
133
        })
134
        passwordEncryption, ok := pgSpec.Spec.PostgresqlParam.Parameters["password_encryption"]
85✔
135
        if !ok {
170✔
136
                passwordEncryption = "md5"
85✔
137
        }
85✔
138

139
        cluster := &Cluster{
85✔
140
                Config:         cfg,
85✔
141
                Postgresql:     pgSpec,
85✔
142
                pgUsers:        make(map[string]spec.PgUser),
85✔
143
                systemUsers:    make(map[string]spec.PgUser),
85✔
144
                podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
85✔
145
                kubeResources: kubeResources{
85✔
146
                        Secrets:           make(map[types.UID]*v1.Secret),
85✔
147
                        Services:          make(map[PostgresRole]*v1.Service),
85✔
148
                        Endpoints:         make(map[PostgresRole]*v1.Endpoints),
85✔
149
                        PatroniEndpoints:  make(map[string]*v1.Endpoints),
85✔
150
                        PatroniConfigMaps: make(map[string]*v1.ConfigMap),
85✔
151
                        VolumeClaims:      make(map[types.UID]*v1.PersistentVolumeClaim),
85✔
152
                        Streams:           make(map[string]*zalandov1.FabricEventStream)},
85✔
153
                userSyncStrategy: users.DefaultUserSyncStrategy{
85✔
154
                        PasswordEncryption:   passwordEncryption,
85✔
155
                        RoleDeletionSuffix:   cfg.OpConfig.RoleDeletionSuffix,
85✔
156
                        AdditionalOwnerRoles: cfg.OpConfig.AdditionalOwnerRoles,
85✔
157
                },
85✔
158
                deleteOptions:       metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
85✔
159
                podEventsQueue:      podEventsQueue,
85✔
160
                KubeClient:          kubeClient,
85✔
161
                currentMajorVersion: 0,
85✔
162
                replicationSlots:    make(map[string]interface{}),
85✔
163
        }
85✔
164
        cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
85✔
165
        cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
85✔
166
        cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
85✔
167
        cluster.patroni = patroni.New(cluster.logger, nil)
85✔
168
        cluster.eventRecorder = eventRecorder
85✔
169

85✔
170
        cluster.EBSVolumes = make(map[string]volumes.VolumeProperties)
85✔
171
        if cfg.OpConfig.StorageResizeMode != "pvc" || cfg.OpConfig.EnableEBSGp3Migration {
169✔
172
                cluster.VolumeResizer = &volumes.EBSVolumeResizer{AWSRegion: cfg.OpConfig.AWSRegion}
84✔
173
        }
84✔
174

175
        return cluster
85✔
176
}
177

178
func (c *Cluster) clusterName() spec.NamespacedName {
131✔
179
        return util.NameFromMeta(c.ObjectMeta)
131✔
180
}
131✔
181

182
func (c *Cluster) clusterNamespace() string {
×
183
        return c.ObjectMeta.Namespace
×
184
}
×
185

186
func (c *Cluster) teamName() string {
44✔
187
        // TODO: check Teams API for the actual name (in case the user passes an integer Id).
44✔
188
        return c.Spec.TeamID
44✔
189
}
44✔
190

191
func (c *Cluster) setProcessName(procName string, args ...interface{}) {
238✔
192
        c.processMu.Lock()
238✔
193
        defer c.processMu.Unlock()
238✔
194
        c.currentProcess = Process{
238✔
195
                Name:      fmt.Sprintf(procName, args...),
238✔
196
                StartTime: time.Now(),
238✔
197
        }
238✔
198
}
238✔
199

200
// GetReference of Postgres CR object
201
// i.e. required to emit events to this resource
202
func (c *Cluster) GetReference() *v1.ObjectReference {
13✔
203
        ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql)
13✔
204
        if err != nil {
13✔
205
                c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err)
×
206
        }
×
207
        return ref
13✔
208
}
209

210
func (c *Cluster) isNewCluster() bool {
5✔
211
        return c.Status.Creating()
5✔
212
}
5✔
213

214
// initUsers populates c.systemUsers and c.pgUsers maps.
215
func (c *Cluster) initUsers() error {
14✔
216
        c.setProcessName("initializing users")
14✔
217

14✔
218
        // if team member deprecation is enabled save current state of pgUsers
14✔
219
        // to check for deleted roles
14✔
220
        c.pgUsersCache = map[string]spec.PgUser{}
14✔
221
        if c.OpConfig.EnableTeamMemberDeprecation {
16✔
222
                for k, v := range c.pgUsers {
4✔
223
                        if v.Origin == spec.RoleOriginTeamsAPI {
3✔
224
                                c.pgUsersCache[k] = v
1✔
225
                        }
1✔
226
                }
227
        }
228

229
        // clear our the previous state of the cluster users (in case we are
230
        // running a sync).
231
        c.systemUsers = map[string]spec.PgUser{}
14✔
232
        c.pgUsers = map[string]spec.PgUser{}
14✔
233

14✔
234
        c.initSystemUsers()
14✔
235

14✔
236
        if err := c.initInfrastructureRoles(); err != nil {
14✔
237
                return fmt.Errorf("could not init infrastructure roles: %v", err)
×
238
        }
×
239

240
        if err := c.initPreparedDatabaseRoles(); err != nil {
14✔
241
                return fmt.Errorf("could not init default users: %v", err)
×
242
        }
×
243

244
        if err := c.initRobotUsers(); err != nil {
14✔
245
                return fmt.Errorf("could not init robot users: %v", err)
×
246
        }
×
247

248
        if err := c.initHumanUsers(); err != nil {
15✔
249
                // remember all cached users in c.pgUsers
1✔
250
                for cachedUserName, cachedUser := range c.pgUsersCache {
2✔
251
                        c.pgUsers[cachedUserName] = cachedUser
1✔
252
                }
1✔
253
                return fmt.Errorf("could not init human users: %v", err)
1✔
254
        }
255

256
        c.initAdditionalOwnerRoles()
13✔
257

13✔
258
        return nil
13✔
259
}
260

261
// Create creates the new kubernetes objects associated with the cluster.
262
func (c *Cluster) Create() (err error) {
1✔
263
        c.mu.Lock()
1✔
264
        defer c.mu.Unlock()
1✔
265

1✔
266
        var (
1✔
267
                pgCreateStatus *acidv1.Postgresql
1✔
268
                service        *v1.Service
1✔
269
                ep             *v1.Endpoints
1✔
270
                ss             *appsv1.StatefulSet
1✔
271
        )
1✔
272

1✔
273
        defer func() {
2✔
274
                var (
1✔
275
                        pgUpdatedStatus *acidv1.Postgresql
1✔
276
                        errStatus       error
1✔
277
                )
1✔
278
                if err == nil {
2✔
279
                        pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
1✔
280
                } else {
1✔
281
                        c.logger.Warningf("cluster created failed: %v", err)
×
282
                        pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
×
283
                }
×
284
                if errStatus != nil {
1✔
285
                        c.logger.Warningf("could not set cluster status: %v", errStatus)
×
286
                }
×
287
                if pgUpdatedStatus != nil {
2✔
288
                        c.setSpec(pgUpdatedStatus)
1✔
289
                }
1✔
290
        }()
291

292
        pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
1✔
293
        if err != nil {
1✔
294
                return fmt.Errorf("could not set cluster status: %v", err)
×
295
        }
×
296
        c.setSpec(pgCreateStatus)
1✔
297

1✔
298
        if c.OpConfig.EnableFinalizers != nil && *c.OpConfig.EnableFinalizers {
2✔
299
                if err = c.addFinalizer(); err != nil {
1✔
300
                        return fmt.Errorf("could not add finalizer: %v", err)
×
301
                }
×
302
        }
303
        c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
1✔
304

1✔
305
        for _, role := range []PostgresRole{Master, Replica} {
3✔
306

2✔
307
                // if kubernetes_use_configmaps is set Patroni will create configmaps
2✔
308
                // otherwise it will use endpoints
2✔
309
                if !c.patroniKubernetesUseConfigMaps() {
4✔
310
                        if c.Endpoints[role] != nil {
2✔
311
                                return fmt.Errorf("%s endpoint already exists in the cluster", role)
×
312
                        }
×
313
                        if role == Master {
3✔
314
                                // replica endpoint will be created by the replica service. Master endpoint needs to be created by us,
1✔
315
                                // since the corresponding master service does not define any selectors.
1✔
316
                                ep, err = c.createEndpoint(role)
1✔
317
                                if err != nil {
1✔
318
                                        return fmt.Errorf("could not create %s endpoint: %v", role, err)
×
319
                                }
×
320
                                c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
1✔
321
                                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
1✔
322
                        }
323
                }
324

325
                if c.Services[role] != nil {
2✔
326
                        return fmt.Errorf("service already exists in the cluster")
×
327
                }
×
328
                service, err = c.createService(role)
2✔
329
                if err != nil {
2✔
330
                        return fmt.Errorf("could not create %s service: %v", role, err)
×
331
                }
×
332
                c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
2✔
333
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role)
2✔
334
        }
335

336
        if err = c.initUsers(); err != nil {
1✔
337
                return err
×
338
        }
×
339
        c.logger.Infof("users have been initialized")
1✔
340

1✔
341
        if err = c.syncSecrets(); err != nil {
1✔
342
                return fmt.Errorf("could not create secrets: %v", err)
×
343
        }
×
344
        c.logger.Infof("secrets have been successfully created")
1✔
345
        c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
1✔
346

1✔
347
        if err = c.createPodDisruptionBudgets(); err != nil {
1✔
348
                return fmt.Errorf("could not create pod disruption budgets: %v", err)
×
349
        }
×
350
        c.logger.Info("pod disruption budgets have been successfully created")
1✔
351

1✔
352
        if c.Statefulset != nil {
1✔
353
                return fmt.Errorf("statefulset already exists in the cluster")
×
354
        }
×
355
        ss, err = c.createStatefulSet()
1✔
356
        if err != nil {
1✔
357
                return fmt.Errorf("could not create statefulset: %v", err)
×
358
        }
×
359
        c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
1✔
360
        c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
1✔
361

1✔
362
        c.logger.Info("waiting for the cluster being ready")
1✔
363

1✔
364
        if err = c.waitStatefulsetPodsReady(); err != nil {
1✔
365
                c.logger.Errorf("failed to create cluster: %v", err)
×
366
                return err
×
367
        }
×
368
        c.logger.Infof("pods are ready")
1✔
369
        c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
1✔
370

1✔
371
        // sync volume may already transition volumes to gp3, if iops/throughput or type is specified
1✔
372
        if err = c.syncVolumes(); err != nil {
1✔
373
                return err
×
374
        }
×
375

376
        // sync resources created by Patroni
377
        if err = c.syncPatroniResources(); err != nil {
1✔
378
                c.logger.Warnf("Patroni resources not yet synced: %v", err)
×
379
        }
×
380

381
        // create database objects unless we are running without pods or disabled
382
        // that feature explicitly
383
        if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
1✔
384
                c.logger.Infof("Create roles")
×
385
                if err = c.createRoles(); err != nil {
×
386
                        return fmt.Errorf("could not create users: %v", err)
×
387
                }
×
388
                c.logger.Infof("users have been successfully created")
×
389

×
390
                if err = c.syncDatabases(); err != nil {
×
391
                        return fmt.Errorf("could not sync databases: %v", err)
×
392
                }
×
393
                if err = c.syncPreparedDatabases(); err != nil {
×
394
                        return fmt.Errorf("could not sync prepared databases: %v", err)
×
395
                }
×
396
                c.logger.Infof("databases have been successfully created")
×
397
        }
398

399
        if c.Postgresql.Spec.EnableLogicalBackup {
2✔
400
                if err := c.createLogicalBackupJob(); err != nil {
1✔
401
                        return fmt.Errorf("could not create a k8s cron job for logical backups: %v", err)
×
402
                }
×
403
                c.logger.Info("a k8s cron job for logical backup has been successfully created")
1✔
404
        }
405

406
        // Create connection pooler deployment and services if necessary. Since we
407
        // need to perform some operations with the database itself (e.g. install
408
        // lookup function), do it as the last step, when everything is available.
409
        //
410
        // Do not consider connection pooler as a strict requirement, and if
411
        // something fails, report warning
412
        c.createConnectionPooler(c.installLookupFunction)
1✔
413

1✔
414
        // remember slots to detect deletion from manifest
1✔
415
        for slotName, desiredSlot := range c.Spec.Patroni.Slots {
1✔
416
                c.replicationSlots[slotName] = desiredSlot
×
417
        }
×
418

419
        if len(c.Spec.Streams) > 0 {
1✔
420
                // creating streams requires syncing the statefulset first
×
421
                err = c.syncStatefulSet()
×
422
                if err != nil {
×
423
                        return fmt.Errorf("could not sync statefulset: %v", err)
×
424
                }
×
425
                if err = c.syncStreams(); err != nil {
×
426
                        c.logger.Errorf("could not create streams: %v", err)
×
427
                }
×
428
        }
429

430
        if err := c.listResources(); err != nil {
1✔
431
                c.logger.Errorf("could not list resources: %v", err)
×
432
        }
×
433

434
        return nil
1✔
435
}
436

437
func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult {
30✔
438
        deletedPodAnnotations := []string{}
30✔
439
        reasons := make([]string, 0)
30✔
440
        var match, needsRollUpdate, needsReplace bool
30✔
441

30✔
442
        match = true
30✔
443
        //TODO: improve me
30✔
444
        if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
30✔
445
                match = false
×
446
                reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
×
447
        }
×
448
        if !reflect.DeepEqual(c.Statefulset.OwnerReferences, statefulSet.OwnerReferences) {
30✔
449
                match = false
×
450
                needsReplace = true
×
451
                reasons = append(reasons, "new statefulset's ownerReferences do not match")
×
452
        }
×
453
        if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed {
36✔
454
                match = false
6✔
455
                needsReplace = true
6✔
456
                reasons = append(reasons, "new statefulset's annotations do not match: "+reason)
6✔
457
        }
6✔
458
        if c.Statefulset.Spec.PodManagementPolicy != statefulSet.Spec.PodManagementPolicy {
30✔
459
                match = false
×
460
                needsReplace = true
×
461
                reasons = append(reasons, "new statefulset's pod management policy do not match")
×
462
        }
×
463

464
        if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
30✔
465
                c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
×
466
                        WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
×
467
                        WhenScaled:  appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
×
468
                }
×
469
        }
×
470
        if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
30✔
471
                match = false
×
472
                needsReplace = true
×
473
                reasons = append(reasons, "new statefulset's persistent volume claim retention policy do not match")
×
474
        }
×
475

476
        needsRollUpdate, reasons = c.compareContainers("statefulset initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
30✔
477
        needsRollUpdate, reasons = c.compareContainers("statefulset containers", c.Statefulset.Spec.Template.Spec.Containers, statefulSet.Spec.Template.Spec.Containers, needsRollUpdate, reasons)
30✔
478

30✔
479
        if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {
30✔
480
                c.logger.Warningf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
×
481
                return &compareStatefulsetResult{}
×
482
        }
×
483
        // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
484
        // and the combined effect of all the changes should be applied.
485
        // TODO: make sure this is in sync with generatePodTemplate, ideally by using the same list of fields to generate
486
        // the template and the diff
487
        if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName {
30✔
488
                needsReplace = true
×
489
                needsRollUpdate = true
×
490
                reasons = append(reasons, "new statefulset's serviceAccountName service account name does not match the current one")
×
491
        }
×
492
        if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds {
30✔
493
                needsReplace = true
×
494
                needsRollUpdate = true
×
495
                reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds does not match the current one")
×
496
        }
×
497
        if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.Affinity, statefulSet.Spec.Template.Spec.Affinity) {
30✔
498
                needsReplace = true
×
499
                needsRollUpdate = true
×
500
                reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
×
501
        }
×
502
        if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
30✔
503
                needsReplace = true
×
504
                needsRollUpdate = true
×
505
                reasons = append(reasons, "new statefulset's pod tolerations does not match the current one")
×
506
        }
×
507

508
        // Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
509
        if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) {
30✔
510
                needsReplace = true
×
511
                needsRollUpdate = true
×
512
                reasons = append(reasons, "new statefulset's metadata labels does not match the current one")
×
513
        }
×
514
        if (c.Statefulset.Spec.Selector != nil) && (statefulSet.Spec.Selector != nil) {
60✔
515
                if !reflect.DeepEqual(c.Statefulset.Spec.Selector.MatchLabels, statefulSet.Spec.Selector.MatchLabels) {
30✔
516
                        // forbid introducing new labels in the selector on the new statefulset, as it would cripple replacements
×
517
                        // due to the fact that the new statefulset won't be able to pick up old pods with non-matching labels.
×
518
                        if !util.MapContains(c.Statefulset.Spec.Selector.MatchLabels, statefulSet.Spec.Selector.MatchLabels) {
×
519
                                c.logger.Warningf("new statefulset introduces extra labels in the label selector, cannot continue")
×
520
                                return &compareStatefulsetResult{}
×
521
                        }
×
522
                        needsReplace = true
×
523
                        reasons = append(reasons, "new statefulset's selector does not match the current one")
×
524
                }
525
        }
526

527
        if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed {
33✔
528
                match = false
3✔
529
                needsReplace = true
3✔
530
                reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
3✔
531
        }
3✔
532
        if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
30✔
533
                needsReplace = true
×
534
                needsRollUpdate = true
×
535
                reasons = append(reasons, "new statefulset's pod template security context in spec does not match the current one")
×
536
        }
×
537
        if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
30✔
538
                needsReplace = true
×
539
                reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one")
×
540
        } else {
30✔
541
                for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ {
60✔
542
                        name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
30✔
543
                        // Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
30✔
544
                        if name != statefulSet.Spec.VolumeClaimTemplates[i].Name {
30✔
545
                                needsReplace = true
×
546
                                reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
×
547
                                continue
×
548
                        }
549
                        if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed {
32✔
550
                                needsReplace = true
2✔
551
                                reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason))
2✔
552
                        }
2✔
553
                        if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
30✔
554
                                name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
×
555
                                needsReplace = true
×
556
                                reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q does not match the current one", name))
×
557
                        }
×
558
                }
559
        }
560

561
        if len(c.Statefulset.Spec.Template.Spec.Volumes) != len(statefulSet.Spec.Template.Spec.Volumes) {
30✔
562
                needsReplace = true
×
563
                reasons = append(reasons, "new statefulset's volumes contains different number of volumes to the old one")
×
564
        }
×
565

566
        // we assume any change in priority happens by rolling out a new priority class
567
        // changing the priority value in an existing class is not supproted
568
        if c.Statefulset.Spec.Template.Spec.PriorityClassName != statefulSet.Spec.Template.Spec.PriorityClassName {
30✔
569
                needsReplace = true
×
570
                needsRollUpdate = true
×
571
                reasons = append(reasons, "new statefulset's pod priority class in spec does not match the current one")
×
572
        }
×
573

574
        // lazy Spilo update: modify the image in the statefulset itself but let its pods run with the old image
575
        // until they are re-created for other reasons, for example node rotation
576
        effectivePodImage := getPostgresContainer(&c.Statefulset.Spec.Template.Spec).Image
30✔
577
        desiredImage := getPostgresContainer(&statefulSet.Spec.Template.Spec).Image
30✔
578
        if c.OpConfig.EnableLazySpiloUpgrade && !reflect.DeepEqual(effectivePodImage, desiredImage) {
30✔
579
                needsReplace = true
×
580
                reasons = append(reasons, "lazy Spilo update: new statefulset's pod image does not match the current one")
×
581
        }
×
582

583
        if needsRollUpdate || needsReplace {
57✔
584
                match = false
27✔
585
        }
27✔
586

587
        return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations}
30✔
588
}
589

590
type containerCondition func(a, b v1.Container) bool
591

592
type containerCheck struct {
593
        condition containerCondition
594
        reason    string
595
}
596

597
func newCheck(msg string, cond containerCondition) containerCheck {
594✔
598
        return containerCheck{reason: msg, condition: cond}
594✔
599
}
594✔
600

601
// compareContainers: compare two list of Containers
602
// and return:
603
// * whether or not a rolling update is needed
604
// * a list of reasons in a human readable format
605

606
func (c *Cluster) compareContainers(description string, setA, setB []v1.Container, needsRollUpdate bool, reasons []string) (bool, []string) {
73✔
607
        if len(setA) != len(setB) {
80✔
608
                return true, append(reasons, fmt.Sprintf("new %s's length does not match the current ones", description))
7✔
609
        }
7✔
610

611
        checks := []containerCheck{
66✔
612
                newCheck("new %s's %s (index %d) name does not match the current one",
66✔
613
                        func(a, b v1.Container) bool { return a.Name != b.Name }),
102✔
614
                newCheck("new %s's %s (index %d) readiness probe does not match the current one",
615
                        func(a, b v1.Container) bool { return !reflect.DeepEqual(a.ReadinessProbe, b.ReadinessProbe) }),
36✔
616
                newCheck("new %s's %s (index %d) ports do not match the current one",
617
                        func(a, b v1.Container) bool { return !comparePorts(a.Ports, b.Ports) }),
36✔
618
                newCheck("new %s's %s (index %d) resources do not match the current ones",
619
                        func(a, b v1.Container) bool { return !compareResources(&a.Resources, &b.Resources) }),
36✔
620
                newCheck("new %s's %s (index %d) environment does not match the current one",
621
                        func(a, b v1.Container) bool { return !compareEnv(a.Env, b.Env) }),
36✔
622
                newCheck("new %s's %s (index %d) environment sources do not match the current one",
623
                        func(a, b v1.Container) bool { return !reflect.DeepEqual(a.EnvFrom, b.EnvFrom) }),
36✔
624
                newCheck("new %s's %s (index %d) security context does not match the current one",
625
                        func(a, b v1.Container) bool { return !reflect.DeepEqual(a.SecurityContext, b.SecurityContext) }),
36✔
626
                newCheck("new %s's %s (index %d) volume mounts do not match the current one",
627
                        func(a, b v1.Container) bool { return !compareVolumeMounts(a.VolumeMounts, b.VolumeMounts) }),
36✔
628
        }
629

630
        if !c.OpConfig.EnableLazySpiloUpgrade {
132✔
631
                checks = append(checks, newCheck("new %s's %s (index %d) image does not match the current one",
66✔
632
                        func(a, b v1.Container) bool { return a.Image != b.Image }))
102✔
633
        }
634

635
        for index, containerA := range setA {
102✔
636
                containerB := setB[index]
36✔
637
                for _, check := range checks {
360✔
638
                        if check.condition(containerA, containerB) {
339✔
639
                                needsRollUpdate = true
15✔
640
                                reasons = append(reasons, fmt.Sprintf(check.reason, description, containerA.Name, index))
15✔
641
                        }
15✔
642
                }
643
        }
644

645
        return needsRollUpdate, reasons
66✔
646
}
647

648
func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool {
36✔
649
        equal := true
36✔
650
        if a != nil {
72✔
651
                equal = compareResourcesAssumeFirstNotNil(a, b)
36✔
652
        }
36✔
653
        if equal && (b != nil) {
63✔
654
                equal = compareResourcesAssumeFirstNotNil(b, a)
27✔
655
        }
27✔
656

657
        return equal
36✔
658
}
659

660
func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool {
63✔
661
        if b == nil || (len(b.Requests) == 0) {
64✔
662
                return len(a.Requests) == 0
1✔
663
        }
1✔
664
        for k, v := range a.Requests {
180✔
665
                if (&v).Cmp(b.Requests[k]) != 0 {
125✔
666
                        return false
7✔
667
                }
7✔
668
        }
669
        for k, v := range a.Limits {
163✔
670
                if (&v).Cmp(b.Limits[k]) != 0 {
110✔
671
                        return false
2✔
672
                }
2✔
673
        }
674
        return true
53✔
675

676
}
677

678
func compareEnv(a, b []v1.EnvVar) bool {
41✔
679
        if len(a) != len(b) {
46✔
680
                return false
5✔
681
        }
5✔
682
        var equal bool
36✔
683
        for _, enva := range a {
541✔
684
                hasmatch := false
505✔
685
                for _, envb := range b {
8,182✔
686
                        if enva.Name == envb.Name {
8,181✔
687
                                hasmatch = true
504✔
688
                                if enva.Name == "SPILO_CONFIGURATION" {
526✔
689
                                        equal = compareSpiloConfiguration(enva.Value, envb.Value)
22✔
690
                                } else {
504✔
691
                                        if enva.Value == "" && envb.Value == "" {
743✔
692
                                                equal = reflect.DeepEqual(enva.ValueFrom, envb.ValueFrom)
261✔
693
                                        } else {
482✔
694
                                                equal = (enva.Value == envb.Value)
221✔
695
                                        }
221✔
696
                                }
697
                                if !equal {
505✔
698
                                        return false
1✔
699
                                }
1✔
700
                        }
701
                }
702
                if !hasmatch {
505✔
703
                        return false
1✔
704
                }
1✔
705
        }
706
        return true
34✔
707
}
708

709
func compareSpiloConfiguration(configa, configb string) bool {
26✔
710
        var (
26✔
711
                oa, ob spiloConfiguration
26✔
712
        )
26✔
713

26✔
714
        var err error
26✔
715
        err = json.Unmarshal([]byte(configa), &oa)
26✔
716
        if err != nil {
26✔
717
                return false
×
718
        }
×
719
        oa.Bootstrap.DCS = patroniDCS{}
26✔
720
        err = json.Unmarshal([]byte(configb), &ob)
26✔
721
        if err != nil {
27✔
722
                return false
1✔
723
        }
1✔
724
        ob.Bootstrap.DCS = patroniDCS{}
25✔
725
        return reflect.DeepEqual(oa, ob)
25✔
726
}
727

728
func areProtocolsEqual(a, b v1.Protocol) bool {
112✔
729
        return a == b ||
112✔
730
                (a == "" && b == v1.ProtocolTCP) ||
112✔
731
                (a == v1.ProtocolTCP && b == "")
112✔
732
}
112✔
733

734
func comparePorts(a, b []v1.ContainerPort) bool {
40✔
735
        if len(a) != len(b) {
40✔
736
                return false
×
737
        }
×
738

739
        areContainerPortsEqual := func(a, b v1.ContainerPort) bool {
152✔
740
                return a.Name == b.Name &&
112✔
741
                        a.HostPort == b.HostPort &&
112✔
742
                        areProtocolsEqual(a.Protocol, b.Protocol) &&
112✔
743
                        a.HostIP == b.HostIP
112✔
744
        }
112✔
745

746
        findByPortValue := func(portSpecs []v1.ContainerPort, port int32) (v1.ContainerPort, bool) {
153✔
747
                for _, portSpec := range portSpecs {
335✔
748
                        if portSpec.ContainerPort == port {
334✔
749
                                return portSpec, true
112✔
750
                        }
112✔
751
                }
752
                return v1.ContainerPort{}, false
1✔
753
        }
754

755
        for _, portA := range a {
153✔
756
                portB, found := findByPortValue(b, portA.ContainerPort)
113✔
757
                if !found {
114✔
758
                        return false
1✔
759
                }
1✔
760
                if !areContainerPortsEqual(portA, portB) {
112✔
761
                        return false
×
762
                }
×
763
        }
764

765
        return true
39✔
766
}
767

768
func compareVolumeMounts(old, new []v1.VolumeMount) bool {
44✔
769
        if len(old) != len(new) {
46✔
770
                return false
2✔
771
        }
2✔
772
        for _, mount := range old {
71✔
773
                if !volumeMountExists(mount, new) {
31✔
774
                        return false
2✔
775
                }
2✔
776
        }
777
        return true
40✔
778
}
779

780
func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool {
29✔
781
        for _, m := range mounts {
60✔
782
                if reflect.DeepEqual(mount, m) {
58✔
783
                        return true
27✔
784
                }
27✔
785
        }
786
        return false
2✔
787
}
788

789
func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) {
362✔
790
        reason := ""
362✔
791
        ignoredAnnotations := make(map[string]bool)
362✔
792
        for _, ignore := range c.OpConfig.IgnoredAnnotations {
362✔
793
                ignoredAnnotations[ignore] = true
×
794
        }
×
795

796
        for key := range old {
643✔
797
                if _, ok := ignoredAnnotations[key]; ok {
281✔
798
                        continue
×
799
                }
800
                if _, ok := new[key]; !ok {
341✔
801
                        reason += fmt.Sprintf(" Removed %q.", key)
60✔
802
                        if removedList != nil {
68✔
803
                                *removedList = append(*removedList, key)
8✔
804
                        }
8✔
805
                }
806
        }
807

808
        for key := range new {
617✔
809
                if _, ok := ignoredAnnotations[key]; ok {
255✔
810
                        continue
×
811
                }
812
                v, ok := old[key]
255✔
813
                if !ok {
289✔
814
                        reason += fmt.Sprintf(" Added %q with value %q.", key, new[key])
34✔
815
                } else if v != new[key] {
312✔
816
                        reason += fmt.Sprintf(" %q changed from %q to %q.", key, v, new[key])
57✔
817
                }
57✔
818
        }
819

820
        return reason != "", reason
362✔
821

822
}
823

824
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
44✔
825
        if old.Spec.Type != new.Spec.Type {
49✔
826
                return false, fmt.Sprintf("new service's type %q does not match the current one %q",
5✔
827
                        new.Spec.Type, old.Spec.Type)
5✔
828
        }
5✔
829

830
        oldSourceRanges := old.Spec.LoadBalancerSourceRanges
39✔
831
        newSourceRanges := new.Spec.LoadBalancerSourceRanges
39✔
832

39✔
833
        /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
39✔
834
        if (len(oldSourceRanges) != 0) || (len(newSourceRanges) != 0) {
43✔
835
                if !util.IsEqualIgnoreOrder(oldSourceRanges, newSourceRanges) {
6✔
836
                        return false, "new service's LoadBalancerSourceRange does not match the current one"
2✔
837
                }
2✔
838
        }
839

840
        if !reflect.DeepEqual(old.ObjectMeta.OwnerReferences, new.ObjectMeta.OwnerReferences) {
38✔
841
                return false, "new service's owner references do not match the current ones"
1✔
842
        }
1✔
843

844
        return true, ""
36✔
845
}
846

847
func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult {
13✔
848
        deletedPodAnnotations := []string{}
13✔
849
        reasons := make([]string, 0)
13✔
850
        match := true
13✔
851

13✔
852
        if cur.Spec.Schedule != new.Spec.Schedule {
14✔
853
                match = false
1✔
854
                reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule))
1✔
855
        }
1✔
856

857
        newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
13✔
858
        curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
13✔
859
        if newImage != curImage {
14✔
860
                match = false
1✔
861
                reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage))
1✔
862
        }
1✔
863

864
        newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
13✔
865
        curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
13✔
866
        if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed {
16✔
867
                match = false
3✔
868
                reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason))
3✔
869
        }
3✔
870

871
        newPgVersion := getPgVersion(new)
13✔
872
        curPgVersion := getPgVersion(cur)
13✔
873
        if newPgVersion != curPgVersion {
13✔
874
                match = false
×
875
                reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion))
×
876
        }
×
877

878
        needsReplace := false
13✔
879
        contReasons := make([]string, 0)
13✔
880
        needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons)
13✔
881
        if needsReplace {
15✔
882
                match = false
2✔
883
                reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`)))
2✔
884
        }
2✔
885

886
        return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations}
13✔
887
}
888

889
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
14✔
890
        //TODO: improve comparison
14✔
891
        if !reflect.DeepEqual(new.Spec, cur.Spec) {
14✔
892
                return false, "new PDB's spec does not match the current one"
×
893
        }
×
894
        if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) {
14✔
895
                return false, "new PDB's owner references do not match the current ones"
×
896
        }
×
897
        if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed {
20✔
898
                return false, "new PDB's annotations do not match the current ones:" + reason
6✔
899
        }
6✔
900
        return true, ""
8✔
901
}
902

903
func getPgVersion(cronJob *batchv1.CronJob) string {
26✔
904
        envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
26✔
905
        for _, env := range envs {
130✔
906
                if env.Name == "PG_VERSION" {
130✔
907
                        return env.Value
26✔
908
                }
26✔
909
        }
910
        return ""
×
911
}
912

913
// addFinalizer patches the postgresql CR to add finalizer
914
func (c *Cluster) addFinalizer() error {
1✔
915
        if c.hasFinalizer() {
1✔
916
                return nil
×
917
        }
×
918

919
        c.logger.Infof("adding finalizer %s", finalizerName)
1✔
920
        finalizers := append(c.ObjectMeta.Finalizers, finalizerName)
1✔
921
        newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
1✔
922
        if err != nil {
1✔
923
                return fmt.Errorf("error adding finalizer: %v", err)
×
924
        }
×
925

926
        // update the spec, maintaining the new resourceVersion
927
        c.setSpec(newSpec)
1✔
928

1✔
929
        return nil
1✔
930
}
931

932
// removeFinalizer patches postgresql CR to remove finalizer
933
func (c *Cluster) removeFinalizer() error {
7✔
934
        if !c.hasFinalizer() {
14✔
935
                return nil
7✔
936
        }
7✔
937

938
        c.logger.Infof("removing finalizer %s", finalizerName)
×
939
        finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
×
940
        newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
×
941
        if err != nil {
×
942
                return fmt.Errorf("error removing finalizer: %v", err)
×
943
        }
×
944

945
        // update the spec, maintaining the new resourceVersion.
946
        c.setSpec(newSpec)
×
947

×
948
        return nil
×
949
}
950

951
// hasFinalizer checks if finalizer is currently set or not
952
func (c *Cluster) hasFinalizer() bool {
9✔
953
        for _, finalizer := range c.ObjectMeta.Finalizers {
10✔
954
                if finalizer == finalizerName {
2✔
955
                        return true
1✔
956
                }
1✔
957
        }
958
        return false
8✔
959
}
960

961
// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object
962
// (i.e. service) is treated as an error
963
// logical backup cron jobs are an exception: a user-initiated Update can enable a logical backup job
964
// for a cluster that had no such job before. In this case a missing job is not an error.
965
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
1✔
966
        updateFailed := false
1✔
967
        userInitFailed := false
1✔
968

1✔
969
        c.mu.Lock()
1✔
970
        defer c.mu.Unlock()
1✔
971

1✔
972
        c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
1✔
973

1✔
974
        if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
1✔
975
                // do not apply any major version related changes yet
×
976
                newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
×
977
        }
×
978
        c.setSpec(newSpec)
1✔
979

1✔
980
        defer func() {
2✔
981
                var (
1✔
982
                        pgUpdatedStatus *acidv1.Postgresql
1✔
983
                        err             error
1✔
984
                )
1✔
985
                if updateFailed {
1✔
986
                        pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
×
987
                } else {
1✔
988
                        pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
1✔
989
                }
1✔
990
                if err != nil {
2✔
991
                        c.logger.Warningf("could not set cluster status: %v", err)
1✔
992
                }
1✔
993
                if pgUpdatedStatus != nil {
1✔
994
                        c.setSpec(pgUpdatedStatus)
×
995
                }
×
996
        }()
997

998
        logNiceDiff(c.logger, oldSpec, newSpec)
1✔
999

1✔
1000
        if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
1✔
1001
                c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
×
1002
                        oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
×
1003
        } else {
1✔
1004
                c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
1✔
1005
                // sticking with old version, this will also advance GetDesiredVersion next time.
1✔
1006
                newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
1✔
1007
        }
1✔
1008

1009
        // Service
1010
        if err := c.syncServices(); err != nil {
1✔
1011
                c.logger.Errorf("could not sync services: %v", err)
×
1012
                updateFailed = true
×
1013
        }
×
1014

1015
        // Patroni service and endpoints / config maps
1016
        if err := c.syncPatroniResources(); err != nil {
1✔
1017
                c.logger.Errorf("could not sync services: %v", err)
×
1018
                updateFailed = true
×
1019
        }
×
1020

1021
        // Users
1022
        func() {
2✔
1023
                // check if users need to be synced during update
1✔
1024
                sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
1✔
1025
                        reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
1✔
1026
                sameRotatedUsers := reflect.DeepEqual(oldSpec.Spec.UsersWithSecretRotation, newSpec.Spec.UsersWithSecretRotation) &&
1✔
1027
                        reflect.DeepEqual(oldSpec.Spec.UsersWithInPlaceSecretRotation, newSpec.Spec.UsersWithInPlaceSecretRotation)
1✔
1028

1✔
1029
                // connection pooler needs one system user created who is initialized in initUsers
1✔
1030
                // only when disabled in oldSpec and enabled in newSpec
1✔
1031
                needPoolerUser := c.needConnectionPoolerUser(&oldSpec.Spec, &newSpec.Spec)
1✔
1032

1✔
1033
                // streams new replication user created who is initialized in initUsers
1✔
1034
                // only when streams were not specified in oldSpec but in newSpec
1✔
1035
                needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
1✔
1036

1✔
1037
                initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
1✔
1038

1✔
1039
                // if inherited annotations differ secrets have to be synced on update
1✔
1040
                newAnnotations := c.annotationsSet(nil)
1✔
1041
                oldAnnotations := make(map[string]string)
1✔
1042
                for _, secret := range c.Secrets {
2✔
1043
                        oldAnnotations = secret.ObjectMeta.Annotations
1✔
1044
                        break
1✔
1045
                }
1046
                annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil)
1✔
1047

1✔
1048
                if initUsers || annotationsChanged {
2✔
1049
                        c.logger.Debug("initialize users")
1✔
1050
                        if err := c.initUsers(); err != nil {
1✔
1051
                                c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
×
1052
                                userInitFailed = true
×
1053
                                updateFailed = true
×
1054
                                return
×
1055
                        }
×
1056

1057
                        c.logger.Debug("syncing secrets")
1✔
1058
                        //TODO: mind the secrets of the deleted/new users
1✔
1059
                        if err := c.syncSecrets(); err != nil {
1✔
1060
                                c.logger.Errorf("could not sync secrets: %v", err)
×
1061
                                updateFailed = true
×
1062
                        }
×
1063
                }
1064
        }()
1065

1066
        // Volume
1067
        if c.OpConfig.StorageResizeMode != "off" {
2✔
1068
                c.syncVolumes()
1✔
1069
        } else {
1✔
1070
                c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
×
1071
        }
×
1072

1073
        // Statefulset
1074
        func() {
2✔
1075
                if err := c.syncStatefulSet(); err != nil {
1✔
1076
                        c.logger.Errorf("could not sync statefulsets: %v", err)
×
1077
                        updateFailed = true
×
1078
                }
×
1079
        }()
1080

1081
        // add or remove standby_cluster section from Patroni config depending on changes in standby section
1082
        if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
1✔
1083
                if err := c.syncStandbyClusterConfiguration(); err != nil {
×
1084
                        return fmt.Errorf("could not set StandbyCluster configuration options: %v", err)
×
1085
                }
×
1086
        }
1087

1088
        // pod disruption budgets
1089
        if err := c.syncPodDisruptionBudgets(true); err != nil {
1✔
1090
                c.logger.Errorf("could not sync pod disruption budgets: %v", err)
×
1091
                updateFailed = true
×
1092
        }
×
1093

1094
        // logical backup job
1095
        func() {
2✔
1096

1✔
1097
                // create if it did not exist
1✔
1098
                if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
1✔
1099
                        c.logger.Debug("creating backup cron job")
×
1100
                        if err := c.createLogicalBackupJob(); err != nil {
×
1101
                                c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err)
×
1102
                                updateFailed = true
×
1103
                                return
×
1104
                        }
×
1105
                }
1106

1107
                // delete if no longer needed
1108
                if oldSpec.Spec.EnableLogicalBackup && !newSpec.Spec.EnableLogicalBackup {
1✔
1109
                        c.logger.Debug("deleting backup cron job")
×
1110
                        if err := c.deleteLogicalBackupJob(); err != nil {
×
1111
                                c.logger.Errorf("could not delete a k8s cron job for logical backups: %v", err)
×
1112
                                updateFailed = true
×
1113
                                return
×
1114
                        }
×
1115

1116
                }
1117

1118
                if oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
2✔
1119
                        if err := c.syncLogicalBackupJob(); err != nil {
1✔
1120
                                c.logger.Errorf("could not sync logical backup jobs: %v", err)
×
1121
                                updateFailed = true
×
1122
                        }
×
1123
                }
1124

1125
        }()
1126

1127
        // Roles and Databases
1128
        if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
1✔
1129
                c.logger.Debug("syncing roles")
×
1130
                if err := c.syncRoles(); err != nil {
×
1131
                        c.logger.Errorf("could not sync roles: %v", err)
×
1132
                        updateFailed = true
×
1133
                }
×
1134
                if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) ||
×
1135
                        !reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) {
×
1136
                        c.logger.Infof("syncing databases")
×
1137
                        if err := c.syncDatabases(); err != nil {
×
1138
                                c.logger.Errorf("could not sync databases: %v", err)
×
1139
                                updateFailed = true
×
1140
                        }
×
1141
                }
1142
                if !reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) {
×
1143
                        c.logger.Infof("syncing prepared databases")
×
1144
                        if err := c.syncPreparedDatabases(); err != nil {
×
1145
                                c.logger.Errorf("could not sync prepared databases: %v", err)
×
1146
                                updateFailed = true
×
1147
                        }
×
1148
                }
1149
        }
1150

1151
        // Sync connection pooler. Before actually doing sync reset lookup
1152
        // installation flag, since manifest updates could add another db which we
1153
        // need to process. In the future we may want to do this more careful and
1154
        // check which databases we need to process, but even repeating the whole
1155
        // installation process should be good enough.
1156
        if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
1✔
1157
                c.logger.Errorf("could not sync connection pooler: %v", err)
×
1158
                updateFailed = true
×
1159
        }
×
1160

1161
        // streams
1162
        if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) {
1✔
NEW
1163
                c.logger.Debug("syncing streams")
×
1164
                if err := c.syncStreams(); err != nil {
×
1165
                        c.logger.Errorf("could not sync streams: %v", err)
×
1166
                        updateFailed = true
×
1167
                }
×
1168
        }
1169

1170
        if !updateFailed {
2✔
1171
                // Major version upgrade must only fire after success of earlier operations and should stay last
1✔
1172
                if err := c.majorVersionUpgrade(); err != nil {
1✔
1173
                        c.logger.Errorf("major version upgrade failed: %v", err)
×
1174
                        updateFailed = true
×
1175
                }
×
1176
        }
1177

1178
        return nil
1✔
1179
}
1180

1181
func syncResources(a, b *v1.ResourceRequirements) bool {
24✔
1182
        for _, res := range []v1.ResourceName{
24✔
1183
                v1.ResourceCPU,
24✔
1184
                v1.ResourceMemory,
24✔
1185
        } {
72✔
1186
                if !a.Limits[res].Equal(b.Limits[res]) ||
48✔
1187
                        !a.Requests[res].Equal(b.Requests[res]) {
48✔
1188
                        return true
×
1189
                }
×
1190
        }
1191

1192
        return false
24✔
1193
}
1194

1195
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
1196
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
1197
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
1198
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
1199
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
1200
func (c *Cluster) Delete() error {
×
1201
        var anyErrors = false
×
1202
        c.mu.Lock()
×
1203
        defer c.mu.Unlock()
×
1204
        c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
×
1205

×
1206
        if err := c.deleteStreams(); err != nil {
×
1207
                anyErrors = true
×
1208
                c.logger.Warningf("could not delete event streams: %v", err)
×
1209
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete event streams: %v", err)
×
1210
        }
×
1211

1212
        // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
1213
        // deleting the cron job also removes pods and batch jobs it created
1214
        if err := c.deleteLogicalBackupJob(); err != nil {
×
1215
                anyErrors = true
×
1216
                c.logger.Warningf("could not remove the logical backup k8s cron job; %v", err)
×
1217
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove the logical backup k8s cron job; %v", err)
×
1218
        }
×
1219

1220
        if err := c.deleteStatefulSet(); err != nil {
×
1221
                anyErrors = true
×
1222
                c.logger.Warningf("could not delete statefulset: %v", err)
×
1223
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete statefulset: %v", err)
×
1224
        }
×
1225

1226
        if c.OpConfig.EnableSecretsDeletion != nil && *c.OpConfig.EnableSecretsDeletion {
×
1227
                if err := c.deleteSecrets(); err != nil {
×
1228
                        anyErrors = true
×
1229
                        c.logger.Warningf("could not delete secrets: %v", err)
×
1230
                        c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete secrets: %v", err)
×
1231
                }
×
1232
        } else {
×
1233
                c.logger.Info("not deleting secrets because disabled in configuration")
×
1234
        }
×
1235

1236
        if err := c.deletePodDisruptionBudgets(); err != nil {
×
1237
                anyErrors = true
×
1238
                c.logger.Warningf("could not delete pod disruption budgets: %v", err)
×
1239
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err)
×
1240
        }
×
1241

1242
        for _, role := range []PostgresRole{Master, Replica} {
×
1243
                if !c.patroniKubernetesUseConfigMaps() {
×
1244
                        if err := c.deleteEndpoint(role); err != nil {
×
1245
                                anyErrors = true
×
1246
                                c.logger.Warningf("could not delete %s endpoint: %v", role, err)
×
1247
                                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err)
×
1248
                        }
×
1249
                }
1250

1251
                if err := c.deleteService(role); err != nil {
×
1252
                        anyErrors = true
×
1253
                        c.logger.Warningf("could not delete %s service: %v", role, err)
×
1254
                        c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
×
1255
                }
×
1256
        }
1257

1258
        if err := c.deletePatroniResources(); err != nil {
×
1259
                anyErrors = true
×
1260
                c.logger.Warningf("could not delete all Patroni resources: %v", err)
×
1261
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
×
1262
        }
×
1263

1264
        // Delete connection pooler objects anyway, even if it's not mentioned in the
1265
        // manifest, just to not keep orphaned components in case if something went
1266
        // wrong
1267
        for _, role := range [2]PostgresRole{Master, Replica} {
×
1268
                if err := c.deleteConnectionPooler(role); err != nil {
×
1269
                        anyErrors = true
×
1270
                        c.logger.Warningf("could not remove connection pooler: %v", err)
×
1271
                        c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove connection pooler: %v", err)
×
1272
                }
×
1273
        }
1274

1275
        // If we are done deleting our various resources we remove the finalizer to let K8S finally delete the Postgres CR
1276
        if anyErrors {
×
1277
                c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "some resources could be successfully deleted yet")
×
1278
                return fmt.Errorf("some error(s) occured when deleting resources, NOT removing finalizer yet")
×
1279
        }
×
1280
        if err := c.removeFinalizer(); err != nil {
×
1281
                return fmt.Errorf("done cleaning up, but error when removing finalizer: %v", err)
×
1282
        }
×
1283

1284
        return nil
×
1285
}
1286

1287
// NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status).
1288
func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
×
1289
        c.specMu.RLock()
×
1290
        defer c.specMu.RUnlock()
×
1291
        return !c.Status.Success(), c.Status
×
1292

×
1293
}
×
1294

1295
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
1296
func (c *Cluster) ReceivePodEvent(event PodEvent) {
×
1297
        if err := c.podEventsQueue.Add(event); err != nil {
×
1298
                c.logger.Errorf("error when receiving pod events: %v", err)
×
1299
        }
×
1300
}
1301

1302
func (c *Cluster) processPodEvent(obj interface{}, isInInitialList bool) error {
×
1303
        event, ok := obj.(PodEvent)
×
1304
        if !ok {
×
1305
                return fmt.Errorf("could not cast to PodEvent")
×
1306
        }
×
1307

1308
        // can only take lock when (un)registerPodSubscriber is finshed
1309
        c.podSubscribersMu.RLock()
×
1310
        subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)]
×
1311
        if ok {
×
1312
                select {
×
1313
                case subscriber <- event:
×
1314
                default:
×
1315
                        // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
1316
                        // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
1317
                }
1318
        }
1319
        // hold lock for the time of processing the event to avoid race condition
1320
        // with unregisterPodSubscriber closing the channel (see #1876)
1321
        c.podSubscribersMu.RUnlock()
×
1322

×
1323
        return nil
×
1324
}
1325

1326
// Run starts the pod event dispatching for the given cluster.
1327
func (c *Cluster) Run(stopCh <-chan struct{}) {
×
1328
        go c.processPodEventQueue(stopCh)
×
1329
}
×
1330

1331
func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) {
×
1332
        for {
×
1333
                select {
×
1334
                case <-stopCh:
×
1335
                        return
×
1336
                default:
×
1337
                        if _, err := c.podEventsQueue.Pop(cache.PopProcessFunc(c.processPodEvent)); err != nil {
×
1338
                                c.logger.Errorf("error when processing pod event queue %v", err)
×
1339
                        }
×
1340
                }
1341
        }
1342
}
1343

1344
func (c *Cluster) initSystemUsers() {
22✔
1345
        // We don't actually use that to create users, delegating this
22✔
1346
        // task to Patroni. Those definitions are only used to create
22✔
1347
        // secrets, therefore, setting flags like SUPERUSER or REPLICATION
22✔
1348
        // is not necessary here
22✔
1349
        c.systemUsers[constants.SuperuserKeyName] = spec.PgUser{
22✔
1350
                Origin:    spec.RoleOriginSystem,
22✔
1351
                Name:      c.OpConfig.SuperUsername,
22✔
1352
                Namespace: c.Namespace,
22✔
1353
                Password:  util.RandomPassword(constants.PasswordLength),
22✔
1354
        }
22✔
1355
        c.systemUsers[constants.ReplicationUserKeyName] = spec.PgUser{
22✔
1356
                Origin:    spec.RoleOriginSystem,
22✔
1357
                Name:      c.OpConfig.ReplicationUsername,
22✔
1358
                Namespace: c.Namespace,
22✔
1359
                Flags:     []string{constants.RoleFlagLogin},
22✔
1360
                Password:  util.RandomPassword(constants.PasswordLength),
22✔
1361
        }
22✔
1362

22✔
1363
        // Connection pooler user is an exception
22✔
1364
        // if requested it's going to be created by operator
22✔
1365
        if needConnectionPooler(&c.Spec) {
39✔
1366
                username := c.poolerUser(&c.Spec)
17✔
1367

17✔
1368
                // connection pooler application should be able to login with this role
17✔
1369
                connectionPoolerUser := spec.PgUser{
17✔
1370
                        Origin:    spec.RoleOriginConnectionPooler,
17✔
1371
                        Name:      username,
17✔
1372
                        Namespace: c.Namespace,
17✔
1373
                        Flags:     []string{constants.RoleFlagLogin},
17✔
1374
                        Password:  util.RandomPassword(constants.PasswordLength),
17✔
1375
                }
17✔
1376

17✔
1377
                if _, exists := c.systemUsers[constants.ConnectionPoolerUserKeyName]; !exists {
31✔
1378
                        c.systemUsers[constants.ConnectionPoolerUserKeyName] = connectionPoolerUser
14✔
1379
                }
14✔
1380
        }
1381

1382
        // replication users for event streams are another exception
1383
        // the operator will create one replication user for all streams
1384
        if len(c.Spec.Streams) > 0 {
29✔
1385
                username := fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
7✔
1386
                streamUser := spec.PgUser{
7✔
1387
                        Origin:    spec.RoleOriginStream,
7✔
1388
                        Name:      username,
7✔
1389
                        Namespace: c.Namespace,
7✔
1390
                        Flags:     []string{constants.RoleFlagLogin, constants.RoleFlagReplication},
7✔
1391
                        Password:  util.RandomPassword(constants.PasswordLength),
7✔
1392
                }
7✔
1393

7✔
1394
                if _, exists := c.systemUsers[constants.EventStreamUserKeyName]; !exists {
14✔
1395
                        c.systemUsers[constants.EventStreamUserKeyName] = streamUser
7✔
1396
                }
7✔
1397
        }
1398
}
1399

1400
func (c *Cluster) initPreparedDatabaseRoles() error {
16✔
1401

16✔
1402
        if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
17✔
1403
                c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
1✔
1404
        }
1✔
1405

1406
        // create maps with default roles/users as keys and their membership as values
1407
        defaultRoles := map[string]string{
16✔
1408
                constants.OwnerRoleNameSuffix:  "",
16✔
1409
                constants.ReaderRoleNameSuffix: "",
16✔
1410
                constants.WriterRoleNameSuffix: constants.ReaderRoleNameSuffix,
16✔
1411
        }
16✔
1412
        defaultUsers := map[string]string{
16✔
1413
                fmt.Sprintf("%s%s", constants.OwnerRoleNameSuffix, constants.UserRoleNameSuffix):  constants.OwnerRoleNameSuffix,
16✔
1414
                fmt.Sprintf("%s%s", constants.ReaderRoleNameSuffix, constants.UserRoleNameSuffix): constants.ReaderRoleNameSuffix,
16✔
1415
                fmt.Sprintf("%s%s", constants.WriterRoleNameSuffix, constants.UserRoleNameSuffix): constants.WriterRoleNameSuffix,
16✔
1416
        }
16✔
1417

16✔
1418
        for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
18✔
1419
                // get list of prepared schemas to set in search_path
2✔
1420
                preparedSchemas := preparedDB.PreparedSchemas
2✔
1421
                if len(preparedDB.PreparedSchemas) == 0 {
3✔
1422
                        preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}}
1✔
1423
                }
1✔
1424

1425
                searchPathArr := []string{constants.DefaultSearchPath}
2✔
1426
                for preparedSchemaName := range preparedSchemas {
4✔
1427
                        searchPathArr = append(searchPathArr, fmt.Sprintf("%q", preparedSchemaName))
2✔
1428
                }
2✔
1429
                searchPath := strings.Join(searchPathArr, ", ")
2✔
1430

2✔
1431
                // default roles per database
2✔
1432
                if err := c.initDefaultRoles(defaultRoles, "admin", preparedDbName, searchPath, preparedDB.SecretNamespace); err != nil {
2✔
1433
                        return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err)
×
1434
                }
×
1435
                if preparedDB.DefaultUsers {
3✔
1436
                        if err := c.initDefaultRoles(defaultUsers, "admin", preparedDbName, searchPath, preparedDB.SecretNamespace); err != nil {
1✔
1437
                                return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err)
×
1438
                        }
×
1439
                }
1440

1441
                // default roles per database schema
1442
                for preparedSchemaName, preparedSchema := range preparedSchemas {
4✔
1443
                        if preparedSchema.DefaultRoles == nil || *preparedSchema.DefaultRoles {
4✔
1444
                                if err := c.initDefaultRoles(defaultRoles,
2✔
1445
                                        preparedDbName+constants.OwnerRoleNameSuffix,
2✔
1446
                                        preparedDbName+"_"+preparedSchemaName,
2✔
1447
                                        fmt.Sprintf("%s, %q", constants.DefaultSearchPath, preparedSchemaName),
2✔
1448
                                        preparedDB.SecretNamespace); err != nil {
2✔
1449
                                        return fmt.Errorf("could not initialize default roles for database schema %s: %v", preparedSchemaName, err)
×
1450
                                }
×
1451
                                if preparedSchema.DefaultUsers {
3✔
1452
                                        if err := c.initDefaultRoles(defaultUsers,
1✔
1453
                                                preparedDbName+constants.OwnerRoleNameSuffix,
1✔
1454
                                                preparedDbName+"_"+preparedSchemaName,
1✔
1455
                                                fmt.Sprintf("%s, %q", constants.DefaultSearchPath, preparedSchemaName),
1✔
1456
                                                preparedDB.SecretNamespace); err != nil {
1✔
1457
                                                return fmt.Errorf("could not initialize default users for database schema %s: %v", preparedSchemaName, err)
×
1458
                                        }
×
1459
                                }
1460
                        }
1461
                }
1462
        }
1463
        return nil
16✔
1464
}
1465

1466
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
6✔
1467

6✔
1468
        for defaultRole, inherits := range defaultRoles {
24✔
1469
                namespace := c.Namespace
18✔
1470
                //if namespaced secrets are allowed
18✔
1471
                if secretNamespace != "" {
18✔
1472
                        if c.Config.OpConfig.EnableCrossNamespaceSecret {
×
1473
                                namespace = secretNamespace
×
1474
                        } else {
×
1475
                                c.logger.Warn("secretNamespace ignored because enable_cross_namespace_secret set to false. Creating secrets in cluster namespace.")
×
1476
                        }
×
1477
                }
1478
                roleName := fmt.Sprintf("%s%s", prefix, defaultRole)
18✔
1479

18✔
1480
                flags := []string{constants.RoleFlagNoLogin}
18✔
1481
                if defaultRole[len(defaultRole)-5:] == constants.UserRoleNameSuffix {
24✔
1482
                        flags = []string{constants.RoleFlagLogin}
6✔
1483
                }
6✔
1484

1485
                memberOf := make([]string, 0)
18✔
1486
                if inherits != "" {
28✔
1487
                        memberOf = append(memberOf, prefix+inherits)
10✔
1488
                }
10✔
1489

1490
                adminRole := ""
18✔
1491
                isOwner := false
18✔
1492
                if strings.Contains(defaultRole, constants.OwnerRoleNameSuffix) {
24✔
1493
                        adminRole = admin
6✔
1494
                        isOwner = true
6✔
1495
                } else {
18✔
1496
                        adminRole = fmt.Sprintf("%s%s", prefix, constants.OwnerRoleNameSuffix)
12✔
1497
                }
12✔
1498

1499
                newRole := spec.PgUser{
18✔
1500
                        Origin:     spec.RoleOriginBootstrap,
18✔
1501
                        Name:       roleName,
18✔
1502
                        Namespace:  namespace,
18✔
1503
                        Password:   util.RandomPassword(constants.PasswordLength),
18✔
1504
                        Flags:      flags,
18✔
1505
                        MemberOf:   memberOf,
18✔
1506
                        Parameters: map[string]string{"search_path": searchPath},
18✔
1507
                        AdminRole:  adminRole,
18✔
1508
                        IsDbOwner:  isOwner,
18✔
1509
                }
18✔
1510
                if currentRole, present := c.pgUsers[roleName]; present {
18✔
1511
                        c.pgUsers[roleName] = c.resolveNameConflict(&currentRole, &newRole)
×
1512
                } else {
18✔
1513
                        c.pgUsers[roleName] = newRole
18✔
1514
                }
18✔
1515
        }
1516
        return nil
6✔
1517
}
1518

1519
func (c *Cluster) initRobotUsers() error {
25✔
1520
        for username, userFlags := range c.Spec.Users {
51✔
1521
                if !isValidUsername(username) {
27✔
1522
                        return fmt.Errorf("invalid username: %q", username)
1✔
1523
                }
1✔
1524

1525
                if c.shouldAvoidProtectedOrSystemRole(username, "manifest robot role") {
30✔
1526
                        continue
5✔
1527
                }
1528
                namespace := c.Namespace
20✔
1529

20✔
1530
                // check if role is specified as database owner
20✔
1531
                isOwner := false
20✔
1532
                for _, owner := range c.Spec.Databases {
39✔
1533
                        if username == owner {
24✔
1534
                                isOwner = true
5✔
1535
                        }
5✔
1536
                }
1537

1538
                //if namespaced secrets are allowed
1539
                if c.Config.OpConfig.EnableCrossNamespaceSecret {
22✔
1540
                        if strings.Contains(username, ".") {
3✔
1541
                                splits := strings.Split(username, ".")
1✔
1542
                                namespace = splits[0]
1✔
1543
                                c.logger.Warningf("enable_cross_namespace_secret is set. Database role name contains the respective namespace i.e. %s is the created user", username)
1✔
1544
                        }
1✔
1545
                }
1546

1547
                flags, err := normalizeUserFlags(userFlags)
20✔
1548
                if err != nil {
23✔
1549
                        return fmt.Errorf("invalid flags for user %q: %v", username, err)
3✔
1550
                }
3✔
1551
                adminRole := ""
17✔
1552
                if c.OpConfig.EnableAdminRoleForUsers {
17✔
1553
                        adminRole = c.OpConfig.TeamAdminRole
×
1554
                }
×
1555
                newRole := spec.PgUser{
17✔
1556
                        Origin:    spec.RoleOriginManifest,
17✔
1557
                        Name:      username,
17✔
1558
                        Namespace: namespace,
17✔
1559
                        Password:  util.RandomPassword(constants.PasswordLength),
17✔
1560
                        Flags:     flags,
17✔
1561
                        AdminRole: adminRole,
17✔
1562
                        IsDbOwner: isOwner,
17✔
1563
                }
17✔
1564
                if currentRole, present := c.pgUsers[username]; present {
18✔
1565
                        c.pgUsers[username] = c.resolveNameConflict(&currentRole, &newRole)
1✔
1566
                } else {
17✔
1567
                        c.pgUsers[username] = newRole
16✔
1568
                }
16✔
1569
        }
1570
        return nil
21✔
1571
}
1572

1573
func (c *Cluster) initAdditionalOwnerRoles() {
14✔
1574
        if len(c.OpConfig.AdditionalOwnerRoles) == 0 {
26✔
1575
                return
12✔
1576
        }
12✔
1577

1578
        // fetch database owners and assign additional owner roles
1579
        for username, pgUser := range c.pgUsers {
7✔
1580
                if pgUser.IsDbOwner {
7✔
1581
                        pgUser.MemberOf = append(pgUser.MemberOf, c.OpConfig.AdditionalOwnerRoles...)
2✔
1582
                        c.pgUsers[username] = pgUser
2✔
1583
                }
2✔
1584
        }
1585
}
1586

1587
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
23✔
1588
        teamMembers, err := c.getTeamMembers(teamID)
23✔
1589

23✔
1590
        if err != nil {
24✔
1591
                return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
1✔
1592
        }
1✔
1593

1594
        for _, username := range teamMembers {
32✔
1595
                flags := []string{constants.RoleFlagLogin}
10✔
1596
                memberOf := []string{c.OpConfig.PamRoleName}
10✔
1597

10✔
1598
                if c.shouldAvoidProtectedOrSystemRole(username, "API role") {
12✔
1599
                        continue
2✔
1600
                }
1601
                if (c.OpConfig.EnableTeamSuperuser && teamID == c.Spec.TeamID) || isPostgresSuperuserTeam {
15✔
1602
                        flags = append(flags, constants.RoleFlagSuperuser)
7✔
1603
                } else {
8✔
1604
                        if c.OpConfig.TeamAdminRole != "" {
1✔
1605
                                memberOf = append(memberOf, c.OpConfig.TeamAdminRole)
×
1606
                        }
×
1607
                }
1608

1609
                newRole := spec.PgUser{
8✔
1610
                        Origin:     spec.RoleOriginTeamsAPI,
8✔
1611
                        Name:       username,
8✔
1612
                        Flags:      flags,
8✔
1613
                        MemberOf:   memberOf,
8✔
1614
                        Parameters: c.OpConfig.TeamAPIRoleConfiguration,
8✔
1615
                }
8✔
1616

8✔
1617
                if currentRole, present := c.pgUsers[username]; present {
10✔
1618
                        c.pgUsers[username] = c.resolveNameConflict(&currentRole, &newRole)
2✔
1619
                } else {
8✔
1620
                        c.pgUsers[username] = newRole
6✔
1621
                }
6✔
1622
        }
1623

1624
        return nil
22✔
1625
}
1626

1627
func (c *Cluster) initHumanUsers() error {
20✔
1628

20✔
1629
        var clusterIsOwnedBySuperuserTeam bool
20✔
1630
        superuserTeams := []string{}
20✔
1631

20✔
1632
        if c.OpConfig.EnablePostgresTeamCRD && c.OpConfig.EnablePostgresTeamCRDSuperusers && c.Config.PgTeamMap != nil {
20✔
1633
                superuserTeams = c.Config.PgTeamMap.GetAdditionalSuperuserTeams(c.Spec.TeamID, true)
×
1634
        }
×
1635

1636
        for _, postgresSuperuserTeam := range c.OpConfig.PostgresSuperuserTeams {
25✔
1637
                if !(util.SliceContains(superuserTeams, postgresSuperuserTeam)) {
10✔
1638
                        superuserTeams = append(superuserTeams, postgresSuperuserTeam)
5✔
1639
                }
5✔
1640
        }
1641

1642
        for _, superuserTeam := range superuserTeams {
25✔
1643
                err := c.initTeamMembers(superuserTeam, true)
5✔
1644
                if err != nil {
5✔
1645
                        return fmt.Errorf("cannot initialize members for team %q of Postgres superusers: %v", superuserTeam, err)
×
1646
                }
×
1647
                if superuserTeam == c.Spec.TeamID {
7✔
1648
                        clusterIsOwnedBySuperuserTeam = true
2✔
1649
                }
2✔
1650
        }
1651

1652
        if c.OpConfig.EnablePostgresTeamCRD && c.Config.PgTeamMap != nil {
20✔
1653
                additionalTeams := c.Config.PgTeamMap.GetAdditionalTeams(c.Spec.TeamID, true)
×
1654
                for _, additionalTeam := range additionalTeams {
×
1655
                        if !(util.SliceContains(superuserTeams, additionalTeam)) {
×
1656
                                err := c.initTeamMembers(additionalTeam, false)
×
1657
                                if err != nil {
×
1658
                                        return fmt.Errorf("cannot initialize members for additional team %q for cluster owned by %q: %v", additionalTeam, c.Spec.TeamID, err)
×
1659
                                }
×
1660
                        }
1661
                }
1662
        }
1663

1664
        if clusterIsOwnedBySuperuserTeam {
22✔
1665
                c.logger.Infof("Team %q owning the cluster is also a team of superusers. Created superuser roles for its members instead of admin roles.", c.Spec.TeamID)
2✔
1666
                return nil
2✔
1667
        }
2✔
1668

1669
        err := c.initTeamMembers(c.Spec.TeamID, false)
18✔
1670
        if err != nil {
19✔
1671
                return fmt.Errorf("cannot initialize members for team %q who owns the Postgres cluster: %v", c.Spec.TeamID, err)
1✔
1672
        }
1✔
1673

1674
        return nil
17✔
1675
}
1676

1677
func (c *Cluster) initInfrastructureRoles() error {
14✔
1678
        // add infrastructure roles from the operator's definition
14✔
1679
        for username, newRole := range c.InfrastructureRoles {
14✔
1680
                if !isValidUsername(username) {
×
1681
                        return fmt.Errorf("invalid username: '%v'", username)
×
1682
                }
×
1683
                if c.shouldAvoidProtectedOrSystemRole(username, "infrastructure role") {
×
1684
                        continue
×
1685
                }
1686
                flags, err := normalizeUserFlags(newRole.Flags)
×
1687
                if err != nil {
×
1688
                        return fmt.Errorf("invalid flags for user '%v': %v", username, err)
×
1689
                }
×
1690
                newRole.Flags = flags
×
1691
                newRole.Namespace = c.Namespace
×
1692

×
1693
                if currentRole, present := c.pgUsers[username]; present {
×
1694
                        c.pgUsers[username] = c.resolveNameConflict(&currentRole, &newRole)
×
1695
                } else {
×
1696
                        c.pgUsers[username] = newRole
×
1697
                }
×
1698
        }
1699
        return nil
14✔
1700
}
1701

1702
// resolves naming conflicts between existing and new roles by choosing either of them.
1703
func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) spec.PgUser {
3✔
1704
        var result spec.PgUser
3✔
1705
        if newRole.Origin >= currentRole.Origin {
5✔
1706
                result = *newRole
2✔
1707
        } else {
3✔
1708
                result = *currentRole
1✔
1709
        }
1✔
1710
        c.logger.Debugf("resolved a conflict of role %q between %s and %s to %s",
3✔
1711
                newRole.Name, newRole.Origin, currentRole.Origin, result.Origin)
3✔
1712
        return result
3✔
1713
}
1714

1715
func (c *Cluster) shouldAvoidProtectedOrSystemRole(username, purpose string) bool {
54✔
1716
        if c.isProtectedUsername(username) {
57✔
1717
                c.logger.Warnf("cannot initialize a new %s with the name of the protected user %q", purpose, username)
3✔
1718
                return true
3✔
1719
        }
3✔
1720
        if c.isSystemUsername(username) {
70✔
1721
                c.logger.Warnf("cannot initialize a new %s with the name of the system user %q", purpose, username)
19✔
1722
                return true
19✔
1723
        }
19✔
1724
        return false
32✔
1725
}
1726

1727
// GetCurrentProcess provides name of the last process of the cluster
1728
func (c *Cluster) GetCurrentProcess() Process {
×
1729
        c.processMu.RLock()
×
1730
        defer c.processMu.RUnlock()
×
1731

×
1732
        return c.currentProcess
×
1733
}
×
1734

1735
// GetStatus provides status of the cluster
1736
func (c *Cluster) GetStatus() *ClusterStatus {
×
1737
        status := &ClusterStatus{
×
1738
                Cluster:                       c.Name,
×
1739
                Namespace:                     c.Namespace,
×
1740
                Team:                          c.Spec.TeamID,
×
1741
                Status:                        c.Status,
×
1742
                Spec:                          c.Spec,
×
1743
                MasterService:                 c.GetServiceMaster(),
×
1744
                ReplicaService:                c.GetServiceReplica(),
×
1745
                StatefulSet:                   c.GetStatefulSet(),
×
1746
                PrimaryPodDisruptionBudget:    c.GetPrimaryPodDisruptionBudget(),
×
1747
                CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(),
×
1748
                CurrentProcess:                c.GetCurrentProcess(),
×
1749

×
1750
                Error: fmt.Errorf("error: %s", c.Error),
×
1751
        }
×
1752

×
1753
        if !c.patroniKubernetesUseConfigMaps() {
×
1754
                status.MasterEndpoint = c.GetEndpointMaster()
×
1755
                status.ReplicaEndpoint = c.GetEndpointReplica()
×
1756
        }
×
1757

1758
        return status
×
1759
}
1760

1761
func (c *Cluster) GetSwitchoverSchedule() string {
5✔
1762
        var possibleSwitchover, schedule time.Time
5✔
1763

5✔
1764
        now := time.Now().UTC()
5✔
1765
        for _, window := range c.Spec.MaintenanceWindows {
11✔
1766
                // in the best case it is possible today
6✔
1767
                possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
6✔
1768
                if window.Everyday {
9✔
1769
                        if now.After(possibleSwitchover) {
5✔
1770
                                // we are already past the time for today, try tomorrow
2✔
1771
                                possibleSwitchover = possibleSwitchover.AddDate(0, 0, 1)
2✔
1772
                        }
2✔
1773
                } else {
3✔
1774
                        if now.Weekday() != window.Weekday {
4✔
1775
                                // get closest possible time for this window
1✔
1776
                                possibleSwitchover = possibleSwitchover.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7))
1✔
1777
                        } else if now.After(possibleSwitchover) {
4✔
1778
                                // we are already past the time for today, try next week
1✔
1779
                                possibleSwitchover = possibleSwitchover.AddDate(0, 0, 7)
1✔
1780
                        }
1✔
1781
                }
1782

1783
                if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) {
12✔
1784
                        schedule = possibleSwitchover
6✔
1785
                }
6✔
1786
        }
1787
        return schedule.Format("2006-01-02T15:04+00")
5✔
1788
}
1789

1790
// Switchover does a switchover (via Patroni) to a candidate pod
1791
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, scheduled bool) error {
×
1792
        var err error
×
1793

×
1794
        stopCh := make(chan struct{})
×
1795
        ch := c.registerPodSubscriber(candidate)
×
1796
        defer c.unregisterPodSubscriber(candidate)
×
1797
        defer close(stopCh)
×
1798

×
1799
        var scheduled_at string
×
1800
        if scheduled {
×
1801
                scheduled_at = c.GetSwitchoverSchedule()
×
1802
        } else {
×
1803
                c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
×
1804
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
×
1805
                scheduled_at = ""
×
1806
        }
×
1807

1808
        if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil {
×
1809
                if scheduled {
×
1810
                        c.logger.Infof("switchover from %q to %q is scheduled at %s", curMaster.Name, candidate, scheduled_at)
×
1811
                        return nil
×
1812
                }
×
1813
                c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
×
1814
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
×
1815
                _, err = c.waitForPodLabel(ch, stopCh, nil)
×
1816
                if err != nil {
×
1817
                        err = fmt.Errorf("could not get master pod label: %v", err)
×
1818
                }
×
1819
        } else {
×
1820
                if scheduled {
×
1821
                        return fmt.Errorf("could not schedule switchover: %v", err)
×
1822
                }
×
1823
                err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
×
1824
                c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
×
1825
        }
1826

1827
        return err
×
1828
}
1829

1830
// Lock locks the cluster
1831
func (c *Cluster) Lock() {
×
1832
        c.mu.Lock()
×
1833
}
×
1834

1835
// Unlock unlocks the cluster
1836
func (c *Cluster) Unlock() {
×
1837
        c.mu.Unlock()
×
1838
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc