• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 6188855463

14 Sep 2023 05:24PM UTC coverage: 69.242% (+0.3%) from 68.933%
6188855463

push

github

web-flow
Merge pull request #1781 from NedAnd1/event-persistence

[V2] feat: persist replica attachment failures

118 of 118 new or added lines in 3 files covered. (100.0%)

7393 of 10677 relevant lines covered (69.24%)

7.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.46
/pkg/controller/common.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "math"
25
        "reflect"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
32
        "google.golang.org/grpc/codes"
33
        "google.golang.org/grpc/status"
34
        v1 "k8s.io/api/core/v1"
35

36
        "k8s.io/apimachinery/pkg/api/meta"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
42

43
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
44
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
45
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
46
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
47

48
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
49

50
        cache "k8s.io/client-go/tools/cache"
51

52
        "sigs.k8s.io/controller-runtime/pkg/client"
53
        "sigs.k8s.io/controller-runtime/pkg/controller"
54
        "sigs.k8s.io/controller-runtime/pkg/event"
55
        "sigs.k8s.io/controller-runtime/pkg/handler"
56
        "sigs.k8s.io/controller-runtime/pkg/manager"
57
        "sigs.k8s.io/controller-runtime/pkg/predicate"
58
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
59
        "sigs.k8s.io/controller-runtime/pkg/source"
60
)
61

62
const (
63
        DefaultTimeUntilGarbageCollection = time.Duration(5) * time.Minute
64

65
        maxRetry             = 10
66
        defaultRetryDuration = time.Duration(1) * time.Second
67
        defaultRetryFactor   = 5.0
68
        defaultRetrySteps    = 5
69

70
        cloudTimeout = time.Duration(5) * time.Minute
71

72
        defaultMaxPodAffinityWeight = 100
73
)
74

75
type noOpReconciler struct{}
76

77
func (n *noOpReconciler) Reconcile(_ context.Context, _ reconcile.Request) (reconcile.Result, error) {
×
78
        return reconcile.Result{}, nil
×
79
}
×
80

81
type operationRequester string
82

83
const (
84
        node     operationRequester = "node-controller"
85
        azvolume operationRequester = "azvolume-controller"
86
        pv       operationRequester = "pv-controller"
87
        replica  operationRequester = "replica-controller"
88
        pod      operationRequester = "pod-controller"
89
        detach   operationRequester = "detach-opeation"
90
        attach   operationRequester = "attach-opeation"
91
)
92

93
type attachmentCleanUpMode int
94

95
const (
96
        cleanUpAttachmentForUninstall attachmentCleanUpMode = iota
97
        cleanUpAttachment
98
)
99

100
type deleteMode int
101

102
const (
103
        deleteOnly deleteMode = iota
104
        deleteAndWait
105
)
106

107
type updateMode int
108

109
const (
110
        normalUpdate updateMode = iota
111
        forceUpdate
112
)
113

114
type updateWithLock bool
115

116
const (
117
        acquireLock updateWithLock = true
118
        skipLock    updateWithLock = false
119
)
120

121
type goSignal struct{}
122

123
type CloudProvisioner interface {
124
        GetSubscriptionID() string
125
        GetResourceGroup() string
126
        GetLocation() string
127
        GetFailureDomain(ctx context.Context, nodeID string) (string, error)
128
        GetInstanceType(ctx context.Context, nodeID string) (string, error)
129

130
        CreateVolume(
131
                ctx context.Context,
132
                volumeName string,
133
                capacityRange *azdiskv1beta2.CapacityRange,
134
                volumeCapabilities []azdiskv1beta2.VolumeCapability,
135
                parameters map[string]string,
136
                secrets map[string]string,
137
                volumeContentSource *azdiskv1beta2.ContentVolumeSource,
138
                accessibilityTopology *azdiskv1beta2.TopologyRequirement) (*azdiskv1beta2.AzVolumeStatusDetail, error)
139
        DeleteVolume(ctx context.Context, volumeID string, secrets map[string]string) error
140
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
141
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
142
        ExpandVolume(ctx context.Context, volumeID string, capacityRange *azdiskv1beta2.CapacityRange, secrets map[string]string) (*azdiskv1beta2.AzVolumeStatusDetail, error)
143
        ListVolumes(ctx context.Context, maxEntries int32, startingToken string) (*azdiskv1beta2.ListVolumesResult, error)
144
        CreateSnapshot(ctx context.Context, sourceVolumeID string, snapshotName string, secrets map[string]string, parameters map[string]string) (*azdiskv1beta2.Snapshot, error)
145
        ListSnapshots(ctx context.Context, maxEntries int32, startingToken string, sourceVolumeID string, snapshotID string, secrets map[string]string) (*azdiskv1beta2.ListSnapshotsResult, error)
146
        DeleteSnapshot(ctx context.Context, snapshotID string, secrets map[string]string) error
147
        CheckDiskExists(ctx context.Context, diskURI string) (*compute.Disk, error)
148
}
149

150
type replicaOperation struct {
151
        ctx                        context.Context
152
        requester                  operationRequester
153
        operationFunc              func(context.Context) error
154
        isReplicaGarbageCollection bool
155
}
156

157
type operationQueue struct {
158
        *list.List
159
        gcExclusionList set
160
        isActive        bool
161
}
162

163
// Remove the element from the queue if the queue is not empty and the element
164
// is in the list.
165
//
166
// Because the queue may have been cleared while the element is in use, this
167
// function checks if the queue is empty before removing the element to prevent
168
// underflow of the queue length.
169
func (q *operationQueue) safeRemove(element *list.Element) {
20✔
170
        if q.Front() != nil {
37✔
171
                _ = q.Remove(element)
17✔
172
        }
17✔
173
}
174

175
func newOperationQueue() *operationQueue {
64✔
176
        return &operationQueue{
64✔
177
                gcExclusionList: set{},
64✔
178
                List:            list.New(),
64✔
179
                isActive:        true,
64✔
180
        }
64✔
181
}
64✔
182

183
type retryInfoEntry struct {
184
        backoff   *wait.Backoff
185
        retryLock *sync.Mutex
186
}
187

188
type retryInfo struct {
189
        retryMap *sync.Map
190
}
191

192
func newRetryInfo() *retryInfo {
26✔
193
        return &retryInfo{
26✔
194
                retryMap: &sync.Map{},
26✔
195
        }
26✔
196
}
26✔
197

198
func newRetryEntry() *retryInfoEntry {
×
199
        return &retryInfoEntry{
×
200
                retryLock: &sync.Mutex{},
×
201
                backoff:   &wait.Backoff{Duration: defaultRetryDuration, Factor: defaultRetryFactor, Steps: defaultRetrySteps},
×
202
        }
×
203
}
×
204

205
func (r *retryInfo) nextRequeue(objectName string) time.Duration {
×
206
        v, _ := r.retryMap.LoadOrStore(objectName, newRetryEntry())
×
207
        entry := v.(*retryInfoEntry)
×
208
        entry.retryLock.Lock()
×
209
        defer entry.retryLock.Unlock()
×
210
        return entry.backoff.Step()
×
211
}
×
212

213
func (r *retryInfo) deleteEntry(objectName string) {
19✔
214
        r.retryMap.Delete(objectName)
19✔
215
}
19✔
216

217
type emptyType struct{}
218

219
type set map[interface{}]emptyType
220

221
func (s set) add(entry interface{}) {
465✔
222
        s[entry] = emptyType{}
465✔
223
}
465✔
224

225
func (s set) has(entry interface{}) bool {
268✔
226
        _, ok := s[entry]
268✔
227
        return ok
268✔
228
}
268✔
229

230
func (s set) remove(entry interface{}) {
11✔
231
        delete(s, entry)
11✔
232
}
11✔
233

234
func (s set) toStringSlice() []string {
4✔
235
        entries := make([]string, len(s))
4✔
236
        i := 0
4✔
237
        for entry := range s {
8✔
238
                entries[i] = entry.(string)
4✔
239
                i++
4✔
240
        }
4✔
241
        return entries
4✔
242
}
243

244
type lockableEntry struct {
245
        sync.RWMutex
246
        entry interface{}
247
}
248

249
func newLockableEntry(entry interface{}) *lockableEntry {
120✔
250
        return &lockableEntry{
120✔
251
                RWMutex: sync.RWMutex{},
120✔
252
                entry:   entry,
120✔
253
        }
120✔
254
}
120✔
255

256
// Return true for all errors unless the operation is replica garbage collection that had been aborted due to context cancellation
257
func shouldRequeueReplicaOperation(isReplicaGarbageCollection bool, err error) bool {
34✔
258
        return err != nil && !(isReplicaGarbageCollection && errors.Is(err, context.Canceled))
34✔
259
}
34✔
260

261
type filterPlugin interface {
262
        name() string
263
        setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState)
264
        filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error)
265
}
266

267
// interPodAffinityFilter selects nodes that either meets inter-pod affinity rules or has replica mounts of volumes of pods with matching labels
268
type interPodAffinityFilter struct {
269
        pods  []v1.Pod
270
        state *SharedState
271
}
272

273
func (p *interPodAffinityFilter) name() string {
81✔
274
        return "inter-pod affinity filter"
81✔
275
}
81✔
276

277
func (p *interPodAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
278
        p.pods = pods
27✔
279
        p.state = state
27✔
280
}
27✔
281

282
func (p *interPodAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
283
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
27✔
284
        defer w.Finish(nil)
27✔
285
        nodeMap := map[string]int{}
27✔
286
        qualifyingNodes := set{}
27✔
287

27✔
288
        for i, node := range nodes {
94✔
289
                nodeMap[node.Name] = i
67✔
290
                qualifyingNodes.add(node.Name)
67✔
291
        }
67✔
292

293
        isFirst := true
27✔
294
        for _, pod := range p.pods {
56✔
295
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
56✔
296
                        continue
27✔
297
                }
298
                for _, affinityTerm := range pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
4✔
299
                        podNodes, replicaNodes := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
2✔
300
                        if isFirst {
4✔
301
                                qualifyingNodes = set{}
2✔
302
                                for podNode := range podNodes {
4✔
303
                                        qualifyingNodes.add(podNode)
2✔
304
                                }
2✔
305
                                for replicaNode := range replicaNodes {
2✔
306
                                        qualifyingNodes.add(replicaNode)
×
307
                                }
×
308
                                isFirst = false
2✔
309
                        } else {
×
310
                                for qualifyingNode := range qualifyingNodes {
×
311
                                        if !podNodes.has(qualifyingNode) && !replicaNodes.has(qualifyingNode) {
×
312
                                                qualifyingNodes.remove(qualifyingNode)
×
313
                                        }
×
314
                                }
315
                        }
316
                }
317
        }
318

319
        var filteredNodes []v1.Node
27✔
320
        for qualifyingNode := range qualifyingNodes {
90✔
321
                if i, exists := nodeMap[qualifyingNode.(string)]; exists {
126✔
322
                        filteredNodes = append(filteredNodes, nodes[i])
63✔
323
                }
63✔
324
        }
325
        // Logging purpose
326
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
27✔
327
        i := 0
27✔
328
        for _, node := range nodes {
94✔
329
                if !qualifyingNodes.has(node.Name) {
71✔
330
                        evictedNodes[i] = node.Name
4✔
331
                        i++
4✔
332
                }
4✔
333
        }
334
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
27✔
335

27✔
336
        return filteredNodes, nil
27✔
337
}
338

339
type interPodAntiAffinityFilter struct {
340
        pods  []v1.Pod
341
        state *SharedState
342
}
343

344
func (p *interPodAntiAffinityFilter) name() string {
81✔
345
        return "inter-pod anti-affinity filter"
81✔
346
}
81✔
347

348
func (p *interPodAntiAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
349
        p.pods = pods
27✔
350
        p.state = state
27✔
351
}
27✔
352

353
func (p *interPodAntiAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
354
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
27✔
355
        defer w.Finish(nil)
27✔
356
        nodeMap := map[string]int{}
27✔
357
        candidateNodes := set{}
27✔
358

27✔
359
        for i, node := range nodes {
90✔
360
                nodeMap[node.Name] = i
63✔
361
                candidateNodes.add(node.Name)
63✔
362
        }
63✔
363

364
        qualifyingNodes := set{}
27✔
365
        isFirst := true
27✔
366
        for _, pod := range p.pods {
56✔
367
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
57✔
368
                        continue
28✔
369
                }
370
                for _, affinityTerm := range pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
2✔
371
                        podNodes, _ := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
1✔
372
                        if isFirst {
2✔
373
                                for podNode := range podNodes {
3✔
374
                                        qualifyingNodes.add(podNode)
2✔
375
                                }
2✔
376
                                isFirst = false
1✔
377
                        } else {
×
378
                                for qualifyingNode := range qualifyingNodes {
×
379
                                        if !podNodes.has(qualifyingNode) {
×
380
                                                qualifyingNodes.remove(qualifyingNode)
×
381
                                        }
×
382
                                }
383
                        }
384
                }
385
        }
386

387
        var filteredNodes []v1.Node
27✔
388
        for candidateNode := range candidateNodes {
90✔
389
                if !qualifyingNodes.has(candidateNode) {
124✔
390
                        if i, exists := nodeMap[candidateNode.(string)]; exists {
122✔
391
                                filteredNodes = append(filteredNodes, nodes[i])
61✔
392
                        }
61✔
393
                }
394
        }
395

396
        // Logging purpose
397
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
27✔
398
        i := 0
27✔
399
        for _, node := range nodes {
90✔
400
                if qualifyingNodes.has(node.Name) {
65✔
401
                        evictedNodes[i] = node.Name
2✔
402
                        i++
2✔
403
                }
2✔
404
        }
405
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
27✔
406

27✔
407
        return filteredNodes, nil
27✔
408
}
409

410
type podTolerationFilter struct {
411
        pods []v1.Pod
412
}
413

414
func (p *podTolerationFilter) name() string {
54✔
415
        return "pod toleration filter"
54✔
416
}
54✔
417

418
func (p *podTolerationFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
419
        p.pods = pods
27✔
420
}
27✔
421

422
func (p *podTolerationFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
423
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
27✔
424
        defer w.Finish(nil)
27✔
425
        candidateNodes := set{}
27✔
426
        for i := range nodes {
88✔
427
                candidateNodes.add(i)
61✔
428
        }
61✔
429

430
        podTolerations := set{}
27✔
431

27✔
432
        for i, pod := range p.pods {
56✔
433
                podTolerationMap := map[string]*v1.Toleration{}
29✔
434
                for _, podToleration := range pod.Spec.Tolerations {
30✔
435
                        podToleration := &podToleration
1✔
436
                        if i == 0 {
2✔
437
                                podTolerations.add(podToleration)
1✔
438
                        } else {
1✔
439
                                podTolerationMap[podToleration.Key] = podToleration
×
440
                        }
×
441
                }
442
                if i > 0 {
31✔
443
                        for podToleration := range podTolerations {
2✔
444
                                if existingToleration, ok := podTolerationMap[podToleration.(v1.Toleration).Key]; ok {
×
445
                                        if !podToleration.(*v1.Toleration).MatchToleration(existingToleration) {
×
446
                                                podTolerations.remove(podToleration)
×
447
                                        }
×
448
                                }
449
                        }
450
                }
451
        }
452

453
        for candidateNode := range candidateNodes {
88✔
454
                tolerable := true
61✔
455
                node := nodes[candidateNode.(int)]
61✔
456
                for _, taint := range node.Spec.Taints {
63✔
457
                        taintTolerable := false
2✔
458
                        for podToleration := range podTolerations {
3✔
459
                                // if any one of node's taint cannot be tolerated by pod's tolerations, break
1✔
460
                                if podToleration.(*v1.Toleration).ToleratesTaint(&taint) {
2✔
461
                                        taintTolerable = true
1✔
462
                                }
1✔
463
                        }
464
                        if tolerable = tolerable && taintTolerable; !tolerable {
3✔
465
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node (%s)'s taint cannot be tolerated", node.Name, node.Name)
1✔
466
                                candidateNodes.remove(candidateNode)
1✔
467
                                break
1✔
468
                        }
469
                }
470
        }
471

472
        filteredNodes := make([]v1.Node, len(candidateNodes))
27✔
473
        i := 0
27✔
474
        for candidateNode := range candidateNodes {
87✔
475
                filteredNodes[i] = nodes[candidateNode.(int)]
60✔
476
                i++
60✔
477
        }
60✔
478
        return filteredNodes, nil
27✔
479
}
480

481
type podNodeAffinityFilter struct {
482
        pods []v1.Pod
483
}
484

485
func (p *podNodeAffinityFilter) name() string {
54✔
486
        return "pod node-affinity filter"
54✔
487
}
54✔
488

489
func (p *podNodeAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
490
        p.pods = pods
27✔
491
}
27✔
492

493
func (p *podNodeAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
494
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
27✔
495
        defer w.Finish(nil)
27✔
496
        var podNodeAffinities []nodeaffinity.RequiredNodeAffinity
27✔
497

27✔
498
        candidateNodes := set{}
27✔
499
        for i := range nodes {
87✔
500
                candidateNodes.add(i)
60✔
501
        }
60✔
502

503
        for _, pod := range p.pods {
56✔
504
                // acknowledge that there can be duplicate entries within the slice
29✔
505
                podNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(&pod)
29✔
506
                podNodeAffinities = append(podNodeAffinities, podNodeAffinity)
29✔
507
        }
29✔
508

509
        for i, node := range nodes {
87✔
510
                for _, podNodeAffinity := range podNodeAffinities {
125✔
511
                        if match, err := podNodeAffinity.Match(&node); !match || err != nil {
70✔
512
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node does not match pod node affinity (%+v)", node.Name, podNodeAffinity)
5✔
513
                                candidateNodes.remove(i)
5✔
514
                        }
5✔
515
                }
516
        }
517

518
        filteredNodes := make([]v1.Node, len(candidateNodes))
27✔
519
        i := 0
27✔
520
        for candidateNode := range candidateNodes {
82✔
521
                filteredNodes[i] = nodes[candidateNode.(int)]
55✔
522
                i++
55✔
523
        }
55✔
524
        return filteredNodes, nil
27✔
525
}
526

527
type podNodeSelectorFilter struct {
528
        pods  []v1.Pod
529
        state *SharedState
530
}
531

532
func (p *podNodeSelectorFilter) name() string {
54✔
533
        return "pod node-selector filter"
54✔
534
}
54✔
535

536
func (p *podNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
537
        p.pods = pods
27✔
538
        p.state = state
27✔
539
}
27✔
540

541
func (p *podNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
542
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
27✔
543
        defer w.Finish(nil)
27✔
544
        candidateNodes := set{}
27✔
545
        for i := range nodes {
82✔
546
                candidateNodes.add(i)
55✔
547
        }
55✔
548

549
        podNodeSelector := labels.NewSelector()
27✔
550
        for _, pod := range p.pods {
56✔
551
                nodeSelector := labels.SelectorFromSet(labels.Set(pod.Spec.NodeSelector))
29✔
552
                requirements, selectable := nodeSelector.Requirements()
29✔
553
                if selectable {
58✔
554
                        podNodeSelector = podNodeSelector.Add(requirements...)
29✔
555
                }
29✔
556
        }
557

558
        filteredNodes := []v1.Node{}
27✔
559
        for candidateNode := range candidateNodes {
82✔
560
                node := nodes[candidateNode.(int)]
55✔
561
                nodeLabels := labels.Set(node.Labels)
55✔
562
                if podNodeSelector.Matches(nodeLabels) {
110✔
563
                        filteredNodes = append(filteredNodes, node)
55✔
564
                } else {
55✔
565
                        w.Logger().V(5).Infof("Removing node (%s) from replica candidate: node does not match pod node selector (%v)", node.Name, podNodeSelector)
×
566
                }
×
567
        }
568

569
        return filteredNodes, nil
27✔
570
}
571

572
type volumeNodeSelectorFilter struct {
573
        persistentVolumes []*v1.PersistentVolume
574
}
575

576
func (v *volumeNodeSelectorFilter) name() string {
54✔
577
        return "volume node-selector filter"
54✔
578
}
54✔
579

580
func (v *volumeNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
27✔
581
        v.persistentVolumes = persistentVolumes
27✔
582
}
27✔
583

584
func (v *volumeNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
27✔
585
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", v.name()))
27✔
586
        defer w.Finish(nil)
27✔
587
        candidateNodes := set{}
27✔
588
        for i := range nodes {
82✔
589
                candidateNodes.add(i)
55✔
590
        }
55✔
591

592
        var volumeNodeSelectors []*nodeaffinity.NodeSelector
27✔
593
        for _, pv := range v.persistentVolumes {
62✔
594
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
67✔
595
                        continue
32✔
596
                }
597
                nodeSelector, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
3✔
598
                if err != nil {
3✔
599
                        w.Logger().Errorf(err, "failed to get node selector from node affinity (%v)", pv.Spec.NodeAffinity.Required)
×
600
                        continue
×
601
                }
602
                // acknowledge that there can be duplicates in the slice
603
                volumeNodeSelectors = append(volumeNodeSelectors, nodeSelector)
3✔
604
        }
605

606
        for candidateNode := range candidateNodes {
82✔
607
                node := nodes[candidateNode.(int)]
55✔
608
                for _, volumeNodeSelector := range volumeNodeSelectors {
63✔
609
                        if !volumeNodeSelector.Match(&node) {
12✔
610
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: volume node selector (%+v) cannot be matched with the node.", node.Name, volumeNodeSelector)
4✔
611
                                candidateNodes.remove(candidateNode)
4✔
612
                        }
4✔
613
                }
614
        }
615

616
        filteredNodes := make([]v1.Node, len(candidateNodes))
27✔
617
        i := 0
27✔
618
        for candidateNode := range candidateNodes {
78✔
619
                filteredNodes[i] = nodes[candidateNode.(int)]
51✔
620
                i++
51✔
621
        }
51✔
622
        return filteredNodes, nil
27✔
623
}
624

625
type nodeScorerPlugin interface {
626
        name() string
627
        setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState)
628
        priority() float64 // returns the score plugin's priority in a scale of 1 ~ 5 (5 being the highest priority)
629
        score(ctx context.Context, nodeScores map[string]int) (map[string]int, error)
630
}
631

632
type scoreByNodeCapacity struct {
633
        nodes   []v1.Node
634
        volumes []string
635
        state   *SharedState
636
}
637

638
func (s *scoreByNodeCapacity) name() string {
42✔
639
        return "score by node capacity"
42✔
640
}
42✔
641

642
func (s *scoreByNodeCapacity) priority() float64 {
48✔
643
        return 1
48✔
644
}
48✔
645

646
func (s *scoreByNodeCapacity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
21✔
647
        s.nodes = nodes
21✔
648
        s.volumes = volumes
21✔
649
        s.state = state
21✔
650
}
21✔
651

652
func (s *scoreByNodeCapacity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
21✔
653
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
21✔
654
        defer w.Finish(nil)
21✔
655
        for _, node := range s.nodes {
69✔
656
                if _, ok := nodeScores[node.Name]; !ok {
48✔
657
                        continue
×
658
                }
659
                maxCapacity, err := azureutils.GetNodeMaxDiskCountWithLabels(node.Labels)
48✔
660
                if err != nil {
48✔
661
                        w.Logger().Errorf(err, "failed to get max capacity of node (%s)", node.Name)
×
662
                        delete(nodeScores, node.Name)
×
663
                        continue
×
664
                }
665
                remainingCapacity, err := azureutils.GetNodeRemainingDiskCountApprox(ctx, s.state.cachedClient, node.Name)
48✔
666
                if err != nil {
48✔
667
                        // if failed to get node's remaining capacity, remove the node from the candidate list and proceed
×
668
                        w.Logger().Errorf(err, "failed to get remaining capacity of node (%s)", node.Name)
×
669
                        delete(nodeScores, node.Name)
×
670
                        continue
×
671
                }
672

673
                nodeScores[node.Name] += int((float64(remainingCapacity) / float64(maxCapacity)) * math.Pow(10, s.priority()))
48✔
674

48✔
675
                if remainingCapacity-len(s.volumes) < 0 {
50✔
676
                        delete(nodeScores, node.Name)
2✔
677
                }
2✔
678

679
                w.Logger().V(10).Infof("node (%s) can accept %d more attachments", node.Name, remainingCapacity)
48✔
680
        }
681
        return nodeScores, nil
21✔
682
}
683

684
type scoreByReplicaCount struct {
685
        volumes []string
686
        state   *SharedState
687
}
688

689
func (s *scoreByReplicaCount) name() string {
42✔
690
        return "score by replica count"
42✔
691
}
42✔
692

693
func (s *scoreByReplicaCount) priority() float64 {
6✔
694
        return 3
6✔
695
}
6✔
696

697
func (s *scoreByReplicaCount) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
21✔
698
        s.volumes = volumes
21✔
699
        s.state = state
21✔
700
}
21✔
701

702
func (s *scoreByReplicaCount) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
21✔
703
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
21✔
704
        defer w.Finish(nil)
21✔
705

21✔
706
        var requestedReplicaCount int
21✔
707

21✔
708
        nodeReplicaCounts := map[string]int{}
21✔
709

21✔
710
        for _, volume := range s.volumes {
50✔
711
                azVolume, err := azureutils.GetAzVolume(ctx, s.state.cachedClient, nil, volume, s.state.config.ObjectNamespace, true)
29✔
712
                if err != nil {
29✔
713
                        w.Logger().Errorf(err, "failed to get AzVolume (%s)", volume)
×
714
                        continue
×
715
                }
716
                requestedReplicaCount += azVolume.Spec.MaxMountReplicaCount
29✔
717
                azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, s.state.cachedClient, volume, azureutils.AllRoles)
29✔
718
                if err != nil {
29✔
719
                        w.Logger().V(5).Errorf(err, "failed listing AzVolumeAttachments for azvolume %s", volume)
×
720
                        continue
×
721
                }
722

723
                for _, azVolumeAttachment := range azVolumeAttachments {
51✔
724
                        if _, exists := nodeScores[azVolumeAttachment.Spec.NodeName]; exists {
36✔
725
                                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
24✔
726
                                        delete(nodeScores, azVolumeAttachment.Spec.NodeName)
10✔
727
                                } else {
14✔
728
                                        nodeReplicaCounts[azVolumeAttachment.Spec.NodeName]++
4✔
729
                                }
4✔
730
                        }
731
                }
732

733
                if requestedReplicaCount > 0 {
58✔
734
                        for nodeName, replicaCount := range nodeReplicaCounts {
35✔
735
                                if _, ok := nodeScores[nodeName]; !ok {
6✔
736
                                        continue
×
737
                                }
738
                                nodeScores[nodeName] += int((float64(replicaCount) / float64(requestedReplicaCount)) * math.Pow(10, s.priority()))
6✔
739
                        }
740
                }
741
        }
742
        return nodeScores, nil
21✔
743
}
744

745
type scoreByInterPodAffinity struct {
746
        nodes   []v1.Node
747
        pods    []v1.Pod
748
        volumes []string
749
        state   *SharedState
750
}
751

752
func (s *scoreByInterPodAffinity) name() string {
42✔
753
        return "score by inter pod affinity"
42✔
754
}
42✔
755

756
func (s *scoreByInterPodAffinity) priority() float64 {
2✔
757
        return 2
2✔
758
}
2✔
759

760
func (s *scoreByInterPodAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
21✔
761
        s.nodes = nodes
21✔
762
        s.pods = pods
21✔
763
        s.volumes = volumes
21✔
764
        s.state = state
21✔
765
}
21✔
766

767
func (s *scoreByInterPodAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
21✔
768
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
21✔
769
        defer w.Finish(nil)
21✔
770

21✔
771
        nodeAffinityScores := map[string]int{}
21✔
772
        var maxAffinityScore int
21✔
773

21✔
774
        for _, pod := range s.pods {
45✔
775
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
47✔
776
                        continue
23✔
777
                }
778

779
                // pod affinity weight range from 1-100
780
                // and as a node can both be a part of 1) nodes that satisfy pod affinity rule and 2) nodes with qualifying replica attachments (for which we give half of the score), we need to 1.5X the max weight
781
                maxAffinityScore += 1.5 * defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
782

1✔
783
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
784
                        podNodes, replicaNodes := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
785
                        for podNode := range podNodes {
3✔
786
                                w.Logger().Infof("podNode: %s", podNode.(string))
2✔
787
                                nodeAffinityScores[podNode.(string)] += int(weightedAffinityTerm.Weight)
2✔
788
                        }
2✔
789
                        for replicaNode := range replicaNodes {
1✔
790
                                nodeAffinityScores[replicaNode.(string)] += int(weightedAffinityTerm.Weight / 2)
×
791
                        }
×
792
                }
793
        }
794

795
        for node, affinityScore := range nodeAffinityScores {
23✔
796
                if _, ok := nodeScores[node]; !ok {
2✔
797
                        continue
×
798
                }
799
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
2✔
800
        }
801

802
        return nodeScores, nil
21✔
803
}
804

805
type scoreByInterPodAntiAffinity struct {
806
        nodes   []v1.Node
807
        pods    []v1.Pod
808
        volumes []string
809
        state   *SharedState
810
}
811

812
func (s *scoreByInterPodAntiAffinity) name() string {
42✔
813
        return "score by inter pod anti affinity"
42✔
814
}
42✔
815

816
func (s *scoreByInterPodAntiAffinity) priority() float64 {
1✔
817
        return 2
1✔
818
}
1✔
819

820
func (s *scoreByInterPodAntiAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
21✔
821
        s.nodes = nodes
21✔
822
        s.pods = pods
21✔
823
        s.volumes = volumes
21✔
824
        s.state = state
21✔
825
}
21✔
826

827
func (s *scoreByInterPodAntiAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
21✔
828
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
21✔
829
        defer w.Finish(nil)
21✔
830

21✔
831
        nodeAffinityScores := map[string]int{}
21✔
832
        var maxAffinityScore int
21✔
833

21✔
834
        for _, pod := range s.pods {
45✔
835
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
47✔
836
                        continue
23✔
837
                }
838

839
                // pod affinity weight range from 1-100
840
                maxAffinityScore += defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
841

1✔
842
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
843
                        podNodes, _ := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
844

1✔
845
                        for node := range nodeScores {
4✔
846
                                if !podNodes.has(node) {
4✔
847
                                        nodeAffinityScores[node] += int(weightedAffinityTerm.Weight)
1✔
848
                                }
1✔
849
                        }
850
                }
851
        }
852

853
        for node, affinityScore := range nodeAffinityScores {
22✔
854
                if _, ok := nodeScores[node]; !ok {
1✔
855
                        continue
×
856
                }
857
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
1✔
858
        }
859

860
        return nodeScores, nil
21✔
861
}
862

863
type scoreByPodNodeAffinity struct {
864
        nodes   []v1.Node
865
        pods    []v1.Pod
866
        volumes []string
867
        state   *SharedState
868
}
869

870
func (s *scoreByPodNodeAffinity) name() string {
42✔
871
        return "score by pod node affinity"
42✔
872
}
42✔
873

874
func (s *scoreByPodNodeAffinity) priority() float64 {
×
875
        return 2
×
876
}
×
877

878
func (s *scoreByPodNodeAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
21✔
879
        s.nodes = nodes
21✔
880
        s.pods = pods
21✔
881
        s.volumes = volumes
21✔
882
        s.state = state
21✔
883
}
21✔
884

885
func (s *scoreByPodNodeAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
21✔
886
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", s.name()))
21✔
887
        defer w.Finish(nil)
21✔
888

21✔
889
        var preferredSchedulingTerms []v1.PreferredSchedulingTerm
21✔
890

21✔
891
        for _, pod := range s.pods {
45✔
892
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
46✔
893
                        continue
22✔
894
                }
895
                preferredSchedulingTerms = append(preferredSchedulingTerms, pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
2✔
896
        }
897

898
        preferredAffinity, err := nodeaffinity.NewPreferredSchedulingTerms(preferredSchedulingTerms)
21✔
899
        if err != nil {
21✔
900
                return nodeScores, err
×
901
        }
×
902

903
        for _, node := range s.nodes {
69✔
904
                nodeScore := preferredAffinity.Score(&node)
48✔
905
                if _, ok := nodeScores[node.Name]; !ok {
60✔
906
                        continue
12✔
907
                }
908
                nodeScores[node.Name] += int(nodeScore)
36✔
909
        }
910
        return nodeScores, nil
21✔
911
}
912

913
func getSupportedZones(nodeSelectorTerms []v1.NodeSelectorTerm, topologyKey string) set {
1✔
914
        // Get the list of supported zones for pv
1✔
915
        supportedZones := set{}
1✔
916
        if len(nodeSelectorTerms) > 0 {
2✔
917
                for _, term := range nodeSelectorTerms {
2✔
918
                        if len(term.MatchExpressions) > 0 {
2✔
919
                                for _, matchExpr := range term.MatchExpressions {
2✔
920
                                        if matchExpr.Key == topologyKey {
1✔
921
                                                for _, value := range matchExpr.Values {
×
922
                                                        if value != "" && !supportedZones.has(value) {
×
923
                                                                supportedZones.add(value)
×
924
                                                        }
×
925
                                                }
926
                                        }
927
                                }
928
                        }
929
                }
930
        }
931
        return supportedZones
1✔
932
}
933

934
func markDetachRequest(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
5✔
935
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation, string(caller))
5✔
936
}
5✔
937

938
func markCleanUp(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
4✔
939
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.CleanUpAnnotation, string(caller))
4✔
940
}
4✔
941

942
func shouldCleanUp(attachment azdiskv1beta2.AzVolumeAttachment, mode azureutils.AttachmentRoleMode) bool {
1✔
943
        return mode == azureutils.AllRoles || (attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole && mode == azureutils.PrimaryOnly) || (attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole && mode == azureutils.ReplicaOnly)
1✔
944
}
1✔
945

946
func isAttached(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
947
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.PublishContext != nil
2✔
948
}
2✔
949

950
func isCreated(volume *azdiskv1beta2.AzVolume) bool {
21✔
951
        return volume != nil && volume.Status.Detail != nil
21✔
952
}
21✔
953

954
// objectDeletionRequested returns whether deletion of the specified object has been requested.
955
// If so, it will return true and a time.Duration after which to delete the object. If the
956
// duration is less than or equal to 0, the object should be deleted immediately.
957
func objectDeletionRequested(obj runtime.Object) (bool, time.Duration) {
40✔
958
        meta, _ := meta.Accessor(obj)
40✔
959
        if meta == nil {
40✔
960
                return false, time.Duration(0)
×
961
        }
×
962
        deletionTime := meta.GetDeletionTimestamp()
40✔
963

40✔
964
        if deletionTime.IsZero() {
75✔
965
                return false, time.Duration(0)
35✔
966
        }
35✔
967

968
        return true, time.Until(deletionTime.Time)
5✔
969
}
970

971
func isCleanupRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
1✔
972
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.CleanUpAnnotation)
1✔
973
}
1✔
974

975
func volumeAttachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
3✔
976
        return attachment != nil && azureutils.MapContains(attachment.Annotations, consts.VolumeAttachRequestAnnotation)
3✔
977
}
3✔
978

979
func volumeDetachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
18✔
980
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation)
18✔
981
}
18✔
982

983
func volumeDeleteRequested(volume *azdiskv1beta2.AzVolume) bool {
2✔
984
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.VolumeDeleteRequestAnnotation)
2✔
985
}
2✔
986

987
func isDemotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
4✔
988
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.PrimaryRole && attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole
4✔
989
}
4✔
990

991
func isPromotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
992
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.ReplicaRole && attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole
2✔
993
}
2✔
994

995
func isPreProvisioned(volume *azdiskv1beta2.AzVolume) bool {
3✔
996
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.PreProvisionedVolumeAnnotation)
3✔
997
}
3✔
998

999
func getQualifiedName(namespace, name string) string {
207✔
1000
        return fmt.Sprintf("%s/%s", namespace, name)
207✔
1001
}
207✔
1002

1003
func parseQualifiedName(qualifiedName string) (namespace, name string, err error) {
15✔
1004
        parsed := strings.Split(qualifiedName, "/")
15✔
1005
        if len(parsed) != 2 {
15✔
1006
                err = status.Errorf(codes.Internal, "pod's qualified name (%s) should be of <namespace>/<name>", qualifiedName)
×
1007
                return
×
1008
        }
×
1009
        namespace = parsed[0]
15✔
1010
        name = parsed[1]
15✔
1011
        return
15✔
1012
}
1013

1014
func formatUpdateStateError(objectType, fromState, toState string, expectedStates ...string) string {
×
1015
        return fmt.Sprintf("%s's state '%s' cannot be updated to %s. %s can only be updated to %s", objectType, fromState, toState, fromState, strings.Join(expectedStates, ", "))
×
1016
}
×
1017

1018
func getOperationRequeueError(desired string, obj client.Object) error {
×
1019
        return status.Errorf(codes.Aborted, "requeueing %s operation because another operation is already pending on %v (%s)", desired, reflect.TypeOf(obj), obj.GetName())
×
1020
}
×
1021

1022
// reconcileReturnOnSuccess returns a reconciler result on successful reconciliation.
1023
func reconcileReturnOnSuccess(objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
17✔
1024
        retryInfo.deleteEntry(objectName)
17✔
1025
        return reconcile.Result{}, nil
17✔
1026
}
17✔
1027

1028
// reconcileReturnOnError returns a reconciler result on error that requeues the object for later processing if the error is retriable.
1029
func reconcileReturnOnError(ctx context.Context, obj runtime.Object, operationType string, err error, retryInfo *retryInfo) (reconcile.Result, error) {
1✔
1030
        var (
1✔
1031
                requeue    bool = status.Code(err) != codes.FailedPrecondition
1✔
1032
                retryAfter time.Duration
1✔
1033
        )
1✔
1034

1✔
1035
        w := workflow.GetWorkflow(ctx, obj)
1✔
1036

1✔
1037
        if meta, metaErr := meta.Accessor(obj); metaErr == nil {
2✔
1038
                objectName := meta.GetName()
1✔
1039
                objectType := reflect.TypeOf(obj)
1✔
1040
                if !requeue {
2✔
1041
                        w.Logger().Errorf(err, "failed to %s %v (%s) with no retry", operationType, objectType, objectName)
1✔
1042
                        retryInfo.deleteEntry(objectName)
1✔
1043
                } else {
1✔
1044
                        retryAfter = retryInfo.nextRequeue(objectName)
×
1045
                        w.Logger().Errorf(err, "failed to %s %v (%s) with retry after %v", operationType, objectType, objectName, retryAfter)
×
1046
                }
×
1047
        }
1048

1049
        return reconcile.Result{
1✔
1050
                Requeue:      requeue,
1✔
1051
                RequeueAfter: retryAfter,
1✔
1052
        }, nil
1✔
1053
}
1054

1055
// reconcileAfter returns a reconciler result that requeues the current object for processing after the specified time.
1056
func reconcileAfter(after time.Duration, objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
×
1057
        retryInfo.deleteEntry(objectName)
×
1058
        return reconcile.Result{Requeue: true, RequeueAfter: after}, nil
×
1059
}
×
1060

1061
func isOperationInProcess(obj interface{}) bool {
11✔
1062
        switch target := obj.(type) {
11✔
1063
        case *azdiskv1beta2.AzVolume:
4✔
1064
                return target.Status.State == azdiskv1beta2.VolumeCreating || target.Status.State == azdiskv1beta2.VolumeDeleting || target.Status.State == azdiskv1beta2.VolumeUpdating
4✔
1065
        case *azdiskv1beta2.AzVolumeAttachment:
7✔
1066
                deleteRequested, _ := objectDeletionRequested(target)
7✔
1067
                return target.Status.State == azdiskv1beta2.Attaching || (target.Status.State == azdiskv1beta2.Detaching && !deleteRequested)
7✔
1068
        }
1069
        return false
×
1070
}
1071

1072
func max(a, b int) int {
22✔
1073
        if a > b {
22✔
1074
                return a
×
1075
        }
×
1076
        return b
22✔
1077
}
1078

1079
func containsString(key string, items []string) bool {
8✔
1080
        for _, item := range items {
18✔
1081
                if item == key {
18✔
1082
                        return true
8✔
1083
                }
8✔
1084
        }
1085
        return false
×
1086
}
1087

1088
type ReplicaRequest struct {
1089
        VolumeName string
1090
        Priority   int //The number of replicas that have yet to be created
1091
}
1092
type VolumeReplicaRequestsPriorityQueue struct {
1093
        queue *cache.Heap
1094
        size  int32
1095
}
1096

1097
func (vq *VolumeReplicaRequestsPriorityQueue) Push(ctx context.Context, replicaRequest *ReplicaRequest) {
2✔
1098
        w, _ := workflow.GetWorkflowFromContext(ctx)
2✔
1099
        err := vq.queue.Add(replicaRequest)
2✔
1100
        atomic.AddInt32(&vq.size, 1)
2✔
1101
        if err != nil {
2✔
1102
                w.Logger().Errorf(err, "failed to add replica request for volume %s", replicaRequest.VolumeName)
×
1103
        }
×
1104
}
1105

1106
func (vq *VolumeReplicaRequestsPriorityQueue) Pop() *ReplicaRequest {
1✔
1107
        request, _ := vq.queue.Pop()
1✔
1108
        atomic.AddInt32(&vq.size, -1)
1✔
1109
        return request.(*ReplicaRequest)
1✔
1110
}
1✔
1111
func (vq *VolumeReplicaRequestsPriorityQueue) DrainQueue() []*ReplicaRequest {
2✔
1112
        var listRequests []*ReplicaRequest
2✔
1113
        for i := vq.size; i > 0; i-- {
3✔
1114
                listRequests = append(listRequests, vq.Pop())
1✔
1115
        }
1✔
1116
        return listRequests
2✔
1117
}
1118

1119
func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
9✔
1120
        if obj == nil || objectDeleted {
11✔
1121
                return true, nil
2✔
1122
        }
2✔
1123
        return false, nil
7✔
1124
}
1125

1126
func verifyObjectFailedOrDeleted(obj interface{}, objectDeleted bool) (bool, error) {
4✔
1127
        if obj == nil || objectDeleted {
6✔
1128
                return true, nil
2✔
1129
        }
2✔
1130

1131
        switch target := obj.(type) {
2✔
1132
        case azdiskv1beta2.AzVolumeAttachment:
×
1133
                if target.Status.Error != nil {
×
1134
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1135
                }
×
1136
        case azdiskv1beta2.AzVolume:
×
1137
                // otherwise, the volume detachment has either failed with error or pending
×
1138
                if target.Status.Error != nil {
×
1139
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1140
                }
×
1141
        }
1142

1143
        return false, nil
2✔
1144
}
1145

1146
func verifyObjectPromotedOrDemoted(obj interface{}, objectDeleted bool) (bool, error) {
6✔
1147
        if obj == nil || objectDeleted {
6✔
1148
                return false, fmt.Errorf("obj is nil or has been deleted")
×
1149
        }
×
1150

1151
        azVolumeAttachmentInstance := obj.(*azdiskv1beta2.AzVolumeAttachment)
6✔
1152
        if azVolumeAttachmentInstance.Labels != nil {
12✔
1153
                if label, ok := azVolumeAttachmentInstance.Labels[consts.RoleChangeLabel]; ok {
11✔
1154
                        isPromoteUpdated := label == consts.Promoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.ReplicaRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.PrimaryRole
5✔
1155
                        isDemoteUpdated := label == consts.Demoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.PrimaryRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.ReplicaRole
5✔
1156

5✔
1157
                        if isPromoteUpdated || isDemoteUpdated {
8✔
1158
                                return true, nil
3✔
1159
                        }
3✔
1160
                }
1161
        }
1162
        return false, nil
3✔
1163
}
1164

1165
// WatchObject creates a noop controller to set up a watch and an informer for an object in controller-runtime manager
1166
// Use this function if you want to set up a watch for an object without a configuring a separate informer factory.
1167
func WatchObject(mgr manager.Manager, objKind source.Kind) error {
×
1168
        objType := objKind.Type.GetName()
×
1169
        c, err := controller.New(fmt.Sprintf("watch %s", objType), mgr, controller.Options{
×
1170
                Reconciler: &noOpReconciler{},
×
1171
        })
×
1172
        if err != nil {
×
1173
                return err
×
1174
        }
×
1175

1176
        c.GetLogger().Info("Starting to watch %s", objType)
×
1177

×
1178
        // Watch for CRUD events on objects
×
1179
        err = c.Watch(&objKind, &handler.EnqueueRequestForObject{}, predicate.Funcs{
×
1180
                CreateFunc: func(_ event.CreateEvent) bool {
×
1181
                        return false
×
1182
                },
×
1183
                UpdateFunc: func(_ event.UpdateEvent) bool {
×
1184
                        return true
×
1185
                },
×
1186
                GenericFunc: func(_ event.GenericEvent) bool {
×
1187
                        return false
×
1188
                },
×
1189
                DeleteFunc: func(_ event.DeleteEvent) bool {
×
1190
                        return false
×
1191
                },
×
1192
        })
1193
        return err
×
1194
}
1195

1196
type circularLinkedList[T any] struct {
1197
        *circularLinkedListNode[T]
1198
}
1199

1200
func (list circularLinkedList[T]) isEmpty() bool {
1✔
1201
        return list.circularLinkedListNode == nil
1✔
1202
}
1✔
1203

1204
func (list *circularLinkedList[T]) clear() {
×
1205
        list.circularLinkedListNode = nil
×
1206
}
×
1207

1208
func (list *circularLinkedList[T]) add(newNode *circularLinkedListNode[T]) {
1✔
1209
        if list.circularLinkedListNode == nil {
2✔
1210
                list.circularLinkedListNode = newNode
1✔
1211
        } else {
1✔
1212
                list.prev.next = newNode
×
1213
        }
×
1214
        newNode.next = list.circularLinkedListNode
1✔
1215
        list.prev = newNode
1✔
1216
}
1217

1218
// Moves to the next node in the list and retrieves it.
1219
func (list *circularLinkedList[T]) next() *circularLinkedListNode[T] {
5✔
1220
        nextNode := list.circularLinkedListNode.next
5✔
1221
        list.circularLinkedListNode = nextNode
5✔
1222
        return nextNode
5✔
1223
}
5✔
1224

1225
type circularLinkedListNode[T any] struct {
1226
        curr T
1227
        prev *circularLinkedListNode[T]
1228
        next *circularLinkedListNode[T]
1229
}
1230

1231
func (oldNode *circularLinkedListNode[T]) remove() { //nolint:golint // receiver name should be consistent with previous receiver name for invalid-type
×
1232
        oldNode.prev.next = oldNode.next
×
1233
        oldNode.next.prev = oldNode.prev
×
1234
}
×
1235

1236
func (oldNode *circularLinkedListNode[T]) tryRemove() { //nolint:golint // receiver name should be consistent with previous receiver name for invalid-type
1✔
1237
        if oldNode != nil {
1✔
1238
                oldNode.remove()
×
1239
        }
×
1240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc