• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 6188855463

14 Sep 2023 05:24PM UTC coverage: 69.242% (+0.3%) from 68.933%
6188855463

push

github

web-flow
Merge pull request #1781 from NedAnd1/event-persistence

[V2] feat: persist replica attachment failures

118 of 118 new or added lines in 3 files covered. (100.0%)

7393 of 10677 relevant lines covered (69.24%)

7.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.16
/pkg/controller/shared_state.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "math"
25
        "reflect"
26
        "sort"
27
        "strings"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "google.golang.org/grpc/codes"
33
        "google.golang.org/grpc/status"
34
        v1 "k8s.io/api/core/v1"
35
        crdClientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
36
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
37
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38
        "k8s.io/apimachinery/pkg/labels"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/apimachinery/pkg/selection"
41
        "k8s.io/apimachinery/pkg/types"
42
        "k8s.io/apimachinery/pkg/util/wait"
43
        utilfeature "k8s.io/apiserver/pkg/util/feature"
44
        "k8s.io/client-go/kubernetes"
45
        cache "k8s.io/client-go/tools/cache"
46
        "k8s.io/client-go/tools/record"
47
        "k8s.io/client-go/util/retry"
48
        csitranslator "k8s.io/csi-translation-lib/plugins"
49
        "k8s.io/klog/v2"
50
        "k8s.io/kubernetes/pkg/features"
51
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
52
        azdisk "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned"
53
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
54
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
55
        "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
56
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
57
        "sigs.k8s.io/controller-runtime/pkg/client"
58
)
59

60
type DriverLifecycle interface {
61
        GetDiskClientSet() azdisk.Interface
62
        GetConditionWatcher() *watcher.ConditionWatcher
63
        IsDriverUninstall() bool
64
}
65

66
type SharedState struct {
67
        recoveryComplete              uint32
68
        config                        *azdiskv1beta2.AzDiskDriverConfiguration
69
        topologyKey                   string
70
        podToClaimsMap                sync.Map
71
        podToInlineMap                sync.Map
72
        claimToPodsMap                sync.Map
73
        volumeToClaimMap              sync.Map
74
        claimToVolumeMap              sync.Map
75
        azVolumeAttachmentToVaMap     sync.Map
76
        pvToVolumeMap                 sync.Map
77
        podLocks                      sync.Map
78
        visitedVolumes                sync.Map
79
        volumeOperationQueues         sync.Map
80
        cleanUpMap                    sync.Map
81
        priorityReplicaRequestsQueue  *VolumeReplicaRequestsPriorityQueue
82
        processingReplicaRequestQueue int32
83
        eventRecorder                 record.EventRecorder
84
        cachedClient                  client.Client
85
        azClient                      azdisk.Interface
86
        kubeClient                    kubernetes.Interface
87
        crdClient                     crdClientset.Interface
88
        conditionWatcher              *watcher.ConditionWatcher
89
        azureDiskCSITranslator        csitranslator.InTreePlugin
90
        availableAttachmentsMap       sync.Map
91
        driverLifecycle               DriverLifecycle
92
        eventsToPersistQueue          chan ReplicaAttachmentFailureInfo
93
        eventsToUnpersistQueue        chan string
94
}
95

96
type ReplicaAttachmentFailureInfo struct {
97
        volumeName string
98
        message    string
99
        pods       []runtime.Object
100
        timestamp  time.Time
101
}
102

103
const (
104
        // persistent event and their refreshes should overlap by this amount of time
105
        eventOverlapDuration = 30 * time.Second
106
        // allow persistent events to overlap by up to this additional amount of time for batched reporting etc.
107
        eventOverlapVariance = 30 * time.Millisecond
108
)
109

110
func NewSharedState(config *azdiskv1beta2.AzDiskDriverConfiguration, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, crdClient crdClientset.Interface, kubeClient kubernetes.Interface, driverLifecycle DriverLifecycle) *SharedState {
88✔
111
        newSharedState := &SharedState{
88✔
112
                config:                 config,
88✔
113
                topologyKey:            topologyKey,
88✔
114
                eventRecorder:          eventRecorder,
88✔
115
                cachedClient:           cachedClient,
88✔
116
                crdClient:              crdClient,
88✔
117
                azClient:               driverLifecycle.GetDiskClientSet(),
88✔
118
                kubeClient:             kubeClient,
88✔
119
                conditionWatcher:       driverLifecycle.GetConditionWatcher(),
88✔
120
                azureDiskCSITranslator: csitranslator.NewAzureDiskCSITranslator(),
88✔
121
                driverLifecycle:        driverLifecycle,
88✔
122
        }
88✔
123
        newSharedState.createReplicaRequestsQueue()
88✔
124
        newSharedState.createEventQueues()
88✔
125

88✔
126
        return newSharedState
88✔
127
}
88✔
128

129
func (c *SharedState) isRecoveryComplete() bool {
42✔
130
        return atomic.LoadUint32(&c.recoveryComplete) == 1
42✔
131
}
42✔
132

133
func (c *SharedState) MarkRecoveryComplete() {
89✔
134
        atomic.StoreUint32(&c.recoveryComplete, 1)
89✔
135
}
89✔
136

137
func (c *SharedState) DeleteAPIVersion(ctx context.Context, deleteVersion string) error {
×
138
        w, _ := workflow.GetWorkflowFromContext(ctx)
×
139
        crdNames := []string{consts.AzDriverNodeCRDName, consts.AzVolumeCRDName, consts.AzVolumeAttachmentCRDName}
×
140
        for _, crdName := range crdNames {
×
141
                err := retry.RetryOnConflict(retry.DefaultBackoff,
×
142
                        func() error {
×
143
                                crd, err := c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{})
×
144
                                if err != nil {
×
145
                                        if apiErrors.IsNotFound(err) {
×
146
                                                return err
×
147
                                        }
×
148
                                        return nil
×
149
                                }
150

151
                                updated := crd.DeepCopy()
×
152
                                var storedVersions []string
×
153
                                // remove version from status stored versions
×
154
                                for _, version := range updated.Status.StoredVersions {
×
155
                                        if version == deleteVersion {
×
156
                                                continue
×
157
                                        }
158
                                        storedVersions = append(storedVersions, version)
×
159
                                }
160
                                updated.Status.StoredVersions = storedVersions
×
161
                                _, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().UpdateStatus(ctx, updated, metav1.UpdateOptions{})
×
162
                                if err != nil {
×
163
                                        // log the error and continue
×
164
                                        return err
×
165
                                }
×
166
                                return nil
×
167
                        })
168

169
                if err != nil {
×
170
                        w.Logger().Errorf(err, "failed to delete %s api version from CRD (%s)", deleteVersion, crdName)
×
171
                }
×
172

173
                // Uncomment when the all deployments have rolled over to v1beta1.
174
                // updated = crd.DeepCopy()
175
                // // remove version from spec versions
176
                // var specVersions []crdv1.CustomResourceDefinitionVersion
177
                // for _, version := range updated.Spec.Versions {
178
                //         if version.Name == deleteVersion {
179
                //                 continue
180
                //         }
181
                //         specVersions = append(specVersions, version)
182
                // }
183
                // updated.Spec.Versions = specVersions
184

185
                // // update the crd
186
                // crd, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, updated, metav1.UpdateOptions{})
187
                // if err != nil {
188
                //         // log the error and continue
189
                //         w.Logger().Errorf(err, "failed to remove %s spec version from CRD (%s)", deleteVersion, crd.Name)
190
                //         continue
191
                // }
192
        }
193
        return nil
×
194
}
195

196
func (c *SharedState) createOperationQueue(volumeName string) {
63✔
197
        _, _ = c.volumeOperationQueues.LoadOrStore(volumeName, newLockableEntry(newOperationQueue()))
63✔
198
}
63✔
199

200
func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string, requester operationRequester, operationFunc func(context.Context) error, isReplicaGarbageCollection bool) {
17✔
201
        // It is expected for caller to provide parent workflow via context.
17✔
202
        // The child workflow will be created below and be fed to the queued operation for necessary workflow information.
17✔
203
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, volumeName))
17✔
204

17✔
205
        v, ok := c.volumeOperationQueues.Load(volumeName)
17✔
206
        if !ok {
17✔
207
                return
×
208
        }
×
209
        lockable := v.(*lockableEntry)
17✔
210
        lockable.Lock()
17✔
211
        isFirst := lockable.entry.(*operationQueue).Len() <= 0
17✔
212
        _ = lockable.entry.(*operationQueue).PushBack(&replicaOperation{
17✔
213
                ctx:       ctx,
17✔
214
                requester: requester,
17✔
215
                operationFunc: func(ctx context.Context) (err error) {
34✔
216
                        defer func() {
34✔
217
                                if !shouldRequeueReplicaOperation(isReplicaGarbageCollection, err) {
34✔
218
                                        w.Finish(err)
17✔
219
                                }
17✔
220
                        }()
221
                        err = operationFunc(ctx)
17✔
222
                        return
17✔
223
                },
224
                isReplicaGarbageCollection: isReplicaGarbageCollection,
225
        })
226
        lockable.Unlock()
17✔
227

17✔
228
        // If this is the first operation, start the goroutine
17✔
229
        if isFirst {
33✔
230
                go func() {
32✔
231
                        lockable.Lock()
16✔
232
                        defer lockable.Unlock()
16✔
233
                        operationQueue := lockable.entry.(*operationQueue)
16✔
234

16✔
235
                        for {
49✔
236
                                // Get the first operation exiting the loop if the queue is empty.
33✔
237
                                front := operationQueue.Front()
33✔
238
                                if front == nil {
49✔
239
                                        break
16✔
240
                                }
241

242
                                operation := front.Value.(*replicaOperation)
17✔
243

17✔
244
                                // Only run the operation if the operation requester is not enlisted in blacklist
17✔
245
                                if !operationQueue.gcExclusionList.has(operation.requester) {
34✔
246

17✔
247
                                        // Release the lock while executing the operation to avoid deadlocks.
17✔
248
                                        lockable.Unlock()
17✔
249
                                        err := operation.operationFunc(operation.ctx)
17✔
250
                                        lockable.Lock()
17✔
251

17✔
252
                                        if shouldRequeueReplicaOperation(operation.isReplicaGarbageCollection, err) {
17✔
253
                                                // If operation failed, push it to the end of the queue if the queue is
×
254
                                                // still active.
×
255
                                                if operationQueue.isActive {
×
256
                                                        operationQueue.PushBack(operation)
×
257
                                                }
×
258
                                        }
259
                                }
260

261
                                // Remove the operation from the queue.
262
                                operationQueue.safeRemove(front)
17✔
263
                        }
264
                }()
265
        }
266
}
267

268
func (c *SharedState) deleteOperationQueue(volumeName string) {
1✔
269
        v, ok := c.volumeOperationQueues.LoadAndDelete(volumeName)
1✔
270
        // if operation queue has already been deleted, return
1✔
271
        if !ok {
1✔
272
                return
×
273
        }
×
274
        // clear the queue in case, there still is an entry in queue
275
        lockable := v.(*lockableEntry)
1✔
276
        lockable.Lock()
1✔
277
        defer lockable.Unlock()
1✔
278
        lockable.entry.(*operationQueue).Init()
1✔
279
}
280

281
func (c *SharedState) closeOperationQueue(volumeName string) func() {
2✔
282
        v, ok := c.volumeOperationQueues.Load(volumeName)
2✔
283
        if !ok {
4✔
284
                return nil
2✔
285
        }
2✔
286
        lockable := v.(*lockableEntry)
×
287

×
288
        lockable.Lock()
×
289
        lockable.entry.(*operationQueue).isActive = false
×
290
        lockable.entry.(*operationQueue).Init()
×
291
        return lockable.Unlock
×
292
}
293

294
func (c *SharedState) addToGcExclusionList(volumeName string, target operationRequester) {
3✔
295
        v, ok := c.volumeOperationQueues.Load(volumeName)
3✔
296
        if !ok {
3✔
297
                return
×
298
        }
×
299
        lockable := v.(*lockableEntry)
3✔
300
        lockable.Lock()
3✔
301
        defer lockable.Unlock()
3✔
302
        lockable.entry.(*operationQueue).gcExclusionList.add(target)
3✔
303
}
304

305
func (c *SharedState) removeFromExclusionList(volumeName string, target operationRequester) {
7✔
306
        v, ok := c.volumeOperationQueues.Load(volumeName)
7✔
307
        if !ok {
7✔
308
                return
×
309
        }
×
310
        lockable := v.(*lockableEntry)
7✔
311
        lockable.Lock()
7✔
312
        defer lockable.Unlock()
7✔
313
        delete(lockable.entry.(*operationQueue).gcExclusionList, target)
7✔
314
}
315

316
func (c *SharedState) dequeueGarbageCollection(volumeName string) {
5✔
317
        v, ok := c.volumeOperationQueues.Load(volumeName)
5✔
318
        if !ok {
5✔
319
                return
×
320
        }
×
321
        lockable := v.(*lockableEntry)
5✔
322
        lockable.Lock()
5✔
323
        defer lockable.Unlock()
5✔
324
        queue := lockable.entry.(*operationQueue)
5✔
325
        // look for garbage collection operation in the queue and remove from queue
5✔
326
        var next *list.Element
5✔
327
        for cur := queue.Front(); cur != nil; cur = next {
8✔
328
                next = cur.Next()
3✔
329
                if cur.Value.(*replicaOperation).isReplicaGarbageCollection {
6✔
330
                        queue.safeRemove(cur)
3✔
331
                }
3✔
332
        }
333
}
334

335
func (c *SharedState) getVolumesFromPod(ctx context.Context, podName string) ([]string, error) {
27✔
336
        w, _ := workflow.GetWorkflowFromContext(ctx)
27✔
337

27✔
338
        var claims []string
27✔
339
        w.Logger().V(5).Infof("Getting requested volumes for pod (%s).", podName)
27✔
340
        value, ok := c.podToClaimsMap.Load(podName)
27✔
341
        if !ok {
28✔
342
                return nil, status.Errorf(codes.NotFound, "unable to find an entry for pod (%s) in podToClaims map", podName)
1✔
343
        }
1✔
344
        claims, ok = value.([]string)
26✔
345
        if !ok {
27✔
346
                return nil, status.Errorf(codes.Internal, "wrong output type: expected []string")
1✔
347
        }
1✔
348

349
        volumes := []string{}
25✔
350
        for _, claim := range claims {
61✔
351
                value, ok := c.claimToVolumeMap.Load(claim)
36✔
352
                if !ok {
41✔
353
                        // the pvc entry is not an azure resource
5✔
354
                        w.Logger().V(5).Infof("Requested volume %s for pod %s is not an azure resource", value, podName)
5✔
355
                        continue
5✔
356
                }
357
                volume, ok := value.(string)
31✔
358
                if !ok {
32✔
359
                        return nil, status.Errorf(codes.Internal, "wrong output type: expected string")
1✔
360
                }
1✔
361
                volumes = append(volumes, volume)
30✔
362
                w.Logger().V(5).Infof("Requested volumes for pod %s are now the following: Volumes: %v, Len: %d", podName, volumes, len(volumes))
30✔
363
        }
364
        return volumes, nil
24✔
365
}
366

367
func (c *SharedState) getPodsFromVolume(ctx context.Context, client client.Client, volumeName string) ([]v1.Pod, error) {
16✔
368
        w, _ := workflow.GetWorkflowFromContext(ctx)
16✔
369
        pods, err := c.getPodNamesFromVolume(volumeName)
16✔
370
        if err != nil {
18✔
371
                return nil, err
2✔
372
        }
2✔
373
        podObjs := []v1.Pod{}
14✔
374
        for _, pod := range pods {
29✔
375
                namespace, name, err := parseQualifiedName(pod)
15✔
376
                if err != nil {
15✔
377
                        w.Logger().Errorf(err, "cannot get podObj for pod (%s)", pod)
×
378
                        continue
×
379
                }
380
                var podObj v1.Pod
15✔
381
                if err := client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &podObj); err != nil {
15✔
382
                        return nil, err
×
383
                }
×
384
                podObjs = append(podObjs, podObj)
15✔
385
        }
386
        return podObjs, nil
14✔
387
}
388

389
func (c *SharedState) getPodNamesFromVolume(volumeName string) ([]string, error) {
16✔
390
        v, ok := c.volumeToClaimMap.Load(volumeName)
16✔
391
        if !ok {
18✔
392
                return nil, status.Errorf(codes.NotFound, "no bound persistent volume claim was found for AzVolume (%s)", volumeName)
2✔
393
        }
2✔
394
        claimName, ok := v.(string)
14✔
395
        if !ok {
14✔
396
                return nil, status.Errorf(codes.Internal, "volumeToClaimMap should should hold string")
×
397
        }
×
398

399
        value, ok := c.claimToPodsMap.Load(claimName)
14✔
400
        if !ok {
14✔
401
                return nil, status.Errorf(codes.NotFound, "no pods found for PVC (%s)", claimName)
×
402
        }
×
403
        lockable, ok := value.(*lockableEntry)
14✔
404
        if !ok {
14✔
405
                return nil, status.Errorf(codes.Internal, "claimToPodsMap should hold lockable entry")
×
406
        }
×
407

408
        lockable.RLock()
14✔
409
        defer lockable.RUnlock()
14✔
410

14✔
411
        podMap, ok := lockable.entry.(set)
14✔
412
        if !ok {
14✔
413
                return nil, status.Errorf(codes.Internal, "claimToPodsMap entry should hold a set")
×
414
        }
×
415

416
        pods := make([]string, len(podMap))
14✔
417
        i := 0
14✔
418
        for v := range podMap {
29✔
419
                pod := v.(string)
15✔
420
                pods[i] = pod
15✔
421
                i++
15✔
422
        }
15✔
423

424
        return pods, nil
14✔
425
}
426

427
func (c *SharedState) getVolumesForPodObjs(ctx context.Context, pods []v1.Pod) ([]string, error) {
18✔
428
        volumes := []string{}
18✔
429
        for _, pod := range pods {
36✔
430
                podVolumes, err := c.getVolumesFromPod(ctx, getQualifiedName(pod.Namespace, pod.Name))
18✔
431
                if err != nil {
18✔
432
                        return nil, err
×
433
                }
×
434
                volumes = append(volumes, podVolumes...)
18✔
435
        }
436
        return volumes, nil
18✔
437
}
438

439
func (c *SharedState) addPod(ctx context.Context, pod *v1.Pod, updateOption updateWithLock) error {
7✔
440
        var err error
7✔
441
        w, _ := workflow.GetWorkflowFromContext(ctx)
7✔
442
        podKey := getQualifiedName(pod.Namespace, pod.Name)
7✔
443
        v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
7✔
444

7✔
445
        w.Logger().V(5).Infof("Adding pod %s to shared map with keyName %s.", pod.Name, podKey)
7✔
446
        podLock := v.(*sync.Mutex)
7✔
447
        if updateOption == acquireLock {
13✔
448
                podLock.Lock()
6✔
449
                defer podLock.Unlock()
6✔
450
        }
6✔
451
        w.Logger().V(5).Infof("Pod spec of pod %s is: %+v. With volumes: %+v", pod.Name, pod.Spec, pod.Spec.Volumes)
7✔
452

7✔
453
        // If the claims already exist for the podKey, add them to a set
7✔
454
        value, _ := c.podToClaimsMap.LoadOrStore(podKey, []string{})
7✔
455
        claims := value.([]string)
7✔
456
        claimSet := set{}
7✔
457
        for _, claim := range claims {
17✔
458
                claimSet.add(claim)
10✔
459
        }
10✔
460

461
        for _, volume := range pod.Spec.Volumes {
18✔
462
                // TODO: investigate if we need special support for CSI ephemeral volume or generic ephemeral volume
11✔
463
                // if csiMigration is enabled and there is an inline volume, create AzVolume CRI for the inline volume.
11✔
464
                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
11✔
465
                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
11✔
466
                        volume.AzureDisk != nil {
12✔
467
                        // inline volume: create AzVolume resource
1✔
468
                        var pv *v1.PersistentVolume
1✔
469
                        if pv, err = c.azureDiskCSITranslator.TranslateInTreeInlineVolumeToCSI(&volume, pod.Namespace); err != nil {
1✔
470
                                w.Logger().V(5).Errorf(err, "failed to translate inline volume to csi")
×
471
                                continue
×
472
                        } else if pv == nil {
1✔
473
                                w.Logger().V(5).Errorf(status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi"), "nil pv returned")
×
474
                                continue
×
475
                        }
476
                        w.Logger().V(5).Infof("Creating AzVolume instance for inline volume %s.", volume.AzureDisk.DiskName)
1✔
477
                        if err := c.createAzVolumeFromPv(ctx, *pv, map[string]string{consts.InlineVolumeAnnotation: volume.AzureDisk.DataDiskURI}); err != nil {
1✔
478
                                return err
×
479
                        }
×
480
                        v, exists := c.podToInlineMap.Load(podKey)
1✔
481
                        var inlines []string
1✔
482
                        if exists {
1✔
483
                                inlines = v.([]string)
×
484
                        }
×
485
                        inlines = append(inlines, volume.AzureDisk.DiskName)
1✔
486
                        c.podToInlineMap.Store(podKey, inlines)
1✔
487
                }
488
                if volume.PersistentVolumeClaim == nil {
12✔
489
                        w.Logger().V(5).Infof("Pod %s: Skipping Volume %s. No persistent volume exists.", pod.Name, volume)
1✔
490
                        continue
1✔
491
                }
492
                namespacedClaimName := getQualifiedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
10✔
493
                if _, ok := c.claimToVolumeMap.Load(namespacedClaimName); !ok {
11✔
494
                        // Log message if the Pod status is Running
1✔
495
                        if pod.Status.Phase == v1.PodRunning {
1✔
496
                                w.Logger().V(5).Infof("Skipping Pod %s. Volume %s not csi. Driver: %+v", pod.Name, volume.Name, volume.CSI)
×
497
                        }
×
498
                        continue
1✔
499
                }
500
                w.Logger().V(5).Infof("Pod %s. Volume %v is csi.", pod.Name, volume)
9✔
501
                claimSet.add(namespacedClaimName)
9✔
502
                v, _ := c.claimToPodsMap.LoadOrStore(namespacedClaimName, newLockableEntry(set{}))
9✔
503

9✔
504
                lockable := v.(*lockableEntry)
9✔
505
                lockable.Lock()
9✔
506
                pods := lockable.entry.(set)
9✔
507
                if !pods.has(podKey) {
9✔
508
                        pods.add(podKey)
×
509
                }
×
510
                // No need to restore the amended set to claimToPodsMap because set is a reference type
511
                lockable.Unlock()
9✔
512

9✔
513
                w.Logger().V(5).Infof("Storing pod %s and claim %s to claimToPodsMap map.", pod.Name, namespacedClaimName)
9✔
514
        }
515
        w.Logger().V(5).Infof("Storing pod %s and claim %s to podToClaimsMap map.", pod.Name, claims)
7✔
516

7✔
517
        allClaims := []string{}
7✔
518
        for key := range claimSet {
17✔
519
                allClaims = append(allClaims, key.(string))
10✔
520
        }
10✔
521
        c.podToClaimsMap.Store(podKey, allClaims)
7✔
522
        return nil
7✔
523
}
524

525
func (c *SharedState) deletePod(ctx context.Context, podKey string) error {
1✔
526
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
527
        value, exists := c.podLocks.LoadAndDelete(podKey)
1✔
528
        if !exists {
1✔
529
                return nil
×
530
        }
×
531
        podLock := value.(*sync.Mutex)
1✔
532
        podLock.Lock()
1✔
533
        defer podLock.Unlock()
1✔
534

1✔
535
        value, exists = c.podToInlineMap.LoadAndDelete(podKey)
1✔
536
        if exists {
1✔
537
                inlines := value.([]string)
×
538

×
539
                for _, inline := range inlines {
×
540
                        _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, inline, pod, azureutils.AllRoles, cleanUpAttachment, deleteAndWait)
×
541
                        if err != nil && !apiErrors.IsNotFound(err) {
×
542
                                w.Logger().Errorf(err, "failed to list AzVolumeAttachments (%s) for inline (%s): %v", inline, inline, err)
×
543
                                return err
×
544
                        }
×
545
                        if err := c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Delete(ctx, inline, metav1.DeleteOptions{}); err != nil && !apiErrors.IsNotFound(err) {
×
546
                                w.Logger().Errorf(err, "failed to delete AzVolume (%s) for inline (%s): %v", inline, inline, err)
×
547
                                return err
×
548
                        }
×
549
                }
550
        }
551

552
        value, exists = c.podToClaimsMap.LoadAndDelete(podKey)
1✔
553
        if !exists {
1✔
554
                return nil
×
555
        }
×
556
        claims := value.([]string)
1✔
557

1✔
558
        for _, claim := range claims {
2✔
559
                value, ok := c.claimToPodsMap.Load(claim)
1✔
560
                if !ok {
1✔
561
                        w.Logger().Errorf(nil, "No pods found for PVC (%s)", claim)
×
562
                }
×
563

564
                // Scope the duration that we hold the lockable lock using a function.
565
                func() {
2✔
566
                        lockable, ok := value.(*lockableEntry)
1✔
567
                        if !ok {
1✔
568
                                w.Logger().Error(nil, "claimToPodsMap should hold lockable entry")
×
569
                                return
×
570
                        }
×
571

572
                        lockable.Lock()
1✔
573
                        defer lockable.Unlock()
1✔
574

1✔
575
                        podSet, ok := lockable.entry.(set)
1✔
576
                        if !ok {
1✔
577
                                w.Logger().Error(nil, "claimToPodsMap entry should hold a set")
×
578
                        }
×
579

580
                        podSet.remove(podKey)
1✔
581
                        if len(podSet) == 0 {
2✔
582
                                c.claimToPodsMap.Delete(claim)
1✔
583
                        }
1✔
584
                }()
585
        }
586
        return nil
1✔
587
}
588

589
func (c *SharedState) addVolumeAndClaim(azVolumeName, pvName, pvClaimName string) {
1✔
590
        c.pvToVolumeMap.Store(pvName, azVolumeName)
1✔
591
        c.volumeToClaimMap.Store(azVolumeName, pvClaimName)
1✔
592
        c.claimToVolumeMap.Store(pvClaimName, azVolumeName)
1✔
593
}
1✔
594

595
func (c *SharedState) deletePV(pvName string) error {
2✔
596
        var err error
2✔
597

2✔
598
        ctx := context.Background()
2✔
599
        ctx, w := workflow.New(ctx, workflow.WithDetails())
2✔
600
        defer func() { w.Finish(err) }()
4✔
601
        defer func() {
4✔
602
                if apiErrors.IsNotFound(err) {
2✔
603
                        err = nil
×
604
                }
×
605
        }()
606

607
        var azVolume azdiskv1beta2.AzVolume
2✔
608
        if val, ok := c.pvToVolumeMap.Load(pvName); ok {
3✔
609
                volumeName := val.(string)
1✔
610
                err = c.cachedClient.Get(ctx, types.NamespacedName{Namespace: c.config.ObjectNamespace, Name: volumeName}, &azVolume)
1✔
611
                if err != nil {
1✔
612
                        if !apiErrors.IsNotFound(err) {
×
613
                                return err
×
614
                        }
×
615
                        return nil
×
616
                }
617
        } else {
1✔
618
                // if no volume name can be found for PV, try fetching azVolume using labels
1✔
619
                var azVolumeList azdiskv1beta2.AzVolumeList
1✔
620
                req, err := azureutils.CreateLabelRequirements(consts.PvNameLabel, selection.Equals, pvName)
1✔
621
                if err != nil {
1✔
622
                        return err
×
623
                }
×
624
                err = c.cachedClient.List(ctx, &azVolumeList, &client.ListOptions{LabelSelector: labels.NewSelector().Add(*req)})
1✔
625
                if err != nil && !apiErrors.IsNotFound(err) {
1✔
626
                        return err
×
627
                } else if apiErrors.IsNotFound(err) || len(azVolumeList.Items) == 0 {
2✔
628
                        return nil
1✔
629
                }
1✔
630
                azVolume = azVolumeList.Items[0]
×
631
        }
632

633
        // deletion timestamp is set and AzVolume reconcliler will handle the delete request.
634
        // The volume itself will not be deleted.
635
        w.AddDetailToLogger(workflow.GetObjectDetails(&azVolume)...)
1✔
636

1✔
637
        if !isPreProvisioned(&azVolume) {
1✔
638
                return nil
×
639
        }
×
640

641
        err = c.cachedClient.Delete(ctx, &azVolume)
1✔
642
        if err != nil {
1✔
643
                if apiErrors.IsNotFound(err) {
×
644
                        return nil
×
645
                }
×
646
                return err
×
647
        }
648

649
        waitCh := make(chan goSignal)
1✔
650
        go func() {
2✔
651
                goCtx, w := workflow.New(ctx)
1✔
652
                defer func() { w.Finish(err) }()
2✔
653
                waitCh <- goSignal{}
1✔
654

1✔
655
                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolume.Name, verifyObjectFailedOrDeleted)
1✔
656

1✔
657
                for {
2✔
658
                        // if AzVolume was successfully deleted
1✔
659
                        obj, err := waiter.Wait(goCtx)
1✔
660
                        if err == nil {
2✔
661
                                // remove the entry from pv to volume map, once AzVolumeCRI is deleted
1✔
662
                                c.pvToVolumeMap.Delete(pvName)
1✔
663
                                return
1✔
664
                        }
1✔
665

666
                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
667

×
668
                        if azVolume.Status.State == azdiskv1beta2.VolumeDeletionFailed || azVolume.Status.Error != nil {
×
669
                                updateFunc := func(obj client.Object) error {
×
670
                                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
671
                                        azVolume.Status.Error = nil
×
672
                                        azVolume.Status.State = azdiskv1beta2.VolumeCreated
×
673
                                        return nil
×
674
                                }
×
675
                                _, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, c.cachedClient, c.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
×
676
                        }
677
                }
678
        }()
679
        <-waitCh
1✔
680

1✔
681
        return nil
1✔
682
}
683

684
func (c *SharedState) deleteVolumeAndClaim(azVolumeName string) {
2✔
685
        v, ok := c.volumeToClaimMap.LoadAndDelete(azVolumeName)
2✔
686
        if ok {
4✔
687
                pvClaimName := v.(string)
2✔
688
                c.claimToVolumeMap.Delete(pvClaimName)
2✔
689
        }
2✔
690
}
691

692
func (c *SharedState) markVolumeVisited(azVolumeName string) {
9✔
693
        c.visitedVolumes.Store(azVolumeName, struct{}{})
9✔
694
}
9✔
695

696
func (c *SharedState) unmarkVolumeVisited(azVolumeName string) {
9✔
697
        c.visitedVolumes.Delete(azVolumeName)
9✔
698
}
9✔
699

700
func (c *SharedState) isVolumeVisited(azVolumeName string) bool {
10✔
701
        _, visited := c.visitedVolumes.Load(azVolumeName)
10✔
702
        return visited
10✔
703
}
10✔
704

705
func (c *SharedState) getRankedNodesForReplicaAttachments(ctx context.Context, volumes []string, podObjs []v1.Pod) ([]string, error) {
15✔
706
        var err error
15✔
707
        ctx, w := workflow.New(ctx)
15✔
708
        defer func() { w.Finish(err) }()
30✔
709

710
        w.Logger().V(5).Info("Getting ranked list of nodes for creating AzVolumeAttachments")
15✔
711

15✔
712
        nodeList := &v1.NodeList{}
15✔
713
        if err := c.cachedClient.List(ctx, nodeList); err != nil {
15✔
714
                return nil, err
×
715
        }
×
716

717
        var selectedNodeObjs []v1.Node
15✔
718
        selectedNodeObjs, err = c.selectNodesPerTopology(ctx, nodeList.Items, podObjs, volumes)
15✔
719
        if err != nil {
15✔
720
                w.Logger().Errorf(err, "failed to select nodes for volumes (%+v)", volumes)
×
721
                return nil, err
×
722
        }
×
723

724
        selectedNodes := make([]string, len(selectedNodeObjs))
15✔
725
        for i, selectedNodeObj := range selectedNodeObjs {
37✔
726
                selectedNodes[i] = selectedNodeObj.Name
22✔
727
        }
22✔
728

729
        w.Logger().V(5).Infof("Selected nodes (%+v) for replica AzVolumeAttachments for volumes (%+v)", selectedNodes, volumes)
15✔
730
        return selectedNodes, nil
15✔
731
}
732

733
func (c *SharedState) filterNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
29✔
734
        var err error
29✔
735
        ctx, w := workflow.New(ctx)
29✔
736
        defer func() { w.Finish(err) }()
58✔
737

738
        pvs := make([]*v1.PersistentVolume, len(volumes))
29✔
739
        for i, volume := range volumes {
66✔
740
                var azVolume *azdiskv1beta2.AzVolume
37✔
741
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
37✔
742
                if err != nil {
39✔
743
                        w.Logger().V(5).Errorf(err, "AzVolume for volume %s is not found.", volume)
2✔
744
                        return nil, err
2✔
745
                }
2✔
746

747
                var pv v1.PersistentVolume
35✔
748
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
35✔
749
                        return nil, err
×
750
                }
×
751
                pvs[i] = &pv
35✔
752
        }
753

754
        var filterPlugins = []filterPlugin{
27✔
755
                &interPodAffinityFilter{},
27✔
756
                &interPodAntiAffinityFilter{},
27✔
757
                &podTolerationFilter{},
27✔
758
                &podNodeAffinityFilter{},
27✔
759
                &podNodeSelectorFilter{},
27✔
760
                &volumeNodeSelectorFilter{},
27✔
761
        }
27✔
762

27✔
763
        filteredNodes := nodes
27✔
764
        for _, filterPlugin := range filterPlugins {
189✔
765
                filterPlugin.setup(pods, pvs, c)
162✔
766
                if updatedFilteredNodes, err := filterPlugin.filter(ctx, filteredNodes); err != nil {
162✔
767
                        w.Logger().Errorf(err, "failed to filter node with filter plugin (%s). Ignoring filtered results.", filterPlugin.name())
×
768
                } else {
162✔
769
                        filteredNodes = updatedFilteredNodes
162✔
770
                        nodeStrs := make([]string, len(filteredNodes))
162✔
771
                        for i, filteredNode := range filteredNodes {
507✔
772
                                nodeStrs[i] = filteredNode.Name
345✔
773
                        }
345✔
774
                        w.Logger().V(10).Infof("Filtered node list from filter plugin (%s): %+v", filterPlugin.name(), nodeStrs)
162✔
775
                }
776
        }
777

778
        return filteredNodes, nil
27✔
779
}
780

781
func (c *SharedState) prioritizeNodes(ctx context.Context, pods []v1.Pod, volumes []string, nodes []v1.Node) []v1.Node {
21✔
782
        ctx, w := workflow.New(ctx)
21✔
783
        defer w.Finish(nil)
21✔
784

21✔
785
        nodeScores := map[string]int{}
21✔
786
        for _, node := range nodes {
69✔
787
                nodeScores[node.Name] = 0
48✔
788
        }
48✔
789

790
        var nodeScorerPlugins = []nodeScorerPlugin{
21✔
791
                &scoreByNodeCapacity{},
21✔
792
                &scoreByReplicaCount{},
21✔
793
                &scoreByInterPodAffinity{},
21✔
794
                &scoreByInterPodAntiAffinity{},
21✔
795
                &scoreByPodNodeAffinity{},
21✔
796
        }
21✔
797

21✔
798
        for _, nodeScorerPlugin := range nodeScorerPlugins {
126✔
799
                nodeScorerPlugin.setup(nodes, pods, volumes, c)
105✔
800
                if updatedNodeScores, err := nodeScorerPlugin.score(ctx, nodeScores); err != nil {
105✔
801
                        w.Logger().Errorf(err, "failed to score nodes by node scorer (%s)", nodeScorerPlugin.name())
×
802
                } else {
105✔
803
                        // update node scores if scorer plugin returned success
105✔
804
                        nodeScores = updatedNodeScores
105✔
805
                }
105✔
806
                var nodeScoreResult string
105✔
807
                for nodeName, score := range nodeScores {
295✔
808
                        nodeScoreResult += fmt.Sprintf("<%s: %d> ", nodeName, score)
190✔
809
                }
190✔
810
                w.Logger().V(10).Infof("node score after node score plugin (%s): %s", nodeScorerPlugin.name(), nodeScoreResult)
105✔
811
        }
812

813
        // normalize score
814
        numFiltered := 0
21✔
815
        for _, node := range nodes {
69✔
816
                if _, exists := nodeScores[node.Name]; !exists {
60✔
817
                        nodeScores[node.Name] = -1
12✔
818
                        numFiltered++
12✔
819
                }
12✔
820
        }
821

822
        sort.Slice(nodes[:], func(i, j int) bool {
53✔
823
                return nodeScores[nodes[i].Name] > nodeScores[nodes[j].Name]
32✔
824
        })
32✔
825

826
        return nodes[:len(nodes)-numFiltered]
21✔
827
}
828

829
func (c *SharedState) filterAndSortNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
17✔
830
        var err error
17✔
831
        ctx, w := workflow.New(ctx)
17✔
832
        defer func() { w.Finish(err) }()
34✔
833

834
        var filteredNodes []v1.Node
17✔
835
        filteredNodes, err = c.filterNodes(ctx, nodes, pods, volumes)
17✔
836
        if err != nil {
18✔
837
                w.Logger().Errorf(err, "failed to filter nodes for volumes (%+v): %v", volumes, err)
1✔
838
                return nil, err
1✔
839
        }
1✔
840
        sortedNodes := c.prioritizeNodes(ctx, pods, volumes, filteredNodes)
16✔
841
        return sortedNodes, nil
16✔
842
}
843

844
func (c *SharedState) selectNodesPerTopology(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
15✔
845
        var err error
15✔
846
        ctx, w := workflow.New(ctx)
15✔
847
        defer func() { w.Finish(err) }()
30✔
848

849
        selectedNodes := []v1.Node{}
15✔
850
        numReplicas := 0
15✔
851

15✔
852
        // disperse node topology if possible
15✔
853
        compatibleZonesSet := set{}
15✔
854
        var primaryNode string
15✔
855
        for i, volume := range volumes {
37✔
856
                var azVolume *azdiskv1beta2.AzVolume
22✔
857
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
22✔
858
                if err != nil {
22✔
859
                        err = status.Errorf(codes.Aborted, "failed to get AzVolume CRI (%s)", volume)
×
860
                        return nil, err
×
861
                }
×
862

863
                numReplicas = max(numReplicas, azVolume.Spec.MaxMountReplicaCount)
22✔
864
                w.Logger().V(5).Infof("Number of requested replicas for Azvolume (%s) is: %d. Max replica count is: %d.",
22✔
865
                        volume, numReplicas, azVolume.Spec.MaxMountReplicaCount)
22✔
866

22✔
867
                var pv v1.PersistentVolume
22✔
868
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
22✔
869
                        return nil, err
×
870
                }
×
871

872
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
43✔
873
                        continue
21✔
874
                }
875

876
                // Find the intersection of the zones for all the volumes
877
                topologyKey := c.topologyKey
1✔
878
                if i == 0 {
2✔
879
                        compatibleZonesSet = getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
1✔
880
                } else {
1✔
881
                        listOfZones := getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
×
882
                        for key := range compatibleZonesSet {
×
883
                                if !listOfZones.has(key) {
×
884
                                        compatibleZonesSet.remove(key)
×
885
                                }
×
886
                        }
887
                }
888

889
                // find primary node if not already found
890
                if primaryNode == "" {
2✔
891
                        if primaryAttachment, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.PrimaryOnly); err != nil || len(primaryAttachment) == 0 {
2✔
892
                                continue
1✔
893
                        } else {
×
894
                                primaryNode = primaryAttachment[0].Spec.NodeName
×
895
                        }
×
896
                }
897
        }
898

899
        var compatibleZones []string
15✔
900
        if len(compatibleZonesSet) > 0 {
15✔
901
                for key := range compatibleZonesSet {
×
902
                        compatibleZones = append(compatibleZones, key.(string))
×
903
                }
×
904
        }
905

906
        if len(compatibleZones) == 0 {
30✔
907
                selectedNodes, err = c.filterAndSortNodes(ctx, nodes, pods, volumes)
15✔
908
                if err != nil {
15✔
909
                        err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
910
                        return nil, err
×
911
                }
×
912
        } else {
×
913
                w.Logger().V(5).Infof("The list of zones to select nodes from is: %s", strings.Join(compatibleZones, ","))
×
914

×
915
                var primaryNodeZone string
×
916
                if primaryNode != "" {
×
917
                        nodeObj := &v1.Node{}
×
918
                        err = c.cachedClient.Get(ctx, types.NamespacedName{Name: primaryNode}, nodeObj)
×
919
                        if err != nil {
×
920
                                w.Logger().Errorf(err, "failed to retrieve the primary node")
×
921
                        }
×
922

923
                        var ok bool
×
924
                        if primaryNodeZone, ok = nodeObj.Labels[consts.WellKnownTopologyKey]; ok {
×
925
                                w.Logger().V(5).Infof("failed to find zone annotations for primary node")
×
926
                        }
×
927
                }
928

929
                nodeSelector := labels.NewSelector()
×
930
                zoneRequirement, _ := labels.NewRequirement(consts.WellKnownTopologyKey, selection.In, compatibleZones)
×
931
                nodeSelector = nodeSelector.Add(*zoneRequirement)
×
932

×
933
                compatibleNodes := &v1.NodeList{}
×
934
                if err = c.cachedClient.List(ctx, compatibleNodes, &client.ListOptions{LabelSelector: nodeSelector}); err != nil {
×
935
                        err = status.Errorf(codes.Aborted, "failed to retrieve node list: %v", err)
×
936
                        return nodes, err
×
937
                }
×
938

939
                // Create a zone to node map
940
                zoneToNodeMap := map[string][]v1.Node{}
×
941
                for _, node := range compatibleNodes.Items {
×
942
                        zoneName := node.Labels[consts.WellKnownTopologyKey]
×
943
                        zoneToNodeMap[zoneName] = append(zoneToNodeMap[zoneName], node)
×
944
                }
×
945

946
                // Get prioritized nodes per zone
947
                nodesPerZone := [][]v1.Node{}
×
948
                primaryZoneNodes := []v1.Node{}
×
949
                totalCount := 0
×
950
                for zone, nodeList := range zoneToNodeMap {
×
951
                        var sortedNodes []v1.Node
×
952
                        sortedNodes, err = c.filterAndSortNodes(ctx, nodeList, pods, volumes)
×
953
                        if err != nil {
×
954
                                err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
955
                                return nil, err
×
956
                        }
×
957

958
                        totalCount += len(sortedNodes)
×
959
                        if zone == primaryNodeZone {
×
960
                                primaryZoneNodes = sortedNodes
×
961
                                continue
×
962
                        }
963
                        nodesPerZone = append(nodesPerZone, sortedNodes)
×
964
                }
965
                // Append the nodes from the zone of the primary node at last
966
                if len(primaryZoneNodes) > 0 {
×
967
                        nodesPerZone = append(nodesPerZone, primaryZoneNodes)
×
968
                }
×
969
                // Select the nodes from each of the zones one by one and append to the list
970
                i, j, countSoFar := 0, 0, 0
×
971
                for len(selectedNodes) < numReplicas && countSoFar < totalCount {
×
972
                        if len(nodesPerZone[i]) > j {
×
973
                                selectedNodes = append(selectedNodes, nodesPerZone[i][j])
×
974
                                countSoFar++
×
975
                        }
×
976
                        if i < len(nodesPerZone)-1 {
×
977
                                i++
×
978
                        } else {
×
979
                                i = 0
×
980
                                j++
×
981
                        }
×
982
                }
983
        }
984

985
        return selectedNodes, nil
15✔
986
}
987

988
func (c *SharedState) getNodesWithReplica(ctx context.Context, volumeName string) ([]string, error) {
14✔
989
        w, _ := workflow.GetWorkflowFromContext(ctx)
14✔
990
        w.Logger().V(5).Infof("Getting nodes with replica AzVolumeAttachments for volume %s.", volumeName)
14✔
991
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
14✔
992
        if err != nil {
14✔
993
                w.Logger().V(5).Errorf(err, "failed to get AzVolumeAttachments for volume %s.", volumeName)
×
994
                return nil, err
×
995
        }
×
996

997
        nodes := []string{}
14✔
998
        for _, azVolumeAttachment := range azVolumeAttachments {
15✔
999
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
2✔
1000
                        nodes = append(nodes, azVolumeAttachment.Spec.NodeName)
1✔
1001
                }
1✔
1002
        }
1003
        w.Logger().V(5).Infof("Nodes with replica AzVolumeAttachments for volume %s are: %v, Len: %d", volumeName, nodes, len(nodes))
14✔
1004
        return nodes, nil
14✔
1005
}
1006

1007
func (c *SharedState) createReplicaAzVolumeAttachment(ctx context.Context, volumeID, node string, volumeContext map[string]string) error {
12✔
1008
        var err error
12✔
1009
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, node))
12✔
1010
        defer func() { w.Finish(err) }()
24✔
1011

1012
        var diskName string
12✔
1013
        diskName, err = azureutils.GetDiskName(volumeID)
12✔
1014
        if err != nil {
12✔
1015
                err = status.Errorf(codes.Internal, "failed to extract volume name from volumeID (%s)", volumeID)
×
1016
                return err
×
1017
        }
×
1018
        w.AddDetailToLogger(consts.VolumeNameLabel, diskName)
12✔
1019

12✔
1020
        w.Logger().V(5).Info("Creating replica AzVolumeAttachments")
12✔
1021
        if volumeContext == nil {
24✔
1022
                volumeContext = make(map[string]string)
12✔
1023
        }
12✔
1024
        // creating azvolumeattachment
1025
        volumeName := strings.ToLower(diskName)
12✔
1026
        replicaName := azureutils.GetAzVolumeAttachmentName(volumeName, node)
12✔
1027
        azVolumeAttachment := azdiskv1beta2.AzVolumeAttachment{
12✔
1028
                ObjectMeta: metav1.ObjectMeta{
12✔
1029
                        Name:      replicaName,
12✔
1030
                        Namespace: c.config.ObjectNamespace,
12✔
1031
                        Labels: map[string]string{
12✔
1032
                                consts.NodeNameLabel:   node,
12✔
1033
                                consts.VolumeNameLabel: volumeName,
12✔
1034
                                consts.RoleLabel:       string(azdiskv1beta2.ReplicaRole),
12✔
1035
                        },
12✔
1036
                        Annotations: map[string]string{consts.VolumeAttachRequestAnnotation: "controller"},
12✔
1037
                        Finalizers:  []string{consts.AzVolumeAttachmentFinalizer},
12✔
1038
                },
12✔
1039
                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
12✔
1040
                        NodeName:      node,
12✔
1041
                        VolumeID:      volumeID,
12✔
1042
                        VolumeName:    volumeName,
12✔
1043
                        RequestedRole: azdiskv1beta2.ReplicaRole,
12✔
1044
                        VolumeContext: volumeContext,
12✔
1045
                },
12✔
1046
        }
12✔
1047
        w.AnnotateObject(&azVolumeAttachment)
12✔
1048
        azureutils.AnnotateAPIVersion(&azVolumeAttachment)
12✔
1049

12✔
1050
        _, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).Create(ctx, &azVolumeAttachment, metav1.CreateOptions{})
12✔
1051
        if err != nil {
12✔
1052
                err = status.Errorf(codes.Internal, "failed to create replica AzVolumeAttachment %s.", replicaName)
×
1053
                return err
×
1054
        }
×
1055
        return nil
12✔
1056
}
1057

1058
func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azVolumeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
5✔
1059
        var err error
5✔
1060
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, azVolumeName))
5✔
1061
        defer func() { w.Finish(err) }()
10✔
1062

1063
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzVolume (%s)", caller, azVolumeName)
5✔
1064

5✔
1065
        var attachments []azdiskv1beta2.AzVolumeAttachment
5✔
1066
        attachments, err = azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, azVolumeName, role)
5✔
1067
        if err != nil {
5✔
1068
                if apiErrors.IsNotFound(err) {
×
1069
                        err = nil
×
1070
                        return nil, nil
×
1071
                }
×
1072
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1073
                return nil, err
×
1074
        }
1075

1076
        if err = c.cleanUpAzVolumeAttachments(ctx, attachments, cleanupMode, caller); err != nil {
5✔
1077
                return attachments, err
×
1078
        }
×
1079
        c.unmarkVolumeVisited(azVolumeName)
5✔
1080

5✔
1081
        if attachmentDeleteMode == deleteAndWait {
7✔
1082
                attachmentsCount := len(attachments)
2✔
1083
                errorMessageCh := make(chan string, attachmentsCount)
2✔
1084

2✔
1085
                // start waiting for replica AzVolumeAttachment CRIs to be deleted
2✔
1086
                for _, attachment := range attachments {
3✔
1087
                        // wait async and report error to go channel
1✔
1088
                        go func(ctx context.Context, attachment azdiskv1beta2.AzVolumeAttachment) {
2✔
1089
                                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
1✔
1090
                                defer waiter.Close()
1✔
1091

1✔
1092
                                _, derr := waiter.Wait(ctx)
1✔
1093
                                if derr != nil {
1✔
1094
                                        errorMessageCh <- fmt.Sprintf("%s: %v", attachment.Name, derr)
×
1095
                                } else {
1✔
1096
                                        errorMessageCh <- ""
1✔
1097
                                }
1✔
1098
                        }(ctx, attachment)
1099
                }
1100

1101
                // if errors have been found with the conditionWatcher calls, format the error msg and report via CRI
1102
                var errMsgs []string
2✔
1103
                for i := 0; i < attachmentsCount; i++ {
3✔
1104
                        v, ok := <-errorMessageCh
1✔
1105
                        if ok && v != "" {
1✔
1106
                                errMsgs = append(errMsgs, v)
×
1107
                        }
×
1108
                }
1109
                close(errorMessageCh)
2✔
1110
                if len(errMsgs) > 0 {
2✔
1111
                        err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
×
1112
                }
×
1113
        }
1114

1115
        return attachments, err
5✔
1116
}
1117

1118
func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDriverNodeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
2✔
1119
        var err error
2✔
1120
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, azDriverNodeName))
2✔
1121
        defer func() { w.Finish(err) }()
4✔
1122
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzDriverNode (%s)", caller, azDriverNodeName)
2✔
1123

2✔
1124
        var nodeRequirement *labels.Requirement
2✔
1125
        nodeRequirement, err = azureutils.CreateLabelRequirements(consts.NodeNameLabel, selection.Equals, azDriverNodeName)
2✔
1126
        if err != nil {
2✔
1127
                return nil, err
×
1128
        }
×
1129
        labelSelector := labels.NewSelector().Add(*nodeRequirement)
2✔
1130

2✔
1131
        var attachments *azdiskv1beta2.AzVolumeAttachmentList
2✔
1132
        attachments, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
2✔
1133
        if err != nil {
2✔
1134
                if apiErrors.IsNotFound(err) {
×
1135
                        err = nil
×
1136
                        return nil, nil
×
1137
                }
×
1138
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1139
                return nil, err
×
1140
        }
1141

1142
        cleanUpMap := map[string][]azdiskv1beta2.AzVolumeAttachment{}
2✔
1143
        for _, attachment := range attachments.Items {
3✔
1144
                if shouldCleanUp(attachment, role) {
2✔
1145
                        cleanUpMap[attachment.Spec.VolumeName] = append(cleanUpMap[attachment.Spec.VolumeName], attachment)
1✔
1146
                }
1✔
1147
        }
1148

1149
        for volumeName, cleanUps := range cleanUpMap {
3✔
1150
                volumeName := volumeName
1✔
1151
                c.addToOperationQueue(ctx,
1✔
1152
                        volumeName,
1✔
1153
                        caller,
1✔
1154
                        func(ctx context.Context) error {
2✔
1155
                                return c.cleanUpAzVolumeAttachments(ctx, cleanUps, cleanupMode, caller)
1✔
1156
                        },
1✔
1157
                        false)
1158
        }
1159
        return attachments.Items, nil
2✔
1160
}
1161

1162
func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachments []azdiskv1beta2.AzVolumeAttachment, cleanUpMode attachmentCleanUpMode, caller operationRequester) error {
6✔
1163
        var err error
6✔
1164

6✔
1165
        for _, attachment := range attachments {
11✔
1166
                patched := attachment.DeepCopy()
5✔
1167

5✔
1168
                if attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
5✔
1169
                        if cleanUpMode == cleanUpAttachment && !volumeDetachRequested(patched) {
×
1170
                                markDetachRequest(patched, caller)
×
1171
                        } else if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1172
                                // if primary azvolumeattachments are being cleaned up for driver uninstall, issue a DELETE call and continue
×
1173
                                // note that this DELETE request will remove AzVolumeAttachment CRI without detaching the volume from node
×
1174
                                if err = c.cachedClient.Delete(ctx, patched); err != nil {
×
1175
                                        return err
×
1176
                                }
×
1177
                        }
1178
                } else {
5✔
1179
                        // replica mount should always be detached in cleanup regardless to the cleanup mode
5✔
1180
                        if !volumeDetachRequested(patched) {
10✔
1181
                                markDetachRequest(patched, caller)
5✔
1182
                        }
5✔
1183

1184
                        // append cleanup annotation to prevent replica recreations except for when the clean up was triggered by node controller due to node failure.
1185
                        if caller != node && !metav1.HasAnnotation(patched.ObjectMeta, consts.CleanUpAnnotation) {
9✔
1186
                                markCleanUp(patched, caller)
4✔
1187
                        }
4✔
1188
                }
1189

1190
                if !reflect.DeepEqual(attachment.Status, patched.Status) {
10✔
1191
                        if err = c.cachedClient.Status().Patch(ctx, patched, client.MergeFrom(&attachment)); err != nil && apiErrors.IsNotFound(err) {
5✔
1192
                                err = status.Errorf(codes.Internal, "failed to patch AzVolumeAttachment (%s)", attachment.Name)
×
1193
                                return err
×
1194
                        }
×
1195
                }
1196
        }
1197
        return nil
6✔
1198
}
1199

1200
func (c *SharedState) createReplicaRequestsQueue() {
88✔
1201
        c.priorityReplicaRequestsQueue = &VolumeReplicaRequestsPriorityQueue{}
88✔
1202
        c.priorityReplicaRequestsQueue.queue = cache.NewHeap(
88✔
1203
                func(obj interface{}) (string, error) {
90✔
1204
                        return obj.(*ReplicaRequest).VolumeName, nil
2✔
1205
                },
2✔
1206
                func(left, right interface{}) bool {
×
1207
                        return left.(*ReplicaRequest).Priority > right.(*ReplicaRequest).Priority
×
1208
                })
×
1209
}
1210

1211
// Removes replica requests from the priority queue and adds to operation queue.
1212
func (c *SharedState) tryCreateFailedReplicas(ctx context.Context, requester operationRequester) {
2✔
1213
        if atomic.SwapInt32(&c.processingReplicaRequestQueue, 1) == 0 {
4✔
1214
                ctx, w := workflow.New(ctx)
2✔
1215
                defer w.Finish(nil)
2✔
1216
                requests := c.priorityReplicaRequestsQueue.DrainQueue()
2✔
1217
                for i := 0; i < len(requests); i++ {
3✔
1218
                        replicaRequest := requests[i]
1✔
1219
                        c.addToOperationQueue(ctx,
1✔
1220
                                replicaRequest.VolumeName,
1✔
1221
                                requester,
1✔
1222
                                func(ctx context.Context) error {
2✔
1223
                                        return c.manageReplicas(ctx, replicaRequest.VolumeName)
1✔
1224
                                },
1✔
1225
                                false,
1226
                        )
1227
                }
1228
                atomic.StoreInt32(&c.processingReplicaRequestQueue, 0)
2✔
1229
        }
1230
}
1231

1232
func (c *SharedState) garbageCollectReplicas(ctx context.Context, volumeName string, requester operationRequester) {
3✔
1233
        c.addToOperationQueue(
3✔
1234
                ctx,
3✔
1235
                volumeName,
3✔
1236
                replica,
3✔
1237
                func(ctx context.Context) error {
6✔
1238
                        if _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, volumeName, requester, azureutils.ReplicaOnly, cleanUpAttachment, deleteOnly); err != nil {
3✔
1239
                                return err
×
1240
                        }
×
1241
                        c.addToGcExclusionList(volumeName, requester)
3✔
1242
                        c.removeGarbageCollection(volumeName)
3✔
1243
                        c.unmarkVolumeVisited(volumeName)
3✔
1244
                        return nil
3✔
1245
                },
1246
                true,
1247
        )
1248
}
1249

1250
func (c *SharedState) removeGarbageCollection(volumeName string) {
5✔
1251
        v, ok := c.cleanUpMap.LoadAndDelete(volumeName)
5✔
1252
        if ok {
8✔
1253
                cancelFunc := v.(context.CancelFunc)
3✔
1254
                cancelFunc()
3✔
1255
        }
3✔
1256
        // if there is any garbage collection enqueued in operation queue, remove it
1257
        c.dequeueGarbageCollection(volumeName)
5✔
1258
}
1259

1260
func (c *SharedState) manageReplicas(ctx context.Context, volumeName string) error {
13✔
1261
        var err error
13✔
1262
        ctx, w := workflow.New(ctx)
13✔
1263
        defer func() { w.Finish(err) }()
26✔
1264

1265
        var azVolume *azdiskv1beta2.AzVolume
13✔
1266
        azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volumeName, c.config.ObjectNamespace, true)
13✔
1267

13✔
1268
        // in case the volume attachment succeeds or terminally errors out
13✔
1269
        c.unpersistAttachmentFailure(volumeName) // attempt un-persisting a previous failure (if the event refresher is active)
13✔
1270

13✔
1271
        if apiErrors.IsNotFound(err) {
13✔
1272
                w.Logger().V(5).Info("Volume no longer exists. Aborting manage replica operation")
×
1273
                return nil
×
1274
        } else if err != nil {
13✔
1275
                w.Logger().Error(err, "failed to get AzVolume")
×
1276
                return err
×
1277
        }
×
1278

1279
        // replica management should not be executed or retried if AzVolume is scheduled for a deletion or not created.
1280
        deleteRequested, _ := objectDeletionRequested(azVolume)
13✔
1281
        if !isCreated(azVolume) || deleteRequested {
13✔
1282
                w.Logger().Errorf(errors.New("no valid azVolume"), "azVolume (%s) is scheduled for deletion or has no underlying volume object", azVolume.Name)
×
1283
                return nil
×
1284
        }
×
1285

1286
        currentReplicaCount, err := c.countValidReplicasForVolume(ctx, volumeName)
13✔
1287
        if err != nil {
13✔
1288
                return err
×
1289
        }
×
1290

1291
        desiredReplicaCount := azVolume.Spec.MaxMountReplicaCount
13✔
1292
        w.Logger().Infof("Control number of replicas for volume (%s): desired=%d, current:%d", azVolume.Spec.VolumeName, desiredReplicaCount, currentReplicaCount)
13✔
1293

13✔
1294
        if desiredReplicaCount > currentReplicaCount {
26✔
1295
                w.Logger().Infof("Need %d more replicas for volume (%s)", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName)
13✔
1296
                if azVolume.Status.Detail == nil || azVolume.Status.State == azdiskv1beta2.VolumeDeleting || azVolume.Status.State == azdiskv1beta2.VolumeDeleted {
13✔
1297
                        // underlying volume does not exist, so volume attachment cannot be made
×
1298
                        return nil
×
1299
                }
×
1300
                if err = c.createReplicas(ctx, desiredReplicaCount-currentReplicaCount, azVolume.Name, azVolume.Status.Detail.VolumeID, azVolume.Spec.Parameters); err != nil {
13✔
1301
                        w.Logger().Errorf(err, "failed to create %d replicas for volume (%s): %v", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName, err)
×
1302
                        return err
×
1303
                }
×
1304
        }
1305
        return nil
13✔
1306
}
1307

1308
// Count the number of replica attachments that aren't scheduled for deletion for a given volume
1309
func (c *SharedState) countValidReplicasForVolume(ctx context.Context, volumeName string) (int, error) {
13✔
1310
        w, _ := workflow.GetWorkflowFromContext(ctx)
13✔
1311
        validReplicaCount := 0
13✔
1312

13✔
1313
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
13✔
1314
        if err != nil {
13✔
1315
                w.Logger().Errorf(err, "failed to list replica AzVolumeAttachments")
×
1316
                return validReplicaCount, err
×
1317
        }
×
1318

1319
        for _, azVolumeAttachment := range azVolumeAttachments {
14✔
1320
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
2✔
1321
                        validReplicaCount++
1✔
1322
                }
1✔
1323
        }
1324
        return validReplicaCount, nil
13✔
1325
}
1326

1327
func (c *SharedState) createReplicas(ctx context.Context, remainingReplicas int, volumeName, volumeID string, volumeContext map[string]string) error {
13✔
1328
        var err error
13✔
1329
        ctx, w := workflow.New(ctx)
13✔
1330
        defer func() { w.Finish(err) }()
26✔
1331

1332
        // if volume is scheduled for clean up, skip replica creation
1333
        if _, cleanUpScheduled := c.cleanUpMap.Load(volumeName); cleanUpScheduled {
13✔
1334
                return nil
×
1335
        }
×
1336

1337
        // get pods linked to the volume
1338
        var pods []v1.Pod
13✔
1339
        pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
13✔
1340
        if err != nil {
13✔
1341
                return err
×
1342
        }
×
1343

1344
        // acquire per-pod lock to be released upon creation of replica AzVolumeAttachment CRIs
1345
        for _, pod := range pods {
27✔
1346
                podKey := getQualifiedName(pod.Namespace, pod.Name)
14✔
1347
                v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
14✔
1348
                podLock := v.(*sync.Mutex)
14✔
1349
                podLock.Lock()
14✔
1350
                defer podLock.Unlock()
14✔
1351
        }
14✔
1352

1353
        var nodes []string
13✔
1354
        nodes, err = c.getNodesForReplica(ctx, volumeName, pods)
13✔
1355
        if err != nil {
13✔
1356
                w.Logger().Errorf(err, "failed to get a list of nodes for replica attachment")
×
1357
                return err
×
1358
        }
×
1359

1360
        requiredReplicas := remainingReplicas
13✔
1361
        for _, node := range nodes {
25✔
1362
                if err = c.createReplicaAzVolumeAttachment(ctx, volumeID, node, volumeContext); err != nil {
12✔
1363
                        w.Logger().Errorf(err, "failed to create replica AzVolumeAttachment for volume %s", volumeName)
×
1364
                        // continue to try attachment with next node
×
1365
                        continue
×
1366
                }
1367
                remainingReplicas--
12✔
1368
                if remainingReplicas <= 0 {
24✔
1369
                        // no more remainingReplicas, don't need to create replica AzVolumeAttachment
12✔
1370
                        break
12✔
1371
                }
1372
        }
1373

1374
        if remainingReplicas > 0 {
14✔
1375
                //no failed replica attachments, but there are still more replicas to reach MaxShares
1✔
1376
                request := ReplicaRequest{VolumeName: volumeName, Priority: remainingReplicas}
1✔
1377
                c.priorityReplicaRequestsQueue.Push(ctx, &request)
1✔
1378
                message := fmt.Sprintf("Not enough suitable nodes to attach %d of %d replica mount(s) for volume %s", remainingReplicas, requiredReplicas, volumeName)
1✔
1379
                podCopies := make([]runtime.Object, len(pods))
1✔
1380
                for i, pod := range pods {
2✔
1381
                        podCopies[i] = pod.DeepCopyObject()
1✔
1382
                }
1✔
1383
                timestamp := time.Now()
1✔
1384
                for _, podCopy := range podCopies {
2✔
1385
                        c.eventRecorder.Eventf(podCopy, v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, message)
1✔
1386
                }
1✔
1387
                message += timestamp.UTC().Format(" [0102 15:04:05") // append original timestamp
1✔
1388
                c.persistAttachmentFailure(volumeName, message, podCopies, timestamp)
1✔
1389
        }
1390
        return nil
13✔
1391
}
1392

1393
func (c *SharedState) persistAttachmentFailure(volumeName string, message string, podCopies []runtime.Object, timestamp time.Time) {
1✔
1394
        c.eventsToPersistQueue <- ReplicaAttachmentFailureInfo{volumeName, message, podCopies, timestamp}
1✔
1395
}
1✔
1396

1397
// Only requests an event un-persist if the event refresher is active
1398
func (c *SharedState) unpersistAttachmentFailure(volumeName string) {
13✔
1399
        c.eventsToUnpersistQueue <- volumeName
13✔
1400
}
13✔
1401

1402
func (c *SharedState) createEventQueues() {
88✔
1403
        c.eventsToPersistQueue = make(chan ReplicaAttachmentFailureInfo, c.config.ControllerConfig.WorkerThreads)
88✔
1404
        c.eventsToUnpersistQueue = make(chan string, c.config.ControllerConfig.WorkerThreads*2)
88✔
1405
        go c._eventRefresherRoutine()
88✔
1406
}
88✔
1407

1408
func (c *SharedState) _eventRefresherRoutine() {
88✔
1409
        type eventInfo struct {
88✔
1410
                message   string
88✔
1411
                timestamp time.Time
88✔
1412
                objects   []runtime.Object
88✔
1413
        }
88✔
1414

88✔
1415
        var eventTTL = time.Duration(c.config.ControllerConfig.EventTTLInSec) * time.Second
88✔
1416
        eventMap := map[string]*circularLinkedListNode[eventInfo]{}
88✔
1417

88✔
1418
        var events circularLinkedList[eventInfo]
88✔
1419
        var lastTime time.Time
88✔
1420
        var delay time.Duration            // how long the alarm was last set to wait for
88✔
1421
        expLatency := eventOverlapVariance // we request the alarm to wake us up earlier by this amount, to negate latency due to other operations and timer imprecision
88✔
1422
        alarm := time.NewTimer(math.MaxInt64)
88✔
1423
        if !alarm.Stop() {
88✔
1424
                <-alarm.C
×
1425
        }
×
1426

1427
        for {
193✔
1428
                select {
105✔
1429
                case newFailureInfo := <-c.eventsToPersistQueue:
1✔
1430
                        // add the new failure
1✔
1431
                        newEvent := &circularLinkedListNode[eventInfo]{
1✔
1432
                                curr: eventInfo{
1✔
1433
                                        message:   newFailureInfo.message,
1✔
1434
                                        objects:   newFailureInfo.pods,
1✔
1435
                                        timestamp: newFailureInfo.timestamp.Add(eventTTL - eventOverlapDuration),
1✔
1436
                                },
1✔
1437
                        }
1✔
1438

1✔
1439
                        if events.isEmpty() {
2✔
1440
                                lastTime = time.Now()
1✔
1441
                                delay = newEvent.curr.timestamp.Sub(lastTime) - expLatency
1✔
1442
                                alarm.Reset(delay)
1✔
1443
                        }
1✔
1444

1445
                        events.add(newEvent)
1✔
1446

1✔
1447
                        eventMap[newFailureInfo.volumeName].tryRemove()
1✔
1448
                        eventMap[newFailureInfo.volumeName] = newEvent
1✔
1449
                case newVolumeName := <-c.eventsToUnpersistQueue:
13✔
1450
                        oldEvent := eventMap[newVolumeName]
13✔
1451
                        if oldEvent != nil {
13✔
1452
                                delete(eventMap, newVolumeName)
×
1453
                                if oldEvent.next == oldEvent {
×
1454
                                        // no more events to refresh!
×
1455
                                        events.clear()
×
1456
                                } else {
×
1457
                                        oldEvent.remove()
×
1458
                                }
×
1459
                        }
1460
                case <-alarm.C:
3✔
1461
                        currTime := time.Now()
3✔
1462
                        currLatency := expLatency + currTime.Sub(lastTime) - delay
3✔
1463
                        expLatency = (expLatency*3 + currLatency) / 4 // update the expected latency with a 25% expontially-weighted moving average
3✔
1464
                        lastTime = currTime
3✔
1465
                        for {
8✔
1466
                                for _, object := range events.curr.objects { // can we keep this as is? or do we need to check / keep track of whether each pod is still relevant
10✔
1467
                                        c.eventRecorder.Event(object, v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, events.curr.message)
5✔
1468
                                }
5✔
1469
                                events.curr.timestamp = events.curr.timestamp.Add(eventTTL)
5✔
1470
                                events.next()
5✔
1471
                                delay = time.Until(events.curr.timestamp) - expLatency
5✔
1472
                                if delay >= eventOverlapVariance {
8✔
1473
                                        break
3✔
1474
                                }
1475
                                // events which are really close together will be processed at the same, preventing the overuse of sleep()
1476
                                // event refresher's responsiviness may be more variable, so we'll increase its channel buffers as needed
1477
                        }
1478
                        alarm.Reset(delay)
3✔
1479
                }
1480
        }
1481
}
1482

1483
func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string, pods []v1.Pod) ([]string, error) {
13✔
1484
        var err error
13✔
1485
        ctx, w := workflow.New(ctx)
13✔
1486
        defer func() { w.Finish(err) }()
26✔
1487

1488
        if len(pods) == 0 {
13✔
1489
                pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
×
1490
                if err != nil {
×
1491
                        return nil, err
×
1492
                }
×
1493
        }
1494

1495
        var volumes []string
13✔
1496
        volumes, err = c.getVolumesForPodObjs(ctx, pods)
13✔
1497
        if err != nil {
13✔
1498
                return nil, err
×
1499
        }
×
1500

1501
        var nodes []string
13✔
1502
        nodes, err = c.getRankedNodesForReplicaAttachments(ctx, volumes, pods)
13✔
1503
        if err != nil {
13✔
1504
                return nil, err
×
1505
        }
×
1506

1507
        var replicaNodes []string
13✔
1508
        replicaNodes, err = c.getNodesWithReplica(ctx, volumeName)
13✔
1509
        if err != nil {
13✔
1510
                return nil, err
×
1511
        }
×
1512

1513
        skipSet := map[string]bool{}
13✔
1514
        for _, replicaNode := range replicaNodes {
14✔
1515
                skipSet[replicaNode] = true
1✔
1516
        }
1✔
1517

1518
        filtered := []string{}
13✔
1519
        for _, node := range nodes {
33✔
1520
                if skipSet[node] {
21✔
1521
                        continue
1✔
1522
                }
1523
                // if the node has no capacity for disk attachment, we should skip it
1524
                remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
19✔
1525
                if !nodeExists || remainingCapacity == nil || remainingCapacity.(*atomic.Int32).Load() <= int32(0) {
20✔
1526
                        w.Logger().V(5).Infof("skip node(%s) because it has no capacity for disk attachment", node)
1✔
1527
                        continue
1✔
1528
                }
1529
                filtered = append(filtered, node)
18✔
1530
        }
1531

1532
        return filtered, nil
13✔
1533
}
1534

1535
func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error {
5✔
1536
        var err error
5✔
1537
        ctx, w := workflow.New(ctx)
5✔
1538
        defer func() { w.Finish(err) }()
10✔
1539

1540
        var desiredAzVolume *azdiskv1beta2.AzVolume
5✔
1541
        requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64()
5✔
1542
        volumeCapability := c.getVolumeCapabilityFromPv(&pv)
5✔
1543

5✔
1544
        // translate intree pv to csi pv to convert them into AzVolume resource
5✔
1545
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
5✔
1546
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
5✔
1547
                pv.Spec.AzureDisk != nil {
5✔
1548
                var transPV *v1.PersistentVolume
×
1549
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1550
                if transPV, err = c.translateInTreePVToCSI(&pv); err != nil {
×
1551
                        return err
×
1552
                }
×
1553
                pv = *transPV
×
1554
        }
1555

1556
        // skip if PV is not managed by azuredisk driver
1557
        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != c.config.DriverName {
5✔
1558
                return nil
×
1559
        }
×
1560

1561
        // create AzVolume CRI for CSI Volume Source
1562
        desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI)
5✔
1563
        if err != nil {
5✔
1564
                return err
×
1565
        }
×
1566

1567
        if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required != nil {
5✔
1568
                desiredAzVolume.Status.Detail.AccessibleTopology = azureutils.GetTopologyFromNodeSelector(*pv.Spec.NodeAffinity.Required, c.topologyKey)
×
1569
        }
×
1570
        if azureutils.IsMultiNodePersistentVolume(pv) {
5✔
1571
                desiredAzVolume.Spec.MaxMountReplicaCount = 0
×
1572
        }
×
1573

1574
        // if it's an inline volume, no pv label or pvc label should be added
1575
        if !azureutils.MapContains(annotations, consts.InlineVolumeAnnotation) {
9✔
1576
                desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvNameLabel, pv.Name)
4✔
1577

4✔
1578
                if pv.Spec.ClaimRef != nil {
8✔
1579
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNameLabel, pv.Spec.ClaimRef.Name)
4✔
1580
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNamespaceLabel, pv.Spec.ClaimRef.Namespace)
4✔
1581
                }
4✔
1582
        }
1583

1584
        desiredAzVolume.Spec.VolumeCapability = volumeCapability
5✔
1585
        desiredAzVolume.Spec.PersistentVolume = pv.Name
5✔
1586
        desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes}
5✔
1587

5✔
1588
        desiredAzVolume.Status.Detail.CapacityBytes = requiredBytes
5✔
1589

5✔
1590
        for k, v := range annotations {
7✔
1591
                desiredAzVolume.Status.Annotations = azureutils.AddToMap(desiredAzVolume.Status.Annotations, k, v)
2✔
1592
        }
2✔
1593

1594
        w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name)
5✔
1595

5✔
1596
        if err = c.createAzVolume(ctx, desiredAzVolume); err != nil {
5✔
1597
                err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err)
×
1598
                return err
×
1599
        }
×
1600
        return nil
5✔
1601
}
1602

1603
func (c *SharedState) getVolumeCapabilityFromPv(pv *v1.PersistentVolume) []azdiskv1beta2.VolumeCapability {
5✔
1604
        volCaps := []azdiskv1beta2.VolumeCapability{}
5✔
1605

5✔
1606
        for _, accessMode := range pv.Spec.AccessModes {
6✔
1607
                volCap := azdiskv1beta2.VolumeCapability{}
1✔
1608
                // default to Mount
1✔
1609
                if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == v1.PersistentVolumeBlock {
1✔
1610
                        volCap.AccessType = azdiskv1beta2.VolumeCapabilityAccessBlock
×
1611
                }
×
1612
                switch accessMode {
1✔
1613
                case v1.ReadWriteOnce:
1✔
1614
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeSingleNodeSingleWriter
1✔
1615
                case v1.ReadWriteMany:
×
1616
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeMultiWriter
×
1617
                case v1.ReadOnlyMany:
×
1618
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeReaderOnly
×
1619
                default:
×
1620
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeUnknown
×
1621
                }
1622
                volCaps = append(volCaps, volCap)
1✔
1623
        }
1624
        return volCaps
5✔
1625
}
1626

1627
func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) {
5✔
1628
        diskName, err := azureutils.GetDiskName(source.VolumeHandle)
5✔
1629
        if err != nil {
5✔
1630
                return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err)
×
1631
        }
×
1632

1633
        _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false)
5✔
1634

5✔
1635
        diskParameters, _ := azureutils.ParseDiskParameters(source.VolumeAttributes, azureutils.IgnoreUnknown)
5✔
1636
        volumeParams := diskParameters.VolumeContext
5✔
1637

5✔
1638
        azVolumeName := strings.ToLower(diskName)
5✔
1639

5✔
1640
        azVolume := azdiskv1beta2.AzVolume{
5✔
1641
                ObjectMeta: metav1.ObjectMeta{
5✔
1642
                        Name:       azVolumeName,
5✔
1643
                        Finalizers: []string{consts.AzVolumeFinalizer},
5✔
1644
                },
5✔
1645
                Spec: azdiskv1beta2.AzVolumeSpec{
5✔
1646
                        MaxMountReplicaCount: maxMountReplicaCount,
5✔
1647
                        Parameters:           volumeParams,
5✔
1648
                        VolumeName:           diskName,
5✔
1649
                },
5✔
1650
                Status: azdiskv1beta2.AzVolumeStatus{
5✔
1651
                        Detail: &azdiskv1beta2.AzVolumeStatusDetail{
5✔
1652
                                VolumeID:      source.VolumeHandle,
5✔
1653
                                VolumeContext: source.VolumeAttributes,
5✔
1654
                        },
5✔
1655
                        State: azdiskv1beta2.VolumeCreated,
5✔
1656
                },
5✔
1657
        }
5✔
1658
        azureutils.AnnotateAPIVersion(&azVolume)
5✔
1659

5✔
1660
        return &azVolume, nil
5✔
1661
}
1662

1663
func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error {
5✔
1664
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1665

5✔
1666
        var err error
5✔
1667
        var azVolume *azdiskv1beta2.AzVolume
5✔
1668
        var updated *azdiskv1beta2.AzVolume
5✔
1669

5✔
1670
        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{})
5✔
1671
        if err != nil {
9✔
1672
                if apiErrors.IsNotFound(err) {
8✔
1673
                        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{})
4✔
1674
                        if err != nil {
4✔
1675
                                return err
×
1676
                        }
×
1677
                        updated = azVolume.DeepCopy()
4✔
1678
                        updated.Status = desiredAzVolume.Status
4✔
1679
                } else {
×
1680
                        return err
×
1681
                }
×
1682
        }
1683

1684
        if apiVersion, ok := azureutils.GetFromMap(azVolume.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
6✔
1685
                w.Logger().Infof("Found AzVolume (%s) with older api version. Converting to apiVersion(%s)", azVolume.Name, azdiskv1beta2.APIVersion)
1✔
1686

1✔
1687
                azVolume.Spec.PersistentVolume = desiredAzVolume.Spec.PersistentVolume
1✔
1688

1✔
1689
                for k, v := range desiredAzVolume.Labels {
4✔
1690
                        azVolume.Labels = azureutils.AddToMap(azVolume.Labels, k, v)
3✔
1691
                }
3✔
1692

1693
                // for now, we don't empty the meta annotations after migrating them to status annotation for safety.
1694
                // note that this will leave some remnant garbage entries in meta annotations
1695
                var statusAnnotation []string
1✔
1696
                for k, v := range azVolume.Annotations {
2✔
1697
                        statusAnnotation = append(statusAnnotation, k, v)
1✔
1698
                }
1✔
1699

1700
                for k, v := range desiredAzVolume.Annotations {
2✔
1701
                        azVolume.Annotations = azureutils.AddToMap(azVolume.Annotations, k, v)
1✔
1702
                }
1✔
1703

1704
                azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Update(ctx, azVolume, metav1.UpdateOptions{})
1✔
1705
                if err != nil {
1✔
1706
                        return err
×
1707
                }
×
1708
                updated = azVolume.DeepCopy()
1✔
1709
                updated.Status.Annotations = azureutils.AddToMap(updated.Status.Annotations, statusAnnotation...)
1✔
1710
        }
1711

1712
        if updated != nil {
10✔
1713
                if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, c.cachedClient, c.azClient, azVolume, func(obj client.Object) error {
10✔
1714
                        azvolume := obj.(*azdiskv1beta2.AzVolume)
5✔
1715
                        azvolume.Status = updated.Status
5✔
1716
                        return nil
5✔
1717
                }, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
5✔
1718
                        return err
×
1719
                }
×
1720
        }
1721

1722
        // if AzVolume CRI successfully recreated, also recreate the operation queue for the volume
1723
        c.createOperationQueue(desiredAzVolume.Name)
5✔
1724
        return nil
5✔
1725
}
1726

1727
func (c *SharedState) translateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
×
1728
        var err error
×
1729
        // translate intree pv to csi pv to convert them into AzVolume resource
×
1730
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
1731
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
1732
                pv.Spec.AzureDisk != nil {
×
1733
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1734
                if pv, err = c.azureDiskCSITranslator.TranslateInTreePVToCSI(pv); err != nil {
×
1735
                } else if pv == nil {
×
1736
                        err = status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi")
×
1737
                }
×
1738

1739
        }
1740
        return pv, err
×
1741
}
1742

1743
// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller
1744
func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) {
2✔
1745
        var vaName string
2✔
1746
        err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) {
4✔
1747
                val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name)
2✔
1748
                if exists {
4✔
1749
                        vaName = val.(string)
2✔
1750
                }
2✔
1751
                return exists, nil
2✔
1752
        })
1753
        return vaName, err
2✔
1754
}
1755

1756
// Returns set of node names that qualify pod affinity term and set of node names with qualifying replica attachments.
1757
func (c *SharedState) getQualifiedNodesForPodAffinityTerm(ctx context.Context, nodes []v1.Node, podNamespace string, affinityTerm v1.PodAffinityTerm) (podNodes, replicaNodes set) {
5✔
1758
        var err error
5✔
1759
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1760
        candidateNodes := set{}
5✔
1761
        for _, node := range nodes {
20✔
1762
                candidateNodes.add(node.Name)
15✔
1763
        }
15✔
1764
        podNodes = set{}
5✔
1765
        replicaNodes = set{}
5✔
1766

5✔
1767
        var podSelector labels.Selector
5✔
1768
        podSelector, err = metav1.LabelSelectorAsSelector(affinityTerm.LabelSelector)
5✔
1769
        // if failed to convert pod affinity label selector to selector, log error and skip
5✔
1770
        if err != nil {
5✔
1771
                w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1772
        }
×
1773

1774
        nsList := &v1.NamespaceList{}
5✔
1775
        if affinityTerm.NamespaceSelector != nil {
5✔
1776
                nsSelector, err := metav1.LabelSelectorAsSelector(affinityTerm.NamespaceSelector)
×
1777
                // if failed to convert pod affinity label selector to selector, log error and skip
×
1778
                if err != nil {
×
1779
                        w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1780
                } else {
×
1781
                        if err = c.cachedClient.List(ctx, nsList, &client.ListOptions{LabelSelector: nsSelector}); err != nil {
×
1782
                                w.Logger().Errorf(err, "failed to list namespaces with selector (%v)", nsSelector)
×
1783
                                return
×
1784
                        }
×
1785

1786
                }
1787
        }
1788

1789
        namespaces := affinityTerm.Namespaces
5✔
1790
        for _, ns := range nsList.Items {
5✔
1791
                namespaces = append(namespaces, ns.Name)
×
1792
        }
×
1793

1794
        pods := []v1.Pod{}
5✔
1795
        if len(namespaces) > 0 {
5✔
1796
                for _, namespace := range namespaces {
×
1797
                        podList := &v1.PodList{}
×
1798
                        if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: namespace}); err != nil {
×
1799
                                w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1800
                                pods = append(pods, podList.Items...)
×
1801
                        }
×
1802
                }
1803
        } else {
5✔
1804
                podList := &v1.PodList{}
5✔
1805
                if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: podNamespace}); err != nil {
5✔
1806
                        w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1807
                }
×
1808
                pods = podList.Items
5✔
1809
        }
1810

1811
        // get replica nodes for pods that satisfy pod label selector
1812
        replicaNodes = c.getReplicaNodesForPods(ctx, pods)
5✔
1813
        for replicaNode := range replicaNodes {
5✔
1814
                if !candidateNodes.has(replicaNode) {
×
1815
                        replicaNodes.remove(replicaNode)
×
1816
                }
×
1817
        }
1818

1819
        // get nodes with pod that share the same topology as pods satisfying pod label selector
1820
        for _, pod := range pods {
9✔
1821
                podNodes.add(pod.Spec.NodeName)
4✔
1822
        }
4✔
1823

1824
        var podNodeObjs []v1.Node
5✔
1825
        for node := range podNodes {
9✔
1826
                var nodeObj v1.Node
4✔
1827
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: node.(string)}, &nodeObj); err != nil {
4✔
1828
                        w.Logger().Errorf(err, "failed to get node (%s)", node.(string))
×
1829
                        continue
×
1830
                }
1831
                podNodeObjs = append(podNodeObjs, nodeObj)
4✔
1832
        }
1833

1834
        topologyLabel := c.getNodesTopologySelector(ctx, podNodeObjs, affinityTerm.TopologyKey)
5✔
1835
        for _, node := range nodes {
20✔
1836
                if topologyLabel != nil && topologyLabel.Matches(labels.Set(node.Labels)) {
23✔
1837
                        podNodes.add(node.Name)
8✔
1838
                }
8✔
1839
        }
1840
        return
5✔
1841
}
1842

1843
// Returns set of node names where replica mounts of given pod can be found
1844
func (c *SharedState) getReplicaNodesForPods(ctx context.Context, pods []v1.Pod) (replicaNodes set) {
5✔
1845
        // add nodes, to which replica attachments of matching pods' volumes are attached, to replicaNodes
5✔
1846
        replicaNodes = set{}
5✔
1847
        if volumes, err := c.getVolumesForPodObjs(ctx, pods); err == nil {
10✔
1848
                for _, volume := range volumes {
5✔
1849
                        attachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.ReplicaOnly)
×
1850
                        if err != nil {
×
1851
                                continue
×
1852
                        }
1853
                        for _, attachment := range attachments {
×
1854
                                if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1855
                                        node := attachment.Spec.NodeName
×
1856
                                        replicaNodes.add(node)
×
1857
                                }
×
1858
                        }
1859
                }
1860
        }
1861

1862
        return replicaNodes
5✔
1863
}
1864

1865
// Returns a label selector corresponding to a list of nodes and a topology key (aka label key)
1866
func (c *SharedState) getNodesTopologySelector(ctx context.Context, nodes []v1.Node, topologyKey string) labels.Selector {
5✔
1867
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1868
        if len(nodes) == 0 {
6✔
1869
                return nil
1✔
1870
        }
1✔
1871

1872
        topologyValues := set{}
4✔
1873
        for _, node := range nodes {
8✔
1874
                nodeLabels := node.GetLabels()
4✔
1875
                if topologyValue, exists := nodeLabels[topologyKey]; exists {
8✔
1876
                        topologyValues.add(topologyValue)
4✔
1877
                } else {
4✔
1878
                        w.Logger().V(5).Infof("node (%s) doesn't have label value for topologyKey (%s)", node.Name, topologyKey)
×
1879
                }
×
1880
        }
1881

1882
        topologySelector := labels.NewSelector()
4✔
1883
        topologyRequirement, err := azureutils.CreateLabelRequirements(topologyKey, selection.In, topologyValues.toStringSlice()...)
4✔
1884
        // if failed to create label requirement, log error and return empty selector
4✔
1885
        if err != nil {
4✔
1886
                w.Logger().Errorf(err, "failed to create label requirement for topologyKey (%s)", topologyKey)
×
1887
        } else {
4✔
1888
                topologySelector = topologySelector.Add(*topologyRequirement)
4✔
1889
        }
4✔
1890
        return topologySelector
4✔
1891
}
1892

1893
// addNodeToAvailableAttachmentsMap returns true if the node is added to or already in the availableAttachmentsMap, and false otherwise.
1894
func (c *SharedState) addNodeToAvailableAttachmentsMap(ctx context.Context, nodeName string, nodeLables map[string]string) bool {
8✔
1895
        if _, ok := c.availableAttachmentsMap.Load(nodeName); !ok {
16✔
1896
                capacity, err := azureutils.GetNodeRemainingDiskCountActual(ctx, c.cachedClient, nodeName)
8✔
1897
                if err != nil {
10✔
1898
                        klog.Errorf("Failed to get node(%s) remaining disk count with error: %v", nodeName, err)
2✔
1899
                        // store the maximum capacity if an entry for the node doesn't exist.
2✔
1900
                        capacity, err = azureutils.GetNodeMaxDiskCountWithLabels(nodeLables)
2✔
1901
                        if err != nil {
3✔
1902
                                klog.Errorf("Failed to add node(%s) in availableAttachmentsMap, because get capacity of available attachments is failed with error: %v", nodeName, err)
1✔
1903
                                return false
1✔
1904
                        }
1✔
1905
                }
1906
                var count atomic.Int32
7✔
1907
                count.Store(int32(capacity))
7✔
1908
                klog.Infof("Added node(%s) to availableAttachmentsMap with capacity: %d", nodeName, capacity)
7✔
1909
                c.availableAttachmentsMap.LoadOrStore(nodeName, &count)
7✔
1910
        }
1911
        return true
7✔
1912
}
1913

1914
func (c *SharedState) deleteNodeFromAvailableAttachmentsMap(ctx context.Context, node string) {
3✔
1915
        klog.Infof("Deleted node(%s) from availableAttachmentsMap", node)
3✔
1916
        c.availableAttachmentsMap.Delete(node)
3✔
1917
}
3✔
1918

1919
func (c *SharedState) decrementNodeCapacity(ctx context.Context, node string) bool {
4✔
1920
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
4✔
1921
        if nodeExists && remainingCapacity != nil {
7✔
1922
                for {
6✔
1923
                        currentCapacity := remainingCapacity.(*atomic.Int32).Load()
3✔
1924
                        if currentCapacity == int32(0) {
4✔
1925
                                klog.Errorf("Failed to decrement disk capacity for node(%s) because no remaining capacity", node)
1✔
1926
                                return false
1✔
1927
                        }
1✔
1928
                        if remainingCapacity.(*atomic.Int32).CompareAndSwap(currentCapacity, currentCapacity-1) {
4✔
1929
                                return true
2✔
1930
                        }
2✔
1931
                }
1932
        }
1933

1934
        klog.Errorf("Failed to decrement disk capacity because node(%s) not found", node)
1✔
1935
        return false
1✔
1936
}
1937

1938
func (c *SharedState) incrementNodeCapacity(ctx context.Context, node string) bool {
3✔
1939
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
3✔
1940
        if nodeExists && remainingCapacity != nil {
5✔
1941
                remainingCapacity.(*atomic.Int32).Add(1)
2✔
1942
                return true
2✔
1943
        }
2✔
1944

1945
        klog.Errorf("Failed to increment disk capacity because node(%s) not found", node)
1✔
1946
        return false
1✔
1947
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc