• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 5603182988

19 Jul 2023 07:05PM UTC coverage: 68.751% (+24.2%) from 44.538%
5603182988

Pull #1911

github

edreed
fix: fix segfault due to empty operation queue
Pull Request #1911: [V2] fix: fix segfault due to empty operation queue

15 of 15 new or added lines in 2 files covered. (100.0%)

7245 of 10538 relevant lines covered (68.75%)

7.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.85
/pkg/controller/shared_state.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "reflect"
25
        "sort"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29

30
        "google.golang.org/grpc/codes"
31
        "google.golang.org/grpc/status"
32
        v1 "k8s.io/api/core/v1"
33
        crdClientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
34
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/selection"
38
        "k8s.io/apimachinery/pkg/types"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        utilfeature "k8s.io/apiserver/pkg/util/feature"
41
        "k8s.io/client-go/kubernetes"
42
        cache "k8s.io/client-go/tools/cache"
43
        "k8s.io/client-go/tools/record"
44
        "k8s.io/client-go/util/retry"
45
        csitranslator "k8s.io/csi-translation-lib/plugins"
46
        "k8s.io/klog/v2"
47
        "k8s.io/kubernetes/pkg/features"
48
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
49
        azdisk "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned"
50
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
51
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
52
        "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
53
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
54
        "sigs.k8s.io/controller-runtime/pkg/client"
55
)
56

57
type DriverLifecycle interface {
58
        GetDiskClientSet() azdisk.Interface
59
        GetConditionWatcher() *watcher.ConditionWatcher
60
        IsDriverUninstall() bool
61
}
62

63
type SharedState struct {
64
        recoveryComplete              uint32
65
        config                        *azdiskv1beta2.AzDiskDriverConfiguration
66
        topologyKey                   string
67
        podToClaimsMap                sync.Map
68
        podToInlineMap                sync.Map
69
        claimToPodsMap                sync.Map
70
        volumeToClaimMap              sync.Map
71
        claimToVolumeMap              sync.Map
72
        azVolumeAttachmentToVaMap     sync.Map
73
        pvToVolumeMap                 sync.Map
74
        podLocks                      sync.Map
75
        visitedVolumes                sync.Map
76
        volumeOperationQueues         sync.Map
77
        cleanUpMap                    sync.Map
78
        priorityReplicaRequestsQueue  *VolumeReplicaRequestsPriorityQueue
79
        processingReplicaRequestQueue int32
80
        eventRecorder                 record.EventRecorder
81
        cachedClient                  client.Client
82
        azClient                      azdisk.Interface
83
        kubeClient                    kubernetes.Interface
84
        crdClient                     crdClientset.Interface
85
        conditionWatcher              *watcher.ConditionWatcher
86
        azureDiskCSITranslator        csitranslator.InTreePlugin
87
        availableAttachmentsMap       sync.Map
88
        driverLifecycle               DriverLifecycle
89
}
90

91
func NewSharedState(config *azdiskv1beta2.AzDiskDriverConfiguration, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, crdClient crdClientset.Interface, kubeClient kubernetes.Interface, driverLifecycle DriverLifecycle) *SharedState {
86✔
92
        newSharedState := &SharedState{
86✔
93
                config:                 config,
86✔
94
                topologyKey:            topologyKey,
86✔
95
                eventRecorder:          eventRecorder,
86✔
96
                cachedClient:           cachedClient,
86✔
97
                crdClient:              crdClient,
86✔
98
                azClient:               driverLifecycle.GetDiskClientSet(),
86✔
99
                kubeClient:             kubeClient,
86✔
100
                conditionWatcher:       driverLifecycle.GetConditionWatcher(),
86✔
101
                azureDiskCSITranslator: csitranslator.NewAzureDiskCSITranslator(),
86✔
102
                driverLifecycle:        driverLifecycle,
86✔
103
        }
86✔
104
        newSharedState.createReplicaRequestsQueue()
86✔
105

86✔
106
        return newSharedState
86✔
107
}
86✔
108

109
func (c *SharedState) isRecoveryComplete() bool {
41✔
110
        return atomic.LoadUint32(&c.recoveryComplete) == 1
41✔
111
}
41✔
112

113
func (c *SharedState) MarkRecoveryComplete() {
87✔
114
        atomic.StoreUint32(&c.recoveryComplete, 1)
87✔
115
}
87✔
116

117
func (c *SharedState) DeleteAPIVersion(ctx context.Context, deleteVersion string) error {
×
118
        w, _ := workflow.GetWorkflowFromContext(ctx)
×
119
        crdNames := []string{consts.AzDriverNodeCRDName, consts.AzVolumeCRDName, consts.AzVolumeAttachmentCRDName}
×
120
        for _, crdName := range crdNames {
×
121
                err := retry.RetryOnConflict(retry.DefaultBackoff,
×
122
                        func() error {
×
123
                                crd, err := c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{})
×
124
                                if err != nil {
×
125
                                        if apiErrors.IsNotFound(err) {
×
126
                                                return err
×
127
                                        }
×
128
                                        return nil
×
129
                                }
130

131
                                updated := crd.DeepCopy()
×
132
                                var storedVersions []string
×
133
                                // remove version from status stored versions
×
134
                                for _, version := range updated.Status.StoredVersions {
×
135
                                        if version == deleteVersion {
×
136
                                                continue
×
137
                                        }
138
                                        storedVersions = append(storedVersions, version)
×
139
                                }
140
                                updated.Status.StoredVersions = storedVersions
×
141
                                _, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().UpdateStatus(ctx, updated, metav1.UpdateOptions{})
×
142
                                if err != nil {
×
143
                                        // log the error and continue
×
144
                                        return err
×
145
                                }
×
146
                                return nil
×
147
                        })
148

149
                if err != nil {
×
150
                        w.Logger().Errorf(err, "failed to delete %s api version from CRD (%s)", deleteVersion, crdName)
×
151
                }
×
152

153
                // Uncomment when the all deployments have rolled over to v1beta1.
154
                // updated = crd.DeepCopy()
155
                // // remove version from spec versions
156
                // var specVersions []crdv1.CustomResourceDefinitionVersion
157
                // for _, version := range updated.Spec.Versions {
158
                //         if version.Name == deleteVersion {
159
                //                 continue
160
                //         }
161
                //         specVersions = append(specVersions, version)
162
                // }
163
                // updated.Spec.Versions = specVersions
164

165
                // // update the crd
166
                // crd, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, updated, metav1.UpdateOptions{})
167
                // if err != nil {
168
                //         // log the error and continue
169
                //         w.Logger().Errorf(err, "failed to remove %s spec version from CRD (%s)", deleteVersion, crd.Name)
170
                //         continue
171
                // }
172
        }
173
        return nil
×
174
}
175

176
func (c *SharedState) createOperationQueue(volumeName string) {
62✔
177
        _, _ = c.volumeOperationQueues.LoadOrStore(volumeName, newLockableEntry(newOperationQueue()))
62✔
178
}
62✔
179

180
func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string, requester operationRequester, operationFunc func(context.Context) error, isReplicaGarbageCollection bool) {
17✔
181
        // It is expected for caller to provide parent workflow via context.
17✔
182
        // The child workflow will be created below and be fed to the queued operation for necessary workflow information.
17✔
183
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, volumeName))
17✔
184

17✔
185
        v, ok := c.volumeOperationQueues.Load(volumeName)
17✔
186
        if !ok {
17✔
187
                return
×
188
        }
×
189
        lockable := v.(*lockableEntry)
17✔
190
        lockable.Lock()
17✔
191
        isFirst := lockable.entry.(*operationQueue).Len() <= 0
17✔
192
        _ = lockable.entry.(*operationQueue).PushBack(&replicaOperation{
17✔
193
                ctx:       ctx,
17✔
194
                requester: requester,
17✔
195
                operationFunc: func(ctx context.Context) (err error) {
34✔
196
                        defer func() {
34✔
197
                                if !shouldRequeueReplicaOperation(isReplicaGarbageCollection, err) {
34✔
198
                                        w.Finish(err)
17✔
199
                                }
17✔
200
                        }()
201
                        err = operationFunc(ctx)
17✔
202
                        return
17✔
203
                },
204
                isReplicaGarbageCollection: isReplicaGarbageCollection,
205
        })
206
        lockable.Unlock()
17✔
207

17✔
208
        // If this is the first operation, start the goroutine
17✔
209
        if isFirst {
33✔
210
                go func() {
32✔
211
                        lockable.Lock()
16✔
212
                        defer lockable.Unlock()
16✔
213
                        operationQueue := lockable.entry.(*operationQueue)
16✔
214

16✔
215
                        for {
49✔
216
                                // Get the first operation exiting the loop if the queue is empty.
33✔
217
                                front := operationQueue.Front()
33✔
218
                                if front == nil {
49✔
219
                                        break
16✔
220
                                }
221

222
                                operation := front.Value.(*replicaOperation)
17✔
223

17✔
224
                                // Only run the operation if the operation requester is not enlisted in blacklist
17✔
225
                                if !operationQueue.gcExclusionList.has(operation.requester) {
34✔
226

17✔
227
                                        // Release the lock while executing the operation to avoid deadlocks.
17✔
228
                                        lockable.Unlock()
17✔
229
                                        err := operation.operationFunc(operation.ctx)
17✔
230
                                        lockable.Lock()
17✔
231

17✔
232
                                        if shouldRequeueReplicaOperation(operation.isReplicaGarbageCollection, err) {
17✔
233
                                                // If operation failed, push it to the end of the queue if the queue is
×
234
                                                // still active.
×
235
                                                if operationQueue.isActive {
×
236
                                                        operationQueue.PushBack(operation)
×
237
                                                }
×
238
                                        }
239
                                }
240

241
                                // Remove the operation from the queue.
242
                                operationQueue.safeRemove(front)
17✔
243
                        }
244
                }()
245
        }
246
}
247

248
func (c *SharedState) deleteOperationQueue(volumeName string) {
1✔
249
        v, ok := c.volumeOperationQueues.LoadAndDelete(volumeName)
1✔
250
        // if operation queue has already been deleted, return
1✔
251
        if !ok {
1✔
252
                return
×
253
        }
×
254
        // clear the queue in case, there still is an entry in queue
255
        lockable := v.(*lockableEntry)
1✔
256
        lockable.Lock()
1✔
257
        defer lockable.Unlock()
1✔
258
        lockable.entry.(*operationQueue).Init()
1✔
259
}
260

261
func (c *SharedState) closeOperationQueue(volumeName string) func() {
2✔
262
        v, ok := c.volumeOperationQueues.Load(volumeName)
2✔
263
        if !ok {
4✔
264
                return nil
2✔
265
        }
2✔
266
        lockable := v.(*lockableEntry)
×
267

×
268
        lockable.Lock()
×
269
        lockable.entry.(*operationQueue).isActive = false
×
270
        lockable.entry.(*operationQueue).Init()
×
271
        return lockable.Unlock
×
272
}
273

274
func (c *SharedState) addToGcExclusionList(volumeName string, target operationRequester) {
3✔
275
        v, ok := c.volumeOperationQueues.Load(volumeName)
3✔
276
        if !ok {
3✔
277
                return
×
278
        }
×
279
        lockable := v.(*lockableEntry)
3✔
280
        lockable.Lock()
3✔
281
        defer lockable.Unlock()
3✔
282
        lockable.entry.(*operationQueue).gcExclusionList.add(target)
3✔
283
}
284

285
func (c *SharedState) removeFromExclusionList(volumeName string, target operationRequester) {
7✔
286
        v, ok := c.volumeOperationQueues.Load(volumeName)
7✔
287
        if !ok {
7✔
288
                return
×
289
        }
×
290
        lockable := v.(*lockableEntry)
7✔
291
        lockable.Lock()
7✔
292
        defer lockable.Unlock()
7✔
293
        delete(lockable.entry.(*operationQueue).gcExclusionList, target)
7✔
294
}
295

296
func (c *SharedState) dequeueGarbageCollection(volumeName string) {
5✔
297
        v, ok := c.volumeOperationQueues.Load(volumeName)
5✔
298
        if !ok {
5✔
299
                return
×
300
        }
×
301
        lockable := v.(*lockableEntry)
5✔
302
        lockable.Lock()
5✔
303
        defer lockable.Unlock()
5✔
304
        queue := lockable.entry.(*operationQueue)
5✔
305
        // look for garbage collection operation in the queue and remove from queue
5✔
306
        var next *list.Element
5✔
307
        for cur := queue.Front(); cur != nil; cur = next {
8✔
308
                next = cur.Next()
3✔
309
                if cur.Value.(*replicaOperation).isReplicaGarbageCollection {
6✔
310
                        queue.safeRemove(cur)
3✔
311
                }
3✔
312
        }
313
}
314

315
func (c *SharedState) getVolumesFromPod(ctx context.Context, podName string) ([]string, error) {
26✔
316
        w, _ := workflow.GetWorkflowFromContext(ctx)
26✔
317

26✔
318
        var claims []string
26✔
319
        w.Logger().V(5).Infof("Getting requested volumes for pod (%s).", podName)
26✔
320
        value, ok := c.podToClaimsMap.Load(podName)
26✔
321
        if !ok {
27✔
322
                return nil, status.Errorf(codes.NotFound, "unable to find an entry for pod (%s) in podToClaims map", podName)
1✔
323
        }
1✔
324
        claims, ok = value.([]string)
25✔
325
        if !ok {
26✔
326
                return nil, status.Errorf(codes.Internal, "wrong output type: expected []string")
1✔
327
        }
1✔
328

329
        volumes := []string{}
24✔
330
        for _, claim := range claims {
59✔
331
                value, ok := c.claimToVolumeMap.Load(claim)
35✔
332
                if !ok {
40✔
333
                        // the pvc entry is not an azure resource
5✔
334
                        w.Logger().V(5).Infof("Requested volume %s for pod %s is not an azure resource", value, podName)
5✔
335
                        continue
5✔
336
                }
337
                volume, ok := value.(string)
30✔
338
                if !ok {
31✔
339
                        return nil, status.Errorf(codes.Internal, "wrong output type: expected string")
1✔
340
                }
1✔
341
                volumes = append(volumes, volume)
29✔
342
                w.Logger().V(5).Infof("Requested volumes for pod %s are now the following: Volumes: %v, Len: %d", podName, volumes, len(volumes))
29✔
343
        }
344
        return volumes, nil
23✔
345
}
346

347
func (c *SharedState) getPodsFromVolume(ctx context.Context, client client.Client, volumeName string) ([]v1.Pod, error) {
14✔
348
        w, _ := workflow.GetWorkflowFromContext(ctx)
14✔
349
        pods, err := c.getPodNamesFromVolume(volumeName)
14✔
350
        if err != nil {
15✔
351
                return nil, err
1✔
352
        }
1✔
353
        podObjs := []v1.Pod{}
13✔
354
        for _, pod := range pods {
27✔
355
                namespace, name, err := parseQualifiedName(pod)
14✔
356
                if err != nil {
14✔
357
                        w.Logger().Errorf(err, "cannot get podObj for pod (%s)", pod)
×
358
                        continue
×
359
                }
360
                var podObj v1.Pod
14✔
361
                if err := client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &podObj); err != nil {
14✔
362
                        return nil, err
×
363
                }
×
364
                podObjs = append(podObjs, podObj)
14✔
365
        }
366
        return podObjs, nil
13✔
367
}
368

369
func (c *SharedState) getPodNamesFromVolume(volumeName string) ([]string, error) {
14✔
370
        v, ok := c.volumeToClaimMap.Load(volumeName)
14✔
371
        if !ok {
15✔
372
                return nil, status.Errorf(codes.NotFound, "no bound persistent volume claim was found for AzVolume (%s)", volumeName)
1✔
373
        }
1✔
374
        claimName, ok := v.(string)
13✔
375
        if !ok {
13✔
376
                return nil, status.Errorf(codes.Internal, "volumeToClaimMap should should hold string")
×
377
        }
×
378

379
        value, ok := c.claimToPodsMap.Load(claimName)
13✔
380
        if !ok {
13✔
381
                return nil, status.Errorf(codes.NotFound, "no pods found for PVC (%s)", claimName)
×
382
        }
×
383
        lockable, ok := value.(*lockableEntry)
13✔
384
        if !ok {
13✔
385
                return nil, status.Errorf(codes.Internal, "claimToPodsMap should hold lockable entry")
×
386
        }
×
387

388
        lockable.RLock()
13✔
389
        defer lockable.RUnlock()
13✔
390

13✔
391
        podMap, ok := lockable.entry.(set)
13✔
392
        if !ok {
13✔
393
                return nil, status.Errorf(codes.Internal, "claimToPodsMap entry should hold a set")
×
394
        }
×
395

396
        pods := make([]string, len(podMap))
13✔
397
        i := 0
13✔
398
        for v := range podMap {
27✔
399
                pod := v.(string)
14✔
400
                pods[i] = pod
14✔
401
                i++
14✔
402
        }
14✔
403

404
        return pods, nil
13✔
405
}
406

407
func (c *SharedState) getVolumesForPodObjs(ctx context.Context, pods []v1.Pod) ([]string, error) {
17✔
408
        volumes := []string{}
17✔
409
        for _, pod := range pods {
34✔
410
                podVolumes, err := c.getVolumesFromPod(ctx, getQualifiedName(pod.Namespace, pod.Name))
17✔
411
                if err != nil {
17✔
412
                        return nil, err
×
413
                }
×
414
                volumes = append(volumes, podVolumes...)
17✔
415
        }
416
        return volumes, nil
17✔
417
}
418

419
func (c *SharedState) addPod(ctx context.Context, pod *v1.Pod, updateOption updateWithLock) error {
7✔
420
        var err error
7✔
421
        w, _ := workflow.GetWorkflowFromContext(ctx)
7✔
422
        podKey := getQualifiedName(pod.Namespace, pod.Name)
7✔
423
        v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
7✔
424

7✔
425
        w.Logger().V(5).Infof("Adding pod %s to shared map with keyName %s.", pod.Name, podKey)
7✔
426
        podLock := v.(*sync.Mutex)
7✔
427
        if updateOption == acquireLock {
13✔
428
                podLock.Lock()
6✔
429
                defer podLock.Unlock()
6✔
430
        }
6✔
431
        w.Logger().V(5).Infof("Pod spec of pod %s is: %+v. With volumes: %+v", pod.Name, pod.Spec, pod.Spec.Volumes)
7✔
432

7✔
433
        // If the claims already exist for the podKey, add them to a set
7✔
434
        value, _ := c.podToClaimsMap.LoadOrStore(podKey, []string{})
7✔
435
        claims := value.([]string)
7✔
436
        claimSet := set{}
7✔
437
        for _, claim := range claims {
17✔
438
                claimSet.add(claim)
10✔
439
        }
10✔
440

441
        for _, volume := range pod.Spec.Volumes {
18✔
442
                // TODO: investigate if we need special support for CSI ephemeral volume or generic ephemeral volume
11✔
443
                // if csiMigration is enabled and there is an inline volume, create AzVolume CRI for the inline volume.
11✔
444
                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
11✔
445
                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
11✔
446
                        volume.AzureDisk != nil {
12✔
447
                        // inline volume: create AzVolume resource
1✔
448
                        var pv *v1.PersistentVolume
1✔
449
                        if pv, err = c.azureDiskCSITranslator.TranslateInTreeInlineVolumeToCSI(&volume, pod.Namespace); err != nil {
1✔
450
                                w.Logger().V(5).Errorf(err, "failed to translate inline volume to csi")
×
451
                                continue
×
452
                        } else if pv == nil {
1✔
453
                                w.Logger().V(5).Errorf(status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi"), "nil pv returned")
×
454
                                continue
×
455
                        }
456
                        w.Logger().V(5).Infof("Creating AzVolume instance for inline volume %s.", volume.AzureDisk.DiskName)
1✔
457
                        if err := c.createAzVolumeFromPv(ctx, *pv, map[string]string{consts.InlineVolumeAnnotation: volume.AzureDisk.DataDiskURI}); err != nil {
1✔
458
                                return err
×
459
                        }
×
460
                        v, exists := c.podToInlineMap.Load(podKey)
1✔
461
                        var inlines []string
1✔
462
                        if exists {
1✔
463
                                inlines = v.([]string)
×
464
                        }
×
465
                        inlines = append(inlines, volume.AzureDisk.DiskName)
1✔
466
                        c.podToInlineMap.Store(podKey, inlines)
1✔
467
                }
468
                if volume.PersistentVolumeClaim == nil {
12✔
469
                        w.Logger().V(5).Infof("Pod %s: Skipping Volume %s. No persistent volume exists.", pod.Name, volume)
1✔
470
                        continue
1✔
471
                }
472
                namespacedClaimName := getQualifiedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
10✔
473
                if _, ok := c.claimToVolumeMap.Load(namespacedClaimName); !ok {
11✔
474
                        // Log message if the Pod status is Running
1✔
475
                        if pod.Status.Phase == v1.PodRunning {
1✔
476
                                w.Logger().V(5).Infof("Skipping Pod %s. Volume %s not csi. Driver: %+v", pod.Name, volume.Name, volume.CSI)
×
477
                        }
×
478
                        continue
1✔
479
                }
480
                w.Logger().V(5).Infof("Pod %s. Volume %v is csi.", pod.Name, volume)
9✔
481
                claimSet.add(namespacedClaimName)
9✔
482
                v, _ := c.claimToPodsMap.LoadOrStore(namespacedClaimName, newLockableEntry(set{}))
9✔
483

9✔
484
                lockable := v.(*lockableEntry)
9✔
485
                lockable.Lock()
9✔
486
                pods := lockable.entry.(set)
9✔
487
                if !pods.has(podKey) {
9✔
488
                        pods.add(podKey)
×
489
                }
×
490
                // No need to restore the amended set to claimToPodsMap because set is a reference type
491
                lockable.Unlock()
9✔
492

9✔
493
                w.Logger().V(5).Infof("Storing pod %s and claim %s to claimToPodsMap map.", pod.Name, namespacedClaimName)
9✔
494
        }
495
        w.Logger().V(5).Infof("Storing pod %s and claim %s to podToClaimsMap map.", pod.Name, claims)
7✔
496

7✔
497
        allClaims := []string{}
7✔
498
        for key := range claimSet {
17✔
499
                allClaims = append(allClaims, key.(string))
10✔
500
        }
10✔
501
        c.podToClaimsMap.Store(podKey, allClaims)
7✔
502
        return nil
7✔
503
}
504

505
func (c *SharedState) deletePod(ctx context.Context, podKey string) error {
1✔
506
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
507
        value, exists := c.podLocks.LoadAndDelete(podKey)
1✔
508
        if !exists {
1✔
509
                return nil
×
510
        }
×
511
        podLock := value.(*sync.Mutex)
1✔
512
        podLock.Lock()
1✔
513
        defer podLock.Unlock()
1✔
514

1✔
515
        value, exists = c.podToInlineMap.LoadAndDelete(podKey)
1✔
516
        if exists {
1✔
517
                inlines := value.([]string)
×
518

×
519
                for _, inline := range inlines {
×
520
                        _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, inline, pod, azureutils.AllRoles, cleanUpAttachment, deleteAndWait)
×
521
                        if err != nil && !apiErrors.IsNotFound(err) {
×
522
                                w.Logger().Errorf(err, "failed to list AzVolumeAttachments (%s) for inline (%s): %v", inline, inline, err)
×
523
                                return err
×
524
                        }
×
525
                        if err := c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Delete(ctx, inline, metav1.DeleteOptions{}); err != nil && !apiErrors.IsNotFound(err) {
×
526
                                w.Logger().Errorf(err, "failed to delete AzVolume (%s) for inline (%s): %v", inline, inline, err)
×
527
                                return err
×
528
                        }
×
529
                }
530
        }
531

532
        value, exists = c.podToClaimsMap.LoadAndDelete(podKey)
1✔
533
        if !exists {
1✔
534
                return nil
×
535
        }
×
536
        claims := value.([]string)
1✔
537

1✔
538
        for _, claim := range claims {
2✔
539
                value, ok := c.claimToPodsMap.Load(claim)
1✔
540
                if !ok {
1✔
541
                        w.Logger().Errorf(nil, "No pods found for PVC (%s)", claim)
×
542
                }
×
543

544
                // Scope the duration that we hold the lockable lock using a function.
545
                func() {
2✔
546
                        lockable, ok := value.(*lockableEntry)
1✔
547
                        if !ok {
1✔
548
                                w.Logger().Error(nil, "claimToPodsMap should hold lockable entry")
×
549
                                return
×
550
                        }
×
551

552
                        lockable.Lock()
1✔
553
                        defer lockable.Unlock()
1✔
554

1✔
555
                        podSet, ok := lockable.entry.(set)
1✔
556
                        if !ok {
1✔
557
                                w.Logger().Error(nil, "claimToPodsMap entry should hold a set")
×
558
                        }
×
559

560
                        podSet.remove(podKey)
1✔
561
                        if len(podSet) == 0 {
2✔
562
                                c.claimToPodsMap.Delete(claim)
1✔
563
                        }
1✔
564
                }()
565
        }
566
        return nil
1✔
567
}
568

569
func (c *SharedState) addVolumeAndClaim(azVolumeName, pvName, pvClaimName string) {
1✔
570
        c.pvToVolumeMap.Store(pvName, azVolumeName)
1✔
571
        c.volumeToClaimMap.Store(azVolumeName, pvClaimName)
1✔
572
        c.claimToVolumeMap.Store(pvClaimName, azVolumeName)
1✔
573
}
1✔
574

575
func (c *SharedState) deletePV(pvName string) error {
2✔
576
        var err error
2✔
577

2✔
578
        ctx := context.Background()
2✔
579
        ctx, w := workflow.New(ctx, workflow.WithDetails())
2✔
580
        defer func() { w.Finish(err) }()
4✔
581
        defer func() {
4✔
582
                if apiErrors.IsNotFound(err) {
2✔
583
                        err = nil
×
584
                }
×
585
        }()
586

587
        var azVolume azdiskv1beta2.AzVolume
2✔
588
        if val, ok := c.pvToVolumeMap.Load(pvName); ok {
3✔
589
                volumeName := val.(string)
1✔
590
                err = c.cachedClient.Get(ctx, types.NamespacedName{Namespace: c.config.ObjectNamespace, Name: volumeName}, &azVolume)
1✔
591
                if err != nil {
1✔
592
                        if !apiErrors.IsNotFound(err) {
×
593
                                return err
×
594
                        }
×
595
                        return nil
×
596
                }
597
        } else {
1✔
598
                // if no volume name can be found for PV, try fetching azVolume using labels
1✔
599
                var azVolumeList azdiskv1beta2.AzVolumeList
1✔
600
                req, err := azureutils.CreateLabelRequirements(consts.PvNameLabel, selection.Equals, pvName)
1✔
601
                if err != nil {
1✔
602
                        return err
×
603
                }
×
604
                err = c.cachedClient.List(ctx, &azVolumeList, &client.ListOptions{LabelSelector: labels.NewSelector().Add(*req)})
1✔
605
                if err != nil && !apiErrors.IsNotFound(err) {
1✔
606
                        return err
×
607
                } else if apiErrors.IsNotFound(err) || len(azVolumeList.Items) == 0 {
2✔
608
                        return nil
1✔
609
                }
1✔
610
                azVolume = azVolumeList.Items[0]
×
611
        }
612

613
        // deletion timestamp is set and AzVolume reconcliler will handle the delete request.
614
        // The volume itself will not be deleted.
615
        w.AddDetailToLogger(workflow.GetObjectDetails(&azVolume)...)
1✔
616

1✔
617
        if !isPreProvisioned(&azVolume) {
1✔
618
                return nil
×
619
        }
×
620

621
        err = c.cachedClient.Delete(ctx, &azVolume)
1✔
622
        if err != nil {
1✔
623
                if apiErrors.IsNotFound(err) {
×
624
                        return nil
×
625
                }
×
626
                return err
×
627
        }
628

629
        waitCh := make(chan goSignal)
1✔
630
        go func() {
2✔
631
                goCtx, w := workflow.New(ctx)
1✔
632
                defer func() { w.Finish(err) }()
2✔
633
                waitCh <- goSignal{}
1✔
634

1✔
635
                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolume.Name, verifyObjectFailedOrDeleted)
1✔
636

1✔
637
                for {
2✔
638
                        // if AzVolume was successfully deleted
1✔
639
                        obj, err := waiter.Wait(goCtx)
1✔
640
                        if err == nil {
2✔
641
                                // remove the entry from pv to volume map, once AzVolumeCRI is deleted
1✔
642
                                c.pvToVolumeMap.Delete(pvName)
1✔
643
                                return
1✔
644
                        }
1✔
645

646
                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
647

×
648
                        if azVolume.Status.State == azdiskv1beta2.VolumeDeletionFailed || azVolume.Status.Error != nil {
×
649
                                updateFunc := func(obj client.Object) error {
×
650
                                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
651
                                        azVolume.Status.Error = nil
×
652
                                        azVolume.Status.State = azdiskv1beta2.VolumeCreated
×
653
                                        return nil
×
654
                                }
×
655
                                _, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, c.cachedClient, c.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
×
656
                        }
657
                }
658
        }()
659
        <-waitCh
1✔
660

1✔
661
        return nil
1✔
662
}
663

664
func (c *SharedState) deleteVolumeAndClaim(azVolumeName string) {
2✔
665
        v, ok := c.volumeToClaimMap.LoadAndDelete(azVolumeName)
2✔
666
        if ok {
4✔
667
                pvClaimName := v.(string)
2✔
668
                c.claimToVolumeMap.Delete(pvClaimName)
2✔
669
        }
2✔
670
}
671

672
func (c *SharedState) markVolumeVisited(azVolumeName string) {
9✔
673
        c.visitedVolumes.Store(azVolumeName, struct{}{})
9✔
674
}
9✔
675

676
func (c *SharedState) unmarkVolumeVisited(azVolumeName string) {
9✔
677
        c.visitedVolumes.Delete(azVolumeName)
9✔
678
}
9✔
679

680
func (c *SharedState) isVolumeVisited(azVolumeName string) bool {
10✔
681
        _, visited := c.visitedVolumes.Load(azVolumeName)
10✔
682
        return visited
10✔
683
}
10✔
684

685
func (c *SharedState) getRankedNodesForReplicaAttachments(ctx context.Context, volumes []string, podObjs []v1.Pod) ([]string, error) {
14✔
686
        var err error
14✔
687
        ctx, w := workflow.New(ctx)
14✔
688
        defer func() { w.Finish(err) }()
28✔
689

690
        w.Logger().V(5).Info("Getting ranked list of nodes for creating AzVolumeAttachments")
14✔
691

14✔
692
        nodeList := &v1.NodeList{}
14✔
693
        if err := c.cachedClient.List(ctx, nodeList); err != nil {
14✔
694
                return nil, err
×
695
        }
×
696

697
        var selectedNodeObjs []v1.Node
14✔
698
        selectedNodeObjs, err = c.selectNodesPerTopology(ctx, nodeList.Items, podObjs, volumes)
14✔
699
        if err != nil {
14✔
700
                w.Logger().Errorf(err, "failed to select nodes for volumes (%+v)", volumes)
×
701
                return nil, err
×
702
        }
×
703

704
        selectedNodes := make([]string, len(selectedNodeObjs))
14✔
705
        for i, selectedNodeObj := range selectedNodeObjs {
34✔
706
                selectedNodes[i] = selectedNodeObj.Name
20✔
707
        }
20✔
708

709
        w.Logger().V(5).Infof("Selected nodes (%+v) for replica AzVolumeAttachments for volumes (%+v)", selectedNodes, volumes)
14✔
710
        return selectedNodes, nil
14✔
711
}
712

713
func (c *SharedState) filterNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
28✔
714
        var err error
28✔
715
        ctx, w := workflow.New(ctx)
28✔
716
        defer func() { w.Finish(err) }()
56✔
717

718
        pvs := make([]*v1.PersistentVolume, len(volumes))
28✔
719
        for i, volume := range volumes {
64✔
720
                var azVolume *azdiskv1beta2.AzVolume
36✔
721
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
36✔
722
                if err != nil {
38✔
723
                        w.Logger().V(5).Errorf(err, "AzVolume for volume %s is not found.", volume)
2✔
724
                        return nil, err
2✔
725
                }
2✔
726

727
                var pv v1.PersistentVolume
34✔
728
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
34✔
729
                        return nil, err
×
730
                }
×
731
                pvs[i] = &pv
34✔
732
        }
733

734
        var filterPlugins = []filterPlugin{
26✔
735
                &interPodAffinityFilter{},
26✔
736
                &interPodAntiAffinityFilter{},
26✔
737
                &podTolerationFilter{},
26✔
738
                &podNodeAffinityFilter{},
26✔
739
                &podNodeSelectorFilter{},
26✔
740
                &volumeNodeSelectorFilter{},
26✔
741
        }
26✔
742

26✔
743
        filteredNodes := nodes
26✔
744
        for _, filterPlugin := range filterPlugins {
182✔
745
                filterPlugin.setup(pods, pvs, c)
156✔
746
                if updatedFilteredNodes, err := filterPlugin.filter(ctx, filteredNodes); err != nil {
156✔
747
                        w.Logger().Errorf(err, "failed to filter node with filter plugin (%s). Ignoring filtered results.", filterPlugin.name())
×
748
                } else {
156✔
749
                        filteredNodes = updatedFilteredNodes
156✔
750
                        nodeStrs := make([]string, len(filteredNodes))
156✔
751
                        for i, filteredNode := range filteredNodes {
489✔
752
                                nodeStrs[i] = filteredNode.Name
333✔
753
                        }
333✔
754
                        w.Logger().V(10).Infof("Filtered node list from filter plugin (%s): %+v", filterPlugin.name(), nodeStrs)
156✔
755
                }
756
        }
757

758
        return filteredNodes, nil
26✔
759
}
760

761
func (c *SharedState) prioritizeNodes(ctx context.Context, pods []v1.Pod, volumes []string, nodes []v1.Node) []v1.Node {
20✔
762
        ctx, w := workflow.New(ctx)
20✔
763
        defer w.Finish(nil)
20✔
764

20✔
765
        nodeScores := map[string]int{}
20✔
766
        for _, node := range nodes {
66✔
767
                nodeScores[node.Name] = 0
46✔
768
        }
46✔
769

770
        var nodeScorerPlugins = []nodeScorerPlugin{
20✔
771
                &scoreByNodeCapacity{},
20✔
772
                &scoreByReplicaCount{},
20✔
773
                &scoreByInterPodAffinity{},
20✔
774
                &scoreByInterPodAntiAffinity{},
20✔
775
                &scoreByPodNodeAffinity{},
20✔
776
        }
20✔
777

20✔
778
        for _, nodeScorerPlugin := range nodeScorerPlugins {
120✔
779
                nodeScorerPlugin.setup(nodes, pods, volumes, c)
100✔
780
                if updatedNodeScores, err := nodeScorerPlugin.score(ctx, nodeScores); err != nil {
100✔
781
                        w.Logger().Errorf(err, "failed to score nodes by node scorer (%s)", nodeScorerPlugin.name())
×
782
                } else {
100✔
783
                        // update node scores if scorer plugin returned success
100✔
784
                        nodeScores = updatedNodeScores
100✔
785
                }
100✔
786
                var nodeScoreResult string
100✔
787
                for nodeName, score := range nodeScores {
280✔
788
                        nodeScoreResult += fmt.Sprintf("<%s: %d> ", nodeName, score)
180✔
789
                }
180✔
790
                w.Logger().V(10).Infof("node score after node score plugin (%s): %s", nodeScorerPlugin.name(), nodeScoreResult)
100✔
791
        }
792

793
        // normalize score
794
        numFiltered := 0
20✔
795
        for _, node := range nodes {
66✔
796
                if _, exists := nodeScores[node.Name]; !exists {
58✔
797
                        nodeScores[node.Name] = -1
12✔
798
                        numFiltered++
12✔
799
                }
12✔
800
        }
801

802
        sort.Slice(nodes[:], func(i, j int) bool {
51✔
803
                return nodeScores[nodes[i].Name] > nodeScores[nodes[j].Name]
31✔
804
        })
31✔
805

806
        return nodes[:len(nodes)-numFiltered]
20✔
807
}
808

809
func (c *SharedState) filterAndSortNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
16✔
810
        var err error
16✔
811
        ctx, w := workflow.New(ctx)
16✔
812
        defer func() { w.Finish(err) }()
32✔
813

814
        var filteredNodes []v1.Node
16✔
815
        filteredNodes, err = c.filterNodes(ctx, nodes, pods, volumes)
16✔
816
        if err != nil {
17✔
817
                w.Logger().Errorf(err, "failed to filter nodes for volumes (%+v): %v", volumes, err)
1✔
818
                return nil, err
1✔
819
        }
1✔
820
        sortedNodes := c.prioritizeNodes(ctx, pods, volumes, filteredNodes)
15✔
821
        return sortedNodes, nil
15✔
822
}
823

824
func (c *SharedState) selectNodesPerTopology(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
14✔
825
        var err error
14✔
826
        ctx, w := workflow.New(ctx)
14✔
827
        defer func() { w.Finish(err) }()
28✔
828

829
        selectedNodes := []v1.Node{}
14✔
830
        numReplicas := 0
14✔
831

14✔
832
        // disperse node topology if possible
14✔
833
        compatibleZonesSet := set{}
14✔
834
        var primaryNode string
14✔
835
        for i, volume := range volumes {
35✔
836
                var azVolume *azdiskv1beta2.AzVolume
21✔
837
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
21✔
838
                if err != nil {
21✔
839
                        err = status.Errorf(codes.Aborted, "failed to get AzVolume CRI (%s)", volume)
×
840
                        return nil, err
×
841
                }
×
842

843
                numReplicas = max(numReplicas, azVolume.Spec.MaxMountReplicaCount)
21✔
844
                w.Logger().V(5).Infof("Number of requested replicas for Azvolume (%s) is: %d. Max replica count is: %d.",
21✔
845
                        volume, numReplicas, azVolume.Spec.MaxMountReplicaCount)
21✔
846

21✔
847
                var pv v1.PersistentVolume
21✔
848
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
21✔
849
                        return nil, err
×
850
                }
×
851

852
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
41✔
853
                        continue
20✔
854
                }
855

856
                // Find the intersection of the zones for all the volumes
857
                topologyKey := c.topologyKey
1✔
858
                if i == 0 {
2✔
859
                        compatibleZonesSet = getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
1✔
860
                } else {
1✔
861
                        listOfZones := getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
×
862
                        for key := range compatibleZonesSet {
×
863
                                if !listOfZones.has(key) {
×
864
                                        compatibleZonesSet.remove(key)
×
865
                                }
×
866
                        }
867
                }
868

869
                // find primary node if not already found
870
                if primaryNode == "" {
2✔
871
                        if primaryAttachment, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.PrimaryOnly); err != nil || len(primaryAttachment) == 0 {
2✔
872
                                continue
1✔
873
                        } else {
×
874
                                primaryNode = primaryAttachment[0].Spec.NodeName
×
875
                        }
×
876
                }
877
        }
878

879
        var compatibleZones []string
14✔
880
        if len(compatibleZonesSet) > 0 {
14✔
881
                for key := range compatibleZonesSet {
×
882
                        compatibleZones = append(compatibleZones, key.(string))
×
883
                }
×
884
        }
885

886
        if len(compatibleZones) == 0 {
28✔
887
                selectedNodes, err = c.filterAndSortNodes(ctx, nodes, pods, volumes)
14✔
888
                if err != nil {
14✔
889
                        err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
890
                        return nil, err
×
891
                }
×
892
        } else {
×
893
                w.Logger().V(5).Infof("The list of zones to select nodes from is: %s", strings.Join(compatibleZones, ","))
×
894

×
895
                var primaryNodeZone string
×
896
                if primaryNode != "" {
×
897
                        nodeObj := &v1.Node{}
×
898
                        err = c.cachedClient.Get(ctx, types.NamespacedName{Name: primaryNode}, nodeObj)
×
899
                        if err != nil {
×
900
                                w.Logger().Errorf(err, "failed to retrieve the primary node")
×
901
                        }
×
902

903
                        var ok bool
×
904
                        if primaryNodeZone, ok = nodeObj.Labels[consts.WellKnownTopologyKey]; ok {
×
905
                                w.Logger().V(5).Infof("failed to find zone annotations for primary node")
×
906
                        }
×
907
                }
908

909
                nodeSelector := labels.NewSelector()
×
910
                zoneRequirement, _ := labels.NewRequirement(consts.WellKnownTopologyKey, selection.In, compatibleZones)
×
911
                nodeSelector = nodeSelector.Add(*zoneRequirement)
×
912

×
913
                compatibleNodes := &v1.NodeList{}
×
914
                if err = c.cachedClient.List(ctx, compatibleNodes, &client.ListOptions{LabelSelector: nodeSelector}); err != nil {
×
915
                        err = status.Errorf(codes.Aborted, "failed to retrieve node list: %v", err)
×
916
                        return nodes, err
×
917
                }
×
918

919
                // Create a zone to node map
920
                zoneToNodeMap := map[string][]v1.Node{}
×
921
                for _, node := range compatibleNodes.Items {
×
922
                        zoneName := node.Labels[consts.WellKnownTopologyKey]
×
923
                        zoneToNodeMap[zoneName] = append(zoneToNodeMap[zoneName], node)
×
924
                }
×
925

926
                // Get prioritized nodes per zone
927
                nodesPerZone := [][]v1.Node{}
×
928
                primaryZoneNodes := []v1.Node{}
×
929
                totalCount := 0
×
930
                for zone, nodeList := range zoneToNodeMap {
×
931
                        var sortedNodes []v1.Node
×
932
                        sortedNodes, err = c.filterAndSortNodes(ctx, nodeList, pods, volumes)
×
933
                        if err != nil {
×
934
                                err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
935
                                return nil, err
×
936
                        }
×
937

938
                        totalCount += len(sortedNodes)
×
939
                        if zone == primaryNodeZone {
×
940
                                primaryZoneNodes = sortedNodes
×
941
                                continue
×
942
                        }
943
                        nodesPerZone = append(nodesPerZone, sortedNodes)
×
944
                }
945
                // Append the nodes from the zone of the primary node at last
946
                if len(primaryZoneNodes) > 0 {
×
947
                        nodesPerZone = append(nodesPerZone, primaryZoneNodes)
×
948
                }
×
949
                // Select the nodes from each of the zones one by one and append to the list
950
                i, j, countSoFar := 0, 0, 0
×
951
                for len(selectedNodes) < numReplicas && countSoFar < totalCount {
×
952
                        if len(nodesPerZone[i]) > j {
×
953
                                selectedNodes = append(selectedNodes, nodesPerZone[i][j])
×
954
                                countSoFar++
×
955
                        }
×
956
                        if i < len(nodesPerZone)-1 {
×
957
                                i++
×
958
                        } else {
×
959
                                i = 0
×
960
                                j++
×
961
                        }
×
962
                }
963
        }
964

965
        return selectedNodes, nil
14✔
966
}
967

968
func (c *SharedState) getNodesWithReplica(ctx context.Context, volumeName string) ([]string, error) {
13✔
969
        w, _ := workflow.GetWorkflowFromContext(ctx)
13✔
970
        w.Logger().V(5).Infof("Getting nodes with replica AzVolumeAttachments for volume %s.", volumeName)
13✔
971
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
13✔
972
        if err != nil {
13✔
973
                w.Logger().V(5).Errorf(err, "failed to get AzVolumeAttachments for volume %s.", volumeName)
×
974
                return nil, err
×
975
        }
×
976

977
        nodes := []string{}
13✔
978
        for _, azVolumeAttachment := range azVolumeAttachments {
13✔
979
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
×
980
                        nodes = append(nodes, azVolumeAttachment.Spec.NodeName)
×
981
                }
×
982
        }
983
        w.Logger().V(5).Infof("Nodes with replica AzVolumeAttachments for volume %s are: %v, Len: %d", volumeName, nodes, len(nodes))
13✔
984
        return nodes, nil
13✔
985
}
986

987
func (c *SharedState) createReplicaAzVolumeAttachment(ctx context.Context, volumeID, node string, volumeContext map[string]string) error {
12✔
988
        var err error
12✔
989
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, node))
12✔
990
        defer func() { w.Finish(err) }()
24✔
991

992
        var diskName string
12✔
993
        diskName, err = azureutils.GetDiskName(volumeID)
12✔
994
        if err != nil {
12✔
995
                err = status.Errorf(codes.Internal, "failed to extract volume name from volumeID (%s)", volumeID)
×
996
                return err
×
997
        }
×
998
        w.AddDetailToLogger(consts.VolumeNameLabel, diskName)
12✔
999

12✔
1000
        w.Logger().V(5).Info("Creating replica AzVolumeAttachments")
12✔
1001
        if volumeContext == nil {
24✔
1002
                volumeContext = make(map[string]string)
12✔
1003
        }
12✔
1004
        // creating azvolumeattachment
1005
        volumeName := strings.ToLower(diskName)
12✔
1006
        replicaName := azureutils.GetAzVolumeAttachmentName(volumeName, node)
12✔
1007
        azVolumeAttachment := azdiskv1beta2.AzVolumeAttachment{
12✔
1008
                ObjectMeta: metav1.ObjectMeta{
12✔
1009
                        Name:      replicaName,
12✔
1010
                        Namespace: c.config.ObjectNamespace,
12✔
1011
                        Labels: map[string]string{
12✔
1012
                                consts.NodeNameLabel:   node,
12✔
1013
                                consts.VolumeNameLabel: volumeName,
12✔
1014
                                consts.RoleLabel:       string(azdiskv1beta2.ReplicaRole),
12✔
1015
                        },
12✔
1016
                        Annotations: map[string]string{consts.VolumeAttachRequestAnnotation: "controller"},
12✔
1017
                        Finalizers:  []string{consts.AzVolumeAttachmentFinalizer},
12✔
1018
                },
12✔
1019
                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
12✔
1020
                        NodeName:      node,
12✔
1021
                        VolumeID:      volumeID,
12✔
1022
                        VolumeName:    volumeName,
12✔
1023
                        RequestedRole: azdiskv1beta2.ReplicaRole,
12✔
1024
                        VolumeContext: volumeContext,
12✔
1025
                },
12✔
1026
        }
12✔
1027
        w.AnnotateObject(&azVolumeAttachment)
12✔
1028
        azureutils.AnnotateAPIVersion(&azVolumeAttachment)
12✔
1029

12✔
1030
        _, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).Create(ctx, &azVolumeAttachment, metav1.CreateOptions{})
12✔
1031
        if err != nil {
12✔
1032
                err = status.Errorf(codes.Internal, "failed to create replica AzVolumeAttachment %s.", replicaName)
×
1033
                return err
×
1034
        }
×
1035
        return nil
12✔
1036
}
1037

1038
func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azVolumeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
5✔
1039
        var err error
5✔
1040
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, azVolumeName))
5✔
1041
        defer func() { w.Finish(err) }()
10✔
1042

1043
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzVolume (%s)", caller, azVolumeName)
5✔
1044

5✔
1045
        var attachments []azdiskv1beta2.AzVolumeAttachment
5✔
1046
        attachments, err = azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, azVolumeName, role)
5✔
1047
        if err != nil {
5✔
1048
                if apiErrors.IsNotFound(err) {
×
1049
                        err = nil
×
1050
                        return nil, nil
×
1051
                }
×
1052
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1053
                return nil, err
×
1054
        }
1055

1056
        if err = c.cleanUpAzVolumeAttachments(ctx, attachments, cleanupMode, caller); err != nil {
5✔
1057
                return attachments, err
×
1058
        }
×
1059
        c.unmarkVolumeVisited(azVolumeName)
5✔
1060

5✔
1061
        if attachmentDeleteMode == deleteAndWait {
7✔
1062
                attachmentsCount := len(attachments)
2✔
1063
                errorMessageCh := make(chan string, attachmentsCount)
2✔
1064

2✔
1065
                // start waiting for replica AzVolumeAttachment CRIs to be deleted
2✔
1066
                for _, attachment := range attachments {
3✔
1067
                        // wait async and report error to go channel
1✔
1068
                        go func(ctx context.Context, attachment azdiskv1beta2.AzVolumeAttachment) {
2✔
1069
                                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
1✔
1070
                                defer waiter.Close()
1✔
1071

1✔
1072
                                _, derr := waiter.Wait(ctx)
1✔
1073
                                if derr != nil {
1✔
1074
                                        errorMessageCh <- fmt.Sprintf("%s: %v", attachment.Name, derr)
×
1075
                                } else {
1✔
1076
                                        errorMessageCh <- ""
1✔
1077
                                }
1✔
1078
                        }(ctx, attachment)
1079
                }
1080

1081
                // if errors have been found with the conditionWatcher calls, format the error msg and report via CRI
1082
                var errMsgs []string
2✔
1083
                for i := 0; i < attachmentsCount; i++ {
3✔
1084
                        v, ok := <-errorMessageCh
1✔
1085
                        if ok && v != "" {
1✔
1086
                                errMsgs = append(errMsgs, v)
×
1087
                        }
×
1088
                }
1089
                close(errorMessageCh)
2✔
1090
                if len(errMsgs) > 0 {
2✔
1091
                        err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
×
1092
                }
×
1093
        }
1094

1095
        return attachments, err
5✔
1096
}
1097

1098
func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDriverNodeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
2✔
1099
        var err error
2✔
1100
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, azDriverNodeName))
2✔
1101
        defer func() { w.Finish(err) }()
4✔
1102
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzDriverNode (%s)", caller, azDriverNodeName)
2✔
1103

2✔
1104
        var nodeRequirement *labels.Requirement
2✔
1105
        nodeRequirement, err = azureutils.CreateLabelRequirements(consts.NodeNameLabel, selection.Equals, azDriverNodeName)
2✔
1106
        if err != nil {
2✔
1107
                return nil, err
×
1108
        }
×
1109
        labelSelector := labels.NewSelector().Add(*nodeRequirement)
2✔
1110

2✔
1111
        var attachments *azdiskv1beta2.AzVolumeAttachmentList
2✔
1112
        attachments, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
2✔
1113
        if err != nil {
2✔
1114
                if apiErrors.IsNotFound(err) {
×
1115
                        err = nil
×
1116
                        return nil, nil
×
1117
                }
×
1118
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1119
                return nil, err
×
1120
        }
1121

1122
        cleanUpMap := map[string][]azdiskv1beta2.AzVolumeAttachment{}
2✔
1123
        for _, attachment := range attachments.Items {
3✔
1124
                if shouldCleanUp(attachment, role) {
2✔
1125
                        cleanUpMap[attachment.Spec.VolumeName] = append(cleanUpMap[attachment.Spec.VolumeName], attachment)
1✔
1126
                }
1✔
1127
        }
1128

1129
        for volumeName, cleanUps := range cleanUpMap {
3✔
1130
                volumeName := volumeName
1✔
1131
                c.addToOperationQueue(ctx,
1✔
1132
                        volumeName,
1✔
1133
                        caller,
1✔
1134
                        func(ctx context.Context) error {
2✔
1135
                                return c.cleanUpAzVolumeAttachments(ctx, cleanUps, cleanupMode, caller)
1✔
1136
                        },
1✔
1137
                        false)
1138
        }
1139
        return attachments.Items, nil
2✔
1140
}
1141

1142
func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachments []azdiskv1beta2.AzVolumeAttachment, cleanUpMode attachmentCleanUpMode, caller operationRequester) error {
6✔
1143
        var err error
6✔
1144

6✔
1145
        for _, attachment := range attachments {
11✔
1146
                patched := attachment.DeepCopy()
5✔
1147

5✔
1148
                if attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
5✔
1149
                        if cleanUpMode == cleanUpAttachment && !volumeDetachRequested(patched) {
×
1150
                                markDetachRequest(patched, caller)
×
1151
                        } else if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1152
                                // if primary azvolumeattachments are being cleaned up for driver uninstall, issue a DELETE call and continue
×
1153
                                // note that this DELETE request will remove AzVolumeAttachment CRI without detaching the volume from node
×
1154
                                if err = c.cachedClient.Delete(ctx, patched); err != nil {
×
1155
                                        return err
×
1156
                                }
×
1157
                        }
1158
                } else {
5✔
1159
                        // replica mount should always be detached in cleanup regardless to the cleanup mode
5✔
1160
                        if !volumeDetachRequested(patched) {
10✔
1161
                                markDetachRequest(patched, caller)
5✔
1162
                        }
5✔
1163

1164
                        // append cleanup annotation to prevent replica recreations except for when the clean up was triggered by node controller due to node failure.
1165
                        if caller != node && !metav1.HasAnnotation(patched.ObjectMeta, consts.CleanUpAnnotation) {
9✔
1166
                                markCleanUp(patched, caller)
4✔
1167
                        }
4✔
1168
                }
1169

1170
                if !reflect.DeepEqual(attachment.Status, patched.Status) {
10✔
1171
                        if err = c.cachedClient.Status().Patch(ctx, patched, client.MergeFrom(&attachment)); err != nil && apiErrors.IsNotFound(err) {
5✔
1172
                                err = status.Errorf(codes.Internal, "failed to patch AzVolumeAttachment (%s)", attachment.Name)
×
1173
                                return err
×
1174
                        }
×
1175
                }
1176
        }
1177
        return nil
6✔
1178
}
1179

1180
func (c *SharedState) createReplicaRequestsQueue() {
86✔
1181
        c.priorityReplicaRequestsQueue = &VolumeReplicaRequestsPriorityQueue{}
86✔
1182
        c.priorityReplicaRequestsQueue.queue = cache.NewHeap(
86✔
1183
                func(obj interface{}) (string, error) {
87✔
1184
                        return obj.(*ReplicaRequest).VolumeName, nil
1✔
1185
                },
1✔
1186
                func(left, right interface{}) bool {
×
1187
                        return left.(*ReplicaRequest).Priority > right.(*ReplicaRequest).Priority
×
1188
                })
×
1189
}
1190

1191
// Removes replica requests from the priority queue and adds to operation queue.
1192
func (c *SharedState) tryCreateFailedReplicas(ctx context.Context, requester operationRequester) {
2✔
1193
        if atomic.SwapInt32(&c.processingReplicaRequestQueue, 1) == 0 {
4✔
1194
                ctx, w := workflow.New(ctx)
2✔
1195
                defer w.Finish(nil)
2✔
1196
                requests := c.priorityReplicaRequestsQueue.DrainQueue()
2✔
1197
                for i := 0; i < len(requests); i++ {
3✔
1198
                        replicaRequest := requests[i]
1✔
1199
                        c.addToOperationQueue(ctx,
1✔
1200
                                replicaRequest.VolumeName,
1✔
1201
                                requester,
1✔
1202
                                func(ctx context.Context) error {
2✔
1203
                                        return c.manageReplicas(ctx, replicaRequest.VolumeName)
1✔
1204
                                },
1✔
1205
                                false,
1206
                        )
1207
                }
1208
                atomic.StoreInt32(&c.processingReplicaRequestQueue, 0)
2✔
1209
        }
1210
}
1211

1212
func (c *SharedState) garbageCollectReplicas(ctx context.Context, volumeName string, requester operationRequester) {
3✔
1213
        c.addToOperationQueue(
3✔
1214
                ctx,
3✔
1215
                volumeName,
3✔
1216
                replica,
3✔
1217
                func(ctx context.Context) error {
6✔
1218
                        if _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, volumeName, requester, azureutils.ReplicaOnly, cleanUpAttachment, deleteOnly); err != nil {
3✔
1219
                                return err
×
1220
                        }
×
1221
                        c.addToGcExclusionList(volumeName, requester)
3✔
1222
                        c.removeGarbageCollection(volumeName)
3✔
1223
                        c.unmarkVolumeVisited(volumeName)
3✔
1224
                        return nil
3✔
1225
                },
1226
                true,
1227
        )
1228
}
1229

1230
func (c *SharedState) removeGarbageCollection(volumeName string) {
5✔
1231
        v, ok := c.cleanUpMap.LoadAndDelete(volumeName)
5✔
1232
        if ok {
8✔
1233
                cancelFunc := v.(context.CancelFunc)
3✔
1234
                cancelFunc()
3✔
1235
        }
3✔
1236
        // if there is any garbage collection enqueued in operation queue, remove it
1237
        c.dequeueGarbageCollection(volumeName)
5✔
1238
}
1239

1240
func (c *SharedState) manageReplicas(ctx context.Context, volumeName string) error {
12✔
1241
        var err error
12✔
1242
        ctx, w := workflow.New(ctx)
12✔
1243
        defer func() { w.Finish(err) }()
24✔
1244

1245
        var azVolume *azdiskv1beta2.AzVolume
12✔
1246
        azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volumeName, c.config.ObjectNamespace, true)
12✔
1247
        if apiErrors.IsNotFound(err) {
12✔
1248
                w.Logger().V(5).Info("Volume no longer exists. Aborting manage replica operation")
×
1249
                return nil
×
1250
        } else if err != nil {
12✔
1251
                w.Logger().Error(err, "failed to get AzVolume")
×
1252
                return err
×
1253
        }
×
1254

1255
        // replica management should not be executed or retried if AzVolume is scheduled for a deletion or not created.
1256
        deleteRequested, _ := objectDeletionRequested(azVolume)
12✔
1257
        if !isCreated(azVolume) || deleteRequested {
12✔
1258
                w.Logger().Errorf(errors.New("no valid azVolume"), "azVolume (%s) is scheduled for deletion or has no underlying volume object", azVolume.Name)
×
1259
                return nil
×
1260
        }
×
1261

1262
        currentReplicaCount, err := c.countValidReplicasForVolume(ctx, volumeName)
12✔
1263
        if err != nil {
12✔
1264
                return err
×
1265
        }
×
1266

1267
        desiredReplicaCount := azVolume.Spec.MaxMountReplicaCount
12✔
1268
        w.Logger().Infof("Control number of replicas for volume (%s): desired=%d, current:%d", azVolume.Spec.VolumeName, desiredReplicaCount, currentReplicaCount)
12✔
1269

12✔
1270
        if desiredReplicaCount > currentReplicaCount {
24✔
1271
                w.Logger().Infof("Need %d more replicas for volume (%s)", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName)
12✔
1272
                if azVolume.Status.Detail == nil || azVolume.Status.State == azdiskv1beta2.VolumeDeleting || azVolume.Status.State == azdiskv1beta2.VolumeDeleted {
12✔
1273
                        // underlying volume does not exist, so volume attachment cannot be made
×
1274
                        return nil
×
1275
                }
×
1276
                if err = c.createReplicas(ctx, desiredReplicaCount-currentReplicaCount, azVolume.Name, azVolume.Status.Detail.VolumeID, azVolume.Spec.Parameters); err != nil {
12✔
1277
                        w.Logger().Errorf(err, "failed to create %d replicas for volume (%s): %v", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName, err)
×
1278
                        return err
×
1279
                }
×
1280
        }
1281
        return nil
12✔
1282
}
1283

1284
// Count the number of replica attachments that aren't scheduled for deletion for a given volume
1285
func (c *SharedState) countValidReplicasForVolume(ctx context.Context, volumeName string) (int, error) {
12✔
1286
        w, _ := workflow.GetWorkflowFromContext(ctx)
12✔
1287
        validReplicaCount := 0
12✔
1288

12✔
1289
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
12✔
1290
        if err != nil {
12✔
1291
                w.Logger().Errorf(err, "failed to list replica AzVolumeAttachments")
×
1292
                return validReplicaCount, err
×
1293
        }
×
1294

1295
        for _, azVolumeAttachment := range azVolumeAttachments {
12✔
1296
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
×
1297
                        validReplicaCount++
×
1298
                }
×
1299
        }
1300
        return validReplicaCount, nil
12✔
1301
}
1302

1303
func (c *SharedState) createReplicas(ctx context.Context, remainingReplicas int, volumeName, volumeID string, volumeContext map[string]string) error {
12✔
1304
        var err error
12✔
1305
        ctx, w := workflow.New(ctx)
12✔
1306
        defer func() { w.Finish(err) }()
24✔
1307

1308
        // if volume is scheduled for clean up, skip replica creation
1309
        if _, cleanUpScheduled := c.cleanUpMap.Load(volumeName); cleanUpScheduled {
12✔
1310
                return nil
×
1311
        }
×
1312

1313
        // get pods linked to the volume
1314
        var pods []v1.Pod
12✔
1315
        pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
12✔
1316
        if err != nil {
12✔
1317
                return err
×
1318
        }
×
1319

1320
        // acquire per-pod lock to be released upon creation of replica AzVolumeAttachment CRIs
1321
        for _, pod := range pods {
25✔
1322
                podKey := getQualifiedName(pod.Namespace, pod.Name)
13✔
1323
                v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
13✔
1324
                podLock := v.(*sync.Mutex)
13✔
1325
                podLock.Lock()
13✔
1326
                defer podLock.Unlock()
13✔
1327
        }
13✔
1328

1329
        var nodes []string
12✔
1330
        nodes, err = c.getNodesForReplica(ctx, volumeName, pods)
12✔
1331
        if err != nil {
12✔
1332
                w.Logger().Errorf(err, "failed to get a list of nodes for replica attachment")
×
1333
                return err
×
1334
        }
×
1335

1336
        requiredReplicas := remainingReplicas
12✔
1337
        for _, node := range nodes {
24✔
1338
                if err = c.createReplicaAzVolumeAttachment(ctx, volumeID, node, volumeContext); err != nil {
12✔
1339
                        w.Logger().Errorf(err, "failed to create replica AzVolumeAttachment for volume %s", volumeName)
×
1340
                        // continue to try attachment with next node
×
1341
                        continue
×
1342
                }
1343
                remainingReplicas--
12✔
1344
                if remainingReplicas <= 0 {
24✔
1345
                        // no more remainingReplicas, don't need to create replica AzVolumeAttachment
12✔
1346
                        break
12✔
1347
                }
1348
        }
1349

1350
        if remainingReplicas > 0 {
12✔
1351
                //no failed replica attachments, but there are still more replicas to reach MaxShares
×
1352
                request := ReplicaRequest{VolumeName: volumeName, Priority: remainingReplicas}
×
1353
                c.priorityReplicaRequestsQueue.Push(ctx, &request)
×
1354
                for _, pod := range pods {
×
1355
                        c.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, "Not enough suitable nodes to attach %d of %d replica mount(s) for volume %s", remainingReplicas, requiredReplicas, volumeName)
×
1356
                }
×
1357
        }
1358
        return nil
12✔
1359
}
1360

1361
func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string, pods []v1.Pod) ([]string, error) {
12✔
1362
        var err error
12✔
1363
        ctx, w := workflow.New(ctx)
12✔
1364
        defer func() { w.Finish(err) }()
24✔
1365

1366
        if len(pods) == 0 {
12✔
1367
                pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
×
1368
                if err != nil {
×
1369
                        return nil, err
×
1370
                }
×
1371
        }
1372

1373
        var volumes []string
12✔
1374
        volumes, err = c.getVolumesForPodObjs(ctx, pods)
12✔
1375
        if err != nil {
12✔
1376
                return nil, err
×
1377
        }
×
1378

1379
        var nodes []string
12✔
1380
        nodes, err = c.getRankedNodesForReplicaAttachments(ctx, volumes, pods)
12✔
1381
        if err != nil {
12✔
1382
                return nil, err
×
1383
        }
×
1384

1385
        var replicaNodes []string
12✔
1386
        replicaNodes, err = c.getNodesWithReplica(ctx, volumeName)
12✔
1387
        if err != nil {
12✔
1388
                return nil, err
×
1389
        }
×
1390

1391
        skipSet := map[string]bool{}
12✔
1392
        for _, replicaNode := range replicaNodes {
12✔
1393
                skipSet[replicaNode] = true
×
1394
        }
×
1395

1396
        filtered := []string{}
12✔
1397
        for _, node := range nodes {
30✔
1398
                if skipSet[node] {
18✔
1399
                        continue
×
1400
                }
1401
                // if the node has no capacity for disk attachment, we should skip it
1402
                remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
18✔
1403
                if !nodeExists || remainingCapacity == nil || remainingCapacity.(*atomic.Int32).Load() <= int32(0) {
18✔
1404
                        w.Logger().V(5).Infof("skip node(%s) because it has no capacity for disk attachment", node)
×
1405
                        continue
×
1406
                }
1407
                filtered = append(filtered, node)
18✔
1408
        }
1409

1410
        return filtered, nil
12✔
1411
}
1412

1413
func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error {
5✔
1414
        var err error
5✔
1415
        ctx, w := workflow.New(ctx)
5✔
1416
        defer func() { w.Finish(err) }()
10✔
1417

1418
        var desiredAzVolume *azdiskv1beta2.AzVolume
5✔
1419
        requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64()
5✔
1420
        volumeCapability := c.getVolumeCapabilityFromPv(&pv)
5✔
1421

5✔
1422
        // translate intree pv to csi pv to convert them into AzVolume resource
5✔
1423
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
5✔
1424
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
5✔
1425
                pv.Spec.AzureDisk != nil {
5✔
1426
                var transPV *v1.PersistentVolume
×
1427
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1428
                if transPV, err = c.translateInTreePVToCSI(&pv); err != nil {
×
1429
                        return err
×
1430
                }
×
1431
                pv = *transPV
×
1432
        }
1433

1434
        // skip if PV is not managed by azuredisk driver
1435
        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != c.config.DriverName {
5✔
1436
                return nil
×
1437
        }
×
1438

1439
        // create AzVolume CRI for CSI Volume Source
1440
        desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI)
5✔
1441
        if err != nil {
5✔
1442
                return err
×
1443
        }
×
1444

1445
        if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required != nil {
5✔
1446
                desiredAzVolume.Status.Detail.AccessibleTopology = azureutils.GetTopologyFromNodeSelector(*pv.Spec.NodeAffinity.Required, c.topologyKey)
×
1447
        }
×
1448
        if azureutils.IsMultiNodePersistentVolume(pv) {
5✔
1449
                desiredAzVolume.Spec.MaxMountReplicaCount = 0
×
1450
        }
×
1451

1452
        // if it's an inline volume, no pv label or pvc label should be added
1453
        if !azureutils.MapContains(annotations, consts.InlineVolumeAnnotation) {
9✔
1454
                desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvNameLabel, pv.Name)
4✔
1455

4✔
1456
                if pv.Spec.ClaimRef != nil {
8✔
1457
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNameLabel, pv.Spec.ClaimRef.Name)
4✔
1458
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNamespaceLabel, pv.Spec.ClaimRef.Namespace)
4✔
1459
                }
4✔
1460
        }
1461

1462
        desiredAzVolume.Spec.VolumeCapability = volumeCapability
5✔
1463
        desiredAzVolume.Spec.PersistentVolume = pv.Name
5✔
1464
        desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes}
5✔
1465

5✔
1466
        desiredAzVolume.Status.Detail.CapacityBytes = requiredBytes
5✔
1467

5✔
1468
        for k, v := range annotations {
7✔
1469
                desiredAzVolume.Status.Annotations = azureutils.AddToMap(desiredAzVolume.Status.Annotations, k, v)
2✔
1470
        }
2✔
1471

1472
        w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name)
5✔
1473

5✔
1474
        if err = c.createAzVolume(ctx, desiredAzVolume); err != nil {
5✔
1475
                err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err)
×
1476
                return err
×
1477
        }
×
1478
        return nil
5✔
1479
}
1480

1481
func (c *SharedState) getVolumeCapabilityFromPv(pv *v1.PersistentVolume) []azdiskv1beta2.VolumeCapability {
5✔
1482
        volCaps := []azdiskv1beta2.VolumeCapability{}
5✔
1483

5✔
1484
        for _, accessMode := range pv.Spec.AccessModes {
6✔
1485
                volCap := azdiskv1beta2.VolumeCapability{}
1✔
1486
                // default to Mount
1✔
1487
                if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == v1.PersistentVolumeBlock {
1✔
1488
                        volCap.AccessType = azdiskv1beta2.VolumeCapabilityAccessBlock
×
1489
                }
×
1490
                switch accessMode {
1✔
1491
                case v1.ReadWriteOnce:
1✔
1492
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeSingleNodeSingleWriter
1✔
1493
                case v1.ReadWriteMany:
×
1494
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeMultiWriter
×
1495
                case v1.ReadOnlyMany:
×
1496
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeReaderOnly
×
1497
                default:
×
1498
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeUnknown
×
1499
                }
1500
                volCaps = append(volCaps, volCap)
1✔
1501
        }
1502
        return volCaps
5✔
1503
}
1504

1505
func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) {
5✔
1506
        diskName, err := azureutils.GetDiskName(source.VolumeHandle)
5✔
1507
        if err != nil {
5✔
1508
                return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err)
×
1509
        }
×
1510

1511
        _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false)
5✔
1512

5✔
1513
        diskParameters, _ := azureutils.ParseDiskParameters(source.VolumeAttributes, azureutils.IgnoreUnknown)
5✔
1514
        volumeParams := diskParameters.VolumeContext
5✔
1515

5✔
1516
        azVolumeName := strings.ToLower(diskName)
5✔
1517

5✔
1518
        azVolume := azdiskv1beta2.AzVolume{
5✔
1519
                ObjectMeta: metav1.ObjectMeta{
5✔
1520
                        Name:       azVolumeName,
5✔
1521
                        Finalizers: []string{consts.AzVolumeFinalizer},
5✔
1522
                },
5✔
1523
                Spec: azdiskv1beta2.AzVolumeSpec{
5✔
1524
                        MaxMountReplicaCount: maxMountReplicaCount,
5✔
1525
                        Parameters:           volumeParams,
5✔
1526
                        VolumeName:           diskName,
5✔
1527
                },
5✔
1528
                Status: azdiskv1beta2.AzVolumeStatus{
5✔
1529
                        Detail: &azdiskv1beta2.AzVolumeStatusDetail{
5✔
1530
                                VolumeID:      source.VolumeHandle,
5✔
1531
                                VolumeContext: source.VolumeAttributes,
5✔
1532
                        },
5✔
1533
                        State: azdiskv1beta2.VolumeCreated,
5✔
1534
                },
5✔
1535
        }
5✔
1536
        azureutils.AnnotateAPIVersion(&azVolume)
5✔
1537

5✔
1538
        return &azVolume, nil
5✔
1539
}
1540

1541
func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error {
5✔
1542
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1543

5✔
1544
        var err error
5✔
1545
        var azVolume *azdiskv1beta2.AzVolume
5✔
1546
        var updated *azdiskv1beta2.AzVolume
5✔
1547

5✔
1548
        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{})
5✔
1549
        if err != nil {
9✔
1550
                if apiErrors.IsNotFound(err) {
8✔
1551
                        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{})
4✔
1552
                        if err != nil {
4✔
1553
                                return err
×
1554
                        }
×
1555
                        updated = azVolume.DeepCopy()
4✔
1556
                        updated.Status = desiredAzVolume.Status
4✔
1557
                } else {
×
1558
                        return err
×
1559
                }
×
1560
        }
1561

1562
        if apiVersion, ok := azureutils.GetFromMap(azVolume.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
6✔
1563
                w.Logger().Infof("Found AzVolume (%s) with older api version. Converting to apiVersion(%s)", azVolume.Name, azdiskv1beta2.APIVersion)
1✔
1564

1✔
1565
                azVolume.Spec.PersistentVolume = desiredAzVolume.Spec.PersistentVolume
1✔
1566

1✔
1567
                for k, v := range desiredAzVolume.Labels {
4✔
1568
                        azVolume.Labels = azureutils.AddToMap(azVolume.Labels, k, v)
3✔
1569
                }
3✔
1570

1571
                // for now, we don't empty the meta annotations after migrating them to status annotation for safety.
1572
                // note that this will leave some remnant garbage entries in meta annotations
1573
                var statusAnnotation []string
1✔
1574
                for k, v := range azVolume.Annotations {
2✔
1575
                        statusAnnotation = append(statusAnnotation, k, v)
1✔
1576
                }
1✔
1577

1578
                for k, v := range desiredAzVolume.Annotations {
2✔
1579
                        azVolume.Annotations = azureutils.AddToMap(azVolume.Annotations, k, v)
1✔
1580
                }
1✔
1581

1582
                azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Update(ctx, azVolume, metav1.UpdateOptions{})
1✔
1583
                if err != nil {
1✔
1584
                        return err
×
1585
                }
×
1586
                updated = azVolume.DeepCopy()
1✔
1587
                updated.Status.Annotations = azureutils.AddToMap(updated.Status.Annotations, statusAnnotation...)
1✔
1588
        }
1589

1590
        if updated != nil {
10✔
1591
                if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, c.cachedClient, c.azClient, azVolume, func(obj client.Object) error {
10✔
1592
                        azvolume := obj.(*azdiskv1beta2.AzVolume)
5✔
1593
                        azvolume.Status = updated.Status
5✔
1594
                        return nil
5✔
1595
                }, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
5✔
1596
                        return err
×
1597
                }
×
1598
        }
1599

1600
        // if AzVolume CRI successfully recreated, also recreate the operation queue for the volume
1601
        c.createOperationQueue(desiredAzVolume.Name)
5✔
1602
        return nil
5✔
1603
}
1604

1605
func (c *SharedState) translateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
×
1606
        var err error
×
1607
        // translate intree pv to csi pv to convert them into AzVolume resource
×
1608
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
1609
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
1610
                pv.Spec.AzureDisk != nil {
×
1611
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1612
                if pv, err = c.azureDiskCSITranslator.TranslateInTreePVToCSI(pv); err != nil {
×
1613
                } else if pv == nil {
×
1614
                        err = status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi")
×
1615
                }
×
1616

1617
        }
1618
        return pv, err
×
1619
}
1620

1621
// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller
1622
func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) {
2✔
1623
        var vaName string
2✔
1624
        err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) {
4✔
1625
                val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name)
2✔
1626
                if exists {
4✔
1627
                        vaName = val.(string)
2✔
1628
                }
2✔
1629
                return exists, nil
2✔
1630
        })
1631
        return vaName, err
2✔
1632
}
1633

1634
// Returns set of node names that qualify pod affinity term and set of node names with qualifying replica attachments.
1635
func (c *SharedState) getQualifiedNodesForPodAffinityTerm(ctx context.Context, nodes []v1.Node, podNamespace string, affinityTerm v1.PodAffinityTerm) (podNodes, replicaNodes set) {
5✔
1636
        var err error
5✔
1637
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1638
        candidateNodes := set{}
5✔
1639
        for _, node := range nodes {
20✔
1640
                candidateNodes.add(node.Name)
15✔
1641
        }
15✔
1642
        podNodes = set{}
5✔
1643
        replicaNodes = set{}
5✔
1644

5✔
1645
        var podSelector labels.Selector
5✔
1646
        podSelector, err = metav1.LabelSelectorAsSelector(affinityTerm.LabelSelector)
5✔
1647
        // if failed to convert pod affinity label selector to selector, log error and skip
5✔
1648
        if err != nil {
5✔
1649
                w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1650
        }
×
1651

1652
        nsList := &v1.NamespaceList{}
5✔
1653
        if affinityTerm.NamespaceSelector != nil {
5✔
1654
                nsSelector, err := metav1.LabelSelectorAsSelector(affinityTerm.NamespaceSelector)
×
1655
                // if failed to convert pod affinity label selector to selector, log error and skip
×
1656
                if err != nil {
×
1657
                        w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1658
                } else {
×
1659
                        if err = c.cachedClient.List(ctx, nsList, &client.ListOptions{LabelSelector: nsSelector}); err != nil {
×
1660
                                w.Logger().Errorf(err, "failed to list namespaces with selector (%v)", nsSelector)
×
1661
                                return
×
1662
                        }
×
1663

1664
                }
1665
        }
1666

1667
        namespaces := affinityTerm.Namespaces
5✔
1668
        for _, ns := range nsList.Items {
5✔
1669
                namespaces = append(namespaces, ns.Name)
×
1670
        }
×
1671

1672
        pods := []v1.Pod{}
5✔
1673
        if len(namespaces) > 0 {
5✔
1674
                for _, namespace := range namespaces {
×
1675
                        podList := &v1.PodList{}
×
1676
                        if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: namespace}); err != nil {
×
1677
                                w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1678
                                pods = append(pods, podList.Items...)
×
1679
                        }
×
1680
                }
1681
        } else {
5✔
1682
                podList := &v1.PodList{}
5✔
1683
                if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: podNamespace}); err != nil {
5✔
1684
                        w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1685
                }
×
1686
                pods = podList.Items
5✔
1687
        }
1688

1689
        // get replica nodes for pods that satisfy pod label selector
1690
        replicaNodes = c.getReplicaNodesForPods(ctx, pods)
5✔
1691
        for replicaNode := range replicaNodes {
5✔
1692
                if !candidateNodes.has(replicaNode) {
×
1693
                        replicaNodes.remove(replicaNode)
×
1694
                }
×
1695
        }
1696

1697
        // get nodes with pod that share the same topology as pods satisfying pod label selector
1698
        for _, pod := range pods {
9✔
1699
                podNodes.add(pod.Spec.NodeName)
4✔
1700
        }
4✔
1701

1702
        var podNodeObjs []v1.Node
5✔
1703
        for node := range podNodes {
9✔
1704
                var nodeObj v1.Node
4✔
1705
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: node.(string)}, &nodeObj); err != nil {
4✔
1706
                        w.Logger().Errorf(err, "failed to get node (%s)", node.(string))
×
1707
                        continue
×
1708
                }
1709
                podNodeObjs = append(podNodeObjs, nodeObj)
4✔
1710
        }
1711

1712
        topologyLabel := c.getNodesTopologySelector(ctx, podNodeObjs, affinityTerm.TopologyKey)
5✔
1713
        for _, node := range nodes {
20✔
1714
                if topologyLabel != nil && topologyLabel.Matches(labels.Set(node.Labels)) {
23✔
1715
                        podNodes.add(node.Name)
8✔
1716
                }
8✔
1717
        }
1718
        return
5✔
1719
}
1720

1721
// Returns set of node names where replica mounts of given pod can be found
1722
func (c *SharedState) getReplicaNodesForPods(ctx context.Context, pods []v1.Pod) (replicaNodes set) {
5✔
1723
        // add nodes, to which replica attachments of matching pods' volumes are attached, to replicaNodes
5✔
1724
        replicaNodes = set{}
5✔
1725
        if volumes, err := c.getVolumesForPodObjs(ctx, pods); err == nil {
10✔
1726
                for _, volume := range volumes {
5✔
1727
                        attachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.ReplicaOnly)
×
1728
                        if err != nil {
×
1729
                                continue
×
1730
                        }
1731
                        for _, attachment := range attachments {
×
1732
                                if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1733
                                        node := attachment.Spec.NodeName
×
1734
                                        replicaNodes.add(node)
×
1735
                                }
×
1736
                        }
1737
                }
1738
        }
1739

1740
        return replicaNodes
5✔
1741
}
1742

1743
// Returns a label selector corresponding to a list of nodes and a topology key (aka label key)
1744
func (c *SharedState) getNodesTopologySelector(ctx context.Context, nodes []v1.Node, topologyKey string) labels.Selector {
5✔
1745
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1746
        if len(nodes) == 0 {
6✔
1747
                return nil
1✔
1748
        }
1✔
1749

1750
        topologyValues := set{}
4✔
1751
        for _, node := range nodes {
8✔
1752
                nodeLabels := node.GetLabels()
4✔
1753
                if topologyValue, exists := nodeLabels[topologyKey]; exists {
8✔
1754
                        topologyValues.add(topologyValue)
4✔
1755
                } else {
4✔
1756
                        w.Logger().V(5).Infof("node (%s) doesn't have label value for topologyKey (%s)", node.Name, topologyKey)
×
1757
                }
×
1758
        }
1759

1760
        topologySelector := labels.NewSelector()
4✔
1761
        topologyRequirement, err := azureutils.CreateLabelRequirements(topologyKey, selection.In, topologyValues.toStringSlice()...)
4✔
1762
        // if failed to create label requirement, log error and return empty selector
4✔
1763
        if err != nil {
4✔
1764
                w.Logger().Errorf(err, "failed to create label requirement for topologyKey (%s)", topologyKey)
×
1765
        } else {
4✔
1766
                topologySelector = topologySelector.Add(*topologyRequirement)
4✔
1767
        }
4✔
1768
        return topologySelector
4✔
1769
}
1770

1771
// addNodeToAvailableAttachmentsMap returns true if the node is added to or already in the availableAttachmentsMap, and false otherwise.
1772
func (c *SharedState) addNodeToAvailableAttachmentsMap(ctx context.Context, nodeName string, nodeLables map[string]string) bool {
8✔
1773
        if _, ok := c.availableAttachmentsMap.Load(nodeName); !ok {
16✔
1774
                capacity, err := azureutils.GetNodeRemainingDiskCountActual(ctx, c.cachedClient, nodeName)
8✔
1775
                if err != nil {
10✔
1776
                        klog.Errorf("Failed to get node(%s) remaining disk count with error: %v", nodeName, err)
2✔
1777
                        // store the maximum capacity if an entry for the node doesn't exist.
2✔
1778
                        capacity, err = azureutils.GetNodeMaxDiskCountWithLabels(nodeLables)
2✔
1779
                        if err != nil {
3✔
1780
                                klog.Errorf("Failed to add node(%s) in availableAttachmentsMap, because get capacity of available attachments is failed with error: %v", nodeName, err)
1✔
1781
                                return false
1✔
1782
                        }
1✔
1783
                }
1784
                var count atomic.Int32
7✔
1785
                count.Store(int32(capacity))
7✔
1786
                klog.Infof("Added node(%s) to availableAttachmentsMap with capacity: %d", nodeName, capacity)
7✔
1787
                c.availableAttachmentsMap.LoadOrStore(nodeName, &count)
7✔
1788
        }
1789
        return true
7✔
1790
}
1791

1792
func (c *SharedState) deleteNodeFromAvailableAttachmentsMap(ctx context.Context, node string) {
3✔
1793
        klog.Infof("Deleted node(%s) from availableAttachmentsMap", node)
3✔
1794
        c.availableAttachmentsMap.Delete(node)
3✔
1795
}
3✔
1796

1797
func (c *SharedState) decrementNodeCapacity(ctx context.Context, node string) bool {
4✔
1798
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
4✔
1799
        if nodeExists && remainingCapacity != nil {
7✔
1800
                for {
6✔
1801
                        currentCapacity := remainingCapacity.(*atomic.Int32).Load()
3✔
1802
                        if currentCapacity == int32(0) {
4✔
1803
                                klog.Errorf("Failed to decrement disk capacity for node(%s) because no remaining capacity", node)
1✔
1804
                                return false
1✔
1805
                        }
1✔
1806
                        if remainingCapacity.(*atomic.Int32).CompareAndSwap(currentCapacity, currentCapacity-1) {
4✔
1807
                                return true
2✔
1808
                        }
2✔
1809
                }
1810
        }
1811

1812
        klog.Errorf("Failed to decrement disk capacity because node(%s) not found", node)
1✔
1813
        return false
1✔
1814
}
1815

1816
func (c *SharedState) incrementNodeCapacity(ctx context.Context, node string) bool {
3✔
1817
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
3✔
1818
        if nodeExists && remainingCapacity != nil {
5✔
1819
                remainingCapacity.(*atomic.Int32).Add(1)
2✔
1820
                return true
2✔
1821
        }
2✔
1822

1823
        klog.Errorf("Failed to increment disk capacity because node(%s) not found", node)
1✔
1824
        return false
1✔
1825
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc