• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 5128766597

31 May 2023 06:10PM UTC coverage: 69.113% (+24.6%) from 44.538%
5128766597

Pull #1826

github

hccheng72
fix: only watch for events that replica controller get interested
Pull Request #1826: [V2] fix: update predicate of replica controller to prevent over creating replica AzVolumeAttachment

114 of 114 new or added lines in 2 files covered. (100.0%)

7196 of 10412 relevant lines covered (69.11%)

6.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.36
/pkg/controller/common.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "math"
25
        "reflect"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
32
        "google.golang.org/grpc/codes"
33
        "google.golang.org/grpc/status"
34
        v1 "k8s.io/api/core/v1"
35

36
        "k8s.io/apimachinery/pkg/api/meta"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
42

43
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
44
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
45
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
46
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
47

48
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
49

50
        cache "k8s.io/client-go/tools/cache"
51

52
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
53
        "sigs.k8s.io/controller-runtime/pkg/client"
54
        "sigs.k8s.io/controller-runtime/pkg/controller"
55
        "sigs.k8s.io/controller-runtime/pkg/event"
56
        "sigs.k8s.io/controller-runtime/pkg/handler"
57
        "sigs.k8s.io/controller-runtime/pkg/manager"
58
        "sigs.k8s.io/controller-runtime/pkg/predicate"
59
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
60
        "sigs.k8s.io/controller-runtime/pkg/source"
61
)
62

63
const (
64
        DefaultTimeUntilGarbageCollection = time.Duration(5) * time.Minute
65

66
        maxRetry             = 10
67
        defaultRetryDuration = time.Duration(1) * time.Second
68
        defaultRetryFactor   = 5.0
69
        defaultRetrySteps    = 5
70

71
        cloudTimeout = time.Duration(5) * time.Minute
72

73
        defaultMaxPodAffinityWeight = 100
74
)
75

76
type noOpReconciler struct{}
77

78
func (n *noOpReconciler) Reconcile(_ context.Context, _ reconcile.Request) (reconcile.Result, error) {
×
79
        return reconcile.Result{}, nil
×
80
}
×
81

82
type operationRequester string
83

84
const (
85
        azdrivernode     operationRequester = "azdrivernode-controller"
86
        azvolume         operationRequester = "azvolume-controller"
87
        pv               operationRequester = "pv-controller"
88
        replica          operationRequester = "replica-controller"
89
        nodeavailability operationRequester = "nodeavailability-controller"
90
        pod              operationRequester = "pod-controller"
91
        detach           operationRequester = "detach-opeation"
92
        attach           operationRequester = "attach-opeation"
93
)
94

95
type attachmentCleanUpMode int
96

97
const (
98
        cleanUpAttachmentForUninstall attachmentCleanUpMode = iota
99
        cleanUpAttachment
100
)
101

102
type deleteMode int
103

104
const (
105
        deleteOnly deleteMode = iota
106
        deleteAndWait
107
)
108

109
type updateMode int
110

111
const (
112
        normalUpdate updateMode = iota
113
        forceUpdate
114
)
115

116
type updateWithLock bool
117

118
const (
119
        acquireLock updateWithLock = true
120
        skipLock    updateWithLock = false
121
)
122

123
type goSignal struct{}
124

125
// TODO Make CloudProvisioner independent of csi types.
126
type CloudProvisioner interface {
127
        CreateVolume(
128
                ctx context.Context,
129
                volumeName string,
130
                capacityRange *azdiskv1beta2.CapacityRange,
131
                volumeCapabilities []azdiskv1beta2.VolumeCapability,
132
                parameters map[string]string,
133
                secrets map[string]string,
134
                volumeContentSource *azdiskv1beta2.ContentVolumeSource,
135
                accessibilityTopology *azdiskv1beta2.TopologyRequirement) (*azdiskv1beta2.AzVolumeStatusDetail, error)
136
        DeleteVolume(ctx context.Context, volumeID string, secrets map[string]string) error
137
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
138
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
139
        ExpandVolume(ctx context.Context, volumeID string, capacityRange *azdiskv1beta2.CapacityRange, secrets map[string]string) (*azdiskv1beta2.AzVolumeStatusDetail, error)
140
        ListVolumes(ctx context.Context, maxEntries int32, startingToken string) (*azdiskv1beta2.ListVolumesResult, error)
141
        CreateSnapshot(ctx context.Context, sourceVolumeID string, snapshotName string, secrets map[string]string, parameters map[string]string) (*azdiskv1beta2.Snapshot, error)
142
        ListSnapshots(ctx context.Context, maxEntries int32, startingToken string, sourceVolumeID string, snapshotID string, secrets map[string]string) (*azdiskv1beta2.ListSnapshotsResult, error)
143
        DeleteSnapshot(ctx context.Context, snapshotID string, secrets map[string]string) error
144
        CheckDiskExists(ctx context.Context, diskURI string) (*compute.Disk, error)
145
        GetCloud() *provider.Cloud
146
        GetMetricPrefix() string
147
}
148

149
type replicaOperation struct {
150
        ctx                        context.Context
151
        requester                  operationRequester
152
        operationFunc              func(context.Context) error
153
        isReplicaGarbageCollection bool
154
}
155

156
type operationQueue struct {
157
        *list.List
158
        gcExclusionList set
159
        isActive        bool
160
}
161

162
func (q *operationQueue) remove(element *list.Element) {
20✔
163
        // operationQueue might have been cleared before the lock was acquired
20✔
164
        // so always check if the list is empty or not before removing object from the queue, otherwise it would set the underlying length of the queue to be < 0, causing issues
20✔
165
        if q.Front() != nil {
37✔
166
                _ = q.Remove(element)
17✔
167
        }
17✔
168
}
169

170
func newOperationQueue() *operationQueue {
63✔
171
        return &operationQueue{
63✔
172
                gcExclusionList: set{},
63✔
173
                List:            list.New(),
63✔
174
                isActive:        true,
63✔
175
        }
63✔
176
}
63✔
177

178
type retryInfoEntry struct {
179
        backoff   *wait.Backoff
180
        retryLock *sync.Mutex
181
}
182

183
type retryInfo struct {
184
        retryMap *sync.Map
185
}
186

187
func newRetryInfo() *retryInfo {
25✔
188
        return &retryInfo{
25✔
189
                retryMap: &sync.Map{},
25✔
190
        }
25✔
191
}
25✔
192

193
func newRetryEntry() *retryInfoEntry {
×
194
        return &retryInfoEntry{
×
195
                retryLock: &sync.Mutex{},
×
196
                backoff:   &wait.Backoff{Duration: defaultRetryDuration, Factor: defaultRetryFactor, Steps: defaultRetrySteps},
×
197
        }
×
198
}
×
199

200
func (r *retryInfo) nextRequeue(objectName string) time.Duration {
×
201
        v, _ := r.retryMap.LoadOrStore(objectName, newRetryEntry())
×
202
        entry := v.(*retryInfoEntry)
×
203
        entry.retryLock.Lock()
×
204
        defer entry.retryLock.Unlock()
×
205
        return entry.backoff.Step()
×
206
}
×
207

208
func (r *retryInfo) deleteEntry(objectName string) {
18✔
209
        r.retryMap.Delete(objectName)
18✔
210
}
18✔
211

212
type emptyType struct{}
213

214
type set map[interface{}]emptyType
215

216
func (s set) add(entry interface{}) {
452✔
217
        s[entry] = emptyType{}
452✔
218
}
452✔
219

220
func (s set) has(entry interface{}) bool {
261✔
221
        _, ok := s[entry]
261✔
222
        return ok
261✔
223
}
261✔
224

225
func (s set) remove(entry interface{}) {
11✔
226
        delete(s, entry)
11✔
227
}
11✔
228

229
func (s set) toStringSlice() []string {
4✔
230
        entries := make([]string, len(s))
4✔
231
        i := 0
4✔
232
        for entry := range s {
8✔
233
                entries[i] = entry.(string)
4✔
234
                i++
4✔
235
        }
4✔
236
        return entries
4✔
237
}
238

239
type lockableEntry struct {
240
        sync.RWMutex
241
        entry interface{}
242
}
243

244
func newLockableEntry(entry interface{}) *lockableEntry {
118✔
245
        return &lockableEntry{
118✔
246
                RWMutex: sync.RWMutex{},
118✔
247
                entry:   entry,
118✔
248
        }
118✔
249
}
118✔
250

251
// Return true for all errors unless the operation is replica garbage collection that had been aborted due to context cancellation
252
func shouldRequeueReplicaOperation(isReplicaGarbageCollection bool, err error) bool {
34✔
253
        return err != nil && !(isReplicaGarbageCollection && errors.Is(err, context.Canceled))
34✔
254
}
34✔
255

256
type filterPlugin interface {
257
        name() string
258
        setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState)
259
        filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error)
260
}
261

262
// interPodAffinityFilter selects nodes that either meets inter-pod affinity rules or has replica mounts of volumes of pods with matching labels
263
type interPodAffinityFilter struct {
264
        pods  []v1.Pod
265
        state *SharedState
266
}
267

268
func (p *interPodAffinityFilter) name() string {
78✔
269
        return "inter-pod affinity filter"
78✔
270
}
78✔
271

272
func (p *interPodAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
273
        p.pods = pods
26✔
274
        p.state = state
26✔
275
}
26✔
276

277
func (p *interPodAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
278
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
279
        defer w.Finish(nil)
26✔
280
        nodeMap := map[string]int{}
26✔
281
        qualifyingNodes := set{}
26✔
282

26✔
283
        for i, node := range nodes {
91✔
284
                nodeMap[node.Name] = i
65✔
285
                qualifyingNodes.add(node.Name)
65✔
286
        }
65✔
287

288
        isFirst := true
26✔
289
        for _, pod := range p.pods {
54✔
290
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
54✔
291
                        continue
26✔
292
                }
293
                for _, affinityTerm := range pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
4✔
294
                        podNodes, replicaNodes := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
2✔
295
                        if isFirst {
4✔
296
                                qualifyingNodes = set{}
2✔
297
                                for podNode := range podNodes {
4✔
298
                                        qualifyingNodes.add(podNode)
2✔
299
                                }
2✔
300
                                for replicaNode := range replicaNodes {
2✔
301
                                        qualifyingNodes.add(replicaNode)
×
302
                                }
×
303
                                isFirst = false
2✔
304
                        } else {
×
305
                                for qualifyingNode := range qualifyingNodes {
×
306
                                        if !podNodes.has(qualifyingNode) && !replicaNodes.has(qualifyingNode) {
×
307
                                                qualifyingNodes.remove(qualifyingNode)
×
308
                                        }
×
309
                                }
310
                        }
311
                }
312
        }
313

314
        var filteredNodes []v1.Node
26✔
315
        for qualifyingNode := range qualifyingNodes {
87✔
316
                if i, exists := nodeMap[qualifyingNode.(string)]; exists {
122✔
317
                        filteredNodes = append(filteredNodes, nodes[i])
61✔
318
                }
61✔
319
        }
320
        // Logging purpose
321
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
322
        i := 0
26✔
323
        for _, node := range nodes {
91✔
324
                if !qualifyingNodes.has(node.Name) {
69✔
325
                        evictedNodes[i] = node.Name
4✔
326
                        i++
4✔
327
                }
4✔
328
        }
329
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
330

26✔
331
        return filteredNodes, nil
26✔
332
}
333

334
type interPodAntiAffinityFilter struct {
335
        pods  []v1.Pod
336
        state *SharedState
337
}
338

339
func (p *interPodAntiAffinityFilter) name() string {
78✔
340
        return "inter-pod anti-affinity filter"
78✔
341
}
78✔
342

343
func (p *interPodAntiAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
344
        p.pods = pods
26✔
345
        p.state = state
26✔
346
}
26✔
347

348
func (p *interPodAntiAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
349
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
350
        defer w.Finish(nil)
26✔
351
        nodeMap := map[string]int{}
26✔
352
        candidateNodes := set{}
26✔
353

26✔
354
        for i, node := range nodes {
87✔
355
                nodeMap[node.Name] = i
61✔
356
                candidateNodes.add(node.Name)
61✔
357
        }
61✔
358

359
        qualifyingNodes := set{}
26✔
360
        isFirst := true
26✔
361
        for _, pod := range p.pods {
54✔
362
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
55✔
363
                        continue
27✔
364
                }
365
                for _, affinityTerm := range pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
2✔
366
                        podNodes, _ := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
1✔
367
                        if isFirst {
2✔
368
                                for podNode := range podNodes {
3✔
369
                                        qualifyingNodes.add(podNode)
2✔
370
                                }
2✔
371
                                isFirst = false
1✔
372
                        } else {
×
373
                                for qualifyingNode := range qualifyingNodes {
×
374
                                        if !podNodes.has(qualifyingNode) {
×
375
                                                qualifyingNodes.remove(qualifyingNode)
×
376
                                        }
×
377
                                }
378
                        }
379
                }
380
        }
381

382
        var filteredNodes []v1.Node
26✔
383
        for candidateNode := range candidateNodes {
87✔
384
                if !qualifyingNodes.has(candidateNode) {
120✔
385
                        if i, exists := nodeMap[candidateNode.(string)]; exists {
118✔
386
                                filteredNodes = append(filteredNodes, nodes[i])
59✔
387
                        }
59✔
388
                }
389
        }
390

391
        // Logging purpose
392
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
393
        i := 0
26✔
394
        for _, node := range nodes {
87✔
395
                if qualifyingNodes.has(node.Name) {
63✔
396
                        evictedNodes[i] = node.Name
2✔
397
                        i++
2✔
398
                }
2✔
399
        }
400
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
401

26✔
402
        return filteredNodes, nil
26✔
403
}
404

405
type podTolerationFilter struct {
406
        pods []v1.Pod
407
}
408

409
func (p *podTolerationFilter) name() string {
52✔
410
        return "pod toleration filter"
52✔
411
}
52✔
412

413
func (p *podTolerationFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
414
        p.pods = pods
26✔
415
}
26✔
416

417
func (p *podTolerationFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
418
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
419
        defer w.Finish(nil)
26✔
420
        candidateNodes := set{}
26✔
421
        for i := range nodes {
85✔
422
                candidateNodes.add(i)
59✔
423
        }
59✔
424

425
        podTolerations := set{}
26✔
426

26✔
427
        for i, pod := range p.pods {
54✔
428
                podTolerationMap := map[string]*v1.Toleration{}
28✔
429
                for _, podToleration := range pod.Spec.Tolerations {
29✔
430
                        podToleration := &podToleration
1✔
431
                        if i == 0 {
2✔
432
                                podTolerations.add(podToleration)
1✔
433
                        } else {
1✔
434
                                podTolerationMap[podToleration.Key] = podToleration
×
435
                        }
×
436
                }
437
                if i > 0 {
30✔
438
                        for podToleration := range podTolerations {
2✔
439
                                if existingToleration, ok := podTolerationMap[podToleration.(v1.Toleration).Key]; ok {
×
440
                                        if !podToleration.(*v1.Toleration).MatchToleration(existingToleration) {
×
441
                                                podTolerations.remove(podToleration)
×
442
                                        }
×
443
                                }
444
                        }
445
                }
446
        }
447

448
        for candidateNode := range candidateNodes {
85✔
449
                tolerable := true
59✔
450
                node := nodes[candidateNode.(int)]
59✔
451
                for _, taint := range node.Spec.Taints {
61✔
452
                        taintTolerable := false
2✔
453
                        for podToleration := range podTolerations {
3✔
454
                                // if any one of node's taint cannot be tolerated by pod's tolerations, break
1✔
455
                                if podToleration.(*v1.Toleration).ToleratesTaint(&taint) {
2✔
456
                                        taintTolerable = true
1✔
457
                                }
1✔
458
                        }
459
                        if tolerable = tolerable && taintTolerable; !tolerable {
3✔
460
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node (%s)'s taint cannot be tolerated", node.Name, node.Name)
1✔
461
                                candidateNodes.remove(candidateNode)
1✔
462
                                break
1✔
463
                        }
464
                }
465
        }
466

467
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
468
        i := 0
26✔
469
        for candidateNode := range candidateNodes {
84✔
470
                filteredNodes[i] = nodes[candidateNode.(int)]
58✔
471
                i++
58✔
472
        }
58✔
473
        return filteredNodes, nil
26✔
474
}
475

476
type podNodeAffinityFilter struct {
477
        pods []v1.Pod
478
}
479

480
func (p *podNodeAffinityFilter) name() string {
52✔
481
        return "pod node-affinity filter"
52✔
482
}
52✔
483

484
func (p *podNodeAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
485
        p.pods = pods
26✔
486
}
26✔
487

488
func (p *podNodeAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
489
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
490
        defer w.Finish(nil)
26✔
491
        var podNodeAffinities []nodeaffinity.RequiredNodeAffinity
26✔
492

26✔
493
        candidateNodes := set{}
26✔
494
        for i := range nodes {
84✔
495
                candidateNodes.add(i)
58✔
496
        }
58✔
497

498
        for _, pod := range p.pods {
54✔
499
                // acknowledge that there can be duplicate entries within the slice
28✔
500
                podNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(&pod)
28✔
501
                podNodeAffinities = append(podNodeAffinities, podNodeAffinity)
28✔
502
        }
28✔
503

504
        for i, node := range nodes {
84✔
505
                for _, podNodeAffinity := range podNodeAffinities {
121✔
506
                        if match, err := podNodeAffinity.Match(&node); !match || err != nil {
68✔
507
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node does not match pod node affinity (%+v)", node.Name, podNodeAffinity)
5✔
508
                                candidateNodes.remove(i)
5✔
509
                        }
5✔
510
                }
511
        }
512

513
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
514
        i := 0
26✔
515
        for candidateNode := range candidateNodes {
79✔
516
                filteredNodes[i] = nodes[candidateNode.(int)]
53✔
517
                i++
53✔
518
        }
53✔
519
        return filteredNodes, nil
26✔
520
}
521

522
type podNodeSelectorFilter struct {
523
        pods  []v1.Pod
524
        state *SharedState
525
}
526

527
func (p *podNodeSelectorFilter) name() string {
52✔
528
        return "pod node-selector filter"
52✔
529
}
52✔
530

531
func (p *podNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
532
        p.pods = pods
26✔
533
        p.state = state
26✔
534
}
26✔
535

536
func (p *podNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
537
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
538
        defer w.Finish(nil)
26✔
539
        candidateNodes := set{}
26✔
540
        for i := range nodes {
79✔
541
                candidateNodes.add(i)
53✔
542
        }
53✔
543

544
        podNodeSelector := labels.NewSelector()
26✔
545
        for _, pod := range p.pods {
54✔
546
                nodeSelector := labels.SelectorFromSet(labels.Set(pod.Spec.NodeSelector))
28✔
547
                requirements, selectable := nodeSelector.Requirements()
28✔
548
                if selectable {
56✔
549
                        podNodeSelector = podNodeSelector.Add(requirements...)
28✔
550
                }
28✔
551
        }
552

553
        filteredNodes := []v1.Node{}
26✔
554
        for candidateNode := range candidateNodes {
79✔
555
                node := nodes[candidateNode.(int)]
53✔
556
                nodeLabels := labels.Set(node.Labels)
53✔
557
                if podNodeSelector.Matches(nodeLabels) {
106✔
558
                        filteredNodes = append(filteredNodes, node)
53✔
559
                } else {
53✔
560
                        w.Logger().V(5).Infof("Removing node (%s) from replica candidate: node does not match pod node selector (%v)", node.Name, podNodeSelector)
×
561
                }
×
562
        }
563

564
        return filteredNodes, nil
26✔
565
}
566

567
type volumeNodeSelectorFilter struct {
568
        persistentVolumes []*v1.PersistentVolume
569
}
570

571
func (v *volumeNodeSelectorFilter) name() string {
52✔
572
        return "volume node-selector filter"
52✔
573
}
52✔
574

575
func (v *volumeNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
576
        v.persistentVolumes = persistentVolumes
26✔
577
}
26✔
578

579
func (v *volumeNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
580
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", v.name()))
26✔
581
        defer w.Finish(nil)
26✔
582
        candidateNodes := set{}
26✔
583
        for i := range nodes {
79✔
584
                candidateNodes.add(i)
53✔
585
        }
53✔
586

587
        var volumeNodeSelectors []*nodeaffinity.NodeSelector
26✔
588
        for _, pv := range v.persistentVolumes {
60✔
589
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
65✔
590
                        continue
31✔
591
                }
592
                nodeSelector, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
3✔
593
                if err != nil {
3✔
594
                        w.Logger().Errorf(err, "failed to get node selector from node affinity (%v)", pv.Spec.NodeAffinity.Required)
×
595
                        continue
×
596
                }
597
                // acknowledge that there can be duplicates in the slice
598
                volumeNodeSelectors = append(volumeNodeSelectors, nodeSelector)
3✔
599
        }
600

601
        for candidateNode := range candidateNodes {
79✔
602
                node := nodes[candidateNode.(int)]
53✔
603
                for _, volumeNodeSelector := range volumeNodeSelectors {
61✔
604
                        if !volumeNodeSelector.Match(&node) {
12✔
605
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: volume node selector (%+v) cannot be matched with the node.", node.Name, volumeNodeSelector)
4✔
606
                                candidateNodes.remove(candidateNode)
4✔
607
                        }
4✔
608
                }
609
        }
610

611
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
612
        i := 0
26✔
613
        for candidateNode := range candidateNodes {
75✔
614
                filteredNodes[i] = nodes[candidateNode.(int)]
49✔
615
                i++
49✔
616
        }
49✔
617
        return filteredNodes, nil
26✔
618
}
619

620
type nodeScorerPlugin interface {
621
        name() string
622
        setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState)
623
        priority() float64 // returns the score plugin's priority in a scale of 1 ~ 5 (5 being the highest priority)
624
        score(ctx context.Context, nodeScores map[string]int) (map[string]int, error)
625
}
626

627
type scoreByNodeCapacity struct {
628
        nodes   []v1.Node
629
        volumes []string
630
        state   *SharedState
631
}
632

633
func (s *scoreByNodeCapacity) name() string {
40✔
634
        return "score by node capacity"
40✔
635
}
40✔
636

637
func (s *scoreByNodeCapacity) priority() float64 {
46✔
638
        return 1
46✔
639
}
46✔
640

641
func (s *scoreByNodeCapacity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
642
        s.nodes = nodes
20✔
643
        s.volumes = volumes
20✔
644
        s.state = state
20✔
645
}
20✔
646

647
func (s *scoreByNodeCapacity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
648
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
649
        defer w.Finish(nil)
20✔
650
        for _, node := range s.nodes {
66✔
651
                if _, ok := nodeScores[node.Name]; !ok {
46✔
652
                        continue
×
653
                }
654
                maxCapacity, err := azureutils.GetNodeMaxDiskCountWithLabels(node.Labels)
46✔
655
                if err != nil {
46✔
656
                        w.Logger().Errorf(err, "failed to get max capacity of node (%s)", node.Name)
×
657
                        delete(nodeScores, node.Name)
×
658
                        continue
×
659
                }
660
                remainingCapacity, err := azureutils.GetNodeRemainingDiskCountApprox(ctx, s.state.cachedClient, node.Name)
46✔
661
                if err != nil {
46✔
662
                        // if failed to get node's remaining capacity, remove the node from the candidate list and proceed
×
663
                        w.Logger().Errorf(err, "failed to get remaining capacity of node (%s)", node.Name)
×
664
                        delete(nodeScores, node.Name)
×
665
                        continue
×
666
                }
667

668
                nodeScores[node.Name] += int((float64(remainingCapacity) / float64(maxCapacity)) * math.Pow(10, s.priority()))
46✔
669

46✔
670
                if remainingCapacity-len(s.volumes) < 0 {
48✔
671
                        delete(nodeScores, node.Name)
2✔
672
                }
2✔
673

674
                w.Logger().V(10).Infof("node (%s) can accept %d more attachments", node.Name, remainingCapacity)
46✔
675
        }
676
        return nodeScores, nil
20✔
677
}
678

679
type scoreByReplicaCount struct {
680
        volumes []string
681
        state   *SharedState
682
}
683

684
func (s *scoreByReplicaCount) name() string {
40✔
685
        return "score by replica count"
40✔
686
}
40✔
687

688
func (s *scoreByReplicaCount) priority() float64 {
5✔
689
        return 3
5✔
690
}
5✔
691

692
func (s *scoreByReplicaCount) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
693
        s.volumes = volumes
20✔
694
        s.state = state
20✔
695
}
20✔
696

697
func (s *scoreByReplicaCount) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
698
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
699
        defer w.Finish(nil)
20✔
700

20✔
701
        var requestedReplicaCount int
20✔
702

20✔
703
        nodeReplicaCounts := map[string]int{}
20✔
704

20✔
705
        for _, volume := range s.volumes {
48✔
706
                azVolume, err := azureutils.GetAzVolume(ctx, s.state.cachedClient, nil, volume, s.state.config.ObjectNamespace, true)
28✔
707
                if err != nil {
28✔
708
                        w.Logger().Errorf(err, "failed to get AzVolume (%s)", volume)
×
709
                        continue
×
710
                }
711
                requestedReplicaCount += azVolume.Spec.MaxMountReplicaCount
28✔
712
                azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, s.state.cachedClient, volume, azureutils.AllRoles)
28✔
713
                if err != nil {
28✔
714
                        w.Logger().V(5).Errorf(err, "failed listing AzVolumeAttachments for azvolume %s", volume)
×
715
                        continue
×
716
                }
717

718
                for _, azVolumeAttachment := range azVolumeAttachments {
49✔
719
                        if _, exists := nodeScores[azVolumeAttachment.Spec.NodeName]; exists {
34✔
720
                                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
23✔
721
                                        delete(nodeScores, azVolumeAttachment.Spec.NodeName)
10✔
722
                                } else {
13✔
723
                                        nodeReplicaCounts[azVolumeAttachment.Spec.NodeName]++
3✔
724
                                }
3✔
725
                        }
726
                }
727

728
                if requestedReplicaCount > 0 {
56✔
729
                        for nodeName, replicaCount := range nodeReplicaCounts {
33✔
730
                                if _, ok := nodeScores[nodeName]; !ok {
5✔
731
                                        continue
×
732
                                }
733
                                nodeScores[nodeName] += int((float64(replicaCount) / float64(requestedReplicaCount)) * math.Pow(10, s.priority()))
5✔
734
                        }
735
                }
736
        }
737
        return nodeScores, nil
20✔
738
}
739

740
type scoreByInterPodAffinity struct {
741
        nodes   []v1.Node
742
        pods    []v1.Pod
743
        volumes []string
744
        state   *SharedState
745
}
746

747
func (s *scoreByInterPodAffinity) name() string {
40✔
748
        return "score by inter pod affinity"
40✔
749
}
40✔
750

751
func (s *scoreByInterPodAffinity) priority() float64 {
2✔
752
        return 2
2✔
753
}
2✔
754

755
func (s *scoreByInterPodAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
756
        s.nodes = nodes
20✔
757
        s.pods = pods
20✔
758
        s.volumes = volumes
20✔
759
        s.state = state
20✔
760
}
20✔
761

762
func (s *scoreByInterPodAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
763
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
764
        defer w.Finish(nil)
20✔
765

20✔
766
        nodeAffinityScores := map[string]int{}
20✔
767
        var maxAffinityScore int
20✔
768

20✔
769
        for _, pod := range s.pods {
43✔
770
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
45✔
771
                        continue
22✔
772
                }
773

774
                // pod affinity weight range from 1-100
775
                // and as a node can both be a part of 1) nodes that satisfy pod affinity rule and 2) nodes with qualifying replica attachments (for which we give half of the score), we need to 1.5X the max weight
776
                maxAffinityScore += 1.5 * defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
777

1✔
778
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
779
                        podNodes, replicaNodes := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
780
                        for podNode := range podNodes {
3✔
781
                                w.Logger().Infof("podNode: %s", podNode.(string))
2✔
782
                                nodeAffinityScores[podNode.(string)] += int(weightedAffinityTerm.Weight)
2✔
783
                        }
2✔
784
                        for replicaNode := range replicaNodes {
1✔
785
                                nodeAffinityScores[replicaNode.(string)] += int(weightedAffinityTerm.Weight / 2)
×
786
                        }
×
787
                }
788
        }
789

790
        for node, affinityScore := range nodeAffinityScores {
22✔
791
                if _, ok := nodeScores[node]; !ok {
2✔
792
                        continue
×
793
                }
794
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
2✔
795
        }
796

797
        return nodeScores, nil
20✔
798
}
799

800
type scoreByInterPodAntiAffinity struct {
801
        nodes   []v1.Node
802
        pods    []v1.Pod
803
        volumes []string
804
        state   *SharedState
805
}
806

807
func (s *scoreByInterPodAntiAffinity) name() string {
40✔
808
        return "score by inter pod anti affinity"
40✔
809
}
40✔
810

811
func (s *scoreByInterPodAntiAffinity) priority() float64 {
1✔
812
        return 2
1✔
813
}
1✔
814

815
func (s *scoreByInterPodAntiAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
816
        s.nodes = nodes
20✔
817
        s.pods = pods
20✔
818
        s.volumes = volumes
20✔
819
        s.state = state
20✔
820
}
20✔
821

822
func (s *scoreByInterPodAntiAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
823
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
824
        defer w.Finish(nil)
20✔
825

20✔
826
        nodeAffinityScores := map[string]int{}
20✔
827
        var maxAffinityScore int
20✔
828

20✔
829
        for _, pod := range s.pods {
43✔
830
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
45✔
831
                        continue
22✔
832
                }
833

834
                // pod affinity weight range from 1-100
835
                maxAffinityScore += defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
836

1✔
837
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
838
                        podNodes, _ := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
839

1✔
840
                        for node := range nodeScores {
4✔
841
                                if !podNodes.has(node) {
4✔
842
                                        nodeAffinityScores[node] += int(weightedAffinityTerm.Weight)
1✔
843
                                }
1✔
844
                        }
845
                }
846
        }
847

848
        for node, affinityScore := range nodeAffinityScores {
21✔
849
                if _, ok := nodeScores[node]; !ok {
1✔
850
                        continue
×
851
                }
852
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
1✔
853
        }
854

855
        return nodeScores, nil
20✔
856
}
857

858
type scoreByPodNodeAffinity struct {
859
        nodes   []v1.Node
860
        pods    []v1.Pod
861
        volumes []string
862
        state   *SharedState
863
}
864

865
func (s *scoreByPodNodeAffinity) name() string {
40✔
866
        return "score by pod node affinity"
40✔
867
}
40✔
868

869
func (s *scoreByPodNodeAffinity) priority() float64 {
×
870
        return 2
×
871
}
×
872

873
func (s *scoreByPodNodeAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
874
        s.nodes = nodes
20✔
875
        s.pods = pods
20✔
876
        s.volumes = volumes
20✔
877
        s.state = state
20✔
878
}
20✔
879

880
func (s *scoreByPodNodeAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
881
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", s.name()))
20✔
882
        defer w.Finish(nil)
20✔
883

20✔
884
        var preferredSchedulingTerms []v1.PreferredSchedulingTerm
20✔
885

20✔
886
        for _, pod := range s.pods {
43✔
887
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
44✔
888
                        continue
21✔
889
                }
890
                preferredSchedulingTerms = append(preferredSchedulingTerms, pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
2✔
891
        }
892

893
        preferredAffinity, err := nodeaffinity.NewPreferredSchedulingTerms(preferredSchedulingTerms)
20✔
894
        if err != nil {
20✔
895
                return nodeScores, err
×
896
        }
×
897

898
        for _, node := range s.nodes {
66✔
899
                nodeScore := preferredAffinity.Score(&node)
46✔
900
                if _, ok := nodeScores[node.Name]; !ok {
58✔
901
                        continue
12✔
902
                }
903
                nodeScores[node.Name] += int(nodeScore)
34✔
904
        }
905
        return nodeScores, nil
20✔
906
}
907

908
func getSupportedZones(nodeSelectorTerms []v1.NodeSelectorTerm, topologyKey string) set {
1✔
909
        // Get the list of supported zones for pv
1✔
910
        supportedZones := set{}
1✔
911
        if len(nodeSelectorTerms) > 0 {
2✔
912
                for _, term := range nodeSelectorTerms {
2✔
913
                        if len(term.MatchExpressions) > 0 {
2✔
914
                                for _, matchExpr := range term.MatchExpressions {
2✔
915
                                        if matchExpr.Key == topologyKey {
1✔
916
                                                for _, value := range matchExpr.Values {
×
917
                                                        if value != "" && !supportedZones.has(value) {
×
918
                                                                supportedZones.add(value)
×
919
                                                        }
×
920
                                                }
921
                                        }
922
                                }
923
                        }
924
                }
925
        }
926
        return supportedZones
1✔
927
}
928

929
func markDetachRequest(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
5✔
930
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation, string(caller))
5✔
931
}
5✔
932

933
func markCleanUp(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
4✔
934
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.CleanUpAnnotation, string(caller))
4✔
935
}
4✔
936

937
func shouldCleanUp(attachment azdiskv1beta2.AzVolumeAttachment, mode azureutils.AttachmentRoleMode) bool {
1✔
938
        return mode == azureutils.AllRoles || (attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole && mode == azureutils.PrimaryOnly) || (attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole && mode == azureutils.ReplicaOnly)
1✔
939
}
1✔
940

941
func isAttached(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
942
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.PublishContext != nil
2✔
943
}
2✔
944

945
func isCreated(volume *azdiskv1beta2.AzVolume) bool {
20✔
946
        return volume != nil && volume.Status.Detail != nil
20✔
947
}
20✔
948

949
// objectDeletionRequested returns whether deletion of the specified object has been requested.
950
// If so, it will return true and a time.Duration after which to delete the object. If the
951
// duration is less than or equal to 0, the object should be deleted immediately.
952
func objectDeletionRequested(obj runtime.Object) (bool, time.Duration) {
34✔
953
        meta, _ := meta.Accessor(obj)
34✔
954
        if meta == nil {
34✔
955
                return false, time.Duration(0)
×
956
        }
×
957
        deletionTime := meta.GetDeletionTimestamp()
34✔
958

34✔
959
        if deletionTime.IsZero() {
63✔
960
                return false, time.Duration(0)
29✔
961
        }
29✔
962

963
        return true, time.Until(deletionTime.Time)
5✔
964
}
965

966
func isCleanupRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
1✔
967
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.CleanUpAnnotation)
1✔
968
}
1✔
969

970
func volumeAttachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
971
        return attachment != nil && azureutils.MapContains(attachment.Annotations, consts.VolumeAttachRequestAnnotation)
2✔
972
}
2✔
973

974
func volumeDetachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
17✔
975
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation)
17✔
976
}
17✔
977

978
func volumeDeleteRequested(volume *azdiskv1beta2.AzVolume) bool {
2✔
979
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.VolumeDeleteRequestAnnotation)
2✔
980
}
2✔
981

982
func isDemotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
4✔
983
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.PrimaryRole && attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole
4✔
984
}
4✔
985

986
func isPromotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
987
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.ReplicaRole && attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole
2✔
988
}
2✔
989

990
func isPreProvisioned(volume *azdiskv1beta2.AzVolume) bool {
3✔
991
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.PreProvisionedVolumeAnnotation)
3✔
992
}
3✔
993

994
func getQualifiedName(namespace, name string) string {
202✔
995
        return fmt.Sprintf("%s/%s", namespace, name)
202✔
996
}
202✔
997

998
func parseQualifiedName(qualifiedName string) (namespace, name string, err error) {
14✔
999
        parsed := strings.Split(qualifiedName, "/")
14✔
1000
        if len(parsed) != 2 {
14✔
1001
                err = status.Errorf(codes.Internal, "pod's qualified name (%s) should be of <namespace>/<name>", qualifiedName)
×
1002
                return
×
1003
        }
×
1004
        namespace = parsed[0]
14✔
1005
        name = parsed[1]
14✔
1006
        return
14✔
1007
}
1008

1009
func formatUpdateStateError(objectType, fromState, toState string, expectedStates ...string) string {
×
1010
        return fmt.Sprintf("%s's state '%s' cannot be updated to %s. %s can only be updated to %s", objectType, fromState, toState, fromState, strings.Join(expectedStates, ", "))
×
1011
}
×
1012

1013
func getOperationRequeueError(desired string, obj client.Object) error {
×
1014
        return status.Errorf(codes.Aborted, "requeueing %s operation because another operation is already pending on %v (%s)", desired, reflect.TypeOf(obj), obj.GetName())
×
1015
}
×
1016

1017
// reconcileReturnOnSuccess returns a reconciler result on successful reconciliation.
1018
func reconcileReturnOnSuccess(objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
16✔
1019
        retryInfo.deleteEntry(objectName)
16✔
1020
        return reconcile.Result{}, nil
16✔
1021
}
16✔
1022

1023
// reconcileReturnOnError returns a reconciler result on error that requeues the object for later processing if the error is retriable.
1024
func reconcileReturnOnError(ctx context.Context, obj runtime.Object, operationType string, err error, retryInfo *retryInfo) (reconcile.Result, error) {
1✔
1025
        var (
1✔
1026
                requeue    bool = status.Code(err) != codes.FailedPrecondition
1✔
1027
                retryAfter time.Duration
1✔
1028
        )
1✔
1029

1✔
1030
        w := workflow.GetWorkflow(ctx, obj)
1✔
1031

1✔
1032
        if meta, metaErr := meta.Accessor(obj); metaErr == nil {
2✔
1033
                objectName := meta.GetName()
1✔
1034
                objectType := reflect.TypeOf(obj)
1✔
1035
                if !requeue {
2✔
1036
                        w.Logger().Errorf(err, "failed to %s %v (%s) with no retry", operationType, objectType, objectName)
1✔
1037
                        retryInfo.deleteEntry(objectName)
1✔
1038
                } else {
1✔
1039
                        retryAfter = retryInfo.nextRequeue(objectName)
×
1040
                        w.Logger().Errorf(err, "failed to %s %v (%s) with retry after %v", operationType, objectType, objectName, retryAfter)
×
1041
                }
×
1042
        }
1043

1044
        return reconcile.Result{
1✔
1045
                Requeue:      requeue,
1✔
1046
                RequeueAfter: retryAfter,
1✔
1047
        }, nil
1✔
1048
}
1049

1050
// reconcileAfter returns a reconciler result that requeues the current object for processing after the specified time.
1051
func reconcileAfter(after time.Duration, objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
×
1052
        retryInfo.deleteEntry(objectName)
×
1053
        return reconcile.Result{Requeue: true, RequeueAfter: after}, nil
×
1054
}
×
1055

1056
func isOperationInProcess(obj interface{}) bool {
10✔
1057
        switch target := obj.(type) {
10✔
1058
        case *azdiskv1beta2.AzVolume:
4✔
1059
                return target.Status.State == azdiskv1beta2.VolumeCreating || target.Status.State == azdiskv1beta2.VolumeDeleting || target.Status.State == azdiskv1beta2.VolumeUpdating
4✔
1060
        case *azdiskv1beta2.AzVolumeAttachment:
6✔
1061
                deleteRequested, _ := objectDeletionRequested(target)
6✔
1062
                return target.Status.State == azdiskv1beta2.Attaching || (target.Status.State == azdiskv1beta2.Detaching && !deleteRequested)
6✔
1063
        }
1064
        return false
×
1065
}
1066

1067
func max(a, b int) int {
21✔
1068
        if a > b {
21✔
1069
                return a
×
1070
        }
×
1071
        return b
21✔
1072
}
1073

1074
func containsString(key string, items []string) bool {
7✔
1075
        for _, item := range items {
16✔
1076
                if item == key {
16✔
1077
                        return true
7✔
1078
                }
7✔
1079
        }
1080
        return false
×
1081
}
1082

1083
type ReplicaRequest struct {
1084
        VolumeName string
1085
        Priority   int //The number of replicas that have yet to be created
1086
}
1087
type VolumeReplicaRequestsPriorityQueue struct {
1088
        queue *cache.Heap
1089
        size  int32
1090
}
1091

1092
func (vq *VolumeReplicaRequestsPriorityQueue) Push(ctx context.Context, replicaRequest *ReplicaRequest) {
1✔
1093
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
1094
        err := vq.queue.Add(replicaRequest)
1✔
1095
        atomic.AddInt32(&vq.size, 1)
1✔
1096
        if err != nil {
1✔
1097
                w.Logger().Errorf(err, "failed to add replica request for volume %s", replicaRequest.VolumeName)
×
1098
        }
×
1099
}
1100

1101
func (vq *VolumeReplicaRequestsPriorityQueue) Pop() *ReplicaRequest {
1✔
1102
        request, _ := vq.queue.Pop()
1✔
1103
        atomic.AddInt32(&vq.size, -1)
1✔
1104
        return request.(*ReplicaRequest)
1✔
1105
}
1✔
1106
func (vq *VolumeReplicaRequestsPriorityQueue) DrainQueue() []*ReplicaRequest {
2✔
1107
        var listRequests []*ReplicaRequest
2✔
1108
        for i := vq.size; i > 0; i-- {
3✔
1109
                listRequests = append(listRequests, vq.Pop())
1✔
1110
        }
1✔
1111
        return listRequests
2✔
1112
}
1113

1114
func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
9✔
1115
        if obj == nil || objectDeleted {
11✔
1116
                return true, nil
2✔
1117
        }
2✔
1118
        return false, nil
7✔
1119
}
1120

1121
func verifyObjectFailedOrDeleted(obj interface{}, objectDeleted bool) (bool, error) {
4✔
1122
        if obj == nil || objectDeleted {
6✔
1123
                return true, nil
2✔
1124
        }
2✔
1125

1126
        switch target := obj.(type) {
2✔
1127
        case azdiskv1beta2.AzVolumeAttachment:
×
1128
                if target.Status.Error != nil {
×
1129
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1130
                }
×
1131
        case azdiskv1beta2.AzVolume:
×
1132
                // otherwise, the volume detachment has either failed with error or pending
×
1133
                if target.Status.Error != nil {
×
1134
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1135
                }
×
1136
        }
1137

1138
        return false, nil
2✔
1139
}
1140

1141
func verifyObjectPromotedOrDemoted(obj interface{}, objectDeleted bool) (bool, error) {
6✔
1142
        if obj == nil || objectDeleted {
6✔
1143
                return false, fmt.Errorf("obj is nil or has been deleted")
×
1144
        }
×
1145

1146
        azVolumeAttachmentInstance := obj.(*azdiskv1beta2.AzVolumeAttachment)
6✔
1147
        if azVolumeAttachmentInstance.Labels != nil {
12✔
1148
                if label, ok := azVolumeAttachmentInstance.Labels[consts.RoleChangeLabel]; ok {
11✔
1149
                        isPromoteUpdated := label == consts.Promoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.ReplicaRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.PrimaryRole
5✔
1150
                        isDemoteUpdated := label == consts.Demoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.PrimaryRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.ReplicaRole
5✔
1151

5✔
1152
                        if isPromoteUpdated || isDemoteUpdated {
8✔
1153
                                return true, nil
3✔
1154
                        }
3✔
1155
                }
1156
        }
1157
        return false, nil
3✔
1158
}
1159

1160
// WatchObject creates a noop controller to set up a watch and an informer for an object in controller-runtime manager
1161
// Use this function if you want to set up a watch for an object without a configuring a separate informer factory.
1162
func WatchObject(mgr manager.Manager, objKind source.Kind) error {
×
1163
        objType := objKind.Type.GetName()
×
1164
        c, err := controller.New(fmt.Sprintf("watch %s", objType), mgr, controller.Options{
×
1165
                Reconciler: &noOpReconciler{},
×
1166
        })
×
1167
        if err != nil {
×
1168
                return err
×
1169
        }
×
1170

1171
        c.GetLogger().Info("Starting to watch %s", objType)
×
1172

×
1173
        // Watch for CRUD events on objects
×
1174
        err = c.Watch(&objKind, &handler.EnqueueRequestForObject{}, predicate.Funcs{
×
1175
                CreateFunc: func(_ event.CreateEvent) bool {
×
1176
                        return false
×
1177
                },
×
1178
                UpdateFunc: func(_ event.UpdateEvent) bool {
×
1179
                        return true
×
1180
                },
×
1181
                GenericFunc: func(_ event.GenericEvent) bool {
×
1182
                        return false
×
1183
                },
×
1184
                DeleteFunc: func(_ event.DeleteEvent) bool {
×
1185
                        return false
×
1186
                },
×
1187
        })
1188
        return err
×
1189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc