• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 12276767147

11 Dec 2024 12:52PM UTC coverage: 44.803% (-0.02%) from 44.821%
12276767147

Pull #2773

github

web-flow
Merge 7fb40de11 into c206eb38a
Pull Request #2773: Add support for pg17 and remove pg12

0 of 4 new or added lines in 2 files covered. (0.0%)

424 existing lines in 5 files now uncovered.

6737 of 15037 relevant lines covered (44.8%)

26.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.09
/pkg/cluster/resources.go
1
package cluster
2

3
import (
4
        "context"
5
        "fmt"
6
        "strconv"
7
        "strings"
8

9
        appsv1 "k8s.io/api/apps/v1"
10
        batchv1 "k8s.io/api/batch/v1"
11
        v1 "k8s.io/api/core/v1"
12
        policyv1 "k8s.io/api/policy/v1"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/types"
15

16
        "github.com/zalando/postgres-operator/pkg/util"
17
        "github.com/zalando/postgres-operator/pkg/util/k8sutil"
18
        "github.com/zalando/postgres-operator/pkg/util/retryutil"
19
)
20

21
const (
22
        rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
23
)
24

25
func (c *Cluster) listResources() error {
1✔
26
        if c.PodDisruptionBudget != nil {
2✔
27
                c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
1✔
28
        }
1✔
29

30
        if c.Statefulset != nil {
2✔
31
                c.logger.Infof("found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID)
1✔
32
        }
1✔
33

34
        for appId, stream := range c.Streams {
1✔
35
                c.logger.Infof("found stream: %q with application id %q (uid: %q)", util.NameFromMeta(stream.ObjectMeta), appId, stream.UID)
×
36
        }
×
37

38
        if c.LogicalBackupJob != nil {
2✔
39
                c.logger.Infof("found logical backup job: %q (uid: %q)", util.NameFromMeta(c.LogicalBackupJob.ObjectMeta), c.LogicalBackupJob.UID)
1✔
40
        }
1✔
41

42
        for uid, secret := range c.Secrets {
2✔
43
                c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), uid, secret.ObjectMeta.Namespace)
1✔
44
        }
1✔
45

46
        for role, service := range c.Services {
3✔
47
                c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
2✔
48
        }
2✔
49

50
        for role, endpoint := range c.Endpoints {
2✔
51
                c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
1✔
52
        }
1✔
53

54
        if c.patroniKubernetesUseConfigMaps() {
1✔
55
                for suffix, configmap := range c.PatroniConfigMaps {
×
56
                        c.logger.Infof("found %s Patroni config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID)
×
57
                }
×
58
        } else {
1✔
59
                for suffix, endpoint := range c.PatroniEndpoints {
1✔
60
                        c.logger.Infof("found %s Patroni endpoint: %q (uid: %q)", suffix, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
×
61
                }
×
62
        }
63

64
        pods, err := c.listPods()
1✔
65
        if err != nil {
1✔
66
                return fmt.Errorf("could not get the list of pods: %v", err)
×
67
        }
×
68

69
        for _, obj := range pods {
2✔
70
                c.logger.Infof("found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
1✔
71
        }
1✔
72

73
        for uid, pvc := range c.VolumeClaims {
1✔
UNCOV
74
                c.logger.Infof("found persistent volume claim: %q (uid: %q)", util.NameFromMeta(pvc.ObjectMeta), uid)
×
75
        }
×
76

77
        for role, poolerObjs := range c.ConnectionPooler {
3✔
78
                if poolerObjs.Deployment != nil {
2✔
79
                        c.logger.Infof("found %s pooler deployment: %q (uid: %q) ", role, util.NameFromMeta(poolerObjs.Deployment.ObjectMeta), poolerObjs.Deployment.UID)
×
80
                }
×
81
                if poolerObjs.Service != nil {
2✔
UNCOV
82
                        c.logger.Infof("found %s pooler service: %q (uid: %q) ", role, util.NameFromMeta(poolerObjs.Service.ObjectMeta), poolerObjs.Service.UID)
×
UNCOV
83
                }
×
84
        }
85

86
        return nil
1✔
87
}
88

89
func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
27✔
90
        c.setProcessName("creating statefulset")
27✔
91
        // check if it's allowed that spec contains initContainers
27✔
92
        if c.Spec.InitContainers != nil && len(c.Spec.InitContainers) > 0 &&
27✔
93
                c.OpConfig.EnableInitContainers != nil && !(*c.OpConfig.EnableInitContainers) {
27✔
UNCOV
94
                return nil, fmt.Errorf("initContainers specified but disabled in configuration")
×
UNCOV
95
        }
×
96
        // check if it's allowed that spec contains sidecars
97
        if c.Spec.Sidecars != nil && len(c.Spec.Sidecars) > 0 &&
27✔
98
                c.OpConfig.EnableSidecars != nil && !(*c.OpConfig.EnableSidecars) {
27✔
99
                return nil, fmt.Errorf("sidecar containers specified but disabled in configuration")
×
100
        }
×
101

102
        statefulSetSpec, err := c.generateStatefulSet(&c.Spec)
27✔
103
        if err != nil {
27✔
104
                return nil, fmt.Errorf("could not generate statefulset: %v", err)
×
105
        }
×
106
        statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(
27✔
107
                context.TODO(),
27✔
108
                statefulSetSpec,
27✔
109
                metav1.CreateOptions{})
27✔
110
        if err != nil {
44✔
111
                return nil, err
17✔
112
        }
17✔
113
        c.Statefulset = statefulSet
10✔
114
        c.logger.Debugf("created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID)
10✔
115

10✔
116
        return statefulSet, nil
10✔
117
}
118

UNCOV
119
func getPodIndex(podName string) (int32, error) {
×
UNCOV
120
        parts := strings.Split(podName, "-")
×
UNCOV
121
        if len(parts) == 0 {
×
UNCOV
122
                return 0, fmt.Errorf("pod has no index part")
×
UNCOV
123
        }
×
124

125
        postfix := parts[len(parts)-1]
×
126
        res, err := strconv.ParseInt(postfix, 10, 32)
×
127
        if err != nil {
×
128
                return 0, fmt.Errorf("could not parse pod index: %v", err)
×
UNCOV
129
        }
×
130

131
        return int32(res), nil
×
132
}
133

134
func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
×
UNCOV
135
        masterPod, err := c.getRolePods(Master)
×
136
        if err != nil {
×
UNCOV
137
                return fmt.Errorf("could not get master pod: %v", err)
×
UNCOV
138
        }
×
139
        if len(masterPod) == 0 {
×
140
                return fmt.Errorf("no master pod is running in the cluster")
×
141
        }
×
142

143
        podNum, err := getPodIndex(masterPod[0].Name)
×
144
        if err != nil {
×
145
                return fmt.Errorf("could not get pod number: %v", err)
×
146
        }
×
147

148
        //Check if scale down affects current master pod
149
        if *newStatefulSet.Spec.Replicas >= podNum+1 {
×
150
                return nil
×
151
        }
×
152

UNCOV
153
        podName := fmt.Sprintf("%s-0", c.Statefulset.Name)
×
154
        masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(context.TODO(), podName, metav1.GetOptions{})
×
155
        if err != nil {
×
156
                return fmt.Errorf("could not get master candidate pod: %v", err)
×
UNCOV
157
        }
×
158

159
        // some sanity check
160
        if !util.MapContains(masterCandidatePod.Labels, c.OpConfig.ClusterLabels) ||
×
161
                !util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel: c.Name}) {
×
162
                return fmt.Errorf("pod %q does not belong to cluster", podName)
×
UNCOV
163
        }
×
164

165
        if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
×
166
                return fmt.Errorf("could not failover: %v", err)
×
167
        }
×
168

UNCOV
169
        return nil
×
170
}
171

172
func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
20✔
173
        c.setProcessName("updating statefulset")
20✔
174
        if c.Statefulset == nil {
20✔
UNCOV
175
                return fmt.Errorf("there is no statefulset in the cluster")
×
UNCOV
176
        }
×
177
        statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)
20✔
178

20✔
179
        //scale down
20✔
180
        if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas {
20✔
181
                if err := c.preScaleDown(newStatefulSet); err != nil {
×
UNCOV
182
                        c.logger.Warningf("could not scale down: %v", err)
×
UNCOV
183
                }
×
184
        }
185
        c.logger.Debug("updating statefulset")
20✔
186

20✔
187
        patchData, err := specPatch(newStatefulSet.Spec)
20✔
188
        if err != nil {
20✔
UNCOV
189
                return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err)
×
UNCOV
190
        }
×
191

192
        statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
20✔
193
                context.TODO(),
20✔
194
                c.Statefulset.Name,
20✔
195
                types.MergePatchType,
20✔
196
                patchData,
20✔
197
                metav1.PatchOptions{},
20✔
198
                "")
20✔
199
        if err != nil {
20✔
UNCOV
200
                return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err)
×
UNCOV
201
        }
×
202

203
        c.Statefulset = statefulSet
20✔
204

20✔
205
        return nil
20✔
206
}
207

208
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL CRD.
209
func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
4✔
210
        c.setProcessName("replacing statefulset")
4✔
211
        if c.Statefulset == nil {
4✔
UNCOV
212
                return fmt.Errorf("there is no statefulset in the cluster")
×
UNCOV
213
        }
×
214

215
        statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)
4✔
216
        c.logger.Debug("replacing statefulset")
4✔
217

4✔
218
        // Delete the current statefulset without deleting the pods
4✔
219
        deletePropagationPolicy := metav1.DeletePropagationOrphan
4✔
220
        oldStatefulset := c.Statefulset
4✔
221

4✔
222
        options := metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
4✔
223
        err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(context.TODO(), oldStatefulset.Name, options)
4✔
224
        if err != nil {
4✔
UNCOV
225
                return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err)
×
UNCOV
226
        }
×
227
        // make sure we clear the stored statefulset status if the subsequent create fails.
228
        c.Statefulset = nil
4✔
229
        // wait until the statefulset is truly deleted
4✔
230
        c.logger.Debug("waiting for the statefulset to be deleted")
4✔
231

4✔
232
        err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
4✔
233
                func() (bool, error) {
8✔
234
                        _, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(context.TODO(), oldStatefulset.Name, metav1.GetOptions{})
4✔
235
                        if err2 == nil {
4✔
UNCOV
236
                                return false, nil
×
UNCOV
237
                        }
×
238
                        if k8sutil.ResourceNotFound(err2) {
8✔
239
                                return true, nil
4✔
240
                        }
4✔
241
                        return false, err2
×
242
                })
243
        if err != nil {
4✔
UNCOV
244
                return fmt.Errorf("could not delete statefulset: %v", err)
×
UNCOV
245
        }
×
246

247
        // create the new statefulset with the desired spec. It would take over the remaining pods.
248
        createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(context.TODO(), newStatefulSet, metav1.CreateOptions{})
4✔
249
        if err != nil {
4✔
250
                return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err)
×
UNCOV
251
        }
×
252
        // check that all the previous replicas were picked up.
253
        if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas &&
4✔
254
                createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas {
4✔
255
                c.logger.Warningf("number of pods for the old and updated Statefulsets is not identical")
×
256
        }
×
257

258
        c.Statefulset = createdStatefulset
4✔
259
        return nil
4✔
260
}
261

UNCOV
262
func (c *Cluster) deleteStatefulSet() error {
×
UNCOV
263
        c.setProcessName("deleting statefulset")
×
UNCOV
264
        c.logger.Debug("deleting statefulset")
×
UNCOV
265
        if c.Statefulset == nil {
×
UNCOV
266
                c.logger.Debug("there is no statefulset in the cluster")
×
267
                return nil
×
268
        }
×
269

270
        err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions)
×
271
        if k8sutil.ResourceNotFound(err) {
×
272
                c.logger.Debugf("statefulset %q has already been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta))
×
273
        } else if err != nil {
×
UNCOV
274
                return err
×
275
        }
×
276

277
        c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta))
×
278
        c.Statefulset = nil
×
279

×
280
        if err := c.deletePods(); err != nil {
×
UNCOV
281
                return fmt.Errorf("could not delete pods: %v", err)
×
282
        }
×
283

284
        if c.OpConfig.EnablePersistentVolumeClaimDeletion != nil && *c.OpConfig.EnablePersistentVolumeClaimDeletion {
×
285
                if err := c.deletePersistentVolumeClaims(); err != nil {
×
286
                        return fmt.Errorf("could not delete persistent volume claims: %v", err)
×
287
                }
×
UNCOV
288
        } else {
×
289
                c.logger.Info("not deleting persistent volume claims because disabled in configuration")
×
290
        }
×
291

292
        return nil
×
293
}
294

295
func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
8✔
296
        c.setProcessName("creating %v service", role)
8✔
297

8✔
298
        serviceSpec := c.generateService(role, &c.Spec)
8✔
299
        service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
8✔
300
        if err != nil {
8✔
UNCOV
301
                return nil, err
×
UNCOV
302
        }
×
303

304
        c.Services[role] = service
8✔
305
        return service, nil
8✔
306
}
307

308
func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newService *v1.Service) (*v1.Service, error) {
33✔
309
        var err error
33✔
310
        svc := oldService
33✔
311

33✔
312
        serviceName := util.NameFromMeta(oldService.ObjectMeta)
33✔
313
        match, reason := c.compareServices(oldService, newService)
33✔
314
        if !match {
37✔
315
                c.logServiceChanges(role, oldService, newService, false, reason)
4✔
316
                c.setProcessName("updating %v service", role)
4✔
317

4✔
318
                // now, patch the service spec, but when disabling LoadBalancers do update instead
4✔
319
                // patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
4✔
320
                oldServiceType := oldService.Spec.Type
4✔
321
                newServiceType := newService.Spec.Type
4✔
322
                if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
4✔
UNCOV
323
                        newService.ResourceVersion = oldService.ResourceVersion
×
UNCOV
324
                        newService.Spec.ClusterIP = oldService.Spec.ClusterIP
×
UNCOV
325
                }
×
326
                svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{})
4✔
327
                if err != nil {
4✔
328
                        return nil, fmt.Errorf("could not update service %q: %v", serviceName, err)
×
329
                }
×
330
        }
331

332
        if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed {
49✔
333
                patchData, err := metaAnnotationsPatch(newService.Annotations)
16✔
334
                if err != nil {
16✔
UNCOV
335
                        return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err)
×
UNCOV
336
                }
×
337
                svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(context.TODO(), newService.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
16✔
338
                if err != nil {
16✔
UNCOV
339
                        return nil, fmt.Errorf("could not patch annotations for service %q: %v", oldService.Name, err)
×
340
                }
×
341
        }
342

343
        return svc, nil
33✔
344
}
345

346
func (c *Cluster) deleteService(role PostgresRole) error {
1✔
347
        c.setProcessName("deleting service")
1✔
348
        c.logger.Debugf("deleting %s service", role)
1✔
349

1✔
350
        if c.Services[role] == nil {
1✔
UNCOV
351
                c.logger.Debugf("No service for %s role was found, nothing to delete", role)
×
UNCOV
352
                return nil
×
UNCOV
353
        }
×
354

355
        if err := c.KubeClient.Services(c.Services[role].Namespace).Delete(context.TODO(), c.Services[role].Name, c.deleteOptions); err != nil {
1✔
356
                if !k8sutil.ResourceNotFound(err) {
×
357
                        return fmt.Errorf("could not delete %s service: %v", role, err)
×
358
                }
×
UNCOV
359
                c.logger.Debugf("%s service has already been deleted", role)
×
360
        }
361

362
        c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(c.Services[role].ObjectMeta))
1✔
363
        delete(c.Services, role)
1✔
364

1✔
365
        return nil
1✔
366
}
367

368
func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) {
3✔
369
        var (
3✔
370
                subsets []v1.EndpointSubset
3✔
371
        )
3✔
372
        c.setProcessName("creating endpoint")
3✔
373
        if !c.isNewCluster() {
5✔
374
                subsets = c.generateEndpointSubsets(role)
2✔
375
        } else {
3✔
376
                // Patroni will populate the master endpoint for the new cluster
1✔
377
                // The replica endpoint will be filled-in by the service selector.
1✔
378
                subsets = make([]v1.EndpointSubset, 0)
1✔
379
        }
1✔
380
        endpointsSpec := c.generateEndpoint(role, subsets)
3✔
381

3✔
382
        endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(context.TODO(), endpointsSpec, metav1.CreateOptions{})
3✔
383
        if err != nil {
3✔
UNCOV
384
                return nil, fmt.Errorf("could not create %s endpoint: %v", role, err)
×
UNCOV
385
        }
×
386

387
        c.Endpoints[role] = endpoints
3✔
388

3✔
389
        return endpoints, nil
3✔
390
}
391

392
func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset {
2✔
393
        result := make([]v1.EndpointSubset, 0)
2✔
394
        pods, err := c.getRolePods(role)
2✔
395
        if err != nil {
2✔
UNCOV
396
                if role == Master {
×
UNCOV
397
                        c.logger.Warningf("could not obtain the address for %s pod: %v", role, err)
×
UNCOV
398
                } else {
×
UNCOV
399
                        c.logger.Warningf("could not obtain the addresses for %s pods: %v", role, err)
×
UNCOV
400
                }
×
401
                return result
×
402
        }
403

404
        endPointAddresses := make([]v1.EndpointAddress, 0)
2✔
405
        for _, pod := range pods {
4✔
406
                endPointAddresses = append(endPointAddresses, v1.EndpointAddress{IP: pod.Status.PodIP})
2✔
407
        }
2✔
408
        if len(endPointAddresses) > 0 {
4✔
409
                result = append(result, v1.EndpointSubset{
2✔
410
                        Addresses: endPointAddresses,
2✔
411
                        Ports:     []v1.EndpointPort{{Name: "postgresql", Port: 5432, Protocol: "TCP"}},
2✔
412
                })
2✔
413
        } else if role == Master {
2✔
UNCOV
414
                c.logger.Warningf("master is not running, generated master endpoint does not contain any addresses")
×
UNCOV
415
        }
×
416

417
        return result
2✔
418
}
419

420
func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) {
2✔
421
        podDisruptionBudgetSpec := c.generatePodDisruptionBudget()
2✔
422
        podDisruptionBudget, err := c.KubeClient.
2✔
423
                PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
2✔
424
                Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
2✔
425

2✔
426
        if err != nil {
2✔
UNCOV
427
                return nil, err
×
UNCOV
428
        }
×
429
        c.PodDisruptionBudget = podDisruptionBudget
2✔
430

2✔
431
        return podDisruptionBudget, nil
2✔
432
}
433

434
func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
3✔
435
        if c.PodDisruptionBudget == nil {
3✔
UNCOV
436
                return fmt.Errorf("there is no pod disruption budget in the cluster")
×
UNCOV
437
        }
×
438

439
        if err := c.deletePodDisruptionBudget(); err != nil {
3✔
UNCOV
440
                return fmt.Errorf("could not delete pod disruption budget: %v", err)
×
441
        }
×
442

443
        newPdb, err := c.KubeClient.
3✔
444
                PodDisruptionBudgets(pdb.Namespace).
3✔
445
                Create(context.TODO(), pdb, metav1.CreateOptions{})
3✔
446
        if err != nil {
3✔
UNCOV
447
                return fmt.Errorf("could not create pod disruption budget: %v", err)
×
UNCOV
448
        }
×
449
        c.PodDisruptionBudget = newPdb
3✔
450

3✔
451
        return nil
3✔
452
}
453

454
func (c *Cluster) deletePodDisruptionBudget() error {
3✔
455
        c.logger.Debug("deleting pod disruption budget")
3✔
456
        if c.PodDisruptionBudget == nil {
3✔
UNCOV
457
                c.logger.Debug("there is no pod disruption budget in the cluster")
×
UNCOV
458
                return nil
×
UNCOV
459
        }
×
460

461
        pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)
3✔
462
        err := c.KubeClient.
3✔
463
                PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
3✔
464
                Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
3✔
465
        if k8sutil.ResourceNotFound(err) {
3✔
UNCOV
466
                c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
×
467
        } else if err != nil {
3✔
UNCOV
468
                return fmt.Errorf("could not delete PodDisruptionBudget: %v", err)
×
UNCOV
469
        }
×
470

471
        c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
3✔
472
        c.PodDisruptionBudget = nil
3✔
473

3✔
474
        err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
3✔
475
                func() (bool, error) {
6✔
476
                        _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
3✔
477
                        if err2 == nil {
3✔
UNCOV
478
                                return false, nil
×
UNCOV
479
                        }
×
480
                        if k8sutil.ResourceNotFound(err2) {
6✔
481
                                return true, nil
3✔
482
                        }
3✔
483
                        return false, err2
×
484
                })
485
        if err != nil {
3✔
UNCOV
486
                return fmt.Errorf("could not delete pod disruption budget: %v", err)
×
UNCOV
487
        }
×
488

489
        return nil
3✔
490
}
491

492
func (c *Cluster) deleteEndpoint(role PostgresRole) error {
×
UNCOV
493
        c.setProcessName("deleting endpoint")
×
UNCOV
494
        c.logger.Debugf("deleting %s endpoint", role)
×
UNCOV
495
        if c.Endpoints[role] == nil {
×
UNCOV
496
                c.logger.Debugf("there is no %s endpoint in the cluster", role)
×
497
                return nil
×
498
        }
×
499

500
        if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil {
×
501
                if !k8sutil.ResourceNotFound(err) {
×
502
                        return fmt.Errorf("could not delete %s endpoint: %v", role, err)
×
503
                }
×
UNCOV
504
                c.logger.Debugf("%s endpoint has already been deleted", role)
×
505
        }
506

507
        c.logger.Infof("%s endpoint %q has been deleted", role, util.NameFromMeta(c.Endpoints[role].ObjectMeta))
×
508
        delete(c.Endpoints, role)
×
509

×
UNCOV
510
        return nil
×
511
}
512

513
func (c *Cluster) deletePatroniResources() error {
1✔
514
        c.setProcessName("deleting Patroni resources")
1✔
515
        errors := make([]string, 0)
1✔
516

1✔
517
        if err := c.deleteService(Patroni); err != nil {
1✔
UNCOV
518
                errors = append(errors, fmt.Sprintf("%v", err))
×
UNCOV
519
        }
×
520

521
        for _, suffix := range patroniObjectSuffixes {
5✔
522
                if c.patroniKubernetesUseConfigMaps() {
8✔
523
                        if err := c.deletePatroniConfigMap(suffix); err != nil {
4✔
524
                                errors = append(errors, fmt.Sprintf("%v", err))
×
UNCOV
525
                        }
×
UNCOV
526
                } else {
×
UNCOV
527
                        if err := c.deletePatroniEndpoint(suffix); err != nil {
×
UNCOV
528
                                errors = append(errors, fmt.Sprintf("%v", err))
×
529
                        }
×
530
                }
531
        }
532

533
        if len(errors) > 0 {
1✔
534
                return fmt.Errorf("%v", strings.Join(errors, `', '`))
×
UNCOV
535
        }
×
536

537
        return nil
1✔
538
}
539

540
func (c *Cluster) deletePatroniConfigMap(suffix string) error {
4✔
541
        c.setProcessName("deleting Patroni config map")
4✔
542
        c.logger.Debugf("deleting %s Patroni config map", suffix)
4✔
543
        cm := c.PatroniConfigMaps[suffix]
4✔
544
        if cm == nil {
4✔
UNCOV
545
                c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix)
×
UNCOV
546
                return nil
×
UNCOV
547
        }
×
548

549
        if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, c.deleteOptions); err != nil {
4✔
550
                if !k8sutil.ResourceNotFound(err) {
×
551
                        return fmt.Errorf("could not delete %s Patroni config map %q: %v", suffix, cm.Name, err)
×
552
                }
×
UNCOV
553
                c.logger.Debugf("%s Patroni config map has already been deleted", suffix)
×
554
        }
555

556
        c.logger.Infof("%s Patroni config map %q has been deleted", suffix, util.NameFromMeta(cm.ObjectMeta))
4✔
557
        delete(c.PatroniConfigMaps, suffix)
4✔
558

4✔
559
        return nil
4✔
560
}
561

UNCOV
562
func (c *Cluster) deletePatroniEndpoint(suffix string) error {
×
UNCOV
563
        c.setProcessName("deleting Patroni endpoint")
×
UNCOV
564
        c.logger.Debugf("deleting %s Patroni endpoint", suffix)
×
UNCOV
565
        ep := c.PatroniEndpoints[suffix]
×
UNCOV
566
        if ep == nil {
×
567
                c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix)
×
568
                return nil
×
569
        }
×
570

571
        if err := c.KubeClient.Endpoints(ep.Namespace).Delete(context.TODO(), ep.Name, c.deleteOptions); err != nil {
×
572
                if !k8sutil.ResourceNotFound(err) {
×
573
                        return fmt.Errorf("could not delete %s Patroni endpoint %q: %v", suffix, ep.Name, err)
×
574
                }
×
UNCOV
575
                c.logger.Debugf("%s Patroni endpoint has already been deleted", suffix)
×
576
        }
577

578
        c.logger.Infof("%s Patroni endpoint %q has been deleted", suffix, util.NameFromMeta(ep.ObjectMeta))
×
579
        delete(c.PatroniEndpoints, suffix)
×
580

×
UNCOV
581
        return nil
×
582
}
583

584
func (c *Cluster) deleteSecrets() error {
×
585
        c.setProcessName("deleting secrets")
×
586
        errors := make([]string, 0)
×
UNCOV
587

×
UNCOV
588
        for uid := range c.Secrets {
×
589
                err := c.deleteSecret(uid)
×
590
                if err != nil {
×
591
                        errors = append(errors, fmt.Sprintf("%v", err))
×
592
                }
×
593
        }
594

595
        if len(errors) > 0 {
×
596
                return fmt.Errorf("could not delete all secrets: %v", strings.Join(errors, `', '`))
×
597
        }
×
598

UNCOV
599
        return nil
×
600
}
601

602
func (c *Cluster) deleteSecret(uid types.UID) error {
×
UNCOV
603
        c.setProcessName("deleting secret")
×
604
        secret := c.Secrets[uid]
×
UNCOV
605
        secretName := util.NameFromMeta(secret.ObjectMeta)
×
UNCOV
606
        c.logger.Debugf("deleting secret %q", secretName)
×
607
        err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions)
×
608
        if k8sutil.ResourceNotFound(err) {
×
609
                c.logger.Debugf("secret %q has already been deleted", secretName)
×
610
        } else if err != nil {
×
611
                return fmt.Errorf("could not delete secret %q: %v", secretName, err)
×
612
        }
×
613
        c.logger.Infof("secret %q has been deleted", secretName)
×
614
        delete(c.Secrets, uid)
×
615

×
616
        return nil
×
617
}
618

619
func (c *Cluster) createRoles() (err error) {
×
620
        // TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
×
621
        return c.syncRoles()
×
UNCOV
622
}
×
623

624
func (c *Cluster) createLogicalBackupJob() (err error) {
3✔
625

3✔
626
        c.setProcessName("creating a k8s cron job for logical backups")
3✔
627

3✔
628
        logicalBackupJobSpec, err := c.generateLogicalBackupJob()
3✔
629
        if err != nil {
3✔
UNCOV
630
                return fmt.Errorf("could not generate k8s cron job spec: %v", err)
×
UNCOV
631
        }
×
632

633
        cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(context.TODO(), logicalBackupJobSpec, metav1.CreateOptions{})
3✔
634
        if err != nil {
3✔
635
                return fmt.Errorf("could not create k8s cron job: %v", err)
×
636
        }
×
637
        c.LogicalBackupJob = cronJob
3✔
638

3✔
639
        return nil
3✔
640
}
641

642
func (c *Cluster) patchLogicalBackupJob(newJob *batchv1.CronJob) error {
2✔
643
        c.setProcessName("patching logical backup job")
2✔
644

2✔
645
        patchData, err := specPatch(newJob.Spec)
2✔
646
        if err != nil {
2✔
UNCOV
647
                return fmt.Errorf("could not form patch for the logical backup job: %v", err)
×
UNCOV
648
        }
×
649

650
        // update the backup job spec
651
        cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch(
2✔
652
                context.TODO(),
2✔
653
                c.getLogicalBackupJobName(),
2✔
654
                types.MergePatchType,
2✔
655
                patchData,
2✔
656
                metav1.PatchOptions{},
2✔
657
                "")
2✔
658
        if err != nil {
2✔
UNCOV
659
                return fmt.Errorf("could not patch logical backup job: %v", err)
×
UNCOV
660
        }
×
661
        c.LogicalBackupJob = cronJob
2✔
662

2✔
663
        return nil
2✔
664
}
665

UNCOV
666
func (c *Cluster) deleteLogicalBackupJob() error {
×
UNCOV
667
        if c.LogicalBackupJob == nil {
×
UNCOV
668
                return nil
×
UNCOV
669
        }
×
UNCOV
670
        c.logger.Info("removing the logical backup job")
×
671

×
672
        err := c.KubeClient.CronJobsGetter.CronJobs(c.LogicalBackupJob.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions)
×
673
        if k8sutil.ResourceNotFound(err) {
×
674
                c.logger.Debugf("logical backup cron job %q has already been deleted", c.getLogicalBackupJobName())
×
675
        } else if err != nil {
×
676
                return err
×
677
        }
×
678
        c.LogicalBackupJob = nil
×
679

×
680
        return nil
×
681
}
682

683
// GetServiceMaster returns cluster's kubernetes master Service
684
func (c *Cluster) GetServiceMaster() *v1.Service {
×
685
        return c.Services[Master]
×
UNCOV
686
}
×
687

688
// GetServiceReplica returns cluster's kubernetes replica Service
689
func (c *Cluster) GetServiceReplica() *v1.Service {
×
690
        return c.Services[Replica]
×
691
}
×
692

693
// GetEndpointMaster returns cluster's kubernetes master Endpoint
694
func (c *Cluster) GetEndpointMaster() *v1.Endpoints {
×
695
        return c.Endpoints[Master]
×
696
}
×
697

698
// GetEndpointReplica returns cluster's kubernetes replica Endpoint
699
func (c *Cluster) GetEndpointReplica() *v1.Endpoints {
×
700
        return c.Endpoints[Replica]
×
701
}
×
702

703
// GetStatefulSet returns cluster's kubernetes StatefulSet
704
func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
×
705
        return c.Statefulset
×
706
}
×
707

708
// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget
709
func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget {
×
710
        return c.PodDisruptionBudget
×
711
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc