• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 5147963244

01 Jun 2023 06:33PM UTC coverage: 69.113% (+22.1%) from 47.059%
5147963244

Pull #1826

github

hccheng72
fix: only watch for events that replica controller get interested
Pull Request #1826: [V2] fix: update predicate of replica controller to prevent over creating replica AzVolumeAttachment

114 of 114 new or added lines in 2 files covered. (100.0%)

7196 of 10412 relevant lines covered (69.11%)

6.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.36
/pkg/controller/common.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "math"
25
        "reflect"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
32
        "google.golang.org/grpc/codes"
33
        "google.golang.org/grpc/status"
34
        v1 "k8s.io/api/core/v1"
35

36
        "k8s.io/apimachinery/pkg/api/meta"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
42

43
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
44
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
45
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
46
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
47

48
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
49

50
        cache "k8s.io/client-go/tools/cache"
51

52
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
53
        "sigs.k8s.io/controller-runtime/pkg/client"
54
        "sigs.k8s.io/controller-runtime/pkg/controller"
55
        "sigs.k8s.io/controller-runtime/pkg/event"
56
        "sigs.k8s.io/controller-runtime/pkg/handler"
57
        "sigs.k8s.io/controller-runtime/pkg/manager"
58
        "sigs.k8s.io/controller-runtime/pkg/predicate"
59
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
60
        "sigs.k8s.io/controller-runtime/pkg/source"
61
)
62

63
const (
64
        DefaultTimeUntilGarbageCollection = time.Duration(5) * time.Minute
65

66
        maxRetry             = 10
67
        defaultRetryDuration = time.Duration(1) * time.Second
68
        defaultRetryFactor   = 5.0
69
        defaultRetrySteps    = 5
70

71
        cloudTimeout = time.Duration(5) * time.Minute
72

73
        defaultMaxPodAffinityWeight = 100
74
)
75

76
type noOpReconciler struct{}
77

78
func (n *noOpReconciler) Reconcile(_ context.Context, _ reconcile.Request) (reconcile.Result, error) {
×
79
        return reconcile.Result{}, nil
×
80
}
×
81

82
type operationRequester string
83

84
const (
85
        node     operationRequester = "node-controller"
86
        azvolume operationRequester = "azvolume-controller"
87
        pv       operationRequester = "pv-controller"
88
        replica  operationRequester = "replica-controller"
89
        pod      operationRequester = "pod-controller"
90
        detach   operationRequester = "detach-opeation"
91
        attach   operationRequester = "attach-opeation"
92
)
93

94
type attachmentCleanUpMode int
95

96
const (
97
        cleanUpAttachmentForUninstall attachmentCleanUpMode = iota
98
        cleanUpAttachment
99
)
100

101
type deleteMode int
102

103
const (
104
        deleteOnly deleteMode = iota
105
        deleteAndWait
106
)
107

108
type updateMode int
109

110
const (
111
        normalUpdate updateMode = iota
112
        forceUpdate
113
)
114

115
type updateWithLock bool
116

117
const (
118
        acquireLock updateWithLock = true
119
        skipLock    updateWithLock = false
120
)
121

122
type goSignal struct{}
123

124
// TODO Make CloudProvisioner independent of csi types.
125
type CloudProvisioner interface {
126
        CreateVolume(
127
                ctx context.Context,
128
                volumeName string,
129
                capacityRange *azdiskv1beta2.CapacityRange,
130
                volumeCapabilities []azdiskv1beta2.VolumeCapability,
131
                parameters map[string]string,
132
                secrets map[string]string,
133
                volumeContentSource *azdiskv1beta2.ContentVolumeSource,
134
                accessibilityTopology *azdiskv1beta2.TopologyRequirement) (*azdiskv1beta2.AzVolumeStatusDetail, error)
135
        DeleteVolume(ctx context.Context, volumeID string, secrets map[string]string) error
136
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
137
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
138
        ExpandVolume(ctx context.Context, volumeID string, capacityRange *azdiskv1beta2.CapacityRange, secrets map[string]string) (*azdiskv1beta2.AzVolumeStatusDetail, error)
139
        ListVolumes(ctx context.Context, maxEntries int32, startingToken string) (*azdiskv1beta2.ListVolumesResult, error)
140
        CreateSnapshot(ctx context.Context, sourceVolumeID string, snapshotName string, secrets map[string]string, parameters map[string]string) (*azdiskv1beta2.Snapshot, error)
141
        ListSnapshots(ctx context.Context, maxEntries int32, startingToken string, sourceVolumeID string, snapshotID string, secrets map[string]string) (*azdiskv1beta2.ListSnapshotsResult, error)
142
        DeleteSnapshot(ctx context.Context, snapshotID string, secrets map[string]string) error
143
        CheckDiskExists(ctx context.Context, diskURI string) (*compute.Disk, error)
144
        GetCloud() *provider.Cloud
145
        GetMetricPrefix() string
146
}
147

148
type replicaOperation struct {
149
        ctx                        context.Context
150
        requester                  operationRequester
151
        operationFunc              func(context.Context) error
152
        isReplicaGarbageCollection bool
153
}
154

155
type operationQueue struct {
156
        *list.List
157
        gcExclusionList set
158
        isActive        bool
159
}
160

161
func (q *operationQueue) remove(element *list.Element) {
20✔
162
        // operationQueue might have been cleared before the lock was acquired
20✔
163
        // so always check if the list is empty or not before removing object from the queue, otherwise it would set the underlying length of the queue to be < 0, causing issues
20✔
164
        if q.Front() != nil {
37✔
165
                _ = q.Remove(element)
17✔
166
        }
17✔
167
}
168

169
func newOperationQueue() *operationQueue {
63✔
170
        return &operationQueue{
63✔
171
                gcExclusionList: set{},
63✔
172
                List:            list.New(),
63✔
173
                isActive:        true,
63✔
174
        }
63✔
175
}
63✔
176

177
type retryInfoEntry struct {
178
        backoff   *wait.Backoff
179
        retryLock *sync.Mutex
180
}
181

182
type retryInfo struct {
183
        retryMap *sync.Map
184
}
185

186
func newRetryInfo() *retryInfo {
25✔
187
        return &retryInfo{
25✔
188
                retryMap: &sync.Map{},
25✔
189
        }
25✔
190
}
25✔
191

192
func newRetryEntry() *retryInfoEntry {
×
193
        return &retryInfoEntry{
×
194
                retryLock: &sync.Mutex{},
×
195
                backoff:   &wait.Backoff{Duration: defaultRetryDuration, Factor: defaultRetryFactor, Steps: defaultRetrySteps},
×
196
        }
×
197
}
×
198

199
func (r *retryInfo) nextRequeue(objectName string) time.Duration {
×
200
        v, _ := r.retryMap.LoadOrStore(objectName, newRetryEntry())
×
201
        entry := v.(*retryInfoEntry)
×
202
        entry.retryLock.Lock()
×
203
        defer entry.retryLock.Unlock()
×
204
        return entry.backoff.Step()
×
205
}
×
206

207
func (r *retryInfo) deleteEntry(objectName string) {
18✔
208
        r.retryMap.Delete(objectName)
18✔
209
}
18✔
210

211
type emptyType struct{}
212

213
type set map[interface{}]emptyType
214

215
func (s set) add(entry interface{}) {
452✔
216
        s[entry] = emptyType{}
452✔
217
}
452✔
218

219
func (s set) has(entry interface{}) bool {
260✔
220
        _, ok := s[entry]
260✔
221
        return ok
260✔
222
}
260✔
223

224
func (s set) remove(entry interface{}) {
11✔
225
        delete(s, entry)
11✔
226
}
11✔
227

228
func (s set) toStringSlice() []string {
4✔
229
        entries := make([]string, len(s))
4✔
230
        i := 0
4✔
231
        for entry := range s {
8✔
232
                entries[i] = entry.(string)
4✔
233
                i++
4✔
234
        }
4✔
235
        return entries
4✔
236
}
237

238
type lockableEntry struct {
239
        sync.RWMutex
240
        entry interface{}
241
}
242

243
func newLockableEntry(entry interface{}) *lockableEntry {
118✔
244
        return &lockableEntry{
118✔
245
                RWMutex: sync.RWMutex{},
118✔
246
                entry:   entry,
118✔
247
        }
118✔
248
}
118✔
249

250
// Return true for all errors unless the operation is replica garbage collection that had been aborted due to context cancellation
251
func shouldRequeueReplicaOperation(isReplicaGarbageCollection bool, err error) bool {
34✔
252
        return err != nil && !(isReplicaGarbageCollection && errors.Is(err, context.Canceled))
34✔
253
}
34✔
254

255
type filterPlugin interface {
256
        name() string
257
        setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState)
258
        filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error)
259
}
260

261
// interPodAffinityFilter selects nodes that either meets inter-pod affinity rules or has replica mounts of volumes of pods with matching labels
262
type interPodAffinityFilter struct {
263
        pods  []v1.Pod
264
        state *SharedState
265
}
266

267
func (p *interPodAffinityFilter) name() string {
78✔
268
        return "inter-pod affinity filter"
78✔
269
}
78✔
270

271
func (p *interPodAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
272
        p.pods = pods
26✔
273
        p.state = state
26✔
274
}
26✔
275

276
func (p *interPodAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
277
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
278
        defer w.Finish(nil)
26✔
279
        nodeMap := map[string]int{}
26✔
280
        qualifyingNodes := set{}
26✔
281

26✔
282
        for i, node := range nodes {
91✔
283
                nodeMap[node.Name] = i
65✔
284
                qualifyingNodes.add(node.Name)
65✔
285
        }
65✔
286

287
        isFirst := true
26✔
288
        for _, pod := range p.pods {
54✔
289
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
54✔
290
                        continue
26✔
291
                }
292
                for _, affinityTerm := range pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
4✔
293
                        podNodes, replicaNodes := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
2✔
294
                        if isFirst {
4✔
295
                                qualifyingNodes = set{}
2✔
296
                                for podNode := range podNodes {
4✔
297
                                        qualifyingNodes.add(podNode)
2✔
298
                                }
2✔
299
                                for replicaNode := range replicaNodes {
2✔
300
                                        qualifyingNodes.add(replicaNode)
×
301
                                }
×
302
                                isFirst = false
2✔
303
                        } else {
×
304
                                for qualifyingNode := range qualifyingNodes {
×
305
                                        if !podNodes.has(qualifyingNode) && !replicaNodes.has(qualifyingNode) {
×
306
                                                qualifyingNodes.remove(qualifyingNode)
×
307
                                        }
×
308
                                }
309
                        }
310
                }
311
        }
312

313
        var filteredNodes []v1.Node
26✔
314
        for qualifyingNode := range qualifyingNodes {
87✔
315
                if i, exists := nodeMap[qualifyingNode.(string)]; exists {
122✔
316
                        filteredNodes = append(filteredNodes, nodes[i])
61✔
317
                }
61✔
318
        }
319
        // Logging purpose
320
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
321
        i := 0
26✔
322
        for _, node := range nodes {
91✔
323
                if !qualifyingNodes.has(node.Name) {
69✔
324
                        evictedNodes[i] = node.Name
4✔
325
                        i++
4✔
326
                }
4✔
327
        }
328
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
329

26✔
330
        return filteredNodes, nil
26✔
331
}
332

333
type interPodAntiAffinityFilter struct {
334
        pods  []v1.Pod
335
        state *SharedState
336
}
337

338
func (p *interPodAntiAffinityFilter) name() string {
78✔
339
        return "inter-pod anti-affinity filter"
78✔
340
}
78✔
341

342
func (p *interPodAntiAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
343
        p.pods = pods
26✔
344
        p.state = state
26✔
345
}
26✔
346

347
func (p *interPodAntiAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
348
        ctx, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
349
        defer w.Finish(nil)
26✔
350
        nodeMap := map[string]int{}
26✔
351
        candidateNodes := set{}
26✔
352

26✔
353
        for i, node := range nodes {
87✔
354
                nodeMap[node.Name] = i
61✔
355
                candidateNodes.add(node.Name)
61✔
356
        }
61✔
357

358
        qualifyingNodes := set{}
26✔
359
        isFirst := true
26✔
360
        for _, pod := range p.pods {
54✔
361
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
55✔
362
                        continue
27✔
363
                }
364
                for _, affinityTerm := range pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
2✔
365
                        podNodes, _ := p.state.getQualifiedNodesForPodAffinityTerm(ctx, nodes, pod.Namespace, affinityTerm)
1✔
366
                        if isFirst {
2✔
367
                                for podNode := range podNodes {
3✔
368
                                        qualifyingNodes.add(podNode)
2✔
369
                                }
2✔
370
                                isFirst = false
1✔
371
                        } else {
×
372
                                for qualifyingNode := range qualifyingNodes {
×
373
                                        if !podNodes.has(qualifyingNode) {
×
374
                                                qualifyingNodes.remove(qualifyingNode)
×
375
                                        }
×
376
                                }
377
                        }
378
                }
379
        }
380

381
        var filteredNodes []v1.Node
26✔
382
        for candidateNode := range candidateNodes {
87✔
383
                if !qualifyingNodes.has(candidateNode) {
120✔
384
                        if i, exists := nodeMap[candidateNode.(string)]; exists {
118✔
385
                                filteredNodes = append(filteredNodes, nodes[i])
59✔
386
                        }
59✔
387
                }
388
        }
389

390
        // Logging purpose
391
        evictedNodes := make([]string, len(nodes)-len(filteredNodes))
26✔
392
        i := 0
26✔
393
        for _, node := range nodes {
87✔
394
                if qualifyingNodes.has(node.Name) {
63✔
395
                        evictedNodes[i] = node.Name
2✔
396
                        i++
2✔
397
                }
2✔
398
        }
399
        w.Logger().V(10).Infof("nodes (%+v) filtered out by %s", evictedNodes, p.name())
26✔
400

26✔
401
        return filteredNodes, nil
26✔
402
}
403

404
type podTolerationFilter struct {
405
        pods []v1.Pod
406
}
407

408
func (p *podTolerationFilter) name() string {
52✔
409
        return "pod toleration filter"
52✔
410
}
52✔
411

412
func (p *podTolerationFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
413
        p.pods = pods
26✔
414
}
26✔
415

416
func (p *podTolerationFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
417
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
418
        defer w.Finish(nil)
26✔
419
        candidateNodes := set{}
26✔
420
        for i := range nodes {
85✔
421
                candidateNodes.add(i)
59✔
422
        }
59✔
423

424
        podTolerations := set{}
26✔
425

26✔
426
        for i, pod := range p.pods {
54✔
427
                podTolerationMap := map[string]*v1.Toleration{}
28✔
428
                for _, podToleration := range pod.Spec.Tolerations {
29✔
429
                        podToleration := &podToleration
1✔
430
                        if i == 0 {
2✔
431
                                podTolerations.add(podToleration)
1✔
432
                        } else {
1✔
433
                                podTolerationMap[podToleration.Key] = podToleration
×
434
                        }
×
435
                }
436
                if i > 0 {
30✔
437
                        for podToleration := range podTolerations {
2✔
438
                                if existingToleration, ok := podTolerationMap[podToleration.(v1.Toleration).Key]; ok {
×
439
                                        if !podToleration.(*v1.Toleration).MatchToleration(existingToleration) {
×
440
                                                podTolerations.remove(podToleration)
×
441
                                        }
×
442
                                }
443
                        }
444
                }
445
        }
446

447
        for candidateNode := range candidateNodes {
85✔
448
                tolerable := true
59✔
449
                node := nodes[candidateNode.(int)]
59✔
450
                for _, taint := range node.Spec.Taints {
61✔
451
                        taintTolerable := false
2✔
452
                        for podToleration := range podTolerations {
3✔
453
                                // if any one of node's taint cannot be tolerated by pod's tolerations, break
1✔
454
                                if podToleration.(*v1.Toleration).ToleratesTaint(&taint) {
2✔
455
                                        taintTolerable = true
1✔
456
                                }
1✔
457
                        }
458
                        if tolerable = tolerable && taintTolerable; !tolerable {
3✔
459
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node (%s)'s taint cannot be tolerated", node.Name, node.Name)
1✔
460
                                candidateNodes.remove(candidateNode)
1✔
461
                                break
1✔
462
                        }
463
                }
464
        }
465

466
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
467
        i := 0
26✔
468
        for candidateNode := range candidateNodes {
84✔
469
                filteredNodes[i] = nodes[candidateNode.(int)]
58✔
470
                i++
58✔
471
        }
58✔
472
        return filteredNodes, nil
26✔
473
}
474

475
type podNodeAffinityFilter struct {
476
        pods []v1.Pod
477
}
478

479
func (p *podNodeAffinityFilter) name() string {
52✔
480
        return "pod node-affinity filter"
52✔
481
}
52✔
482

483
func (p *podNodeAffinityFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
484
        p.pods = pods
26✔
485
}
26✔
486

487
func (p *podNodeAffinityFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
488
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
489
        defer w.Finish(nil)
26✔
490
        var podNodeAffinities []nodeaffinity.RequiredNodeAffinity
26✔
491

26✔
492
        candidateNodes := set{}
26✔
493
        for i := range nodes {
84✔
494
                candidateNodes.add(i)
58✔
495
        }
58✔
496

497
        for _, pod := range p.pods {
54✔
498
                // acknowledge that there can be duplicate entries within the slice
28✔
499
                podNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(&pod)
28✔
500
                podNodeAffinities = append(podNodeAffinities, podNodeAffinity)
28✔
501
        }
28✔
502

503
        for i, node := range nodes {
84✔
504
                for _, podNodeAffinity := range podNodeAffinities {
121✔
505
                        if match, err := podNodeAffinity.Match(&node); !match || err != nil {
68✔
506
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: node does not match pod node affinity (%+v)", node.Name, podNodeAffinity)
5✔
507
                                candidateNodes.remove(i)
5✔
508
                        }
5✔
509
                }
510
        }
511

512
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
513
        i := 0
26✔
514
        for candidateNode := range candidateNodes {
79✔
515
                filteredNodes[i] = nodes[candidateNode.(int)]
53✔
516
                i++
53✔
517
        }
53✔
518
        return filteredNodes, nil
26✔
519
}
520

521
type podNodeSelectorFilter struct {
522
        pods  []v1.Pod
523
        state *SharedState
524
}
525

526
func (p *podNodeSelectorFilter) name() string {
52✔
527
        return "pod node-selector filter"
52✔
528
}
52✔
529

530
func (p *podNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
531
        p.pods = pods
26✔
532
        p.state = state
26✔
533
}
26✔
534

535
func (p *podNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
536
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", p.name()))
26✔
537
        defer w.Finish(nil)
26✔
538
        candidateNodes := set{}
26✔
539
        for i := range nodes {
79✔
540
                candidateNodes.add(i)
53✔
541
        }
53✔
542

543
        podNodeSelector := labels.NewSelector()
26✔
544
        for _, pod := range p.pods {
54✔
545
                nodeSelector := labels.SelectorFromSet(labels.Set(pod.Spec.NodeSelector))
28✔
546
                requirements, selectable := nodeSelector.Requirements()
28✔
547
                if selectable {
56✔
548
                        podNodeSelector = podNodeSelector.Add(requirements...)
28✔
549
                }
28✔
550
        }
551

552
        filteredNodes := []v1.Node{}
26✔
553
        for candidateNode := range candidateNodes {
79✔
554
                node := nodes[candidateNode.(int)]
53✔
555
                nodeLabels := labels.Set(node.Labels)
53✔
556
                if podNodeSelector.Matches(nodeLabels) {
106✔
557
                        filteredNodes = append(filteredNodes, node)
53✔
558
                } else {
53✔
559
                        w.Logger().V(5).Infof("Removing node (%s) from replica candidate: node does not match pod node selector (%v)", node.Name, podNodeSelector)
×
560
                }
×
561
        }
562

563
        return filteredNodes, nil
26✔
564
}
565

566
type volumeNodeSelectorFilter struct {
567
        persistentVolumes []*v1.PersistentVolume
568
}
569

570
func (v *volumeNodeSelectorFilter) name() string {
52✔
571
        return "volume node-selector filter"
52✔
572
}
52✔
573

574
func (v *volumeNodeSelectorFilter) setup(pods []v1.Pod, persistentVolumes []*v1.PersistentVolume, state *SharedState) {
26✔
575
        v.persistentVolumes = persistentVolumes
26✔
576
}
26✔
577

578
func (v *volumeNodeSelectorFilter) filter(ctx context.Context, nodes []v1.Node) ([]v1.Node, error) {
26✔
579
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", v.name()))
26✔
580
        defer w.Finish(nil)
26✔
581
        candidateNodes := set{}
26✔
582
        for i := range nodes {
79✔
583
                candidateNodes.add(i)
53✔
584
        }
53✔
585

586
        var volumeNodeSelectors []*nodeaffinity.NodeSelector
26✔
587
        for _, pv := range v.persistentVolumes {
60✔
588
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
65✔
589
                        continue
31✔
590
                }
591
                nodeSelector, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
3✔
592
                if err != nil {
3✔
593
                        w.Logger().Errorf(err, "failed to get node selector from node affinity (%v)", pv.Spec.NodeAffinity.Required)
×
594
                        continue
×
595
                }
596
                // acknowledge that there can be duplicates in the slice
597
                volumeNodeSelectors = append(volumeNodeSelectors, nodeSelector)
3✔
598
        }
599

600
        for candidateNode := range candidateNodes {
79✔
601
                node := nodes[candidateNode.(int)]
53✔
602
                for _, volumeNodeSelector := range volumeNodeSelectors {
61✔
603
                        if !volumeNodeSelector.Match(&node) {
12✔
604
                                w.Logger().V(5).Infof("Removing node (%s) from replica candidates: volume node selector (%+v) cannot be matched with the node.", node.Name, volumeNodeSelector)
4✔
605
                                candidateNodes.remove(candidateNode)
4✔
606
                        }
4✔
607
                }
608
        }
609

610
        filteredNodes := make([]v1.Node, len(candidateNodes))
26✔
611
        i := 0
26✔
612
        for candidateNode := range candidateNodes {
75✔
613
                filteredNodes[i] = nodes[candidateNode.(int)]
49✔
614
                i++
49✔
615
        }
49✔
616
        return filteredNodes, nil
26✔
617
}
618

619
type nodeScorerPlugin interface {
620
        name() string
621
        setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState)
622
        priority() float64 // returns the score plugin's priority in a scale of 1 ~ 5 (5 being the highest priority)
623
        score(ctx context.Context, nodeScores map[string]int) (map[string]int, error)
624
}
625

626
type scoreByNodeCapacity struct {
627
        nodes   []v1.Node
628
        volumes []string
629
        state   *SharedState
630
}
631

632
func (s *scoreByNodeCapacity) name() string {
40✔
633
        return "score by node capacity"
40✔
634
}
40✔
635

636
func (s *scoreByNodeCapacity) priority() float64 {
46✔
637
        return 1
46✔
638
}
46✔
639

640
func (s *scoreByNodeCapacity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
641
        s.nodes = nodes
20✔
642
        s.volumes = volumes
20✔
643
        s.state = state
20✔
644
}
20✔
645

646
func (s *scoreByNodeCapacity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
647
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
648
        defer w.Finish(nil)
20✔
649
        for _, node := range s.nodes {
66✔
650
                if _, ok := nodeScores[node.Name]; !ok {
46✔
651
                        continue
×
652
                }
653
                maxCapacity, err := azureutils.GetNodeMaxDiskCountWithLabels(node.Labels)
46✔
654
                if err != nil {
46✔
655
                        w.Logger().Errorf(err, "failed to get max capacity of node (%s)", node.Name)
×
656
                        delete(nodeScores, node.Name)
×
657
                        continue
×
658
                }
659
                remainingCapacity, err := azureutils.GetNodeRemainingDiskCountApprox(ctx, s.state.cachedClient, node.Name)
46✔
660
                if err != nil {
46✔
661
                        // if failed to get node's remaining capacity, remove the node from the candidate list and proceed
×
662
                        w.Logger().Errorf(err, "failed to get remaining capacity of node (%s)", node.Name)
×
663
                        delete(nodeScores, node.Name)
×
664
                        continue
×
665
                }
666

667
                nodeScores[node.Name] += int((float64(remainingCapacity) / float64(maxCapacity)) * math.Pow(10, s.priority()))
46✔
668

46✔
669
                if remainingCapacity-len(s.volumes) < 0 {
48✔
670
                        delete(nodeScores, node.Name)
2✔
671
                }
2✔
672

673
                w.Logger().V(10).Infof("node (%s) can accept %d more attachments", node.Name, remainingCapacity)
46✔
674
        }
675
        return nodeScores, nil
20✔
676
}
677

678
type scoreByReplicaCount struct {
679
        volumes []string
680
        state   *SharedState
681
}
682

683
func (s *scoreByReplicaCount) name() string {
40✔
684
        return "score by replica count"
40✔
685
}
40✔
686

687
func (s *scoreByReplicaCount) priority() float64 {
4✔
688
        return 3
4✔
689
}
4✔
690

691
func (s *scoreByReplicaCount) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
692
        s.volumes = volumes
20✔
693
        s.state = state
20✔
694
}
20✔
695

696
func (s *scoreByReplicaCount) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
697
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
698
        defer w.Finish(nil)
20✔
699

20✔
700
        var requestedReplicaCount int
20✔
701

20✔
702
        nodeReplicaCounts := map[string]int{}
20✔
703

20✔
704
        for _, volume := range s.volumes {
48✔
705
                azVolume, err := azureutils.GetAzVolume(ctx, s.state.cachedClient, nil, volume, s.state.config.ObjectNamespace, true)
28✔
706
                if err != nil {
28✔
707
                        w.Logger().Errorf(err, "failed to get AzVolume (%s)", volume)
×
708
                        continue
×
709
                }
710
                requestedReplicaCount += azVolume.Spec.MaxMountReplicaCount
28✔
711
                azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, s.state.cachedClient, volume, azureutils.AllRoles)
28✔
712
                if err != nil {
28✔
713
                        w.Logger().V(5).Errorf(err, "failed listing AzVolumeAttachments for azvolume %s", volume)
×
714
                        continue
×
715
                }
716

717
                for _, azVolumeAttachment := range azVolumeAttachments {
49✔
718
                        if _, exists := nodeScores[azVolumeAttachment.Spec.NodeName]; exists {
34✔
719
                                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
23✔
720
                                        delete(nodeScores, azVolumeAttachment.Spec.NodeName)
10✔
721
                                } else {
13✔
722
                                        nodeReplicaCounts[azVolumeAttachment.Spec.NodeName]++
3✔
723
                                }
3✔
724
                        }
725
                }
726

727
                if requestedReplicaCount > 0 {
56✔
728
                        for nodeName, replicaCount := range nodeReplicaCounts {
32✔
729
                                if _, ok := nodeScores[nodeName]; !ok {
4✔
730
                                        continue
×
731
                                }
732
                                nodeScores[nodeName] += int((float64(replicaCount) / float64(requestedReplicaCount)) * math.Pow(10, s.priority()))
4✔
733
                        }
734
                }
735
        }
736
        return nodeScores, nil
20✔
737
}
738

739
type scoreByInterPodAffinity struct {
740
        nodes   []v1.Node
741
        pods    []v1.Pod
742
        volumes []string
743
        state   *SharedState
744
}
745

746
func (s *scoreByInterPodAffinity) name() string {
40✔
747
        return "score by inter pod affinity"
40✔
748
}
40✔
749

750
func (s *scoreByInterPodAffinity) priority() float64 {
2✔
751
        return 2
2✔
752
}
2✔
753

754
func (s *scoreByInterPodAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
755
        s.nodes = nodes
20✔
756
        s.pods = pods
20✔
757
        s.volumes = volumes
20✔
758
        s.state = state
20✔
759
}
20✔
760

761
func (s *scoreByInterPodAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
762
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
763
        defer w.Finish(nil)
20✔
764

20✔
765
        nodeAffinityScores := map[string]int{}
20✔
766
        var maxAffinityScore int
20✔
767

20✔
768
        for _, pod := range s.pods {
43✔
769
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
45✔
770
                        continue
22✔
771
                }
772

773
                // pod affinity weight range from 1-100
774
                // and as a node can both be a part of 1) nodes that satisfy pod affinity rule and 2) nodes with qualifying replica attachments (for which we give half of the score), we need to 1.5X the max weight
775
                maxAffinityScore += 1.5 * defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
776

1✔
777
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
778
                        podNodes, replicaNodes := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
779
                        for podNode := range podNodes {
3✔
780
                                w.Logger().Infof("podNode: %s", podNode.(string))
2✔
781
                                nodeAffinityScores[podNode.(string)] += int(weightedAffinityTerm.Weight)
2✔
782
                        }
2✔
783
                        for replicaNode := range replicaNodes {
1✔
784
                                nodeAffinityScores[replicaNode.(string)] += int(weightedAffinityTerm.Weight / 2)
×
785
                        }
×
786
                }
787
        }
788

789
        for node, affinityScore := range nodeAffinityScores {
22✔
790
                if _, ok := nodeScores[node]; !ok {
2✔
791
                        continue
×
792
                }
793
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
2✔
794
        }
795

796
        return nodeScores, nil
20✔
797
}
798

799
type scoreByInterPodAntiAffinity struct {
800
        nodes   []v1.Node
801
        pods    []v1.Pod
802
        volumes []string
803
        state   *SharedState
804
}
805

806
func (s *scoreByInterPodAntiAffinity) name() string {
40✔
807
        return "score by inter pod anti affinity"
40✔
808
}
40✔
809

810
func (s *scoreByInterPodAntiAffinity) priority() float64 {
1✔
811
        return 2
1✔
812
}
1✔
813

814
func (s *scoreByInterPodAntiAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
815
        s.nodes = nodes
20✔
816
        s.pods = pods
20✔
817
        s.volumes = volumes
20✔
818
        s.state = state
20✔
819
}
20✔
820

821
func (s *scoreByInterPodAntiAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
822
        ctx, w := workflow.New(ctx, workflow.WithDetails("score-plugin", s.name()))
20✔
823
        defer w.Finish(nil)
20✔
824

20✔
825
        nodeAffinityScores := map[string]int{}
20✔
826
        var maxAffinityScore int
20✔
827

20✔
828
        for _, pod := range s.pods {
43✔
829
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
45✔
830
                        continue
22✔
831
                }
832

833
                // pod affinity weight range from 1-100
834
                maxAffinityScore += defaultMaxPodAffinityWeight * len(pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
1✔
835

1✔
836
                for _, weightedAffinityTerm := range pod.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
2✔
837
                        podNodes, _ := s.state.getQualifiedNodesForPodAffinityTerm(ctx, s.nodes, pod.Namespace, weightedAffinityTerm.PodAffinityTerm)
1✔
838

1✔
839
                        for node := range nodeScores {
4✔
840
                                if !podNodes.has(node) {
4✔
841
                                        nodeAffinityScores[node] += int(weightedAffinityTerm.Weight)
1✔
842
                                }
1✔
843
                        }
844
                }
845
        }
846

847
        for node, affinityScore := range nodeAffinityScores {
21✔
848
                if _, ok := nodeScores[node]; !ok {
1✔
849
                        continue
×
850
                }
851
                nodeScores[node] += int((float64(affinityScore) / float64(maxAffinityScore)) * math.Pow(10, s.priority()))
1✔
852
        }
853

854
        return nodeScores, nil
20✔
855
}
856

857
type scoreByPodNodeAffinity struct {
858
        nodes   []v1.Node
859
        pods    []v1.Pod
860
        volumes []string
861
        state   *SharedState
862
}
863

864
func (s *scoreByPodNodeAffinity) name() string {
40✔
865
        return "score by pod node affinity"
40✔
866
}
40✔
867

868
func (s *scoreByPodNodeAffinity) priority() float64 {
×
869
        return 2
×
870
}
×
871

872
func (s *scoreByPodNodeAffinity) setup(nodes []v1.Node, pods []v1.Pod, volumes []string, state *SharedState) {
20✔
873
        s.nodes = nodes
20✔
874
        s.pods = pods
20✔
875
        s.volumes = volumes
20✔
876
        s.state = state
20✔
877
}
20✔
878

879
func (s *scoreByPodNodeAffinity) score(ctx context.Context, nodeScores map[string]int) (map[string]int, error) {
20✔
880
        _, w := workflow.New(ctx, workflow.WithDetails("filter-plugin", s.name()))
20✔
881
        defer w.Finish(nil)
20✔
882

20✔
883
        var preferredSchedulingTerms []v1.PreferredSchedulingTerm
20✔
884

20✔
885
        for _, pod := range s.pods {
43✔
886
                if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
44✔
887
                        continue
21✔
888
                }
889
                preferredSchedulingTerms = append(preferredSchedulingTerms, pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
2✔
890
        }
891

892
        preferredAffinity, err := nodeaffinity.NewPreferredSchedulingTerms(preferredSchedulingTerms)
20✔
893
        if err != nil {
20✔
894
                return nodeScores, err
×
895
        }
×
896

897
        for _, node := range s.nodes {
66✔
898
                nodeScore := preferredAffinity.Score(&node)
46✔
899
                if _, ok := nodeScores[node.Name]; !ok {
58✔
900
                        continue
12✔
901
                }
902
                nodeScores[node.Name] += int(nodeScore)
34✔
903
        }
904
        return nodeScores, nil
20✔
905
}
906

907
func getSupportedZones(nodeSelectorTerms []v1.NodeSelectorTerm, topologyKey string) set {
1✔
908
        // Get the list of supported zones for pv
1✔
909
        supportedZones := set{}
1✔
910
        if len(nodeSelectorTerms) > 0 {
2✔
911
                for _, term := range nodeSelectorTerms {
2✔
912
                        if len(term.MatchExpressions) > 0 {
2✔
913
                                for _, matchExpr := range term.MatchExpressions {
2✔
914
                                        if matchExpr.Key == topologyKey {
1✔
915
                                                for _, value := range matchExpr.Values {
×
916
                                                        if value != "" && !supportedZones.has(value) {
×
917
                                                                supportedZones.add(value)
×
918
                                                        }
×
919
                                                }
920
                                        }
921
                                }
922
                        }
923
                }
924
        }
925
        return supportedZones
1✔
926
}
927

928
func markDetachRequest(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
5✔
929
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation, string(caller))
5✔
930
}
5✔
931

932
func markCleanUp(attachment *azdiskv1beta2.AzVolumeAttachment, caller operationRequester) {
4✔
933
        attachment.Status.Annotations = azureutils.AddToMap(attachment.Status.Annotations, consts.CleanUpAnnotation, string(caller))
4✔
934
}
4✔
935

936
func shouldCleanUp(attachment azdiskv1beta2.AzVolumeAttachment, mode azureutils.AttachmentRoleMode) bool {
1✔
937
        return mode == azureutils.AllRoles || (attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole && mode == azureutils.PrimaryOnly) || (attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole && mode == azureutils.ReplicaOnly)
1✔
938
}
1✔
939

940
func isAttached(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
941
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.PublishContext != nil
2✔
942
}
2✔
943

944
func isCreated(volume *azdiskv1beta2.AzVolume) bool {
20✔
945
        return volume != nil && volume.Status.Detail != nil
20✔
946
}
20✔
947

948
// objectDeletionRequested returns whether deletion of the specified object has been requested.
949
// If so, it will return true and a time.Duration after which to delete the object. If the
950
// duration is less than or equal to 0, the object should be deleted immediately.
951
func objectDeletionRequested(obj runtime.Object) (bool, time.Duration) {
34✔
952
        meta, _ := meta.Accessor(obj)
34✔
953
        if meta == nil {
34✔
954
                return false, time.Duration(0)
×
955
        }
×
956
        deletionTime := meta.GetDeletionTimestamp()
34✔
957

34✔
958
        if deletionTime.IsZero() {
63✔
959
                return false, time.Duration(0)
29✔
960
        }
29✔
961

962
        return true, time.Until(deletionTime.Time)
5✔
963
}
964

965
func isCleanupRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
1✔
966
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.CleanUpAnnotation)
1✔
967
}
1✔
968

969
func volumeAttachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
970
        return attachment != nil && azureutils.MapContains(attachment.Annotations, consts.VolumeAttachRequestAnnotation)
2✔
971
}
2✔
972

973
func volumeDetachRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
17✔
974
        return attachment != nil && azureutils.MapContains(attachment.Status.Annotations, consts.VolumeDetachRequestAnnotation)
17✔
975
}
17✔
976

977
func volumeDeleteRequested(volume *azdiskv1beta2.AzVolume) bool {
2✔
978
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.VolumeDeleteRequestAnnotation)
2✔
979
}
2✔
980

981
func isDemotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
4✔
982
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.PrimaryRole && attachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole
4✔
983
}
4✔
984

985
func isPromotionRequested(attachment *azdiskv1beta2.AzVolumeAttachment) bool {
2✔
986
        return attachment != nil && attachment.Status.Detail != nil && attachment.Status.Detail.Role == azdiskv1beta2.ReplicaRole && attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole
2✔
987
}
2✔
988

989
func isPreProvisioned(volume *azdiskv1beta2.AzVolume) bool {
3✔
990
        return volume != nil && azureutils.MapContains(volume.Status.Annotations, consts.PreProvisionedVolumeAnnotation)
3✔
991
}
3✔
992

993
func getQualifiedName(namespace, name string) string {
202✔
994
        return fmt.Sprintf("%s/%s", namespace, name)
202✔
995
}
202✔
996

997
func parseQualifiedName(qualifiedName string) (namespace, name string, err error) {
14✔
998
        parsed := strings.Split(qualifiedName, "/")
14✔
999
        if len(parsed) != 2 {
14✔
1000
                err = status.Errorf(codes.Internal, "pod's qualified name (%s) should be of <namespace>/<name>", qualifiedName)
×
1001
                return
×
1002
        }
×
1003
        namespace = parsed[0]
14✔
1004
        name = parsed[1]
14✔
1005
        return
14✔
1006
}
1007

1008
func formatUpdateStateError(objectType, fromState, toState string, expectedStates ...string) string {
×
1009
        return fmt.Sprintf("%s's state '%s' cannot be updated to %s. %s can only be updated to %s", objectType, fromState, toState, fromState, strings.Join(expectedStates, ", "))
×
1010
}
×
1011

1012
func getOperationRequeueError(desired string, obj client.Object) error {
×
1013
        return status.Errorf(codes.Aborted, "requeueing %s operation because another operation is already pending on %v (%s)", desired, reflect.TypeOf(obj), obj.GetName())
×
1014
}
×
1015

1016
// reconcileReturnOnSuccess returns a reconciler result on successful reconciliation.
1017
func reconcileReturnOnSuccess(objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
16✔
1018
        retryInfo.deleteEntry(objectName)
16✔
1019
        return reconcile.Result{}, nil
16✔
1020
}
16✔
1021

1022
// reconcileReturnOnError returns a reconciler result on error that requeues the object for later processing if the error is retriable.
1023
func reconcileReturnOnError(ctx context.Context, obj runtime.Object, operationType string, err error, retryInfo *retryInfo) (reconcile.Result, error) {
1✔
1024
        var (
1✔
1025
                requeue    bool = status.Code(err) != codes.FailedPrecondition
1✔
1026
                retryAfter time.Duration
1✔
1027
        )
1✔
1028

1✔
1029
        w := workflow.GetWorkflow(ctx, obj)
1✔
1030

1✔
1031
        if meta, metaErr := meta.Accessor(obj); metaErr == nil {
2✔
1032
                objectName := meta.GetName()
1✔
1033
                objectType := reflect.TypeOf(obj)
1✔
1034
                if !requeue {
2✔
1035
                        w.Logger().Errorf(err, "failed to %s %v (%s) with no retry", operationType, objectType, objectName)
1✔
1036
                        retryInfo.deleteEntry(objectName)
1✔
1037
                } else {
1✔
1038
                        retryAfter = retryInfo.nextRequeue(objectName)
×
1039
                        w.Logger().Errorf(err, "failed to %s %v (%s) with retry after %v", operationType, objectType, objectName, retryAfter)
×
1040
                }
×
1041
        }
1042

1043
        return reconcile.Result{
1✔
1044
                Requeue:      requeue,
1✔
1045
                RequeueAfter: retryAfter,
1✔
1046
        }, nil
1✔
1047
}
1048

1049
// reconcileAfter returns a reconciler result that requeues the current object for processing after the specified time.
1050
func reconcileAfter(after time.Duration, objectName string, retryInfo *retryInfo) (reconcile.Result, error) {
×
1051
        retryInfo.deleteEntry(objectName)
×
1052
        return reconcile.Result{Requeue: true, RequeueAfter: after}, nil
×
1053
}
×
1054

1055
func isOperationInProcess(obj interface{}) bool {
10✔
1056
        switch target := obj.(type) {
10✔
1057
        case *azdiskv1beta2.AzVolume:
4✔
1058
                return target.Status.State == azdiskv1beta2.VolumeCreating || target.Status.State == azdiskv1beta2.VolumeDeleting || target.Status.State == azdiskv1beta2.VolumeUpdating
4✔
1059
        case *azdiskv1beta2.AzVolumeAttachment:
6✔
1060
                deleteRequested, _ := objectDeletionRequested(target)
6✔
1061
                return target.Status.State == azdiskv1beta2.Attaching || (target.Status.State == azdiskv1beta2.Detaching && !deleteRequested)
6✔
1062
        }
1063
        return false
×
1064
}
1065

1066
func max(a, b int) int {
21✔
1067
        if a > b {
21✔
1068
                return a
×
1069
        }
×
1070
        return b
21✔
1071
}
1072

1073
func containsString(key string, items []string) bool {
7✔
1074
        for _, item := range items {
16✔
1075
                if item == key {
16✔
1076
                        return true
7✔
1077
                }
7✔
1078
        }
1079
        return false
×
1080
}
1081

1082
type ReplicaRequest struct {
1083
        VolumeName string
1084
        Priority   int //The number of replicas that have yet to be created
1085
}
1086
type VolumeReplicaRequestsPriorityQueue struct {
1087
        queue *cache.Heap
1088
        size  int32
1089
}
1090

1091
func (vq *VolumeReplicaRequestsPriorityQueue) Push(ctx context.Context, replicaRequest *ReplicaRequest) {
1✔
1092
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
1093
        err := vq.queue.Add(replicaRequest)
1✔
1094
        atomic.AddInt32(&vq.size, 1)
1✔
1095
        if err != nil {
1✔
1096
                w.Logger().Errorf(err, "failed to add replica request for volume %s", replicaRequest.VolumeName)
×
1097
        }
×
1098
}
1099

1100
func (vq *VolumeReplicaRequestsPriorityQueue) Pop() *ReplicaRequest {
1✔
1101
        request, _ := vq.queue.Pop()
1✔
1102
        atomic.AddInt32(&vq.size, -1)
1✔
1103
        return request.(*ReplicaRequest)
1✔
1104
}
1✔
1105
func (vq *VolumeReplicaRequestsPriorityQueue) DrainQueue() []*ReplicaRequest {
2✔
1106
        var listRequests []*ReplicaRequest
2✔
1107
        for i := vq.size; i > 0; i-- {
3✔
1108
                listRequests = append(listRequests, vq.Pop())
1✔
1109
        }
1✔
1110
        return listRequests
2✔
1111
}
1112

1113
func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
9✔
1114
        if obj == nil || objectDeleted {
11✔
1115
                return true, nil
2✔
1116
        }
2✔
1117
        return false, nil
7✔
1118
}
1119

1120
func verifyObjectFailedOrDeleted(obj interface{}, objectDeleted bool) (bool, error) {
4✔
1121
        if obj == nil || objectDeleted {
6✔
1122
                return true, nil
2✔
1123
        }
2✔
1124

1125
        switch target := obj.(type) {
2✔
1126
        case azdiskv1beta2.AzVolumeAttachment:
×
1127
                if target.Status.Error != nil {
×
1128
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1129
                }
×
1130
        case azdiskv1beta2.AzVolume:
×
1131
                // otherwise, the volume detachment has either failed with error or pending
×
1132
                if target.Status.Error != nil {
×
1133
                        return false, util.ErrorFromAzError(target.Status.Error)
×
1134
                }
×
1135
        }
1136

1137
        return false, nil
2✔
1138
}
1139

1140
func verifyObjectPromotedOrDemoted(obj interface{}, objectDeleted bool) (bool, error) {
6✔
1141
        if obj == nil || objectDeleted {
6✔
1142
                return false, fmt.Errorf("obj is nil or has been deleted")
×
1143
        }
×
1144

1145
        azVolumeAttachmentInstance := obj.(*azdiskv1beta2.AzVolumeAttachment)
6✔
1146
        if azVolumeAttachmentInstance.Labels != nil {
12✔
1147
                if label, ok := azVolumeAttachmentInstance.Labels[consts.RoleChangeLabel]; ok {
11✔
1148
                        isPromoteUpdated := label == consts.Promoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.ReplicaRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.PrimaryRole
5✔
1149
                        isDemoteUpdated := label == consts.Demoted && azVolumeAttachmentInstance.Status.Detail.PreviousRole == azdiskv1beta2.PrimaryRole && azVolumeAttachmentInstance.Status.Detail.Role == azdiskv1beta2.ReplicaRole
5✔
1150

5✔
1151
                        if isPromoteUpdated || isDemoteUpdated {
8✔
1152
                                return true, nil
3✔
1153
                        }
3✔
1154
                }
1155
        }
1156
        return false, nil
3✔
1157
}
1158

1159
// WatchObject creates a noop controller to set up a watch and an informer for an object in controller-runtime manager
1160
// Use this function if you want to set up a watch for an object without a configuring a separate informer factory.
1161
func WatchObject(mgr manager.Manager, objKind source.Kind) error {
×
1162
        objType := objKind.Type.GetName()
×
1163
        c, err := controller.New(fmt.Sprintf("watch %s", objType), mgr, controller.Options{
×
1164
                Reconciler: &noOpReconciler{},
×
1165
        })
×
1166
        if err != nil {
×
1167
                return err
×
1168
        }
×
1169

1170
        c.GetLogger().Info("Starting to watch %s", objType)
×
1171

×
1172
        // Watch for CRUD events on objects
×
1173
        err = c.Watch(&objKind, &handler.EnqueueRequestForObject{}, predicate.Funcs{
×
1174
                CreateFunc: func(_ event.CreateEvent) bool {
×
1175
                        return false
×
1176
                },
×
1177
                UpdateFunc: func(_ event.UpdateEvent) bool {
×
1178
                        return true
×
1179
                },
×
1180
                GenericFunc: func(_ event.GenericEvent) bool {
×
1181
                        return false
×
1182
                },
×
1183
                DeleteFunc: func(_ event.DeleteEvent) bool {
×
1184
                        return false
×
1185
                },
×
1186
        })
1187
        return err
×
1188
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc