• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 3598308181

02 Dec 2022 02:34AM UTC coverage: 70.266% (+25.1%) from 45.203%
3598308181

push

github

GitHub
Merge pull request #1570 from hccheng72/uninstall-path

116 of 116 new or added lines in 6 files covered. (100.0%)

7014 of 9982 relevant lines covered (70.27%)

6.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/pkg/controller/common.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "math"
25
        "reflect"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
32
        "google.golang.org/grpc/codes"
33
        "google.golang.org/grpc/status"
34
        v1 "k8s.io/api/core/v1"
35

36
        "k8s.io/apimachinery/pkg/api/meta"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
42

43
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
44
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
45
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
46
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
47

48
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
49

50
        cache "k8s.io/client-go/tools/cache"
51

52
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
53
        "sigs.k8s.io/controller-runtime/pkg/client"
54
        "sigs.k8s.io/controller-runtime/pkg/controller"
55
        "sigs.k8s.io/controller-runtime/pkg/event"
56
        "sigs.k8s.io/controller-runtime/pkg/handler"
57
        "sigs.k8s.io/controller-runtime/pkg/manager"
58
        "sigs.k8s.io/controller-runtime/pkg/predicate"
59
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
60
        "sigs.k8s.io/controller-runtime/pkg/source"
61
)
62

63
const (
64
        DefaultTimeUntilGarbageCollection = time.Duration(5) * time.Minute
65

66
        maxRetry             = 10
67
        defaultRetryDuration = time.Duration(1) * time.Second
68
        defaultRetryFactor   = 5.0
69
        defaultRetrySteps    = 5
70

71
        cloudTimeout = time.Duration(5) * time.Minute
72

73
        defaultMaxPodAffinityWeight = 100
74
)
75

76
type noOpReconciler struct{}
77

78
func (n *noOpReconciler) Reconcile(_ context.Context, _ reconcile.Request) (reconcile.Result, error) {
×
79
        return reconcile.Result{}, nil
×
80
}
×
81

82
type operationRequester string
83

84
const (
85
        azdrivernode     operationRequester = "azdrivernode-controller"
86
        azvolume         operationRequester = "azvolume-controller"
87
        pv               operationRequester = "pv-controller"
88
        replica          operationRequester = "replica-controller"
89
        nodeavailability operationRequester = "nodeavailability-controller"
90
        pod              operationRequester = "pod-controller"
91
)
92

93
type attachmentCleanUpMode int
94

95
const (
96
        cleanUpAttachmentForUninstall attachmentCleanUpMode = iota
97
        cleanUpAttachment
98
)
99

100
type updateMode int
101

102
const (
103
        normalUpdate updateMode = iota
104
        forceUpdate
105
)
106

107
type updateWithLock bool
108

109
const (
110
        acquireLock updateWithLock = true
111
        skipLock    updateWithLock = false
112
)
113

114
type goSignal struct{}
115

116
// TODO Make CloudProvisioner independent of csi types.
117
type CloudProvisioner interface {
118
        CreateVolume(
119
                ctx context.Context,
120
                volumeName string,
121
                capacityRange *azdiskv1beta2.CapacityRange,
122
                volumeCapabilities []azdiskv1beta2.VolumeCapability,
123
                parameters map[string]string,
124
                secrets map[string]string,
125
                volumeContentSource *azdiskv1beta2.ContentVolumeSource,
126
                accessibilityTopology *azdiskv1beta2.TopologyRequirement) (*azdiskv1beta2.AzVolumeStatusDetail, error)
127
        DeleteVolume(ctx context.Context, volumeID string, secrets map[string]string) error
128
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
129
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
130
        ExpandVolume(ctx context.Context, volumeID string, capacityRange *azdiskv1beta2.CapacityRange, secrets map[string]string) (*azdiskv1beta2.AzVolumeStatusDetail, error)
131
        ListVolumes(ctx context.Context, maxEntries int32, startingToken string) (*azdiskv1beta2.ListVolumesResult, error)
132
        CreateSnapshot(ctx context.Context, sourceVolumeID string, snapshotName string, secrets map[string]string, parameters map[string]string) (*azdiskv1beta2.Snapshot, error)
133
        ListSnapshots(ctx context.Context, maxEntries int32, startingToken string, sourceVolumeID string, snapshotID string, secrets map[string]string) (*azdiskv1beta2.ListSnapshotsResult, error)
134
        DeleteSnapshot(ctx context.Context, snapshotID string, secrets map[string]string) error
135
        CheckDiskExists(ctx context.Context, diskURI string) (*compute.Disk, error)
136
        GetCloud() *provider.Cloud
137
        GetMetricPrefix() string
138
}
139

140
type replicaOperation struct {
141
        ctx                        context.Context
142
        requester                  operationRequester
143
        operationFunc              func(context.Context) error
144
        isReplicaGarbageCollection bool
145
}
146

147
type operationQueue struct {
148
        *list.List
149
        gcExclusionList set
150
        isActive        bool
151
}
152

153
func (q *operationQueue) remove(element *list.Element) {
20✔
154
        // operationQueue might have been cleared before the lock was acquired
20✔
155
        // so always check if the list is empty or not before removing object from the queue, otherwise it would set the underlying length of the queue to be < 0, causing issues
20✔
156
        if q.Front() != nil {
37✔
157
                _ = q.Remove(element)
17✔
158
        }
17✔
159
}
160

161
func newOperationQueue() *operationQueue {
59✔
162
        return &operationQueue{
59✔
163
                gcExclusionList: set{},
59✔
164
                List:            list.New(),
59✔
165
                isActive:        true,
59✔
166
        }
59✔
167
}
59✔
168

169
type retryInfoEntry struct {
170
        backoff   *wait.Backoff
171
        retryLock *sync.Mutex
172
}
173

174
type retryInfo struct {
175
        retryMap *sync.Map
176
}
177

178
func newRetryInfo() *retryInfo {
25✔
179
        return &retryInfo{
25✔
180
                retryMap: &sync.Map{},
25✔
181
        }
25✔
182
}
25✔
183

184
func newRetryEntry() *retryInfoEntry {
×
185
        return &retryInfoEntry{
×
186
                retryLock: &sync.Mutex{},
×
187
                backoff:   &wait.Backoff{Duration: defaultRetryDuration, Factor: defaultRetryFactor, Steps: defaultRetrySteps},
×
188
        }
×
189
}
×
190

191
func (r *retryInfo) nextRequeue(objectName string) time.Duration {
×
192
        v, _ := r.retryMap.LoadOrStore(objectName, newRetryEntry())
×
193
        entry := v.(*retryInfoEntry)
×
194
        entry.retryLock.Lock()
×
195
        defer entry.retryLock.Unlock()
×
196
        return entry.backoff.Step()
×
197
}
×
198

199
func (r *retryInfo) deleteEntry(objectName string) {
18✔
200
        r.retryMap.Delete(objectName)
18✔
201
}
18✔
202

203
type emptyType struct{}
204

205
type set map[interface{}]emptyType
206

207
func (s set) add(entry interface{}) {
452✔
208
        s[entry] = emptyType{}
452✔
209
}
452✔
210

211
func (s set) has(entry interface{}) bool {
261✔
212
        _, ok := s[entry]
261✔
213
        return ok
261✔
214
}
261✔
215

216
func (s set) remove(entry interface{}) {
11✔
217
        delete(s, entry)
11✔
218
}
11✔
219

220
func (s set) toStringSlice() []string {
4✔
221
        entries := make([]string, len(s))
4✔
222
        i := 0
4✔
223
        for entry := range s {
8✔
224
                entries[i] = entry.(string)
4✔
225
                i++
4✔
226
        }
4✔
227
        return entries
4✔
228
}
229

230
type lockableEntry struct {
231
        sync.RWMutex
232
        entry interface{}
233
}
234

235
func newLockableEntry(entry interface{}) *lockableEntry {
114✔
236
        return &lockableEntry{
114✔
237
                RWMutex: sync.RWMutex{},
114✔
238
                entry:   entry,
114✔
239
        }
114✔
240
}
114✔
241

242
func shouldRequeueReplicaOperation(isReplicaGarbageCollection bool, err error) bool {
17✔
243
        return !isReplicaGarbageCollection || !errors.Is(err, context.Canceled)
17✔
244
}
17✔
245

246
type filterPlugin interface {
247
        name() string
248
        setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState)
249
        filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error)
250
}
251

252
// interPodAffinityFilter selects nodes that either meets inter-pod affinity rules or has replica mounts of volumes of pods with matching labels
253
type interPodAffinityFilter struct {
254
        pods  []v1.Pod
255
        state *SharedState
256
}
257

258
func (p *interPodAffinityFilter) name() string {
78✔
259
        return "inter-pod affinity filter"
78✔
260
}
78✔
261

262
func (p *interPodAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
263
        p.pods = pods
26✔
264
        p.state = state
26✔
265
}
26✔
266

267
func (p *interPodAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
268
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
269
        defer w.Finish(nil)
26✔
270
        nodeMap := map[string]int{}
26✔
271
        qualifyingNodes := set{}
26✔
272

26✔
273
        for i, node := range nodes {
91✔
274
                nodeMap[node.Name] = i
65✔
275
                qualifyingNodes.add(node.Name)
65✔
276
        }
65✔
277

278
        isFirst := true
26✔
279
        for _, pod := range p.pods {
54✔
280
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
54✔
281
                        continue
26✔
282
                }
283
                for _, affinityTerm := range pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
4✔
284
                        podNodes, replicaNodes := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
2✔
285
                        if isFirst {
4✔
286
                                qualifyingNodes = set{}
2✔
287
                                for podNode := range podNodes {
4✔
288
                                        qualifyingNodes.add(podNode)
2✔
289
                                }
2✔
290
                                for replicaNode := range replicaNodes {
2✔
291
                                        qualifyingNodes.add(replicaNode)
×
292
                                }
×
293
                                isFirst = false
2✔
294
                        } else {
×
295
                                for qualifyingNode := range qualifyingNodes {
×
296
                                        if !podNodes.has(qualifyingNode) && !replicaNodes.has(qualifyingNode) {
×
297
                                                qualifyingNodes.remove(qualifyingNode)
×
298
                                        }
×
299
                                }
300
                        }
301
                }
302
        }
303

304
        var filteredNodes []v1.Node
26✔
305
        for qualifyingNode := range qualifyingNodes {
87✔
306
                if i, exists := nodeMap[qualifyingNode.(string)]; exists {
122✔
307
                        filteredNodes = append(filteredNodes, nodes[i])
61✔
308
                }
61✔
309
        }
310
        // Logging purpose
311
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
312
        i := 0
26✔
313
        for _, node := range nodes {
91✔
314
                if !qualifyingNodes.has(node.Name) {
69✔
315
                        evictedNodes[i] = node.Name
4✔
316
                        i++
4✔
317
                }
4✔
318
        }
319
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
320

26✔
321
        return filteredNodes, nil
26✔
322
}
323

324
type interPodAntiAffinityFilter struct {
325
        pods  []v1.Pod
326
        state *SharedState
327
}
328

329
func (p *interPodAntiAffinityFilter) name() string {
78✔
330
        return "inter-pod anti-affinity filter"
78✔
331
}
78✔
332

333
func (p *interPodAntiAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
334
        p.pods = pods
26✔
335
        p.state = state
26✔
336
}
26✔
337

338
func (p *interPodAntiAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
339
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
340
        defer w.Finish(nil)
26✔
341
        nodeMap := map[string]int{}
26✔
342
        candidateNodes := set{}
26✔
343

26✔
344
        for i, node := range nodes {
87✔
345
                nodeMap[node.Name] = i
61✔
346
                candidateNodes.add(node.Name)
61✔
347
        }
61✔
348

349
        qualifyingNodes := set{}
26✔
350
        isFirst := true
26✔
351
        for _, pod := range p.pods {
54✔
352
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
55✔
353
                        continue
27✔
354
                }
355
                for _, affinityTerm := range pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
2✔
356
                        podNodes, _ := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
1✔
357
                        if isFirst {
2✔
358
                                for podNode := range podNodes {
3✔
359
                                        qualifyingNodes.add(podNode)
2✔
360
                                }
2✔
361
                                isFirst = false
1✔
362
                        } else {
×
363
                                for qualifyingNode := range qualifyingNodes {
×
364
                                        if !podNodes.has(qualifyingNode) {
×
365
                                                qualifyingNodes.remove(qualifyingNode)
×
366
                                        }
×
367
                                }
368
                        }
369
                }
370
        }
371

372
        var filteredNodes []v1.Node
26✔
373
        for candidateNode := range candidateNodes {
87✔
374
                if !qualifyingNodes.has(candidateNode) {
120✔
375
                        if i, exists := nodeMap[candidateNode.(string)]; exists {
118✔
376
                                filteredNodes = append(filteredNodes, nodes[i])
59✔
377
                        }
59✔
378
                }
379
        }
380

381
        // Logging purpose
382
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
383
        i := 0
26✔
384
        for _, node := range nodes {
87✔
385
                if qualifyingNodes.has(node.Name) {
63✔
386
                        evictedNodes[i] = node.Name
2✔
387
                        i++
2✔
388
                }
2✔
389
        }
390
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
391

26✔
392
        return filteredNodes, nil
26✔
393
}
394

395
type podTolerationFilter struct {
396
        pods []v1.Pod
397
}
398

399
func (p *podTolerationFilter) name() string {
52✔
400
        return "pod toleration filter"
52✔
401
}
52✔
402

403
func (p *podTolerationFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
404
        p.pods = pods
26✔
405
}
26✔
406

407
func (p *podTolerationFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
408
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
409
        defer w.Finish(nil)
26✔
410
        candidateNodes := set{}
26✔
411
        for i := range nodes {
85✔
412
                candidateNodes.add(i)
59✔
413
        }
59✔
414

415
        podTolerations := set{}
26✔
416

26✔
417
        for i, pod := range p.pods {
54✔
418
                podTolerationMap := map[string]*v1.Toleration{}
28✔
419
                for _, podToleration := range pod.Spec.Tolerations {
29✔
420
                        podToleration := &podToleration
1✔
421
                        if i == 0 {
2✔
422
                                podTolerations.add(podToleration)
1✔
423
                        } else {
1✔
424
                                podTolerationMap[podToleration.Key] = podToleration
×
425
                        }
×
426
                }
427
                if i > 0 {
30✔
428
                        for podToleration := range podTolerations {
2✔
429
                                if existingToleration, ok := podTolerationMap[podToleration.(v1.Toleration).Key]; ok {
×
430
                                        if !podToleration.(*v1.Toleration).MatchToleration(existingToleration) {
×
431
                                                podTolerations.remove(podToleration)
×
432
                                        }
×
433
                                }
434
                        }
435
                }
436
        }
437

438
        for candidateNode := range candidateNodes {
85✔
439
                tolerable := true
59✔
440
                node := nodes[candidateNode.(int)]
59✔
441
                for _, taint := range node.Spec.Taints {
61✔
442
                        taintTolerable := false
2✔
443
                        for podToleration := range podTolerations {
3✔
444
                                // if any one of node's taint cannot be tolerated by pod's tolerations, break
1✔
445
                                if podToleration.(*v1.Toleration).ToleratesTaint(&taint) {
2✔
446
                                        taintTolerable = true
1✔
447
                                }
1✔
448
                        }
449
                        if tolerable = tolerable && taintTolerable; !tolerable {
3✔
450
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node (%s)'s taint cannot be tolerated", node.Name, node.Name)
1✔
451
                                candidateNodes.remove(candidateNode)
1✔
452
                                break
1✔
453
                        }
454
                }
455
        }
456

457
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
458
        i := 0
26✔
459
        for candidateNode := range candidateNodes {
84✔
460
                filteredNodes[i] = nodes[candidateNode.(int)]
58✔
461
                i++
58✔
462
        }
58✔
463
        return filteredNodes, nil
26✔
464
}
465

466
type podNodeAffinityFilter struct {
467
        pods []v1.Pod
468
}
469

470
func (p *podNodeAffinityFilter) name() string {
52✔
471
        return "pod node-affinity filter"
52✔
472
}
52✔
473

474
func (p *podNodeAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
475
        p.pods = pods
26✔
476
}
26✔
477

478
func (p *podNodeAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
479
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
480
        defer w.Finish(nil)
26✔
481
        var podNodeAffinities []nodeaffinity.RequiredNodeAffinity
26✔
482

26✔
483
        candidateNodes := set{}
26✔
484
        for i := range nodes {
84✔
485
                candidateNodes.add(i)
58✔
486
        }
58✔
487

488
        for _, pod := range p.pods {
54✔
489
                // acknowledge that there can be duplicate entries within the slice
28✔
490
                podNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(&pod)
28✔
491
                podNodeAffinities = append(podNodeAffinities, podNodeAffinity)
28✔
492
        }
28✔
493

494
        for i, node := range nodes {
84✔
495
                for _, podNodeAffinity := range podNodeAffinities {
121✔
496
                        if match, err := podNodeAffinity.Match(&node); !match || err != nil {
68✔
497
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node does not match pod node affinity (%+v)", node.Name, podNodeAffinity)
5✔
498
                                candidateNodes.remove(i)
5✔
499
                        }
5✔
500
                }
501
        }
502

503
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
504
        i := 0
26✔
505
        for candidateNode := range candidateNodes {
79✔
506
                filteredNodes[i] = nodes[candidateNode.(int)]
53✔
507
                i++
53✔
508
        }
53✔
509
        return filteredNodes, nil
26✔
510
}
511

512
type podNodeSelectorFilter struct {
513
        pods  []v1.Pod
514
        state *SharedState
515
}
516

517
func (p *podNodeSelectorFilter) name() string {
52✔
518
        return "pod node-selector filter"
52✔
519
}
52✔
520

521
func (p *podNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
522
        p.pods = pods
26✔
523
        p.state = state
26✔
524
}
26✔
525

526
func (p *podNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
527
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
528
        defer w.Finish(nil)
26✔
529
        candidateNodes := set{}
26✔
530
        for i := range nodes {
79✔
531
                candidateNodes.add(i)
53✔
532
        }
53✔
533

534
        podNodeSelector := labels.NewSelector()
26✔
535
        for _, pod := range p.pods {
54✔
536
                nodeSelector := labels.SelectorFromSet(labels.Set(pod.Spec.NodeSelector))
28✔
537
                requirements, selectable := nodeSelector.Requirements()
28✔
538
                if selectable {
56✔
539
                        podNodeSelector = podNodeSelector.Add(requirements...)
28✔
540
                }
28✔
541
        }
542

543
        filteredNodes := []v1.Node{}
26✔
544
        for candidateNode := range candidateNodes {
79✔
545
                node := nodes[candidateNode.(int)]
53✔
546
                nodeLabels := labels.Set(node.Labels)
53✔
547
                if podNodeSelector.Matches(nodeLabels) {
106✔
548
                        filteredNodes = append(filteredNodes, node)
53✔
549
                } else {
53✔
550
                        w.Logger().V(5).Infof("Removing node (%s) from replica candidate: node does not match pod node selector (%v)", node.Name, podNodeSelector)
×
551
                }
×
552
        }
553

554
        return filteredNodes, nil
26✔
555
}
556

557
type volumeNodeSelectorFilter struct {
558
        persistentVolumes []*v1.PersistentVolume
559
}
560

561
func (v *volumeNodeSelectorFilter) name() string {
52✔
562
        return "volume node-selector filter"
52✔
563
}
52✔
564

565
func (v *volumeNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
566
        v.persistentVolumes = persistentVolumes
26✔
567
}
26✔
568

569
func (v *volumeNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
570
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", v.name()))
26✔
571
        defer w.Finish(nil)
26✔
572
        candidateNodes := set{}
26✔
573
        for i := range nodes {
79✔
574
                candidateNodes.add(i)
53✔
575
        }
53✔
576

577
        var volumeNodeSelectors []*nodeaffinity.NodeSelector
26✔
578
        for _, pv := range v.persistentVolumes {
60✔
579
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
65✔
580
                        continue
31✔
581
                }
582
                nodeSelector, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
3✔
583
                if err != nil {
3✔
584
                        w.Logger().Errorf(err, "failed to get node selector from node affinity (%v)", pv.Spec.NodeAffinity.Required)
×
585
                        continue
×
586
                }
587
                // acknowledge that there can be duplicates in the slice
588
                volumeNodeSelectors = append(volumeNodeSelectors, nodeSelector)
3✔
589
        }
590

591
        for candidateNode := range candidateNodes {
79✔
592
                node := nodes[candidateNode.(int)]
53✔
593
                for _, volumeNodeSelector := range volumeNodeSelectors {
61✔
594
                        if !volumeNodeSelector.Match(&node) {
12✔
595
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: volume node selector (%+v) cannot be matched with the node.", node.Name, volumeNodeSelector)
4✔
596
                                candidateNodes.remove(candidateNode)
4✔
597
                        }
4✔
598
                }
599
        }
600

601
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
602
        i := 0
26✔
603
        for candidateNode := range candidateNodes {
75✔
604
                filteredNodes[i] = nodes[candidateNode.(int)]
49✔
605
                i++
49✔
606
        }
49✔
607
        return filteredNodes, nil
26✔
608
}
609

610
type nodeScorerPlugin interface {
611
        name() string
612
        setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState)
613
        priority() float64 // returns the score plugin's priority in a scale of 1 ~ 5 (5 being the highest priority)
614
        score(ctx context.Context, nodeScores map[string]int) (map[string]int, error)
615
}
616

617
type scoreByNodeCapacity struct {
618
        nodes   []v1.Node
619
        volumes []string
620
        state   *SharedState
621
}
622

623
func (s *scoreByNodeCapacity) name() string {
40✔
624
        return "score by node capacity"
40✔
625
}
40✔
626

627
func (s *scoreByNodeCapacity) priority() float64 {
46✔
628
        return 1
46✔
629
}
46✔
630

631
func (s *scoreByNodeCapacity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
632
        s.nodes = nodes
20✔
633
        s.volumes = volumes
20✔
634
        s.state = state
20✔
635
}
20✔
636

637
func (s *scoreByNodeCapacity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
638
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
639
        defer w.Finish(nil)
20✔
640
        for _, node := range s.nodes {
66✔
641
                if _, ok := nodeScores[node.Name]; !ok {
46✔
642
                        continue
×
643
                }
644
                maxCapacity, err := azureutils.GetNodeMaxDiskCountWithLabels(node.Labels)
46✔
645
                if err != nil {
46✔
646
                        w.Logger().Errorf(err, "failed to get max capacity of node (%s)", node.Name)
×
647
                        delete(nodeScores, node.Name)
×
648
                        continue
×
649
                }
650
                remainingCapacity, err := azureutils.GetNodeRemainingDiskCount(ctx, s.state.cachedClient, node.Name)
46✔
651
                if err != nil {
46✔
652
                        // if failed to get node's remaining capacity, remove the node from the candidate list and proceed
×
653
                        w.Logger().Errorf(err, "failed to get remaining capacity of node (%s)", node.Name)
×
654
                        delete(nodeScores, node.Name)
×
655
                        continue
×
656
                }
657

658
                nodeScores[node.Name] += int((float64(remainingCapacity) / float64(maxCapacity)) * math.Pow(10, s.priority()))
46✔
659

46✔
660
                if remainingCapacity-len(s.volumes) < 0 {
48✔
661
                        delete(nodeScores, node.Name)
2✔
662
                }
2✔
663

664
                w.Logger().V(10).Infof("node (%s) can accept %d more attachments", node.Name, remainingCapacity)
46✔
665
        }
666
        return nodeScores, nil
20✔
667
}
668

669
type scoreByReplicaCount struct {
670
        volumes []string
671
        state   *SharedState
672
}
673

674
func (s *scoreByReplicaCount) name() string {
40✔
675
        return "score by replica count"
40✔
676
}
40✔
677

678
func (s *scoreByReplicaCount) priority() float64 {
6✔
679
        return 3
6✔
680
}
6✔
681

682
func (s *scoreByReplicaCount) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
683
        s.volumes = volumes
20✔
684
        s.state = state
20✔
685
}
20✔
686

687
func (s *scoreByReplicaCount) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
688
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
689
        defer w.Finish(nil)
20✔
690

20✔
691
        var requestedReplicaCount int
20✔
692

20✔
693
        nodeReplicaCounts := map[string]int{}
20✔
694

20✔
695
        for _, volume := range s.volumes {
48✔
696
                azVolume, err := azureutils.GetAzVolume(ctx, s.state.cachedClient, nil, volume, s.state.config.ObjectNamespace, true)
28✔
697
                if err != nil {
28✔
698
                        w.Logger().Errorf(err, "failed to get AzVolume (%s)", volume)
×
699
                        continue
×
700
                }
701
                requestedReplicaCount += azVolume.Spec.MaxMountReplicaCount
28✔
702
                azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, s.state.cachedClient, volume, azureutils.AllRoles)
28✔
703
                if err != nil {
28✔
704
                        w.Logger().V(5).Errorf(err, "failed listing AzVolumeAttachments for azvolume %s", volume)
×
705
                        continue
×
706
                }
707

708
                for _, azVolumeAttachment := range azVolumeAttachments {
47✔
709
                        if _, exists := nodeScores[azVolumeAttachment.Spec.NodeName]; exists {
30✔
710
                                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
19✔
711
                                        delete(nodeScores, azVolumeAttachment.Spec.NodeName)
8✔
712
                                } else {
11✔
713
                                        nodeReplicaCounts[azVolumeAttachment.Spec.NodeName]++
3✔
714
                                }
3✔
715
                        }
716
                }
717

718
                if requestedReplicaCount > 0 {
56✔
719
                        for nodeName, replicaCount := range nodeReplicaCounts {
34✔
720
                                if _, ok := nodeScores[nodeName]; !ok {
6✔
721
                                        continue
×
722
                                }
723
                                nodeScores[nodeName] += int((float64(replicaCount) / float64(requestedReplicaCount)) * math.Pow(10, s.priority()))
6✔
724
                        }
725
                }
726
        }
727
        return nodeScores, nil
20✔
728
}
729

730
type scoreByInterPodAffinity struct {
731
        nodes   []v1.Node
732
        pods    []v1.Pod
733
        volumes []string
734
        state   *SharedState
735
}
736

737
func (s *scoreByInterPodAffinity) name() string {
40✔
738
        return "score by inter pod affinity"
40✔
739
}
40✔
740

741
func (s *scoreByInterPodAffinity) priority() float64 {
2✔
742
        return 2
2✔
743
}
2✔
744

745
func (s *scoreByInterPodAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
746
        s.nodes = nodes
20✔
747
        s.pods = pods
20✔
748
        s.volumes = volumes
20✔
749
        s.state = state
20✔
750
}
20✔
751

752
func (s *scoreByInterPodAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
753
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
754
        defer w.Finish(nil)
20✔
755

20✔
756
        nodeAffinityScores := map[string]int{}
20✔
757
        var maxAffinityScore int
20✔
758

20✔
759
        for _, pod := range s.pods {
43✔
760
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
45✔
761
                        continue
22✔
762
                }
763

764
                // pod affinity weight range from 1-100
765
                // and as a node can both be a part of 1) nodes that satisfy pod affinity rule and 2) nodes with qualifying replica attachments (for which we give half of the score), we need to 1.5X the max weight
766
                maxAffinityScore += 1.5 * defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
767

1✔
768
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
769
                        podNodes, replicaNodes := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
770
                        for podNode := range podNodes {
3✔
771
                                w.Logger().Infof("podNode: %s", podNode.(string))
2✔
772
                                nodeAffinityScores[podNode.(string)] += int(weightedAffinityTerm.Weight)
2✔
773
                        }
2✔
774
                        for replicaNode := range replicaNodes {
1✔
775
                                nodeAffinityScores[replicaNode.(string)] += int(weightedAffinityTerm.Weight / 2)
×
776
                        }
×
777
                }
778
        }
779

780
        for node, affinityScore := range nodeAffinityScores {
22✔
781
                if _, ok := nodeScores[node]; !ok {
2✔
782
                        continue
×
783
                }
784
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
2✔
785
        }
786

787
        return nodeScores, nil
20✔
788
}
789

790
type scoreByInterPodAntiAffinity struct {
791
        nodes   []v1.Node
792
        pods    []v1.Pod
793
        volumes []string
794
        state   *SharedState
795
}
796

797
func (s *scoreByInterPodAntiAffinity) name() string {
40✔
798
        return "score by inter pod anti affinity"
40✔
799
}
40✔
800

801
func (s *scoreByInterPodAntiAffinity) priority() float64 {
1✔
802
        return 2
1✔
803
}
1✔
804

805
func (s *scoreByInterPodAntiAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
806
        s.nodes = nodes
20✔
807
        s.pods = pods
20✔
808
        s.volumes = volumes
20✔
809
        s.state = state
20✔
810
}
20✔
811

812
func (s *scoreByInterPodAntiAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
813
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
814
        defer w.Finish(nil)
20✔
815

20✔
816
        nodeAffinityScores := map[string]int{}
20✔
817
        var maxAffinityScore int
20✔
818

20✔
819
        for _, pod := range s.pods {
43✔
820
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
45✔
821
                        continue
22✔
822
                }
823

824
                // pod affinity weight range from 1-100
825
                maxAffinityScore += defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
826

1✔
827
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
828
                        podNodes, _ := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
829

1✔
830
                        for node := range nodeScores {
4✔
831
                                if !podNodes.has(node) {
4✔
832
                                        nodeAffinityScores[node] += int(weightedAffinityTerm.Weight)
1✔
833
                                }
1✔
834
                        }
835
                }
836
        }
837

838
        for node, affinityScore := range nodeAffinityScores {
21✔
839
                if _, ok := nodeScores[node]; !ok {
1✔
840
                        continue
×
841
                }
842
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
1✔
843
        }
844

845
        return nodeScores, nil
20✔
846
}
847

848
type scoreByPodNodeAffinity struct {
849
        nodes   []v1.Node
850
        pods    []v1.Pod
851
        volumes []string
852
        state   *SharedState
853
}
854

855
func (s *scoreByPodNodeAffinity) name() string {
40✔
856
        return "score by pod node affinity"
40✔
857
}
40✔
858

859
func (s *scoreByPodNodeAffinity) priority() float64 {
×
860
        return 2
×
861
}
×
862

863
func (s *scoreByPodNodeAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
864
        s.nodes = nodes
20✔
865
        s.pods = pods
20✔
866
        s.volumes = volumes
20✔
867
        s.state = state
20✔
868
}
20✔
869

870
func (s *scoreByPodNodeAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
871
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", s.name()))
20✔
872
        defer w.Finish(nil)
20✔
873

20✔
874
        var preferredSchedulingTerms []v1.PreferredSchedulingTerm
20✔
875

20✔
876
        for _, pod := range s.pods {
43✔
877
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
44✔
878
                        continue
21✔
879
                }
880
                preferredSchedulingTerms = append(preferredSchedulingTerms, pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
2✔
881
        }
882

883
        preferredAffinity, err := nodeaffinity.NewPreferredSchedulingTerms(preferredSchedulingTerms)
20✔
884
        if err != nil {
20✔
885
                return nodeScores, err
×
886
        }
×
887

888
        for _, node := range s.nodes {
66✔
889
                nodeScore := preferredAffinity.Score(&node)
46✔
890
                if _, ok := nodeScores[node.Name]; !ok {
56✔
891
                        continue
10✔
892
                }
893
                nodeScores[node.Name] += int(nodeScore)
36✔
894
        }
895
        return nodeScores, nil
20✔
896
}
897

898
func getSupportedZones(nodeSelectorTerms []v1.NodeSelectorTerm, topologyKey string) set {
1✔
899
        // Get the list of supported zones for pv
1✔
900
        supportedZones := set{}
1✔
901
        if len(nodeSelectorTerms) > 0 {
2✔
902
                for _, term := range nodeSelectorTerms {
2✔
903
                        if len(term.MatchExpressions) > 0 {
2✔
904
                                for _, matchExpr := range term.MatchExpressions {
2✔
905
                                        if matchExpr.Key == topologyKey {
1✔
906
                                                for _, value := range matchExpr.Values {
×
907
                                                        if value != "" && !supportedZones.has(value) {
×
908
                                                                supportedZones.add(value)
×
909
                                                        }
×
910
                                                }
911
                                        }
912
                                }
913
                        }
914
                }
915
        }
916
        return supportedZones
1✔
917
}
918

919
func shouldCleanUp(attachment azdiskv1beta2.AzVolumeAttachment, mode azureutils.AttachmentRoleMode) bool {
1✔
920
        return mode == azureutils.AllRoles || (attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole && mode == azureutils.PrimaryOnly) || (attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole && mode == azureutils.ReplicaOnly)
1✔
921
}
1✔
922

923
func isAttached(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
924
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.PublishContext != nil
2✔
925
}
2✔
926

927
func isCreated(volume *azdiskv1beta2.AzVolume) bool {
20✔
928
        return volume != nil && volume.Status.Detail != nil
20✔
929
}
20✔
930

931
// objectDeletionRequested returns whether deletion of the specified object has been requested.
932
// If so, it will return true and a time.Duration after which to delete the object. If the
933
// duration is less than or equal to 0, the object should be deleted immediately.
934
func objectDeletionRequested(obj runtime.Object) (bool, time.Duration) {
33✔
935
        meta, _ := meta.Accessor(obj)
33✔
936
        if meta == nil {
33✔
937
                return false, time.Duration(0)
×
938
        }
×
939
        deletionTime := meta.GetDeletionTimestamp()
33✔
940

33✔
941
        if deletionTime.IsZero() {
60✔
942
                return false, time.Duration(0)
27✔
943
        }
27✔
944

945
        return true, time.Until(deletionTime.Time)
6✔
946
}
947

948
func isCleanupRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
949
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.CleanUpAnnotation)
2✔
950
}
2✔
951

952
func volumeAttachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
953
        return attachment != nil && azureutils.MapContains(attachment.Annotations, consts.VolumeAttachRequestAnnotation)
2✔
954
}
2✔
955

956
func volumeDetachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
12✔
957
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation)
12✔
958
}
12✔
959

960
func volumeDeleteRequested(volume *azdiskv1beta2.AzVolume) bool {
2✔
961
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.VolumeDeleteRequestAnnotation)
2✔
962
}
2✔
963

964
func isDemotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
4✔
965
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.PrimaryRole && attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole
4✔
966
}
4✔
967

968
func isPreProvisioned(volume *azdiskv1beta2.AzVolume) bool {
2✔
969
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.PreProvisionedVolumeAnnotation)
2✔
970
}
2✔
971

972
func getQualifiedName(namespace, name string) string {
202✔
973
        return fmt.Sprintf("%s/%s", namespace, name)
202✔
974
}
202✔
975

976
func parseQualifiedName(qualifiedName string) (namespace, name string, err error) {
14✔
977
        parsed := strings.Split(qualifiedName, "/")
14✔
978
        if len(parsed) != 2 {
14✔
979
                err = status.Errorf(codes.Internal, "pod's qualified name (%s) should be of <namespace>/<name>", qualifiedName)
×
980
                return
×
981
        }
×
982
        namespace = parsed[0]
14✔
983
        name = parsed[1]
14✔
984
        return
14✔
985
}
986

987
func formatUpdateStateError(objectType, fromState, toState string, expectedStates ...string) string {
×
988
        return fmt.Sprintf("%s's state '%s' cannot be updated to %s. %s can only be updated to %s", objectType, fromState, toState, fromState, strings.Join(expectedStates, ", "))
×
989
}
×
990

991
func getOperationRequeueError(desired string, obj client.Object) error {
×
992
        return status.Errorf(codes.Aborted, "requeueing %s operation because another operation is already pending on %v (%s)", desired, reflect.TypeOf(obj), obj.GetName())
×
993
}
×
994

995
// reconcileReturnOnSuccess returns a reconciler result on successful reconciliation.
996
func reconcileReturnOnSuccess(objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
16✔
997
        retryInfo.deleteEntry(objectName)
16✔
998
        return reconcile.Result{}, nil
16✔
999
}
16✔
1000

1001
// reconcileReturnOnError returns a reconciler result on error that requeues the object for later processing if the error is retriable.
1002
func reconcileReturnOnError(ctx context.Context, obj runtime.Object, operationType string, err error, retryInfo *retryInfo) (reconcile.Result, error) {
1✔
1003
        var (
1✔
1004
                requeue    bool = status.Code(err) != codes.FailedPrecondition
1✔
1005
                retryAfter time.Duration
1✔
1006
        )
1✔
1007

1✔
1008
        w := workflow.GetWorkflow(ctx, obj)
1✔
1009

1✔
1010
        if meta, metaErr := meta.Accessor(obj); metaErr == nil {
2✔
1011
                objectName := meta.GetName()
1✔
1012
                objectType := reflect.TypeOf(obj)
1✔
1013
                if !requeue {
2✔
1014
                        w.Logger().Errorf(err, "failed to %s %v (%s) with no retry", operationType, objectType, objectName)
1✔
1015
                        retryInfo.deleteEntry(objectName)
1✔
1016
                } else {
1✔
1017
                        retryAfter = retryInfo.nextRequeue(objectName)
×
1018
                        w.Logger().Errorf(err, "failed to %s %v (%s) with retry after %v", operationType, objectType, objectName, retryAfter)
×
1019
                }
×
1020
        }
1021

1022
        return reconcile.Result{
1✔
1023
                Requeue:      requeue,
1✔
1024
                RequeueAfter: retryAfter,
1✔
1025
        }, nil
1✔
1026
}
1027

1028
// reconcileAfter returns a reconciler result that requeues the current object for processing after the specified time.
1029
func reconcileAfter(after time.Duration, objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
×
1030
        retryInfo.deleteEntry(objectName)
×
1031
        return reconcile.Result{Requeue: true, RequeueAfter: after}, nil
×
1032
}
×
1033

1034
func isOperationInProcess(obj interface{}) bool {
10✔
1035
        switch target := obj.(type) {
10✔
1036
        case *azdiskv1beta2.AzVolume:
4✔
1037
                return target.Status.State == azdiskv1beta2.VolumeCreating || target.Status.State == azdiskv1beta2.VolumeDeleting || target.Status.State == azdiskv1beta2.VolumeUpdating
4✔
1038
        case *azdiskv1beta2.AzVolumeAttachment:
6✔
1039
                deleteRequested, _ := objectDeletionRequested(target)
6✔
1040
                return target.Status.State == azdiskv1beta2.Attaching || (target.Status.State == azdiskv1beta2.Detaching && !deleteRequested)
6✔
1041
        }
1042
        return false
×
1043
}
1044

1045
func max(a, b int) int {
21✔
1046
        if a > b {
21✔
1047
                return a
×
1048
        }
×
1049
        return b
21✔
1050
}
1051

1052
func containsString(key string, items []string) bool {
7✔
1053
        for _, item := range items {
16✔
1054
                if item == key {
16✔
1055
                        return true
7✔
1056
                }
7✔
1057
        }
1058
        return false
×
1059
}
1060

1061
type ReplicaRequest struct {
1062
        VolumeName string
1063
        Priority   int //The number of replicas that have yet to be created
1064
}
1065
type VolumeReplicaRequestsPriorityQueue struct {
1066
        queue *cache.Heap
1067
        size  int32
1068
}
1069

1070
func (vq *VolumeReplicaRequestsPriorityQueue) Push(ctx context.Context, replicaRequest *ReplicaRequest) {
1✔
1071
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
1072
        err := vq.queue.Add(replicaRequest)
1✔
1073
        atomic.AddInt32(&vq.size, 1)
1✔
1074
        if err != nil {
1✔
1075
                w.Logger().Errorf(err, "failed to add replica request for volume %s", replicaRequest.VolumeName)
×
1076
        }
×
1077
}
1078

1079
func (vq *VolumeReplicaRequestsPriorityQueue) Pop() *ReplicaRequest {
1✔
1080
        request, _ := vq.queue.Pop()
1✔
1081
        atomic.AddInt32(&vq.size, -1)
1✔
1082
        return request.(*ReplicaRequest)
1✔
1083
}
1✔
1084
func (vq *VolumeReplicaRequestsPriorityQueue) DrainQueue() []*ReplicaRequest {
3✔
1085
        var listRequests []*ReplicaRequest
3✔
1086
        for i := vq.size; i > 0; i-- {
4✔
1087
                listRequests = append(listRequests, vq.Pop())
1✔
1088
        }
1✔
1089
        return listRequests
3✔
1090
}
1091

1092
func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
47✔
1093
        if obj == nil || objectDeleted {
50✔
1094
                return true, nil
3✔
1095
        }
3✔
1096
        return false, nil
44✔
1097
}
1098

1099
func verifyObjectFailedOrDeleted(obj interface{}, objectDeleted bool) (bool, error) {
1✔
1100
        if obj == nil || objectDeleted {
2✔
1101
                return true, nil
1✔
1102
        }
1✔
1103

1104
        // otherwise, the volume detachment has either failed with error or pending
1105
        azVolumeAttachmentInstance := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
1106
        if azVolumeAttachmentInstance.Status.Error != nil {
×
1107
                return false, util.ErrorFromAzError(azVolumeAttachmentInstance.Status.Error)
×
1108
        }
×
1109
        return false, nil
×
1110
}
1111

1112
// WatchObject creates a noop controller to set up a watch and an informer for an object in controller-runtime manager
1113
// Use this function if you want to set up a watch for an object without a configuring a separate informer factory.
1114
func WatchObject(mgr manager.Manager, objKind source.Kind) error {
×
1115
        objType := objKind.Type.GetName()
×
1116
        c, err := controller.New(fmt.Sprintf("watch %s", objType), mgr, controller.Options{
×
1117
                Reconciler: &noOpReconciler{},
×
1118
        })
×
1119
        if err != nil {
×
1120
                return err
×
1121
        }
×
1122

1123
        c.GetLogger().Info("Starting to watch %s", objType)
×
1124

×
1125
        // Watch for CRUD events on objects
×
1126
        err = c.Watch(&objKind, &handler.EnqueueRequestForObject{}, predicate.Funcs{
×
1127
                CreateFunc: func(_ event.CreateEvent) bool {
×
1128
                        return false
×
1129
                },
×
1130
                UpdateFunc: func(_ event.UpdateEvent) bool {
×
1131
                        return true
×
1132
                },
×
1133
                GenericFunc: func(_ event.GenericEvent) bool {
×
1134
                        return false
×
1135
                },
×
1136
                DeleteFunc: func(_ event.DeleteEvent) bool {
×
1137
                        return false
×
1138
                },
×
1139
        })
1140
        return err
×
1141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc