• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mikkeloscar / pdb-controller / 13857161949

14 Mar 2025 01:03PM UTC coverage: 80.46% (+1.6%) from 78.824%
13857161949

Pull #66

github

itspooya
use watch instead of polling

Signed-off-by: itspooya <pooyadowlat@gmail.com>
Pull Request #66: use watch instead of polling

112 of 132 new or added lines in 1 file covered. (84.85%)

1 existing line in 1 file now uncovered.

420 of 522 relevant lines covered (80.46%)

20.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.06
/controller.go
1
package main
2

3
import (
4
        "context"
5
        "crypto/sha1"
6
        "encoding/hex"
7
        "time"
8

9
        log "github.com/sirupsen/logrus"
10
        appsv1 "k8s.io/api/apps/v1"
11
        v1 "k8s.io/api/core/v1"
12
        pv1 "k8s.io/api/policy/v1"
13
        "k8s.io/apimachinery/pkg/api/equality"
14
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
        "k8s.io/apimachinery/pkg/runtime"
16
        "k8s.io/apimachinery/pkg/util/intstr"
17
        "k8s.io/apimachinery/pkg/watch"
18
        "k8s.io/client-go/kubernetes"
19
        "k8s.io/client-go/tools/cache"
20
        "k8s.io/client-go/util/retry"
21
        "k8s.io/client-go/util/workqueue"
22
)
23

24
const (
25
        heritageLabel               = "heritage"
26
        pdbController               = "pdb-controller"
27
        nonReadyTTLAnnotationName   = "pdb-controller.zalando.org/non-ready-ttl"
28
        nonReadySinceAnnotationName = "pdb-controller.zalando.org/non-ready-since"
29
        parentResourceHashLabel     = "parent-resource-hash"
30
)
31

32
var (
33
        ownerLabels = map[string]string{heritageLabel: pdbController}
34
)
35

36
// PDBController creates PodDisruptionBudgets for deployments and StatefulSets
37
// if missing.
38
type PDBController struct {
39
        kubernetes.Interface
40
        interval            time.Duration // kept for backward compatibility
41
        pdbNameSuffix       string
42
        nonReadyTTL         time.Duration
43
        parentResourceHash  bool
44
        maxUnavailable      intstr.IntOrString
45
        queue               workqueue.TypedRateLimitingInterface[string]
46
        deploymentInformer  cache.SharedIndexInformer
47
        statefulSetInformer cache.SharedIndexInformer
48
        pdbInformer         cache.SharedIndexInformer
49
}
50

51
// NewPDBController initializes a new PDBController.
52
func NewPDBController(interval time.Duration, client kubernetes.Interface, pdbNameSuffix string, nonReadyTTL time.Duration, parentResourceHash bool, maxUnavailable intstr.IntOrString) *PDBController {
30✔
53
        log.Info("Initializing PDB controller with TypedRateLimitingInterface - v20250304")
30✔
54
        controller := &PDBController{
30✔
55
                Interface:          client,
30✔
56
                interval:           interval,
30✔
57
                pdbNameSuffix:      pdbNameSuffix,
30✔
58
                nonReadyTTL:        nonReadyTTL,
30✔
59
                parentResourceHash: parentResourceHash,
30✔
60
                maxUnavailable:     maxUnavailable,
30✔
61
                queue:              workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
30✔
62
        }
30✔
63

30✔
64
        // Setup Deployment informer
30✔
65
        controller.deploymentInformer = cache.NewSharedIndexInformer(
30✔
66
                &cache.ListWatch{
30✔
67
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
57✔
68
                                return client.AppsV1().Deployments(v1.NamespaceAll).List(context.Background(), options)
27✔
69
                        },
27✔
70
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
27✔
71
                                return client.AppsV1().Deployments(v1.NamespaceAll).Watch(context.Background(), options)
27✔
72
                        },
27✔
73
                },
74
                &appsv1.Deployment{},
75
                0, // resync disabled
76
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
77
        )
78

79
        // Setup StatefulSet informer
80
        controller.statefulSetInformer = cache.NewSharedIndexInformer(
30✔
81
                &cache.ListWatch{
30✔
82
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
57✔
83
                                return client.AppsV1().StatefulSets(v1.NamespaceAll).List(context.Background(), options)
27✔
84
                        },
27✔
85
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
27✔
86
                                return client.AppsV1().StatefulSets(v1.NamespaceAll).Watch(context.Background(), options)
27✔
87
                        },
27✔
88
                },
89
                &appsv1.StatefulSet{},
90
                0, // resync disabled
91
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
92
        )
93

94
        // Setup PodDisruptionBudget informer
95
        controller.pdbInformer = cache.NewSharedIndexInformer(
30✔
96
                &cache.ListWatch{
30✔
97
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
57✔
98
                                return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(context.Background(), options)
27✔
99
                        },
27✔
100
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
27✔
101
                                return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).Watch(context.Background(), options)
27✔
102
                        },
27✔
103
                },
104
                &pv1.PodDisruptionBudget{},
105
                0, // resync disabled
106
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
107
        )
108

109
        // Setup event handlers
110
        controller.deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
30✔
111
                AddFunc: controller.enqueueResource,
30✔
112
                UpdateFunc: func(old, new interface{}) {
30✔
NEW
113
                        controller.enqueueResource(new)
×
NEW
114
                },
×
115
                DeleteFunc: controller.enqueueResource,
116
        })
117

118
        controller.statefulSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
30✔
119
                AddFunc: controller.enqueueResource,
30✔
120
                UpdateFunc: func(old, new interface{}) {
30✔
NEW
121
                        controller.enqueueResource(new)
×
NEW
122
                },
×
123
                DeleteFunc: controller.enqueueResource,
124
        })
125

126
        controller.pdbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
30✔
127
                DeleteFunc: controller.enqueuePDB,
30✔
128
        })
30✔
129

30✔
130
        return controller
30✔
131
}
132

133
func (n *PDBController) enqueueResource(obj interface{}) {
51✔
134
        key, err := cache.MetaNamespaceKeyFunc(obj)
51✔
135
        if err != nil {
51✔
NEW
136
                log.Errorf("Couldn't get key for object %+v: %v", obj, err)
×
NEW
137
                return
×
NEW
138
        }
×
139
        n.queue.Add(key)
51✔
140
}
141

142
func (n *PDBController) enqueuePDB(obj interface{}) {
19✔
143
        pdb, ok := obj.(*pv1.PodDisruptionBudget)
19✔
144
        if !ok {
20✔
145
                log.Errorf("Expected PodDisruptionBudget but got %+v", obj)
1✔
146
                return
1✔
147
        }
1✔
148

149
        // Only enqueue for our managed PDBs
150
        if !containLabels(pdb.Labels, ownerLabels) {
19✔
151
                return
1✔
152
        }
1✔
153

154
        key, err := cache.MetaNamespaceKeyFunc(obj)
17✔
155
        if err != nil {
17✔
NEW
156
                log.Errorf("Couldn't get key for object %+v: %v", obj, err)
×
NEW
157
                return
×
NEW
158
        }
×
159
        n.queue.Add(key)
17✔
160
}
161

162
// Run runs the controller with the specified number of workers.
163
func (n *PDBController) Run(ctx context.Context) {
1✔
164
        defer n.queue.ShutDown()
1✔
165

1✔
166
        log.Info("Starting PDB controller")
1✔
167

1✔
168
        // Start the informers
1✔
169
        go n.deploymentInformer.Run(ctx.Done())
1✔
170
        go n.statefulSetInformer.Run(ctx.Done())
1✔
171
        go n.pdbInformer.Run(ctx.Done())
1✔
172

1✔
173
        // Wait for the caches to be synced
1✔
174
        log.Info("Waiting for informer caches to sync")
1✔
175
        if !cache.WaitForCacheSync(ctx.Done(),
1✔
176
                n.deploymentInformer.HasSynced,
1✔
177
                n.statefulSetInformer.HasSynced,
1✔
178
                n.pdbInformer.HasSynced) {
1✔
NEW
179
                log.Fatal("Failed to wait for caches to sync")
×
NEW
180
                return
×
UNCOV
181
        }
×
182
        log.Info("Informer caches synced")
1✔
183

1✔
184
        // Run the reconcile loop
1✔
185
        go n.worker(ctx)
1✔
186

1✔
187
        <-ctx.Done()
1✔
188
        log.Info("Shutting down PDB controller")
1✔
189
}
190

191
func (n *PDBController) worker(ctx context.Context) {
1✔
192
        for n.processNextItem(ctx) {
1✔
NEW
193
        }
×
194
}
195

196
func (n *PDBController) processNextItem(ctx context.Context) bool {
4✔
197
        key, quit := n.queue.Get()
4✔
198
        if quit {
6✔
199
                return false
2✔
200
        }
2✔
201
        defer n.queue.Done(key)
2✔
202

2✔
203
        err := n.reconcile(ctx, key)
2✔
204
        if err != nil {
2✔
NEW
205
                log.Errorf("Error processing item %s: %v", key, err)
×
NEW
206
                n.queue.AddRateLimited(key)
×
NEW
207
                return true
×
NEW
208
        }
×
209

210
        n.queue.Forget(key)
2✔
211
        return true
2✔
212
}
213

214
func (n *PDBController) reconcile(ctx context.Context, key string) error {
2✔
215
        log.Debugf("Processing key: %s", key)
2✔
216

2✔
217
        // Process all resources and PDBs
2✔
218
        return n.runOnce(ctx)
2✔
219
}
2✔
220

221
// runOnce runs the main reconcilation loop of the controller.
222
func (n *PDBController) runOnce(ctx context.Context) error {
28✔
223
        // Get all PDBs from the informer
28✔
224
        var allPDBs []pv1.PodDisruptionBudget
28✔
225
        for _, obj := range n.pdbInformer.GetStore().List() {
76✔
226
                pdb, ok := obj.(*pv1.PodDisruptionBudget)
48✔
227
                if !ok {
48✔
NEW
228
                        continue
×
229
                }
230
                allPDBs = append(allPDBs, *pdb)
48✔
231
        }
232

233
        managedPDBs, unmanagedPDBs := filterPDBs(allPDBs)
28✔
234

28✔
235
        // Get all resources from the informers
28✔
236
        resources := make([]kubeResource, 0)
28✔
237

28✔
238
        // Process Deployments
28✔
239
        for _, obj := range n.deploymentInformer.GetStore().List() {
53✔
240
                d, ok := obj.(*appsv1.Deployment)
25✔
241
                if !ok {
25✔
NEW
242
                        continue
×
243
                }
244
                // manually set Kind and APIVersion because of a bug in
245
                // client-go
246
                // https://github.com/kubernetes/client-go/issues/308
247
                d.Kind = "Deployment"
25✔
248
                d.APIVersion = "apps/v1"
25✔
249
                resources = append(resources, deployment{*d})
25✔
250
        }
251

252
        // Process StatefulSets
253
        for _, obj := range n.statefulSetInformer.GetStore().List() {
52✔
254
                s, ok := obj.(*appsv1.StatefulSet)
24✔
255
                if !ok {
24✔
NEW
256
                        continue
×
257
                }
258
                // manually set Kind and APIVersion because of a bug in
259
                // client-go
260
                // https://github.com/kubernetes/client-go/issues/308
261
                s.Kind = "StatefulSet"
24✔
262
                s.APIVersion = "apps/v1"
24✔
263
                resources = append(resources, statefulSet{*s})
24✔
264
        }
265

266
        desiredPDBs := n.generateDesiredPDBs(resources, managedPDBs, unmanagedPDBs)
28✔
267
        n.reconcilePDBs(ctx, desiredPDBs, managedPDBs)
28✔
268
        return nil
28✔
269
}
270

271
func (n *PDBController) generateDesiredPDBs(resources []kubeResource, managedPDBs, unmanagedPDBs map[string]pv1.PodDisruptionBudget) map[string]pv1.PodDisruptionBudget {
28✔
272
        desiredPDBs := make(map[string]pv1.PodDisruptionBudget, len(managedPDBs))
28✔
273

28✔
274
        nonReadyTTL := time.Time{}
28✔
275
        if n.nonReadyTTL > 0 {
52✔
276
                nonReadyTTL = time.Now().UTC().Add(-n.nonReadyTTL)
24✔
277
        }
24✔
278

279
        for _, resource := range resources {
77✔
280
                matchedPDBs := getMatchedPDBs(resource.TemplateLabels(), unmanagedPDBs)
49✔
281

49✔
282
                // don't create managed PDB if there is already unmanaged and
49✔
283
                // matched PDBs
49✔
284
                if len(matchedPDBs) > 0 {
53✔
285
                        continue
4✔
286
                }
287

288
                // don't create PDB if the resource has 1 or less replicas
289
                if resource.Replicas() <= 1 {
49✔
290
                        continue
4✔
291
                }
292

293
                // ensure PDB if the resource has more than one replica and all
294
                // of them are ready
295
                if resource.StatusReadyReplicas() >= resource.Replicas() {
62✔
296
                        pdb := n.generatePDB(resource, time.Time{})
21✔
297
                        desiredPDBs[pdb.Namespace+"/"+pdb.Name] = pdb
21✔
298
                        continue
21✔
299
                }
300

301
                ownedPDBs := getOwnedPDBs(managedPDBs, resource)
20✔
302
                validPDBs := make([]pv1.PodDisruptionBudget, 0, len(ownedPDBs))
20✔
303
                // only consider valid PDBs. If they're invalid they'll be
20✔
304
                // recreated on the next iteration
20✔
305
                for _, pdb := range ownedPDBs {
40✔
306
                        if pdbSpecValid(pdb) {
40✔
307
                                validPDBs = append(validPDBs, pdb)
20✔
308
                        }
20✔
309
                }
310

311
                if len(validPDBs) > 0 {
40✔
312
                        // it's unlikely that we will have more than a single
20✔
313
                        // valid owned PDB. If we do simply pick the first one
20✔
314
                        // and check if it's still valid. Any other PDBs will
20✔
315
                        // automatically get dropped.
20✔
316
                        pdb := validPDBs[0]
20✔
317
                        if pdb.Annotations == nil {
20✔
318
                                pdb.Annotations = make(map[string]string)
×
319
                        }
×
320

321
                        ttl, err := overrideNonReadyTTL(resource.Annotations(), nonReadyTTL)
20✔
322
                        if err != nil {
20✔
323
                                log.Errorf("Failed to override PDB Delete TTL: %s", err)
×
324
                        }
×
325

326
                        var nonReadySince time.Time
20✔
327
                        if nonReadySinceStr, ok := pdb.Annotations[nonReadySinceAnnotationName]; ok {
36✔
328
                                nonReadySince, err = time.Parse(time.RFC3339, nonReadySinceStr)
16✔
329
                                if err != nil {
16✔
330
                                        log.Errorf("Failed to parse non-ready-since annotation '%s': %v", nonReadySinceStr, err)
×
331
                                }
×
332
                        }
333

334
                        if !nonReadySince.IsZero() {
36✔
335
                                if !ttl.IsZero() && nonReadySince.Before(ttl) {
24✔
336
                                        continue
8✔
337
                                }
338
                        } else {
4✔
339
                                nonReadySince = time.Now().UTC()
4✔
340
                        }
4✔
341

342
                        generatedPDB := n.generatePDB(resource, nonReadySince)
12✔
343
                        desiredPDBs[generatedPDB.Namespace+"/"+generatedPDB.Name] = generatedPDB
12✔
344
                }
345
        }
346

347
        return desiredPDBs
28✔
348
}
349

350
// mergeActualAndDesiredPDB takes the current definition of a PDB as it is in cluster and a PDB
351
// with our desired configurations and does a merge between them. We also return a boolean to tell the
352
// caller if any change actually had to be made to achieve our desired state or not
353
func mergeActualAndDesiredPDB(managedPDB, desiredPDB pv1.PodDisruptionBudget) (pv1.PodDisruptionBudget, bool) {
28✔
354

28✔
355
        needsUpdate := false
28✔
356

28✔
357
        // check if PDBs are equal an only update if not
28✔
358
        if !equality.Semantic.DeepEqual(managedPDB.Spec, desiredPDB.Spec) ||
28✔
359
                !equality.Semantic.DeepEqual(managedPDB.Labels, desiredPDB.Labels) ||
28✔
360
                !equality.Semantic.DeepEqual(managedPDB.Annotations, desiredPDB.Annotations) {
44✔
361
                managedPDB.Annotations = desiredPDB.Annotations
16✔
362
                managedPDB.Labels = desiredPDB.Labels
16✔
363
                managedPDB.Spec = desiredPDB.Spec
16✔
364

16✔
365
                needsUpdate = true
16✔
366
        }
16✔
367

368
        return managedPDB, needsUpdate
28✔
369
}
370

371
func (n *PDBController) reconcilePDBs(ctx context.Context, desiredPDBs, managedPDBs map[string]pv1.PodDisruptionBudget) {
28✔
372
        for key, managedPDB := range managedPDBs {
72✔
373
                desiredPDB, ok := desiredPDBs[key]
44✔
374
                if !ok {
60✔
375
                        err := n.PolicyV1().PodDisruptionBudgets(managedPDB.Namespace).Delete(ctx, managedPDB.Name, metav1.DeleteOptions{})
16✔
376
                        if err != nil {
16✔
377
                                log.Errorf("Failed to delete PDB: %v", err)
×
378
                                continue
×
379
                        }
380

381
                        log.WithFields(log.Fields{
16✔
382
                                "action":    "removed",
16✔
383
                                "pdb":       managedPDB.Name,
16✔
384
                                "namespace": managedPDB.Namespace,
16✔
385
                                "selector":  managedPDB.Spec.Selector.String(),
16✔
386
                        }).Info("")
16✔
387

16✔
388
                        // If we delete a PDB then we don't want to attempt to update it later since this will
16✔
389
                        // result in a `StorageError` since we can't find the PDB to make an update to it.
16✔
390
                        continue
16✔
391
                }
392

393
                // check if PDBs are equal an only update if not
394
                updatedPDB, needsUpdate := mergeActualAndDesiredPDB(managedPDB, desiredPDB)
28✔
395
                if needsUpdate {
44✔
396
                        err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
32✔
397
                                // Technically the updatedPDB and managedPDB namespace should never be different
16✔
398
                                // but just to be **certain** we're updating the correct namespace we'll just use
16✔
399
                                // the one that was given to us and not potentially modified
16✔
400
                                _, err := n.PolicyV1().PodDisruptionBudgets(managedPDB.Namespace).Update(ctx, &updatedPDB, metav1.UpdateOptions{})
16✔
401

16✔
402
                                // If the update failed that likely means that our definition of what was on the cluster
16✔
403
                                // has become out of date. To resolve this we'll need to get a more up to date copy of
16✔
404
                                // the object we're attempting to modify
16✔
405
                                if err != nil {
16✔
406
                                        currentPDB, err := n.PolicyV1().PodDisruptionBudgets(managedPDB.Namespace).Get(ctx, managedPDB.Name, metav1.GetOptions{})
×
407

×
408
                                        // This err is locally scoped to this if block and will not cause our `RetryOnConflict`
×
409
                                        // to pass if it is nil. If we're in this block then we will get another Retry
×
410
                                        if err != nil {
×
411
                                                return err
×
412
                                        }
×
413

414
                                        updatedPDB, _ = mergeActualAndDesiredPDB(
×
415
                                                *currentPDB,
×
416
                                                desiredPDB,
×
417
                                        )
×
418
                                }
419

420
                                // If this err != nil then the current block will be re-run by `RetryOnConflict`
421
                                // on an exponential backoff schedule to see if we can fix the problem by trying again
422
                                return err
16✔
423
                        })
424
                        if err != nil {
16✔
425
                                log.Errorf("Failed to update PDB: %v", err)
×
426
                                continue
×
427
                        }
428

429
                        log.WithFields(log.Fields{
16✔
430
                                "action":    "updated",
16✔
431
                                "pdb":       desiredPDB.Name,
16✔
432
                                "namespace": desiredPDB.Namespace,
16✔
433
                                "selector":  desiredPDB.Spec.Selector.String(),
16✔
434
                        }).Info("")
16✔
435
                }
436
        }
437

438
        for key, desiredPDB := range desiredPDBs {
61✔
439
                if _, ok := managedPDBs[key]; !ok {
38✔
440
                        _, err := n.PolicyV1().PodDisruptionBudgets(desiredPDB.Namespace).Create(ctx, &desiredPDB, metav1.CreateOptions{})
5✔
441
                        if err != nil {
5✔
442
                                log.Errorf("Failed to create PDB: %v", err)
×
443
                                continue
×
444
                        }
445

446
                        log.WithFields(log.Fields{
5✔
447
                                "action":    "added",
5✔
448
                                "pdb":       desiredPDB.Name,
5✔
449
                                "namespace": desiredPDB.Namespace,
5✔
450
                                "selector":  desiredPDB.Spec.Selector.String(),
5✔
451
                        }).Info("")
5✔
452
                }
453
        }
454
}
455

456
func overrideNonReadyTTL(annotations map[string]string, nonReadyTTL time.Time) (time.Time, error) {
20✔
457
        if ttlVal, ok := annotations[nonReadyTTLAnnotationName]; ok {
28✔
458
                duration, err := time.ParseDuration(ttlVal)
8✔
459
                if err != nil {
8✔
460
                        return time.Time{}, err
×
461
                }
×
462
                return time.Now().UTC().Add(-duration), nil
8✔
463
        }
464
        return nonReadyTTL, nil
12✔
465
}
466

467
// pdbSpecValid returns true if the PDB spec is up-to-date
468
func pdbSpecValid(pdb pv1.PodDisruptionBudget) bool {
20✔
469
        return pdb.Spec.MinAvailable == nil
20✔
470
}
20✔
471

472
// getMatchedPDBs gets matching PodDisruptionBudgets.
473
func getMatchedPDBs(labels map[string]string, pdbs map[string]pv1.PodDisruptionBudget) []pv1.PodDisruptionBudget {
49✔
474
        matchedPDBs := make([]pv1.PodDisruptionBudget, 0)
49✔
475
        for _, pdb := range pdbs {
57✔
476
                if labelsIntersect(labels, pdb.Spec.Selector.MatchLabels) {
12✔
477
                        matchedPDBs = append(matchedPDBs, pdb)
4✔
478
                }
4✔
479
        }
480
        return matchedPDBs
49✔
481
}
482

483
func getOwnedPDBs(pdbs map[string]pv1.PodDisruptionBudget, owner kubeResource) []pv1.PodDisruptionBudget {
20✔
484
        ownedPDBs := make([]pv1.PodDisruptionBudget, 0, len(pdbs))
20✔
485
        for _, pdb := range pdbs {
60✔
486
                if isOwnedReference(owner, pdb.ObjectMeta) {
60✔
487
                        ownedPDBs = append(ownedPDBs, pdb)
20✔
488
                }
20✔
489
        }
490
        return ownedPDBs
20✔
491
}
492

493
// isOwnedReference returns true if the dependent object is owned by the owner
494
// object.
495
func isOwnedReference(owner kubeResource, dependent metav1.ObjectMeta) bool {
40✔
496
        for _, ref := range dependent.OwnerReferences {
80✔
497
                if ref.APIVersion == owner.APIVersion() &&
40✔
498
                        ref.Kind == owner.Kind() &&
40✔
499
                        ref.UID == owner.UID() &&
40✔
500
                        ref.Name == owner.Name() {
60✔
501
                        return true
20✔
502
                }
20✔
503
        }
504
        return false
20✔
505
}
506

507
// containLabels reports whether expectedLabels are in labels.
508
func containLabels(labels, expectedLabels map[string]string) bool {
68✔
509
        for key, val := range expectedLabels {
136✔
510
                if v, ok := labels[key]; !ok || v != val {
74✔
511
                        return false
6✔
512
                }
6✔
513
        }
514
        return true
62✔
515
}
516

517
// labelsIntersect checks whether two maps a and b intersects. Intersection is
518
// defined as at least one identical key value pair must exist in both maps and
519
// there must be no keys which match where the values doesn't match.
520
func labelsIntersect(a, b map[string]string) bool {
8✔
521
        intersect := false
8✔
522
        for key, val := range a {
16✔
523
                v, ok := b[key]
8✔
524
                if ok {
16✔
525
                        if v == val {
12✔
526
                                intersect = true
4✔
527
                        } else { // if the key exists but the values doesn't match, don't consider it an intersection
8✔
528
                                return false
4✔
529
                        }
4✔
530
                }
531
        }
532

533
        return intersect
4✔
534
}
535

536
func filterPDBs(pdbs []pv1.PodDisruptionBudget) (map[string]pv1.PodDisruptionBudget, map[string]pv1.PodDisruptionBudget) {
28✔
537
        managed := make(map[string]pv1.PodDisruptionBudget, len(pdbs))
28✔
538
        unmanaged := make(map[string]pv1.PodDisruptionBudget, len(pdbs))
28✔
539
        for _, pdb := range pdbs {
76✔
540
                if containLabels(pdb.Labels, ownerLabels) {
92✔
541
                        managed[pdb.Namespace+"/"+pdb.Name] = pdb
44✔
542
                        continue
44✔
543
                }
544
                unmanaged[pdb.Namespace+"/"+pdb.Name] = pdb
4✔
545
        }
546
        return managed, unmanaged
28✔
547
}
548

549
func (n *PDBController) generatePDB(owner kubeResource, ttl time.Time) pv1.PodDisruptionBudget {
33✔
550
        var suffix string
33✔
551
        if n.pdbNameSuffix != "" {
66✔
552
                suffix = "-" + n.pdbNameSuffix
33✔
553
        }
33✔
554

555
        pdb := pv1.PodDisruptionBudget{
33✔
556
                ObjectMeta: metav1.ObjectMeta{
33✔
557
                        Name:      owner.Name() + suffix,
33✔
558
                        Namespace: owner.Namespace(),
33✔
559
                        OwnerReferences: []metav1.OwnerReference{
33✔
560
                                {
33✔
561
                                        APIVersion: owner.APIVersion(),
33✔
562
                                        Kind:       owner.Kind(),
33✔
563
                                        Name:       owner.Name(),
33✔
564
                                        UID:        owner.UID(),
33✔
565
                                },
33✔
566
                        },
33✔
567
                        Labels:      owner.Labels(),
33✔
568
                        Annotations: make(map[string]string),
33✔
569
                },
33✔
570
                Spec: pv1.PodDisruptionBudgetSpec{
33✔
571
                        MaxUnavailable: &n.maxUnavailable,
33✔
572
                        Selector:       owner.Selector(),
33✔
573
                },
33✔
574
        }
33✔
575

33✔
576
        if n.parentResourceHash {
49✔
577
                // if we fail to generate the hash simply fall back to using
16✔
578
                // the existing selector
16✔
579
                hash, err := resourceHash(owner.Kind(), owner.Name())
16✔
580
                if err == nil {
32✔
581
                        pdb.Spec.Selector = &metav1.LabelSelector{
16✔
582
                                MatchLabels: map[string]string{
16✔
583
                                        parentResourceHashLabel: hash,
16✔
584
                                },
16✔
585
                        }
16✔
586
                }
16✔
587
        }
588

589
        if pdb.Labels == nil {
66✔
590
                pdb.Labels = make(map[string]string)
33✔
591
        }
33✔
592
        pdb.Labels[heritageLabel] = pdbController
33✔
593

33✔
594
        if !ttl.IsZero() {
45✔
595
                pdb.Annotations[nonReadySinceAnnotationName] = ttl.Format(time.RFC3339)
12✔
596
        }
12✔
597
        return pdb
33✔
598
}
599

600
func resourceHash(kind, name string) (string, error) {
18✔
601
        h := sha1.New()
18✔
602
        _, err := h.Write([]byte(kind + "-" + name))
18✔
603
        if err != nil {
18✔
604
                return "", err
×
605
        }
×
606
        return hex.EncodeToString(h.Sum(nil)), nil
18✔
607
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc