• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 4515997592

24 Mar 2023 11:08PM UTC coverage: 69.198% (-0.1%) from 69.304%
4515997592

push

github

GitHub
Merge pull request #1772 from alice-zheyan-yu/annotation_doc

7162 of 10350 relevant lines covered (69.2%)

6.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.89
/pkg/controller/shared_state.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "container/list"
21
        "context"
22
        "errors"
23
        "fmt"
24
        "reflect"
25
        "sort"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29

30
        "google.golang.org/grpc/codes"
31
        "google.golang.org/grpc/status"
32
        v1 "k8s.io/api/core/v1"
33
        crdClientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
34
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/selection"
38
        "k8s.io/apimachinery/pkg/types"
39
        "k8s.io/apimachinery/pkg/util/wait"
40
        utilfeature "k8s.io/apiserver/pkg/util/feature"
41
        "k8s.io/client-go/kubernetes"
42
        cache "k8s.io/client-go/tools/cache"
43
        "k8s.io/client-go/tools/record"
44
        "k8s.io/client-go/util/retry"
45
        csitranslator "k8s.io/csi-translation-lib/plugins"
46
        "k8s.io/klog/v2"
47
        "k8s.io/kubernetes/pkg/features"
48
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
49
        azdisk "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned"
50
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
51
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
52
        "sigs.k8s.io/azuredisk-csi-driver/pkg/watcher"
53
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
54
        "sigs.k8s.io/controller-runtime/pkg/client"
55
)
56

57
type DriverLifecycle interface {
58
        GetDiskClientSet() azdisk.Interface
59
        GetConditionWatcher() *watcher.ConditionWatcher
60
        IsDriverUninstall() bool
61
}
62

63
type SharedState struct {
64
        recoveryComplete              uint32
65
        config                        *azdiskv1beta2.AzDiskDriverConfiguration
66
        topologyKey                   string
67
        podToClaimsMap                sync.Map
68
        podToInlineMap                sync.Map
69
        claimToPodsMap                sync.Map
70
        volumeToClaimMap              sync.Map
71
        claimToVolumeMap              sync.Map
72
        azVolumeAttachmentToVaMap     sync.Map
73
        pvToVolumeMap                 sync.Map
74
        podLocks                      sync.Map
75
        visitedVolumes                sync.Map
76
        volumeOperationQueues         sync.Map
77
        cleanUpMap                    sync.Map
78
        priorityReplicaRequestsQueue  *VolumeReplicaRequestsPriorityQueue
79
        processingReplicaRequestQueue int32
80
        eventRecorder                 record.EventRecorder
81
        cachedClient                  client.Client
82
        azClient                      azdisk.Interface
83
        kubeClient                    kubernetes.Interface
84
        crdClient                     crdClientset.Interface
85
        conditionWatcher              *watcher.ConditionWatcher
86
        azureDiskCSITranslator        csitranslator.InTreePlugin
87
        availableAttachmentsMap       sync.Map
88
        driverLifecycle               DriverLifecycle
89
}
90

91
func NewSharedState(config *azdiskv1beta2.AzDiskDriverConfiguration, topologyKey string, eventRecorder record.EventRecorder, cachedClient client.Client, crdClient crdClientset.Interface, kubeClient kubernetes.Interface, driverLifecycle DriverLifecycle) *SharedState {
85✔
92
        newSharedState := &SharedState{
85✔
93
                config:                 config,
85✔
94
                topologyKey:            topologyKey,
85✔
95
                eventRecorder:          eventRecorder,
85✔
96
                cachedClient:           cachedClient,
85✔
97
                crdClient:              crdClient,
85✔
98
                azClient:               driverLifecycle.GetDiskClientSet(),
85✔
99
                kubeClient:             kubeClient,
85✔
100
                conditionWatcher:       driverLifecycle.GetConditionWatcher(),
85✔
101
                azureDiskCSITranslator: csitranslator.NewAzureDiskCSITranslator(),
85✔
102
                driverLifecycle:        driverLifecycle,
85✔
103
        }
85✔
104
        newSharedState.createReplicaRequestsQueue()
85✔
105

85✔
106
        return newSharedState
85✔
107
}
85✔
108

109
func (c *SharedState) isRecoveryComplete() bool {
41✔
110
        return atomic.LoadUint32(&c.recoveryComplete) == 1
41✔
111
}
41✔
112

113
func (c *SharedState) MarkRecoveryComplete() {
86✔
114
        atomic.StoreUint32(&c.recoveryComplete, 1)
86✔
115
}
86✔
116

117
func (c *SharedState) DeleteAPIVersion(ctx context.Context, deleteVersion string) error {
×
118
        w, _ := workflow.GetWorkflowFromContext(ctx)
×
119
        crdNames := []string{consts.AzDriverNodeCRDName, consts.AzVolumeCRDName, consts.AzVolumeAttachmentCRDName}
×
120
        for _, crdName := range crdNames {
×
121
                err := retry.RetryOnConflict(retry.DefaultBackoff,
×
122
                        func() error {
×
123
                                crd, err := c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{})
×
124
                                if err != nil {
×
125
                                        if apiErrors.IsNotFound(err) {
×
126
                                                return err
×
127
                                        }
×
128
                                        return nil
×
129
                                }
130

131
                                updated := crd.DeepCopy()
×
132
                                var storedVersions []string
×
133
                                // remove version from status stored versions
×
134
                                for _, version := range updated.Status.StoredVersions {
×
135
                                        if version == deleteVersion {
×
136
                                                continue
×
137
                                        }
138
                                        storedVersions = append(storedVersions, version)
×
139
                                }
140
                                updated.Status.StoredVersions = storedVersions
×
141
                                _, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().UpdateStatus(ctx, updated, metav1.UpdateOptions{})
×
142
                                if err != nil {
×
143
                                        // log the error and continue
×
144
                                        return err
×
145
                                }
×
146
                                return nil
×
147
                        })
148

149
                if err != nil {
×
150
                        w.Logger().Errorf(err, "failed to delete %s api version from CRD (%s)", deleteVersion, crdName)
×
151
                }
×
152

153
                // Uncomment when the all deployments have rolled over to v1beta1.
154
                // updated = crd.DeepCopy()
155
                // // remove version from spec versions
156
                // var specVersions []crdv1.CustomResourceDefinitionVersion
157
                // for _, version := range updated.Spec.Versions {
158
                //         if version.Name == deleteVersion {
159
                //                 continue
160
                //         }
161
                //         specVersions = append(specVersions, version)
162
                // }
163
                // updated.Spec.Versions = specVersions
164

165
                // // update the crd
166
                // crd, err = c.crdClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, updated, metav1.UpdateOptions{})
167
                // if err != nil {
168
                //         // log the error and continue
169
                //         w.Logger().Errorf(err, "failed to remove %s spec version from CRD (%s)", deleteVersion, crd.Name)
170
                //         continue
171
                // }
172
        }
173
        return nil
×
174
}
175

176
func (c *SharedState) createOperationQueue(volumeName string) {
62✔
177
        _, _ = c.volumeOperationQueues.LoadOrStore(volumeName, newLockableEntry(newOperationQueue()))
62✔
178
}
62✔
179

180
func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string, requester operationRequester, operationFunc func(context.Context) error, isReplicaGarbageCollection bool) {
17✔
181
        // It is expected for caller to provide parent workflow via context.
17✔
182
        // The child workflow will be created below and be fed to the queued operation for necessary workflow information.
17✔
183
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, volumeName))
17✔
184

17✔
185
        v, ok := c.volumeOperationQueues.Load(volumeName)
17✔
186
        if !ok {
17✔
187
                return
×
188
        }
×
189
        lockable := v.(*lockableEntry)
17✔
190
        lockable.Lock()
17✔
191
        isFirst := lockable.entry.(*operationQueue).Len() <= 0
17✔
192
        _ = lockable.entry.(*operationQueue).PushBack(&replicaOperation{
17✔
193
                ctx:       ctx,
17✔
194
                requester: requester,
17✔
195
                operationFunc: func(ctx context.Context) (err error) {
34✔
196
                        defer func() {
34✔
197
                                if !shouldRequeueReplicaOperation(isReplicaGarbageCollection, err) {
34✔
198
                                        w.Finish(err)
17✔
199
                                }
17✔
200
                        }()
201
                        err = operationFunc(ctx)
17✔
202
                        return
17✔
203
                },
204
                isReplicaGarbageCollection: isReplicaGarbageCollection,
205
        })
206
        lockable.Unlock()
17✔
207

17✔
208
        // if first operation, start goroutine
17✔
209
        if isFirst {
33✔
210
                go func() {
32✔
211
                        lockable.Lock()
16✔
212
                        defer lockable.Unlock()
16✔
213
                        for {
33✔
214
                                operationQueue := lockable.entry.(*operationQueue)
17✔
215
                                // pop the first operation
17✔
216
                                front := operationQueue.Front()
17✔
217
                                operation := front.Value.(*replicaOperation)
17✔
218

17✔
219
                                // only run the operation if the operation requester is not enlisted in blacklist
17✔
220
                                if !operationQueue.gcExclusionList.has(operation.requester) {
34✔
221
                                        lockable.Unlock()
17✔
222
                                        err := operation.operationFunc(operation.ctx)
17✔
223
                                        lockable.Lock()
17✔
224
                                        if shouldRequeueReplicaOperation(operation.isReplicaGarbageCollection, err) {
17✔
225
                                                // if operation failed, push it to the end of the queue
×
226
                                                if operationQueue.isActive {
×
227
                                                        operationQueue.PushBack(operation)
×
228
                                                }
×
229
                                        }
230
                                }
231

232
                                operationQueue.remove(front)
17✔
233
                                // there is no entry remaining, exit the loop
17✔
234
                                if operationQueue.Front() == nil {
33✔
235
                                        break
16✔
236
                                }
237
                        }
238
                }()
239
        }
240
}
241

242
func (c *SharedState) deleteOperationQueue(volumeName string) {
1✔
243
        v, ok := c.volumeOperationQueues.LoadAndDelete(volumeName)
1✔
244
        // if operation queue has already been deleted, return
1✔
245
        if !ok {
1✔
246
                return
×
247
        }
×
248
        // clear the queue in case, there still is an entry in queue
249
        lockable := v.(*lockableEntry)
1✔
250
        lockable.Lock()
1✔
251
        defer lockable.Unlock()
1✔
252
        lockable.entry.(*operationQueue).Init()
1✔
253
}
254

255
func (c *SharedState) closeOperationQueue(volumeName string) func() {
2✔
256
        v, ok := c.volumeOperationQueues.Load(volumeName)
2✔
257
        if !ok {
4✔
258
                return nil
2✔
259
        }
2✔
260
        lockable := v.(*lockableEntry)
×
261

×
262
        lockable.Lock()
×
263
        lockable.entry.(*operationQueue).isActive = false
×
264
        lockable.entry.(*operationQueue).Init()
×
265
        return lockable.Unlock
×
266
}
267

268
func (c *SharedState) addToGcExclusionList(volumeName string, target operationRequester) {
3✔
269
        v, ok := c.volumeOperationQueues.Load(volumeName)
3✔
270
        if !ok {
3✔
271
                return
×
272
        }
×
273
        lockable := v.(*lockableEntry)
3✔
274
        lockable.Lock()
3✔
275
        defer lockable.Unlock()
3✔
276
        lockable.entry.(*operationQueue).gcExclusionList.add(target)
3✔
277
}
278

279
func (c *SharedState) removeFromExclusionList(volumeName string, target operationRequester) {
7✔
280
        v, ok := c.volumeOperationQueues.Load(volumeName)
7✔
281
        if !ok {
7✔
282
                return
×
283
        }
×
284
        lockable := v.(*lockableEntry)
7✔
285
        lockable.Lock()
7✔
286
        defer lockable.Unlock()
7✔
287
        delete(lockable.entry.(*operationQueue).gcExclusionList, target)
7✔
288
}
289

290
func (c *SharedState) dequeueGarbageCollection(volumeName string) {
5✔
291
        v, ok := c.volumeOperationQueues.Load(volumeName)
5✔
292
        if !ok {
5✔
293
                return
×
294
        }
×
295
        lockable := v.(*lockableEntry)
5✔
296
        lockable.Lock()
5✔
297
        defer lockable.Unlock()
5✔
298
        queue := lockable.entry.(*operationQueue)
5✔
299
        // look for garbage collection operation in the queue and remove from queue
5✔
300
        var next *list.Element
5✔
301
        for cur := queue.Front(); cur != nil; cur = next {
8✔
302
                next = cur.Next()
3✔
303
                if cur.Value.(*replicaOperation).isReplicaGarbageCollection {
6✔
304
                        queue.remove(cur)
3✔
305
                }
3✔
306
        }
307
}
308

309
func (c *SharedState) getVolumesFromPod(ctx context.Context, podName string) ([]string, error) {
26✔
310
        w, _ := workflow.GetWorkflowFromContext(ctx)
26✔
311

26✔
312
        var claims []string
26✔
313
        w.Logger().V(5).Infof("Getting requested volumes for pod (%s).", podName)
26✔
314
        value, ok := c.podToClaimsMap.Load(podName)
26✔
315
        if !ok {
27✔
316
                return nil, status.Errorf(codes.NotFound, "unable to find an entry for pod (%s) in podToClaims map", podName)
1✔
317
        }
1✔
318
        claims, ok = value.([]string)
25✔
319
        if !ok {
26✔
320
                return nil, status.Errorf(codes.Internal, "wrong output type: expected []string")
1✔
321
        }
1✔
322

323
        volumes := []string{}
24✔
324
        for _, claim := range claims {
59✔
325
                value, ok := c.claimToVolumeMap.Load(claim)
35✔
326
                if !ok {
40✔
327
                        // the pvc entry is not an azure resource
5✔
328
                        w.Logger().V(5).Infof("Requested volume %s for pod %s is not an azure resource", value, podName)
5✔
329
                        continue
5✔
330
                }
331
                volume, ok := value.(string)
30✔
332
                if !ok {
31✔
333
                        return nil, status.Errorf(codes.Internal, "wrong output type: expected string")
1✔
334
                }
1✔
335
                volumes = append(volumes, volume)
29✔
336
                w.Logger().V(5).Infof("Requested volumes for pod %s are now the following: Volumes: %v, Len: %d", podName, volumes, len(volumes))
29✔
337
        }
338
        return volumes, nil
23✔
339
}
340

341
func (c *SharedState) getPodsFromVolume(ctx context.Context, client client.Client, volumeName string) ([]v1.Pod, error) {
14✔
342
        w, _ := workflow.GetWorkflowFromContext(ctx)
14✔
343
        pods, err := c.getPodNamesFromVolume(volumeName)
14✔
344
        if err != nil {
15✔
345
                return nil, err
1✔
346
        }
1✔
347
        podObjs := []v1.Pod{}
13✔
348
        for _, pod := range pods {
27✔
349
                namespace, name, err := parseQualifiedName(pod)
14✔
350
                if err != nil {
14✔
351
                        w.Logger().Errorf(err, "cannot get podObj for pod (%s)", pod)
×
352
                        continue
×
353
                }
354
                var podObj v1.Pod
14✔
355
                if err := client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &podObj); err != nil {
14✔
356
                        return nil, err
×
357
                }
×
358
                podObjs = append(podObjs, podObj)
14✔
359
        }
360
        return podObjs, nil
13✔
361
}
362

363
func (c *SharedState) getPodNamesFromVolume(volumeName string) ([]string, error) {
14✔
364
        v, ok := c.volumeToClaimMap.Load(volumeName)
14✔
365
        if !ok {
15✔
366
                return nil, status.Errorf(codes.NotFound, "no bound persistent volume claim was found for AzVolume (%s)", volumeName)
1✔
367
        }
1✔
368
        claimName, ok := v.(string)
13✔
369
        if !ok {
13✔
370
                return nil, status.Errorf(codes.Internal, "volumeToClaimMap should should hold string")
×
371
        }
×
372

373
        value, ok := c.claimToPodsMap.Load(claimName)
13✔
374
        if !ok {
13✔
375
                return nil, status.Errorf(codes.NotFound, "no pods found for PVC (%s)", claimName)
×
376
        }
×
377
        lockable, ok := value.(*lockableEntry)
13✔
378
        if !ok {
13✔
379
                return nil, status.Errorf(codes.Internal, "claimToPodsMap should hold lockable entry")
×
380
        }
×
381

382
        lockable.RLock()
13✔
383
        defer lockable.RUnlock()
13✔
384

13✔
385
        podMap, ok := lockable.entry.(set)
13✔
386
        if !ok {
13✔
387
                return nil, status.Errorf(codes.Internal, "claimToPodsMap entry should hold a set")
×
388
        }
×
389

390
        pods := make([]string, len(podMap))
13✔
391
        i := 0
13✔
392
        for v := range podMap {
27✔
393
                pod := v.(string)
14✔
394
                pods[i] = pod
14✔
395
                i++
14✔
396
        }
14✔
397

398
        return pods, nil
13✔
399
}
400

401
func (c *SharedState) getVolumesForPodObjs(ctx context.Context, pods []v1.Pod) ([]string, error) {
17✔
402
        volumes := []string{}
17✔
403
        for _, pod := range pods {
34✔
404
                podVolumes, err := c.getVolumesFromPod(ctx, getQualifiedName(pod.Namespace, pod.Name))
17✔
405
                if err != nil {
17✔
406
                        return nil, err
×
407
                }
×
408
                volumes = append(volumes, podVolumes...)
17✔
409
        }
410
        return volumes, nil
17✔
411
}
412

413
func (c *SharedState) addPod(ctx context.Context, pod *v1.Pod, updateOption updateWithLock) error {
7✔
414
        var err error
7✔
415
        w, _ := workflow.GetWorkflowFromContext(ctx)
7✔
416
        podKey := getQualifiedName(pod.Namespace, pod.Name)
7✔
417
        v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
7✔
418

7✔
419
        w.Logger().V(5).Infof("Adding pod %s to shared map with keyName %s.", pod.Name, podKey)
7✔
420
        podLock := v.(*sync.Mutex)
7✔
421
        if updateOption == acquireLock {
13✔
422
                podLock.Lock()
6✔
423
                defer podLock.Unlock()
6✔
424
        }
6✔
425
        w.Logger().V(5).Infof("Pod spec of pod %s is: %+v. With volumes: %+v", pod.Name, pod.Spec, pod.Spec.Volumes)
7✔
426

7✔
427
        // If the claims already exist for the podKey, add them to a set
7✔
428
        value, _ := c.podToClaimsMap.LoadOrStore(podKey, []string{})
7✔
429
        claims := value.([]string)
7✔
430
        claimSet := set{}
7✔
431
        for _, claim := range claims {
17✔
432
                claimSet.add(claim)
10✔
433
        }
10✔
434

435
        for _, volume := range pod.Spec.Volumes {
18✔
436
                // TODO: investigate if we need special support for CSI ephemeral volume or generic ephemeral volume
11✔
437
                // if csiMigration is enabled and there is an inline volume, create AzVolume CRI for the inline volume.
11✔
438
                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
11✔
439
                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
11✔
440
                        volume.AzureDisk != nil {
12✔
441
                        // inline volume: create AzVolume resource
1✔
442
                        var pv *v1.PersistentVolume
1✔
443
                        if pv, err = c.azureDiskCSITranslator.TranslateInTreeInlineVolumeToCSI(&volume, pod.Namespace); err != nil {
1✔
444
                                w.Logger().V(5).Errorf(err, "failed to translate inline volume to csi")
×
445
                                continue
×
446
                        } else if pv == nil {
1✔
447
                                w.Logger().V(5).Errorf(status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi"), "nil pv returned")
×
448
                                continue
×
449
                        }
450
                        w.Logger().V(5).Infof("Creating AzVolume instance for inline volume %s.", volume.AzureDisk.DiskName)
1✔
451
                        if err := c.createAzVolumeFromPv(ctx, *pv, map[string]string{consts.InlineVolumeAnnotation: volume.AzureDisk.DataDiskURI}); err != nil {
1✔
452
                                return err
×
453
                        }
×
454
                        v, exists := c.podToInlineMap.Load(podKey)
1✔
455
                        var inlines []string
1✔
456
                        if exists {
1✔
457
                                inlines = v.([]string)
×
458
                        }
×
459
                        inlines = append(inlines, volume.AzureDisk.DiskName)
1✔
460
                        c.podToInlineMap.Store(podKey, inlines)
1✔
461
                }
462
                if volume.PersistentVolumeClaim == nil {
12✔
463
                        w.Logger().V(5).Infof("Pod %s: Skipping Volume %s. No persistent volume exists.", pod.Name, volume)
1✔
464
                        continue
1✔
465
                }
466
                namespacedClaimName := getQualifiedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
10✔
467
                if _, ok := c.claimToVolumeMap.Load(namespacedClaimName); !ok {
11✔
468
                        // Log message if the Pod status is Running
1✔
469
                        if pod.Status.Phase == v1.PodRunning {
1✔
470
                                w.Logger().V(5).Infof("Skipping Pod %s. Volume %s not csi. Driver: %+v", pod.Name, volume.Name, volume.CSI)
×
471
                        }
×
472
                        continue
1✔
473
                }
474
                w.Logger().V(5).Infof("Pod %s. Volume %v is csi.", pod.Name, volume)
9✔
475
                claimSet.add(namespacedClaimName)
9✔
476
                v, _ := c.claimToPodsMap.LoadOrStore(namespacedClaimName, newLockableEntry(set{}))
9✔
477

9✔
478
                lockable := v.(*lockableEntry)
9✔
479
                lockable.Lock()
9✔
480
                pods := lockable.entry.(set)
9✔
481
                if !pods.has(podKey) {
9✔
482
                        pods.add(podKey)
×
483
                }
×
484
                // No need to restore the amended set to claimToPodsMap because set is a reference type
485
                lockable.Unlock()
9✔
486

9✔
487
                w.Logger().V(5).Infof("Storing pod %s and claim %s to claimToPodsMap map.", pod.Name, namespacedClaimName)
9✔
488
        }
489
        w.Logger().V(5).Infof("Storing pod %s and claim %s to podToClaimsMap map.", pod.Name, claims)
7✔
490

7✔
491
        allClaims := []string{}
7✔
492
        for key := range claimSet {
17✔
493
                allClaims = append(allClaims, key.(string))
10✔
494
        }
10✔
495
        c.podToClaimsMap.Store(podKey, allClaims)
7✔
496
        return nil
7✔
497
}
498

499
func (c *SharedState) deletePod(ctx context.Context, podKey string) error {
1✔
500
        w, _ := workflow.GetWorkflowFromContext(ctx)
1✔
501
        value, exists := c.podLocks.LoadAndDelete(podKey)
1✔
502
        if !exists {
1✔
503
                return nil
×
504
        }
×
505
        podLock := value.(*sync.Mutex)
1✔
506
        podLock.Lock()
1✔
507
        defer podLock.Unlock()
1✔
508

1✔
509
        value, exists = c.podToInlineMap.LoadAndDelete(podKey)
1✔
510
        if exists {
1✔
511
                inlines := value.([]string)
×
512

×
513
                for _, inline := range inlines {
×
514
                        _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, inline, pod, azureutils.AllRoles, cleanUpAttachment, deleteAndWait)
×
515
                        if err != nil && !apiErrors.IsNotFound(err) {
×
516
                                w.Logger().Errorf(err, "failed to list AzVolumeAttachments (%s) for inline (%s): %v", inline, inline, err)
×
517
                                return err
×
518
                        }
×
519
                        if err := c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Delete(ctx, inline, metav1.DeleteOptions{}); err != nil && !apiErrors.IsNotFound(err) {
×
520
                                w.Logger().Errorf(err, "failed to delete AzVolume (%s) for inline (%s): %v", inline, inline, err)
×
521
                                return err
×
522
                        }
×
523
                }
524
        }
525

526
        value, exists = c.podToClaimsMap.LoadAndDelete(podKey)
1✔
527
        if !exists {
1✔
528
                return nil
×
529
        }
×
530
        claims := value.([]string)
1✔
531

1✔
532
        for _, claim := range claims {
2✔
533
                value, ok := c.claimToPodsMap.Load(claim)
1✔
534
                if !ok {
1✔
535
                        w.Logger().Errorf(nil, "No pods found for PVC (%s)", claim)
×
536
                }
×
537

538
                // Scope the duration that we hold the lockable lock using a function.
539
                func() {
2✔
540
                        lockable, ok := value.(*lockableEntry)
1✔
541
                        if !ok {
1✔
542
                                w.Logger().Error(nil, "claimToPodsMap should hold lockable entry")
×
543
                                return
×
544
                        }
×
545

546
                        lockable.Lock()
1✔
547
                        defer lockable.Unlock()
1✔
548

1✔
549
                        podSet, ok := lockable.entry.(set)
1✔
550
                        if !ok {
1✔
551
                                w.Logger().Error(nil, "claimToPodsMap entry should hold a set")
×
552
                        }
×
553

554
                        podSet.remove(podKey)
1✔
555
                        if len(podSet) == 0 {
2✔
556
                                c.claimToPodsMap.Delete(claim)
1✔
557
                        }
1✔
558
                }()
559
        }
560
        return nil
1✔
561
}
562

563
func (c *SharedState) addVolumeAndClaim(azVolumeName, pvName, pvClaimName string) {
1✔
564
        c.pvToVolumeMap.Store(pvName, azVolumeName)
1✔
565
        c.volumeToClaimMap.Store(azVolumeName, pvClaimName)
1✔
566
        c.claimToVolumeMap.Store(pvClaimName, azVolumeName)
1✔
567
}
1✔
568

569
func (c *SharedState) deletePV(pvName string) error {
2✔
570
        var err error
2✔
571

2✔
572
        ctx := context.Background()
2✔
573
        ctx, w := workflow.New(ctx, workflow.WithDetails())
2✔
574
        defer func() { w.Finish(err) }()
4✔
575
        defer func() {
4✔
576
                if apiErrors.IsNotFound(err) {
2✔
577
                        err = nil
×
578
                }
×
579
        }()
580

581
        var azVolume azdiskv1beta2.AzVolume
2✔
582
        if val, ok := c.pvToVolumeMap.Load(pvName); ok {
3✔
583
                volumeName := val.(string)
1✔
584
                err = c.cachedClient.Get(ctx, types.NamespacedName{Namespace: c.config.ObjectNamespace, Name: volumeName}, &azVolume)
1✔
585
                if err != nil {
1✔
586
                        if !apiErrors.IsNotFound(err) {
×
587
                                return err
×
588
                        }
×
589
                        return nil
×
590
                }
591
        } else {
1✔
592
                // if no volume name can be found for PV, try fetching azVolume using labels
1✔
593
                var azVolumeList azdiskv1beta2.AzVolumeList
1✔
594
                req, err := azureutils.CreateLabelRequirements(consts.PvNameLabel, selection.Equals, pvName)
1✔
595
                if err != nil {
1✔
596
                        return err
×
597
                }
×
598
                err = c.cachedClient.List(ctx, &azVolumeList, &client.ListOptions{LabelSelector: labels.NewSelector().Add(*req)})
1✔
599
                if err != nil && !apiErrors.IsNotFound(err) {
1✔
600
                        return err
×
601
                } else if apiErrors.IsNotFound(err) || len(azVolumeList.Items) == 0 {
2✔
602
                        return nil
1✔
603
                }
1✔
604
                azVolume = azVolumeList.Items[0]
×
605
        }
606

607
        // deletion timestamp is set and AzVolume reconcliler will handle the delete request.
608
        // The volume itself will not be deleted.
609
        w.AddDetailToLogger(workflow.GetObjectDetails(&azVolume)...)
1✔
610

1✔
611
        if !isPreProvisioned(&azVolume) {
1✔
612
                return nil
×
613
        }
×
614

615
        err = c.cachedClient.Delete(ctx, &azVolume)
1✔
616
        if err != nil {
1✔
617
                if apiErrors.IsNotFound(err) {
×
618
                        return nil
×
619
                }
×
620
                return err
×
621
        }
622

623
        waitCh := make(chan goSignal)
1✔
624
        go func() {
2✔
625
                goCtx, w := workflow.New(ctx)
1✔
626
                defer func() { w.Finish(err) }()
2✔
627
                waitCh <- goSignal{}
1✔
628

1✔
629
                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeType, azVolume.Name, verifyObjectFailedOrDeleted)
1✔
630

1✔
631
                for {
2✔
632
                        // if AzVolume was successfully deleted
1✔
633
                        obj, err := waiter.Wait(goCtx)
1✔
634
                        if err == nil {
2✔
635
                                // remove the entry from pv to volume map, once AzVolumeCRI is deleted
1✔
636
                                c.pvToVolumeMap.Delete(pvName)
1✔
637
                                return
1✔
638
                        }
1✔
639

640
                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
641

×
642
                        if azVolume.Status.State == azdiskv1beta2.VolumeDeletionFailed || azVolume.Status.Error != nil {
×
643
                                updateFunc := func(obj client.Object) error {
×
644
                                        azVolume := obj.(*azdiskv1beta2.AzVolume)
×
645
                                        azVolume.Status.Error = nil
×
646
                                        azVolume.Status.State = azdiskv1beta2.VolumeCreated
×
647
                                        return nil
×
648
                                }
×
649
                                _, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, c.cachedClient, c.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
×
650
                        }
651
                }
652
        }()
653
        <-waitCh
1✔
654

1✔
655
        return nil
1✔
656
}
657

658
func (c *SharedState) deleteVolumeAndClaim(azVolumeName string) {
2✔
659
        v, ok := c.volumeToClaimMap.LoadAndDelete(azVolumeName)
2✔
660
        if ok {
4✔
661
                pvClaimName := v.(string)
2✔
662
                c.claimToVolumeMap.Delete(pvClaimName)
2✔
663
        }
2✔
664
}
665

666
func (c *SharedState) markVolumeVisited(azVolumeName string) {
9✔
667
        c.visitedVolumes.Store(azVolumeName, struct{}{})
9✔
668
}
9✔
669

670
func (c *SharedState) unmarkVolumeVisited(azVolumeName string) {
9✔
671
        c.visitedVolumes.Delete(azVolumeName)
9✔
672
}
9✔
673

674
func (c *SharedState) isVolumeVisited(azVolumeName string) bool {
10✔
675
        _, visited := c.visitedVolumes.Load(azVolumeName)
10✔
676
        return visited
10✔
677
}
10✔
678

679
func (c *SharedState) getRankedNodesForReplicaAttachments(ctx context.Context, volumes []string, podObjs []v1.Pod) ([]string, error) {
14✔
680
        var err error
14✔
681
        ctx, w := workflow.New(ctx)
14✔
682
        defer func() { w.Finish(err) }()
28✔
683

684
        w.Logger().V(5).Info("Getting ranked list of nodes for creating AzVolumeAttachments")
14✔
685

14✔
686
        nodeList := &v1.NodeList{}
14✔
687
        if err := c.cachedClient.List(ctx, nodeList); err != nil {
14✔
688
                return nil, err
×
689
        }
×
690

691
        var selectedNodeObjs []v1.Node
14✔
692
        selectedNodeObjs, err = c.selectNodesPerTopology(ctx, nodeList.Items, podObjs, volumes)
14✔
693
        if err != nil {
14✔
694
                w.Logger().Errorf(err, "failed to select nodes for volumes (%+v)", volumes)
×
695
                return nil, err
×
696
        }
×
697

698
        selectedNodes := make([]string, len(selectedNodeObjs))
14✔
699
        for i, selectedNodeObj := range selectedNodeObjs {
36✔
700
                selectedNodes[i] = selectedNodeObj.Name
22✔
701
        }
22✔
702

703
        w.Logger().V(5).Infof("Selected nodes (%+v) for replica AzVolumeAttachments for volumes (%+v)", selectedNodes, volumes)
14✔
704
        return selectedNodes, nil
14✔
705
}
706

707
func (c *SharedState) filterNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
28✔
708
        var err error
28✔
709
        ctx, w := workflow.New(ctx)
28✔
710
        defer func() { w.Finish(err) }()
56✔
711

712
        pvs := make([]*v1.PersistentVolume, len(volumes))
28✔
713
        for i, volume := range volumes {
64✔
714
                var azVolume *azdiskv1beta2.AzVolume
36✔
715
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
36✔
716
                if err != nil {
38✔
717
                        w.Logger().V(5).Errorf(err, "AzVolume for volume %s is not found.", volume)
2✔
718
                        return nil, err
2✔
719
                }
2✔
720

721
                var pv v1.PersistentVolume
34✔
722
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
34✔
723
                        return nil, err
×
724
                }
×
725
                pvs[i] = &pv
34✔
726
        }
727

728
        var filterPlugins = []filterPlugin{
26✔
729
                &interPodAffinityFilter{},
26✔
730
                &interPodAntiAffinityFilter{},
26✔
731
                &podTolerationFilter{},
26✔
732
                &podNodeAffinityFilter{},
26✔
733
                &podNodeSelectorFilter{},
26✔
734
                &volumeNodeSelectorFilter{},
26✔
735
        }
26✔
736

26✔
737
        filteredNodes := nodes
26✔
738
        for _, filterPlugin := range filterPlugins {
182✔
739
                filterPlugin.setup(pods, pvs, c)
156✔
740
                if updatedFilteredNodes, err := filterPlugin.filter(ctx, filteredNodes); err != nil {
156✔
741
                        w.Logger().Errorf(err, "failed to filter node with filter plugin (%s). Ignoring filtered results.", filterPlugin.name())
×
742
                } else {
156✔
743
                        filteredNodes = updatedFilteredNodes
156✔
744
                        nodeStrs := make([]string, len(filteredNodes))
156✔
745
                        for i, filteredNode := range filteredNodes {
489✔
746
                                nodeStrs[i] = filteredNode.Name
333✔
747
                        }
333✔
748
                        w.Logger().V(10).Infof("Filtered node list from filter plugin (%s): %+v", filterPlugin.name(), nodeStrs)
156✔
749
                }
750
        }
751

752
        return filteredNodes, nil
26✔
753
}
754

755
func (c *SharedState) prioritizeNodes(ctx context.Context, pods []v1.Pod, volumes []string, nodes []v1.Node) []v1.Node {
20✔
756
        ctx, w := workflow.New(ctx)
20✔
757
        defer w.Finish(nil)
20✔
758

20✔
759
        nodeScores := map[string]int{}
20✔
760
        for _, node := range nodes {
66✔
761
                nodeScores[node.Name] = 0
46✔
762
        }
46✔
763

764
        var nodeScorerPlugins = []nodeScorerPlugin{
20✔
765
                &scoreByNodeCapacity{},
20✔
766
                &scoreByReplicaCount{},
20✔
767
                &scoreByInterPodAffinity{},
20✔
768
                &scoreByInterPodAntiAffinity{},
20✔
769
                &scoreByPodNodeAffinity{},
20✔
770
        }
20✔
771

20✔
772
        for _, nodeScorerPlugin := range nodeScorerPlugins {
120✔
773
                nodeScorerPlugin.setup(nodes, pods, volumes, c)
100✔
774
                if updatedNodeScores, err := nodeScorerPlugin.score(ctx, nodeScores); err != nil {
100✔
775
                        w.Logger().Errorf(err, "failed to score nodes by node scorer (%s)", nodeScorerPlugin.name())
×
776
                } else {
100✔
777
                        // update node scores if scorer plugin returned success
100✔
778
                        nodeScores = updatedNodeScores
100✔
779
                }
100✔
780
                var nodeScoreResult string
100✔
781
                for nodeName, score := range nodeScores {
288✔
782
                        nodeScoreResult += fmt.Sprintf("<%s: %d> ", nodeName, score)
188✔
783
                }
188✔
784
                w.Logger().V(10).Infof("node score after node score plugin (%s): %s", nodeScorerPlugin.name(), nodeScoreResult)
100✔
785
        }
786

787
        // normalize score
788
        numFiltered := 0
20✔
789
        for _, node := range nodes {
66✔
790
                if _, exists := nodeScores[node.Name]; !exists {
56✔
791
                        nodeScores[node.Name] = -1
10✔
792
                        numFiltered++
10✔
793
                }
10✔
794
        }
795

796
        sort.Slice(nodes[:], func(i, j int) bool {
49✔
797
                return nodeScores[nodes[i].Name] > nodeScores[nodes[j].Name]
29✔
798
        })
29✔
799

800
        return nodes[:len(nodes)-numFiltered]
20✔
801
}
802

803
func (c *SharedState) filterAndSortNodes(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
16✔
804
        var err error
16✔
805
        ctx, w := workflow.New(ctx)
16✔
806
        defer func() { w.Finish(err) }()
32✔
807

808
        var filteredNodes []v1.Node
16✔
809
        filteredNodes, err = c.filterNodes(ctx, nodes, pods, volumes)
16✔
810
        if err != nil {
17✔
811
                w.Logger().Errorf(err, "failed to filter nodes for volumes (%+v): %v", volumes, err)
1✔
812
                return nil, err
1✔
813
        }
1✔
814
        sortedNodes := c.prioritizeNodes(ctx, pods, volumes, filteredNodes)
15✔
815
        return sortedNodes, nil
15✔
816
}
817

818
func (c *SharedState) selectNodesPerTopology(ctx context.Context, nodes []v1.Node, pods []v1.Pod, volumes []string) ([]v1.Node, error) {
14✔
819
        var err error
14✔
820
        ctx, w := workflow.New(ctx)
14✔
821
        defer func() { w.Finish(err) }()
28✔
822

823
        selectedNodes := []v1.Node{}
14✔
824
        numReplicas := 0
14✔
825

14✔
826
        // disperse node topology if possible
14✔
827
        compatibleZonesSet := set{}
14✔
828
        var primaryNode string
14✔
829
        for i, volume := range volumes {
35✔
830
                var azVolume *azdiskv1beta2.AzVolume
21✔
831
                azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volume, c.config.ObjectNamespace, true)
21✔
832
                if err != nil {
21✔
833
                        err = status.Errorf(codes.Aborted, "failed to get AzVolume CRI (%s)", volume)
×
834
                        return nil, err
×
835
                }
×
836

837
                numReplicas = max(numReplicas, azVolume.Spec.MaxMountReplicaCount)
21✔
838
                w.Logger().V(5).Infof("Number of requested replicas for Azvolume (%s) is: %d. Max replica count is: %d.",
21✔
839
                        volume, numReplicas, azVolume.Spec.MaxMountReplicaCount)
21✔
840

21✔
841
                var pv v1.PersistentVolume
21✔
842
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: azVolume.Spec.PersistentVolume}, &pv); err != nil {
21✔
843
                        return nil, err
×
844
                }
×
845

846
                if pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil {
41✔
847
                        continue
20✔
848
                }
849

850
                // Find the intersection of the zones for all the volumes
851
                topologyKey := c.topologyKey
1✔
852
                if i == 0 {
2✔
853
                        compatibleZonesSet = getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
1✔
854
                } else {
1✔
855
                        listOfZones := getSupportedZones(pv.Spec.NodeAffinity.Required.NodeSelectorTerms, topologyKey)
×
856
                        for key := range compatibleZonesSet {
×
857
                                if !listOfZones.has(key) {
×
858
                                        compatibleZonesSet.remove(key)
×
859
                                }
×
860
                        }
861
                }
862

863
                // find primary node if not already found
864
                if primaryNode == "" {
2✔
865
                        if primaryAttachment, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.PrimaryOnly); err != nil || len(primaryAttachment) == 0 {
2✔
866
                                continue
1✔
867
                        } else {
×
868
                                primaryNode = primaryAttachment[0].Spec.NodeName
×
869
                        }
×
870
                }
871
        }
872

873
        var compatibleZones []string
14✔
874
        if len(compatibleZonesSet) > 0 {
14✔
875
                for key := range compatibleZonesSet {
×
876
                        compatibleZones = append(compatibleZones, key.(string))
×
877
                }
×
878
        }
879

880
        if len(compatibleZones) == 0 {
28✔
881
                selectedNodes, err = c.filterAndSortNodes(ctx, nodes, pods, volumes)
14✔
882
                if err != nil {
14✔
883
                        err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
884
                        return nil, err
×
885
                }
×
886
        } else {
×
887
                w.Logger().V(5).Infof("The list of zones to select nodes from is: %s", strings.Join(compatibleZones, ","))
×
888

×
889
                var primaryNodeZone string
×
890
                if primaryNode != "" {
×
891
                        nodeObj := &v1.Node{}
×
892
                        err = c.cachedClient.Get(ctx, types.NamespacedName{Name: primaryNode}, nodeObj)
×
893
                        if err != nil {
×
894
                                w.Logger().Errorf(err, "failed to retrieve the primary node")
×
895
                        }
×
896

897
                        var ok bool
×
898
                        if primaryNodeZone, ok = nodeObj.Labels[consts.WellKnownTopologyKey]; ok {
×
899
                                w.Logger().V(5).Infof("failed to find zone annotations for primary node")
×
900
                        }
×
901
                }
902

903
                nodeSelector := labels.NewSelector()
×
904
                zoneRequirement, _ := labels.NewRequirement(consts.WellKnownTopologyKey, selection.In, compatibleZones)
×
905
                nodeSelector = nodeSelector.Add(*zoneRequirement)
×
906

×
907
                compatibleNodes := &v1.NodeList{}
×
908
                if err = c.cachedClient.List(ctx, compatibleNodes, &client.ListOptions{LabelSelector: nodeSelector}); err != nil {
×
909
                        err = status.Errorf(codes.Aborted, "failed to retrieve node list: %v", err)
×
910
                        return nodes, err
×
911
                }
×
912

913
                // Create a zone to node map
914
                zoneToNodeMap := map[string][]v1.Node{}
×
915
                for _, node := range compatibleNodes.Items {
×
916
                        zoneName := node.Labels[consts.WellKnownTopologyKey]
×
917
                        zoneToNodeMap[zoneName] = append(zoneToNodeMap[zoneName], node)
×
918
                }
×
919

920
                // Get prioritized nodes per zone
921
                nodesPerZone := [][]v1.Node{}
×
922
                primaryZoneNodes := []v1.Node{}
×
923
                totalCount := 0
×
924
                for zone, nodeList := range zoneToNodeMap {
×
925
                        var sortedNodes []v1.Node
×
926
                        sortedNodes, err = c.filterAndSortNodes(ctx, nodeList, pods, volumes)
×
927
                        if err != nil {
×
928
                                err = status.Errorf(codes.Aborted, "failed to select nodes for volumes (%+v): %v", volumes, err)
×
929
                                return nil, err
×
930
                        }
×
931

932
                        totalCount += len(sortedNodes)
×
933
                        if zone == primaryNodeZone {
×
934
                                primaryZoneNodes = sortedNodes
×
935
                                continue
×
936
                        }
937
                        nodesPerZone = append(nodesPerZone, sortedNodes)
×
938
                }
939
                // Append the nodes from the zone of the primary node at last
940
                if len(primaryZoneNodes) > 0 {
×
941
                        nodesPerZone = append(nodesPerZone, primaryZoneNodes)
×
942
                }
×
943
                // Select the nodes from each of the zones one by one and append to the list
944
                i, j, countSoFar := 0, 0, 0
×
945
                for len(selectedNodes) < numReplicas && countSoFar < totalCount {
×
946
                        if len(nodesPerZone[i]) > j {
×
947
                                selectedNodes = append(selectedNodes, nodesPerZone[i][j])
×
948
                                countSoFar++
×
949
                        }
×
950
                        if i < len(nodesPerZone)-1 {
×
951
                                i++
×
952
                        } else {
×
953
                                i = 0
×
954
                                j++
×
955
                        }
×
956
                }
957
        }
958

959
        return selectedNodes, nil
14✔
960
}
961

962
func (c *SharedState) getNodesWithReplica(ctx context.Context, volumeName string) ([]string, error) {
13✔
963
        w, _ := workflow.GetWorkflowFromContext(ctx)
13✔
964
        w.Logger().V(5).Infof("Getting nodes with replica AzVolumeAttachments for volume %s.", volumeName)
13✔
965
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
13✔
966
        if err != nil {
13✔
967
                w.Logger().V(5).Errorf(err, "failed to get AzVolumeAttachments for volume %s.", volumeName)
×
968
                return nil, err
×
969
        }
×
970

971
        nodes := []string{}
13✔
972
        for _, azVolumeAttachment := range azVolumeAttachments {
13✔
973
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
×
974
                        nodes = append(nodes, azVolumeAttachment.Spec.NodeName)
×
975
                }
×
976
        }
977
        w.Logger().V(5).Infof("Nodes with replica AzVolumeAttachments for volume %s are: %v, Len: %d", volumeName, nodes, len(nodes))
13✔
978
        return nodes, nil
13✔
979
}
980

981
func (c *SharedState) createReplicaAzVolumeAttachment(ctx context.Context, volumeID, node string, volumeContext map[string]string) error {
12✔
982
        var err error
12✔
983
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, node))
12✔
984
        defer func() { w.Finish(err) }()
24✔
985

986
        var diskName string
12✔
987
        diskName, err = azureutils.GetDiskName(volumeID)
12✔
988
        if err != nil {
12✔
989
                err = status.Errorf(codes.Internal, "failed to extract volume name from volumeID (%s)", volumeID)
×
990
                return err
×
991
        }
×
992
        w.AddDetailToLogger(consts.VolumeNameLabel, diskName)
12✔
993

12✔
994
        w.Logger().V(5).Info("Creating replica AzVolumeAttachments")
12✔
995
        if volumeContext == nil {
24✔
996
                volumeContext = make(map[string]string)
12✔
997
        }
12✔
998
        // creating azvolumeattachment
999
        volumeName := strings.ToLower(diskName)
12✔
1000
        replicaName := azureutils.GetAzVolumeAttachmentName(volumeName, node)
12✔
1001
        azVolumeAttachment := azdiskv1beta2.AzVolumeAttachment{
12✔
1002
                ObjectMeta: metav1.ObjectMeta{
12✔
1003
                        Name:      replicaName,
12✔
1004
                        Namespace: c.config.ObjectNamespace,
12✔
1005
                        Labels: map[string]string{
12✔
1006
                                consts.NodeNameLabel:   node,
12✔
1007
                                consts.VolumeNameLabel: volumeName,
12✔
1008
                                consts.RoleLabel:       string(azdiskv1beta2.ReplicaRole),
12✔
1009
                        },
12✔
1010
                        Annotations: map[string]string{consts.VolumeAttachRequestAnnotation: "controller"},
12✔
1011
                        Finalizers:  []string{consts.AzVolumeAttachmentFinalizer},
12✔
1012
                },
12✔
1013
                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
12✔
1014
                        NodeName:      node,
12✔
1015
                        VolumeID:      volumeID,
12✔
1016
                        VolumeName:    volumeName,
12✔
1017
                        RequestedRole: azdiskv1beta2.ReplicaRole,
12✔
1018
                        VolumeContext: volumeContext,
12✔
1019
                },
12✔
1020
        }
12✔
1021
        w.AnnotateObject(&azVolumeAttachment)
12✔
1022
        azureutils.AnnotateAPIVersion(&azVolumeAttachment)
12✔
1023

12✔
1024
        _, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).Create(ctx, &azVolumeAttachment, metav1.CreateOptions{})
12✔
1025
        if err != nil {
12✔
1026
                err = status.Errorf(codes.Internal, "failed to create replica AzVolumeAttachment %s.", replicaName)
×
1027
                return err
×
1028
        }
×
1029
        return nil
12✔
1030
}
1031

1032
func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azVolumeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
5✔
1033
        var err error
5✔
1034
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, azVolumeName))
5✔
1035
        defer func() { w.Finish(err) }()
10✔
1036

1037
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzVolume (%s)", caller, azVolumeName)
5✔
1038

5✔
1039
        var attachments []azdiskv1beta2.AzVolumeAttachment
5✔
1040
        attachments, err = azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, azVolumeName, role)
5✔
1041
        if err != nil {
5✔
1042
                if apiErrors.IsNotFound(err) {
×
1043
                        err = nil
×
1044
                        return nil, nil
×
1045
                }
×
1046
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1047
                return nil, err
×
1048
        }
1049

1050
        if err = c.cleanUpAzVolumeAttachments(ctx, attachments, cleanupMode, caller); err != nil {
5✔
1051
                return attachments, err
×
1052
        }
×
1053
        c.unmarkVolumeVisited(azVolumeName)
5✔
1054

5✔
1055
        if attachmentDeleteMode == deleteAndWait {
7✔
1056
                attachmentsCount := len(attachments)
2✔
1057
                errorMessageCh := make(chan string, attachmentsCount)
2✔
1058

2✔
1059
                // start waiting for replica AzVolumeAttachment CRIs to be deleted
2✔
1060
                for _, attachment := range attachments {
3✔
1061
                        // wait async and report error to go channel
1✔
1062
                        go func(ctx context.Context, attachment azdiskv1beta2.AzVolumeAttachment) {
2✔
1063
                                waiter := c.conditionWatcher.NewConditionWaiter(ctx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
1✔
1064
                                defer waiter.Close()
1✔
1065

1✔
1066
                                _, derr := waiter.Wait(ctx)
1✔
1067
                                if derr != nil {
1✔
1068
                                        errorMessageCh <- fmt.Sprintf("%s: %v", attachment.Name, derr)
×
1069
                                } else {
1✔
1070
                                        errorMessageCh <- ""
1✔
1071
                                }
1✔
1072
                        }(ctx, attachment)
1073
                }
1074

1075
                // if errors have been found with the conditionWatcher calls, format the error msg and report via CRI
1076
                var errMsgs []string
2✔
1077
                for i := 0; i < attachmentsCount; i++ {
3✔
1078
                        v, ok := <-errorMessageCh
1✔
1079
                        if ok && v != "" {
1✔
1080
                                errMsgs = append(errMsgs, v)
×
1081
                        }
×
1082
                }
1083
                close(errorMessageCh)
2✔
1084
                if len(errMsgs) > 0 {
2✔
1085
                        err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
×
1086
                }
×
1087
        }
1088

1089
        return attachments, err
5✔
1090
}
1091

1092
func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDriverNodeName string, caller operationRequester, role azureutils.AttachmentRoleMode, cleanupMode attachmentCleanUpMode, attachmentDeleteMode deleteMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
2✔
1093
        var err error
2✔
1094
        ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, azDriverNodeName))
2✔
1095
        defer func() { w.Finish(err) }()
4✔
1096
        w.Logger().Infof("AzVolumeAttachment clean up requested by %s for AzDriverNode (%s)", caller, azDriverNodeName)
2✔
1097

2✔
1098
        var nodeRequirement *labels.Requirement
2✔
1099
        nodeRequirement, err = azureutils.CreateLabelRequirements(consts.NodeNameLabel, selection.Equals, azDriverNodeName)
2✔
1100
        if err != nil {
2✔
1101
                return nil, err
×
1102
        }
×
1103
        labelSelector := labels.NewSelector().Add(*nodeRequirement)
2✔
1104

2✔
1105
        var attachments *azdiskv1beta2.AzVolumeAttachmentList
2✔
1106
        attachments, err = c.azClient.DiskV1beta2().AzVolumeAttachments(c.config.ObjectNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
2✔
1107
        if err != nil {
2✔
1108
                if apiErrors.IsNotFound(err) {
×
1109
                        err = nil
×
1110
                        return nil, nil
×
1111
                }
×
1112
                err = status.Errorf(codes.Aborted, "failed to get AzVolumeAttachments: %v", err)
×
1113
                return nil, err
×
1114
        }
1115

1116
        cleanUpMap := map[string][]azdiskv1beta2.AzVolumeAttachment{}
2✔
1117
        for _, attachment := range attachments.Items {
3✔
1118
                if shouldCleanUp(attachment, role) {
2✔
1119
                        cleanUpMap[attachment.Spec.VolumeName] = append(cleanUpMap[attachment.Spec.VolumeName], attachment)
1✔
1120
                }
1✔
1121
        }
1122

1123
        for volumeName, cleanUps := range cleanUpMap {
3✔
1124
                volumeName := volumeName
1✔
1125
                c.addToOperationQueue(ctx,
1✔
1126
                        volumeName,
1✔
1127
                        caller,
1✔
1128
                        func(ctx context.Context) error {
2✔
1129
                                return c.cleanUpAzVolumeAttachments(ctx, cleanUps, cleanupMode, caller)
1✔
1130
                        },
1✔
1131
                        false)
1132
        }
1133
        return attachments.Items, nil
2✔
1134
}
1135

1136
func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachments []azdiskv1beta2.AzVolumeAttachment, cleanUpMode attachmentCleanUpMode, caller operationRequester) error {
6✔
1137
        var err error
6✔
1138

6✔
1139
        for _, attachment := range attachments {
11✔
1140
                patched := attachment.DeepCopy()
5✔
1141

5✔
1142
                if attachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
5✔
1143
                        if cleanUpMode == cleanUpAttachment && !volumeDetachRequested(patched) {
×
1144
                                markDetachRequest(patched, caller)
×
1145
                        } else if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1146
                                // if primary azvolumeattachments are being cleaned up for driver uninstall, issue a DELETE call and continue
×
1147
                                // note that this DELETE request will remove AzVolumeAttachment CRI without detaching the volume from node
×
1148
                                if err = c.cachedClient.Delete(ctx, patched); err != nil {
×
1149
                                        return err
×
1150
                                }
×
1151
                        }
1152
                } else {
5✔
1153
                        // replica mount should always be detached in cleanup regardless to the cleanup mode
5✔
1154
                        if !volumeDetachRequested(patched) {
10✔
1155
                                markDetachRequest(patched, caller)
5✔
1156
                        }
5✔
1157

1158
                        // append cleanup annotation to prevent replica recreations except for when the clean up was triggered by node controller due to node failure.
1159
                        if caller != azdrivernode && !metav1.HasAnnotation(patched.ObjectMeta, consts.CleanUpAnnotation) {
9✔
1160
                                markCleanUp(patched, caller)
4✔
1161
                        }
4✔
1162
                }
1163

1164
                if !reflect.DeepEqual(attachment.Status, patched.Status) {
10✔
1165
                        if err = c.cachedClient.Status().Patch(ctx, patched, client.MergeFrom(&attachment)); err != nil && apiErrors.IsNotFound(err) {
5✔
1166
                                err = status.Errorf(codes.Internal, "failed to patch AzVolumeAttachment (%s)", attachment.Name)
×
1167
                                return err
×
1168
                        }
×
1169
                }
1170
        }
1171
        return nil
6✔
1172
}
1173

1174
func (c *SharedState) createReplicaRequestsQueue() {
85✔
1175
        c.priorityReplicaRequestsQueue = &VolumeReplicaRequestsPriorityQueue{}
85✔
1176
        c.priorityReplicaRequestsQueue.queue = cache.NewHeap(
85✔
1177
                func(obj interface{}) (string, error) {
86✔
1178
                        return obj.(*ReplicaRequest).VolumeName, nil
1✔
1179
                },
1✔
1180
                func(left, right interface{}) bool {
×
1181
                        return left.(*ReplicaRequest).Priority > right.(*ReplicaRequest).Priority
×
1182
                })
×
1183
}
1184

1185
// Removes replica requests from the priority queue and adds to operation queue.
1186
func (c *SharedState) tryCreateFailedReplicas(ctx context.Context, requestor operationRequester) {
2✔
1187
        if atomic.SwapInt32(&c.processingReplicaRequestQueue, 1) == 0 {
4✔
1188
                ctx, w := workflow.New(ctx)
2✔
1189
                defer w.Finish(nil)
2✔
1190
                requests := c.priorityReplicaRequestsQueue.DrainQueue()
2✔
1191
                for i := 0; i < len(requests); i++ {
3✔
1192
                        replicaRequest := requests[i]
1✔
1193
                        c.addToOperationQueue(ctx,
1✔
1194
                                replicaRequest.VolumeName,
1✔
1195
                                requestor,
1✔
1196
                                func(ctx context.Context) error {
2✔
1197
                                        return c.manageReplicas(ctx, replicaRequest.VolumeName)
1✔
1198
                                },
1✔
1199
                                false,
1200
                        )
1201
                }
1202
                atomic.StoreInt32(&c.processingReplicaRequestQueue, 0)
2✔
1203
        }
1204
}
1205

1206
func (c *SharedState) garbageCollectReplicas(ctx context.Context, volumeName string, requester operationRequester) {
3✔
1207
        c.addToOperationQueue(
3✔
1208
                ctx,
3✔
1209
                volumeName,
3✔
1210
                replica,
3✔
1211
                func(ctx context.Context) error {
6✔
1212
                        if _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, volumeName, requester, azureutils.ReplicaOnly, cleanUpAttachment, deleteOnly); err != nil {
3✔
1213
                                return err
×
1214
                        }
×
1215
                        c.addToGcExclusionList(volumeName, requester)
3✔
1216
                        c.removeGarbageCollection(volumeName)
3✔
1217
                        c.unmarkVolumeVisited(volumeName)
3✔
1218
                        return nil
3✔
1219
                },
1220
                true,
1221
        )
1222
}
1223

1224
func (c *SharedState) removeGarbageCollection(volumeName string) {
5✔
1225
        v, ok := c.cleanUpMap.LoadAndDelete(volumeName)
5✔
1226
        if ok {
8✔
1227
                cancelFunc := v.(context.CancelFunc)
3✔
1228
                cancelFunc()
3✔
1229
        }
3✔
1230
        // if there is any garbage collection enqueued in operation queue, remove it
1231
        c.dequeueGarbageCollection(volumeName)
5✔
1232
}
1233

1234
func (c *SharedState) manageReplicas(ctx context.Context, volumeName string) error {
12✔
1235
        var err error
12✔
1236
        ctx, w := workflow.New(ctx)
12✔
1237
        defer func() { w.Finish(err) }()
24✔
1238

1239
        var azVolume *azdiskv1beta2.AzVolume
12✔
1240
        azVolume, err = azureutils.GetAzVolume(ctx, c.cachedClient, c.azClient, volumeName, c.config.ObjectNamespace, true)
12✔
1241
        if apiErrors.IsNotFound(err) {
12✔
1242
                w.Logger().V(5).Info("Volume no longer exists. Aborting manage replica operation")
×
1243
                return nil
×
1244
        } else if err != nil {
12✔
1245
                w.Logger().Error(err, "failed to get AzVolume")
×
1246
                return err
×
1247
        }
×
1248

1249
        // replica management should not be executed or retried if AzVolume is scheduled for a deletion or not created.
1250
        deleteRequested, _ := objectDeletionRequested(azVolume)
12✔
1251
        if !isCreated(azVolume) || deleteRequested {
12✔
1252
                w.Logger().Errorf(errors.New("no valid azVolume"), "azVolume (%s) is scheduled for deletion or has no underlying volume object", azVolume.Name)
×
1253
                return nil
×
1254
        }
×
1255

1256
        currentReplicaCount, err := c.countValidReplicasForVolume(ctx, volumeName)
12✔
1257
        if err != nil {
12✔
1258
                return err
×
1259
        }
×
1260

1261
        desiredReplicaCount := azVolume.Spec.MaxMountReplicaCount
12✔
1262
        w.Logger().Infof("Control number of replicas for volume (%s): desired=%d, current:%d", azVolume.Spec.VolumeName, desiredReplicaCount, currentReplicaCount)
12✔
1263

12✔
1264
        if desiredReplicaCount > currentReplicaCount {
24✔
1265
                w.Logger().Infof("Need %d more replicas for volume (%s)", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName)
12✔
1266
                if azVolume.Status.Detail == nil || azVolume.Status.State == azdiskv1beta2.VolumeDeleting || azVolume.Status.State == azdiskv1beta2.VolumeDeleted {
12✔
1267
                        // underlying volume does not exist, so volume attachment cannot be made
×
1268
                        return nil
×
1269
                }
×
1270
                if err = c.createReplicas(ctx, desiredReplicaCount-currentReplicaCount, azVolume.Name, azVolume.Status.Detail.VolumeID, azVolume.Spec.Parameters); err != nil {
12✔
1271
                        w.Logger().Errorf(err, "failed to create %d replicas for volume (%s): %v", desiredReplicaCount-currentReplicaCount, azVolume.Spec.VolumeName, err)
×
1272
                        return err
×
1273
                }
×
1274
        }
1275
        return nil
12✔
1276
}
1277

1278
// Count the number of replica attachments that aren't scheduled for deletion for a given volume
1279
func (c *SharedState) countValidReplicasForVolume(ctx context.Context, volumeName string) (int, error) {
12✔
1280
        w, _ := workflow.GetWorkflowFromContext(ctx)
12✔
1281
        validReplicaCount := 0
12✔
1282

12✔
1283
        azVolumeAttachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volumeName, azureutils.ReplicaOnly)
12✔
1284
        if err != nil {
12✔
1285
                w.Logger().Errorf(err, "failed to list replica AzVolumeAttachments")
×
1286
                return validReplicaCount, err
×
1287
        }
×
1288

1289
        for _, azVolumeAttachment := range azVolumeAttachments {
12✔
1290
                if deleteRequested, _ := objectDeletionRequested(&azVolumeAttachment); !deleteRequested {
×
1291
                        validReplicaCount++
×
1292
                }
×
1293
        }
1294
        return validReplicaCount, nil
12✔
1295
}
1296

1297
func (c *SharedState) createReplicas(ctx context.Context, remainingReplicas int, volumeName, volumeID string, volumeContext map[string]string) error {
12✔
1298
        var err error
12✔
1299
        ctx, w := workflow.New(ctx)
12✔
1300
        defer func() { w.Finish(err) }()
24✔
1301

1302
        // if volume is scheduled for clean up, skip replica creation
1303
        if _, cleanUpScheduled := c.cleanUpMap.Load(volumeName); cleanUpScheduled {
12✔
1304
                return nil
×
1305
        }
×
1306

1307
        // get pods linked to the volume
1308
        var pods []v1.Pod
12✔
1309
        pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
12✔
1310
        if err != nil {
12✔
1311
                return err
×
1312
        }
×
1313

1314
        // acquire per-pod lock to be released upon creation of replica AzVolumeAttachment CRIs
1315
        for _, pod := range pods {
25✔
1316
                podKey := getQualifiedName(pod.Namespace, pod.Name)
13✔
1317
                v, _ := c.podLocks.LoadOrStore(podKey, &sync.Mutex{})
13✔
1318
                podLock := v.(*sync.Mutex)
13✔
1319
                podLock.Lock()
13✔
1320
                defer podLock.Unlock()
13✔
1321
        }
13✔
1322

1323
        var nodes []string
12✔
1324
        nodes, err = c.getNodesForReplica(ctx, volumeName, pods)
12✔
1325
        if err != nil {
12✔
1326
                w.Logger().Errorf(err, "failed to get a list of nodes for replica attachment")
×
1327
                return err
×
1328
        }
×
1329

1330
        requiredReplicas := remainingReplicas
12✔
1331
        for _, node := range nodes {
24✔
1332
                if err = c.createReplicaAzVolumeAttachment(ctx, volumeID, node, volumeContext); err != nil {
12✔
1333
                        w.Logger().Errorf(err, "failed to create replica AzVolumeAttachment for volume %s", volumeName)
×
1334
                        // continue to try attachment with next node
×
1335
                        continue
×
1336
                }
1337
                remainingReplicas--
12✔
1338
                if remainingReplicas <= 0 {
24✔
1339
                        // no more remainingReplicas, don't need to create replica AzVolumeAttachment
12✔
1340
                        break
12✔
1341
                }
1342
        }
1343

1344
        if remainingReplicas > 0 {
12✔
1345
                //no failed replica attachments, but there are still more replicas to reach MaxShares
×
1346
                request := ReplicaRequest{VolumeName: volumeName, Priority: remainingReplicas}
×
1347
                c.priorityReplicaRequestsQueue.Push(ctx, &request)
×
1348
                for _, pod := range pods {
×
1349
                        c.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, "Not enough suitable nodes to attach %d of %d replica mount(s) for volume %s", remainingReplicas, requiredReplicas, volumeName)
×
1350
                }
×
1351
        }
1352
        return nil
12✔
1353
}
1354

1355
func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string, pods []v1.Pod) ([]string, error) {
12✔
1356
        var err error
12✔
1357
        ctx, w := workflow.New(ctx)
12✔
1358
        defer func() { w.Finish(err) }()
24✔
1359

1360
        if len(pods) == 0 {
12✔
1361
                pods, err = c.getPodsFromVolume(ctx, c.cachedClient, volumeName)
×
1362
                if err != nil {
×
1363
                        return nil, err
×
1364
                }
×
1365
        }
1366

1367
        var volumes []string
12✔
1368
        volumes, err = c.getVolumesForPodObjs(ctx, pods)
12✔
1369
        if err != nil {
12✔
1370
                return nil, err
×
1371
        }
×
1372

1373
        var nodes []string
12✔
1374
        nodes, err = c.getRankedNodesForReplicaAttachments(ctx, volumes, pods)
12✔
1375
        if err != nil {
12✔
1376
                return nil, err
×
1377
        }
×
1378

1379
        var replicaNodes []string
12✔
1380
        replicaNodes, err = c.getNodesWithReplica(ctx, volumeName)
12✔
1381
        if err != nil {
12✔
1382
                return nil, err
×
1383
        }
×
1384

1385
        skipSet := map[string]bool{}
12✔
1386
        for _, replicaNode := range replicaNodes {
12✔
1387
                skipSet[replicaNode] = true
×
1388
        }
×
1389

1390
        filtered := []string{}
12✔
1391
        for _, node := range nodes {
32✔
1392
                if skipSet[node] {
20✔
1393
                        continue
×
1394
                }
1395
                // if the node has no capacity for disk attachment, we should skip it
1396
                remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
20✔
1397
                if !nodeExists || remainingCapacity == nil || remainingCapacity.(*atomic.Int32).Load() <= int32(0) {
20✔
1398
                        w.Logger().V(5).Infof("skip node(%s) because it has no capacity for disk attachment", node)
×
1399
                        continue
×
1400
                }
1401
                filtered = append(filtered, node)
20✔
1402
        }
1403

1404
        return filtered, nil
12✔
1405
}
1406

1407
func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error {
5✔
1408
        var err error
5✔
1409
        ctx, w := workflow.New(ctx)
5✔
1410
        defer func() { w.Finish(err) }()
10✔
1411

1412
        var desiredAzVolume *azdiskv1beta2.AzVolume
5✔
1413
        requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64()
5✔
1414
        volumeCapability := c.getVolumeCapabilityFromPv(&pv)
5✔
1415

5✔
1416
        // translate intree pv to csi pv to convert them into AzVolume resource
5✔
1417
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
5✔
1418
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
5✔
1419
                pv.Spec.AzureDisk != nil {
5✔
1420
                var transPV *v1.PersistentVolume
×
1421
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1422
                if transPV, err = c.translateInTreePVToCSI(&pv); err != nil {
×
1423
                        return err
×
1424
                }
×
1425
                pv = *transPV
×
1426
        }
1427

1428
        // skip if PV is not managed by azuredisk driver
1429
        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != c.config.DriverName {
5✔
1430
                return nil
×
1431
        }
×
1432

1433
        // create AzVolume CRI for CSI Volume Source
1434
        desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI)
5✔
1435
        if err != nil {
5✔
1436
                return err
×
1437
        }
×
1438

1439
        if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required != nil {
5✔
1440
                desiredAzVolume.Status.Detail.AccessibleTopology = azureutils.GetTopologyFromNodeSelector(*pv.Spec.NodeAffinity.Required, c.topologyKey)
×
1441
        }
×
1442
        if azureutils.IsMultiNodePersistentVolume(pv) {
5✔
1443
                desiredAzVolume.Spec.MaxMountReplicaCount = 0
×
1444
        }
×
1445

1446
        // if it's an inline volume, no pv label or pvc label should be added
1447
        if !azureutils.MapContains(annotations, consts.InlineVolumeAnnotation) {
9✔
1448
                desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvNameLabel, pv.Name)
4✔
1449

4✔
1450
                if pv.Spec.ClaimRef != nil {
8✔
1451
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNameLabel, pv.Spec.ClaimRef.Name)
4✔
1452
                        desiredAzVolume.Labels = azureutils.AddToMap(desiredAzVolume.Labels, consts.PvcNamespaceLabel, pv.Spec.ClaimRef.Namespace)
4✔
1453
                }
4✔
1454
        }
1455

1456
        desiredAzVolume.Spec.VolumeCapability = volumeCapability
5✔
1457
        desiredAzVolume.Spec.PersistentVolume = pv.Name
5✔
1458
        desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes}
5✔
1459

5✔
1460
        desiredAzVolume.Status.Detail.CapacityBytes = requiredBytes
5✔
1461

5✔
1462
        for k, v := range annotations {
7✔
1463
                desiredAzVolume.Status.Annotations = azureutils.AddToMap(desiredAzVolume.Status.Annotations, k, v)
2✔
1464
        }
2✔
1465

1466
        w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name)
5✔
1467

5✔
1468
        if err = c.createAzVolume(ctx, desiredAzVolume); err != nil {
5✔
1469
                err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err)
×
1470
                return err
×
1471
        }
×
1472
        return nil
5✔
1473
}
1474

1475
func (c *SharedState) getVolumeCapabilityFromPv(pv *v1.PersistentVolume) []azdiskv1beta2.VolumeCapability {
5✔
1476
        volCaps := []azdiskv1beta2.VolumeCapability{}
5✔
1477

5✔
1478
        for _, accessMode := range pv.Spec.AccessModes {
6✔
1479
                volCap := azdiskv1beta2.VolumeCapability{}
1✔
1480
                // default to Mount
1✔
1481
                if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == v1.PersistentVolumeBlock {
1✔
1482
                        volCap.AccessType = azdiskv1beta2.VolumeCapabilityAccessBlock
×
1483
                }
×
1484
                switch accessMode {
1✔
1485
                case v1.ReadWriteOnce:
1✔
1486
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeSingleNodeSingleWriter
1✔
1487
                case v1.ReadWriteMany:
×
1488
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeMultiWriter
×
1489
                case v1.ReadOnlyMany:
×
1490
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeMultiNodeReaderOnly
×
1491
                default:
×
1492
                        volCap.AccessMode = azdiskv1beta2.VolumeCapabilityAccessModeUnknown
×
1493
                }
1494
                volCaps = append(volCaps, volCap)
1✔
1495
        }
1496
        return volCaps
5✔
1497
}
1498

1499
func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) {
5✔
1500
        diskName, err := azureutils.GetDiskName(source.VolumeHandle)
5✔
1501
        if err != nil {
5✔
1502
                return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err)
×
1503
        }
×
1504

1505
        _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false)
5✔
1506

5✔
1507
        diskParameters, _ := azureutils.ParseDiskParameters(source.VolumeAttributes, azureutils.IgnoreUnknown)
5✔
1508
        volumeParams := diskParameters.VolumeContext
5✔
1509

5✔
1510
        azVolumeName := strings.ToLower(diskName)
5✔
1511

5✔
1512
        azVolume := azdiskv1beta2.AzVolume{
5✔
1513
                ObjectMeta: metav1.ObjectMeta{
5✔
1514
                        Name:       azVolumeName,
5✔
1515
                        Finalizers: []string{consts.AzVolumeFinalizer},
5✔
1516
                },
5✔
1517
                Spec: azdiskv1beta2.AzVolumeSpec{
5✔
1518
                        MaxMountReplicaCount: maxMountReplicaCount,
5✔
1519
                        Parameters:           volumeParams,
5✔
1520
                        VolumeName:           diskName,
5✔
1521
                },
5✔
1522
                Status: azdiskv1beta2.AzVolumeStatus{
5✔
1523
                        Detail: &azdiskv1beta2.AzVolumeStatusDetail{
5✔
1524
                                VolumeID:      source.VolumeHandle,
5✔
1525
                                VolumeContext: source.VolumeAttributes,
5✔
1526
                        },
5✔
1527
                        State: azdiskv1beta2.VolumeCreated,
5✔
1528
                },
5✔
1529
        }
5✔
1530
        azureutils.AnnotateAPIVersion(&azVolume)
5✔
1531

5✔
1532
        return &azVolume, nil
5✔
1533
}
1534

1535
func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error {
5✔
1536
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1537

5✔
1538
        var err error
5✔
1539
        var azVolume *azdiskv1beta2.AzVolume
5✔
1540
        var updated *azdiskv1beta2.AzVolume
5✔
1541

5✔
1542
        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{})
5✔
1543
        if err != nil {
9✔
1544
                if apiErrors.IsNotFound(err) {
8✔
1545
                        azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{})
4✔
1546
                        if err != nil {
4✔
1547
                                return err
×
1548
                        }
×
1549
                        updated = azVolume.DeepCopy()
4✔
1550
                        updated.Status = desiredAzVolume.Status
4✔
1551
                } else {
×
1552
                        return err
×
1553
                }
×
1554
        }
1555

1556
        if apiVersion, ok := azureutils.GetFromMap(azVolume.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
6✔
1557
                w.Logger().Infof("Found AzVolume (%s) with older api version. Converting to apiVersion(%s)", azVolume.Name, azdiskv1beta2.APIVersion)
1✔
1558

1✔
1559
                azVolume.Spec.PersistentVolume = desiredAzVolume.Spec.PersistentVolume
1✔
1560

1✔
1561
                for k, v := range desiredAzVolume.Labels {
4✔
1562
                        azVolume.Labels = azureutils.AddToMap(azVolume.Labels, k, v)
3✔
1563
                }
3✔
1564

1565
                // for now, we don't empty the meta annotations after migrating them to status annotation for safety.
1566
                // note that this will leave some remnant garbage entries in meta annotations
1567
                var statusAnnotation []string
1✔
1568
                for k, v := range azVolume.Annotations {
2✔
1569
                        statusAnnotation = append(statusAnnotation, k, v)
1✔
1570
                }
1✔
1571

1572
                for k, v := range desiredAzVolume.Annotations {
2✔
1573
                        azVolume.Annotations = azureutils.AddToMap(azVolume.Annotations, k, v)
1✔
1574
                }
1✔
1575

1576
                azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.config.ObjectNamespace).Update(ctx, azVolume, metav1.UpdateOptions{})
1✔
1577
                if err != nil {
1✔
1578
                        return err
×
1579
                }
×
1580
                updated = azVolume.DeepCopy()
1✔
1581
                updated.Status.Annotations = azureutils.AddToMap(updated.Status.Annotations, statusAnnotation...)
1✔
1582
        }
1583

1584
        if updated != nil {
10✔
1585
                if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, c.cachedClient, c.azClient, azVolume, func(obj client.Object) error {
10✔
1586
                        azvolume := obj.(*azdiskv1beta2.AzVolume)
5✔
1587
                        azvolume.Status = updated.Status
5✔
1588
                        return nil
5✔
1589
                }, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
5✔
1590
                        return err
×
1591
                }
×
1592
        }
1593

1594
        // if AzVolume CRI successfully recreated, also recreate the operation queue for the volume
1595
        c.createOperationQueue(desiredAzVolume.Name)
5✔
1596
        return nil
5✔
1597
}
1598

1599
func (c *SharedState) translateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
×
1600
        var err error
×
1601
        // translate intree pv to csi pv to convert them into AzVolume resource
×
1602
        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
1603
                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
1604
                pv.Spec.AzureDisk != nil {
×
1605
                // if an error occurs while translating, it's unrecoverable, so return no error
×
1606
                if pv, err = c.azureDiskCSITranslator.TranslateInTreePVToCSI(pv); err != nil {
×
1607
                } else if pv == nil {
×
1608
                        err = status.Errorf(codes.Internal, "unexpected failure in translating inline volume to csi")
×
1609
                }
×
1610

1611
        }
1612
        return pv, err
×
1613
}
1614

1615
// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller
1616
func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) {
2✔
1617
        var vaName string
2✔
1618
        err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) {
4✔
1619
                val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name)
2✔
1620
                if exists {
4✔
1621
                        vaName = val.(string)
2✔
1622
                }
2✔
1623
                return exists, nil
2✔
1624
        })
1625
        return vaName, err
2✔
1626
}
1627

1628
// Returns set of node names that qualify pod affinity term and set of node names with qualifying replica attachments.
1629
func (c *SharedState) getQualifiedNodesForPodAffinityTerm(ctx context.Context, nodes []v1.Node, podNamespace string, affinityTerm v1.PodAffinityTerm) (podNodes, replicaNodes set) {
5✔
1630
        var err error
5✔
1631
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1632
        candidateNodes := set{}
5✔
1633
        for _, node := range nodes {
20✔
1634
                candidateNodes.add(node.Name)
15✔
1635
        }
15✔
1636
        podNodes = set{}
5✔
1637
        replicaNodes = set{}
5✔
1638

5✔
1639
        var podSelector labels.Selector
5✔
1640
        podSelector, err = metav1.LabelSelectorAsSelector(affinityTerm.LabelSelector)
5✔
1641
        // if failed to convert pod affinity label selector to selector, log error and skip
5✔
1642
        if err != nil {
5✔
1643
                w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1644
        }
×
1645

1646
        nsList := &v1.NamespaceList{}
5✔
1647
        if affinityTerm.NamespaceSelector != nil {
5✔
1648
                nsSelector, err := metav1.LabelSelectorAsSelector(affinityTerm.NamespaceSelector)
×
1649
                // if failed to convert pod affinity label selector to selector, log error and skip
×
1650
                if err != nil {
×
1651
                        w.Logger().Errorf(err, "failed to convert pod affinity (%v) to selector", affinityTerm.LabelSelector)
×
1652
                } else {
×
1653
                        if err = c.cachedClient.List(ctx, nsList, &client.ListOptions{LabelSelector: nsSelector}); err != nil {
×
1654
                                w.Logger().Errorf(err, "failed to list namespaces with selector (%v)", nsSelector)
×
1655
                                return
×
1656
                        }
×
1657

1658
                }
1659
        }
1660

1661
        namespaces := affinityTerm.Namespaces
5✔
1662
        for _, ns := range nsList.Items {
5✔
1663
                namespaces = append(namespaces, ns.Name)
×
1664
        }
×
1665

1666
        pods := []v1.Pod{}
5✔
1667
        if len(namespaces) > 0 {
5✔
1668
                for _, namespace := range namespaces {
×
1669
                        podList := &v1.PodList{}
×
1670
                        if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: namespace}); err != nil {
×
1671
                                w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1672
                                pods = append(pods, podList.Items...)
×
1673
                        }
×
1674
                }
1675
        } else {
5✔
1676
                podList := &v1.PodList{}
5✔
1677
                if err = c.cachedClient.List(ctx, podList, &client.ListOptions{LabelSelector: podSelector, Namespace: podNamespace}); err != nil {
5✔
1678
                        w.Logger().Errorf(err, "failed to retrieve pod list: %v", err)
×
1679
                }
×
1680
                pods = podList.Items
5✔
1681
        }
1682

1683
        // get replica nodes for pods that satisfy pod label selector
1684
        replicaNodes = c.getReplicaNodesForPods(ctx, pods)
5✔
1685
        for replicaNode := range replicaNodes {
5✔
1686
                if !candidateNodes.has(replicaNode) {
×
1687
                        replicaNodes.remove(replicaNode)
×
1688
                }
×
1689
        }
1690

1691
        // get nodes with pod that share the same topology as pods satisfying pod label selector
1692
        for _, pod := range pods {
9✔
1693
                podNodes.add(pod.Spec.NodeName)
4✔
1694
        }
4✔
1695

1696
        var podNodeObjs []v1.Node
5✔
1697
        for node := range podNodes {
9✔
1698
                var nodeObj v1.Node
4✔
1699
                if err = c.cachedClient.Get(ctx, types.NamespacedName{Name: node.(string)}, &nodeObj); err != nil {
4✔
1700
                        w.Logger().Errorf(err, "failed to get node (%s)", node.(string))
×
1701
                        continue
×
1702
                }
1703
                podNodeObjs = append(podNodeObjs, nodeObj)
4✔
1704
        }
1705

1706
        topologyLabel := c.getNodesTopologySelector(ctx, podNodeObjs, affinityTerm.TopologyKey)
5✔
1707
        for _, node := range nodes {
20✔
1708
                if topologyLabel != nil && topologyLabel.Matches(labels.Set(node.Labels)) {
23✔
1709
                        podNodes.add(node.Name)
8✔
1710
                }
8✔
1711
        }
1712
        return
5✔
1713
}
1714

1715
// Returns set of node names where replica mounts of given pod can be found
1716
func (c *SharedState) getReplicaNodesForPods(ctx context.Context, pods []v1.Pod) (replicaNodes set) {
5✔
1717
        // add nodes, to which replica attachments of matching pods' volumes are attached, to replicaNodes
5✔
1718
        replicaNodes = set{}
5✔
1719
        if volumes, err := c.getVolumesForPodObjs(ctx, pods); err == nil {
10✔
1720
                for _, volume := range volumes {
5✔
1721
                        attachments, err := azureutils.GetAzVolumeAttachmentsForVolume(ctx, c.cachedClient, volume, azureutils.ReplicaOnly)
×
1722
                        if err != nil {
×
1723
                                continue
×
1724
                        }
1725
                        for _, attachment := range attachments {
×
1726
                                if deleteRequested, _ := objectDeletionRequested(&attachment); !deleteRequested {
×
1727
                                        node := attachment.Spec.NodeName
×
1728
                                        replicaNodes.add(node)
×
1729
                                }
×
1730
                        }
1731
                }
1732
        }
1733

1734
        return replicaNodes
5✔
1735
}
1736

1737
// Returns a label selector corresponding to a list of nodes and a topology key (aka label key)
1738
func (c *SharedState) getNodesTopologySelector(ctx context.Context, nodes []v1.Node, topologyKey string) labels.Selector {
5✔
1739
        w, _ := workflow.GetWorkflowFromContext(ctx)
5✔
1740
        if len(nodes) == 0 {
6✔
1741
                return nil
1✔
1742
        }
1✔
1743

1744
        topologyValues := set{}
4✔
1745
        for _, node := range nodes {
8✔
1746
                nodeLabels := node.GetLabels()
4✔
1747
                if topologyValue, exists := nodeLabels[topologyKey]; exists {
8✔
1748
                        topologyValues.add(topologyValue)
4✔
1749
                } else {
4✔
1750
                        w.Logger().V(5).Infof("node (%s) doesn't have label value for topologyKey (%s)", node.Name, topologyKey)
×
1751
                }
×
1752
        }
1753

1754
        topologySelector := labels.NewSelector()
4✔
1755
        topologyRequirement, err := azureutils.CreateLabelRequirements(topologyKey, selection.In, topologyValues.toStringSlice()...)
4✔
1756
        // if failed to create label requirement, log error and return empty selector
4✔
1757
        if err != nil {
4✔
1758
                w.Logger().Errorf(err, "failed to create label requirement for topologyKey (%s)", topologyKey)
×
1759
        } else {
4✔
1760
                topologySelector = topologySelector.Add(*topologyRequirement)
4✔
1761
        }
4✔
1762
        return topologySelector
4✔
1763
}
1764

1765
// addNodeToAvailableAttachmentsMap returns true if the node is added to or already in the availableAttachmentsMap, and false otherwise.
1766
func (c *SharedState) addNodeToAvailableAttachmentsMap(ctx context.Context, nodeName string, nodeLables map[string]string) bool {
7✔
1767
        if _, ok := c.availableAttachmentsMap.Load(nodeName); !ok {
14✔
1768
                capacity, err := azureutils.GetNodeRemainingDiskCount(ctx, c.cachedClient, nodeName)
7✔
1769
                if err != nil {
10✔
1770
                        klog.Errorf("Failed to get node(%s) remaining disk count with error: %v", nodeName, err)
3✔
1771
                        // store the maximum capacity if an entry for the node doesn't exist.
3✔
1772
                        capacity, err = azureutils.GetNodeMaxDiskCountWithLabels(nodeLables)
3✔
1773
                        if err != nil {
4✔
1774
                                klog.Errorf("Failed to add node(%s) in availableAttachmentsMap, because get capacity of available attachments is failed with error: %v", nodeName, err)
1✔
1775
                                return false
1✔
1776
                        }
1✔
1777
                }
1778
                var count atomic.Int32
6✔
1779
                count.Store(int32(capacity))
6✔
1780
                klog.Infof("Added node(%s) to availableAttachmentsMap with capacity: %d", nodeName, capacity)
6✔
1781
                c.availableAttachmentsMap.LoadOrStore(nodeName, &count)
6✔
1782
        }
1783
        return true
6✔
1784
}
1785

1786
func (c *SharedState) deleteNodeFromAvailableAttachmentsMap(ctx context.Context, node string) {
3✔
1787
        klog.Infof("Deleted node(%s) from availableAttachmentsMap", node)
3✔
1788
        c.availableAttachmentsMap.Delete(node)
3✔
1789
}
3✔
1790

1791
func (c *SharedState) decrementAttachmentCount(ctx context.Context, node string) bool {
4✔
1792
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
4✔
1793
        if nodeExists && remainingCapacity != nil {
7✔
1794
                currentCapacity := int32(0)
3✔
1795
                for {
6✔
1796
                        currentCapacity = remainingCapacity.(*atomic.Int32).Load()
3✔
1797
                        if currentCapacity == int32(0) || remainingCapacity.(*atomic.Int32).CompareAndSwap(currentCapacity, currentCapacity-1) {
6✔
1798
                                if currentCapacity == int32(0) {
4✔
1799
                                        klog.Errorf("Failed to decrement attachment count for node(%s), because no available attachment", node)
1✔
1800
                                        return false
1✔
1801
                                }
1✔
1802
                                return true
2✔
1803
                        }
1804
                }
1805
        }
1806

1807
        klog.Errorf("Failed to decrement attachment count, because node(%s) not found", node)
1✔
1808

1✔
1809
        return false
1✔
1810
}
1811

1812
func (c *SharedState) incrementAttachmentCount(ctx context.Context, node string) bool {
3✔
1813
        remainingCapacity, nodeExists := c.availableAttachmentsMap.Load(node)
3✔
1814
        if nodeExists && remainingCapacity != nil {
4✔
1815
                remainingCapacity.(*atomic.Int32).Add(1)
1✔
1816
                return true
1✔
1817
        }
1✔
1818

1819
        klog.Errorf("Failed to increment attachment count, because node(%s) not found", node)
2✔
1820

2✔
1821
        return false
2✔
1822
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc