• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 4358802581

07 Mar 2023 09:50PM UTC coverage: 69.495% (+25.0%) from 44.538%
4358802581

push

github

GitHub
Merge pull request #1739 from alice-zheyan-yu/UpdateCRIWithRetry_optimization

90 of 90 new or added lines in 3 files covered. (100.0%)

7142 of 10277 relevant lines covered (69.49%)

6.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.31
/pkg/controller/attach_detach.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "strings"
23
        "sync"
24
        "sync/atomic"
25

26
        "github.com/go-logr/logr"
27
        "google.golang.org/grpc/codes"
28
        "google.golang.org/grpc/status"
29
        v1 "k8s.io/api/core/v1"
30
        storagev1 "k8s.io/api/storage/v1"
31
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/types"
34
        utilfeature "k8s.io/apiserver/pkg/util/feature"
35
        "k8s.io/kubernetes/pkg/features"
36
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
37
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
38
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
39
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
40
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
41
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
42

43
        volerr "k8s.io/cloud-provider/volume/errors"
44
        "sigs.k8s.io/cloud-provider-azure/pkg/retry"
45
        "sigs.k8s.io/controller-runtime/pkg/client"
46
        "sigs.k8s.io/controller-runtime/pkg/controller"
47
        "sigs.k8s.io/controller-runtime/pkg/handler"
48
        "sigs.k8s.io/controller-runtime/pkg/manager"
49
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
50
        "sigs.k8s.io/controller-runtime/pkg/source"
51
)
52

53
type CloudDiskAttachDetacher interface {
54
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
55
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
56
}
57

58
type CrdDetacher interface {
59
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string, secrets map[string]string, mode consts.UnpublishMode) error
60
        WaitForDetach(ctx context.Context, volumeID, nodeID string) error
61
}
62

63
/*
64
Attach Detach controller is responsible for
65
 1. attaching volume to a specified node upon creation of AzVolumeAttachment CRI
66
 2. promoting AzVolumeAttachment to primary upon spec update
67
 3. detaching volume upon deletions marked with certain annotations
68
*/
69
type ReconcileAttachDetach struct {
70
        *SharedState
71
        logger            logr.Logger
72
        crdDetacher       CrdDetacher
73
        cloudDiskAttacher CloudDiskAttachDetacher
74
        stateLock         *sync.Map
75
        retryInfo         *retryInfo
76
}
77

78
var _ reconcile.Reconciler = &ReconcileAttachDetach{}
79

80
var allowedTargetAttachmentStates = map[string][]string{
81
        "":                                       {string(azdiskv1beta2.AttachmentPending), string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
82
        string(azdiskv1beta2.AttachmentPending):  {string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
83
        string(azdiskv1beta2.Attaching):          {string(azdiskv1beta2.Attached), string(azdiskv1beta2.AttachmentFailed)},
84
        string(azdiskv1beta2.Detaching):          {string(azdiskv1beta2.Detached), string(azdiskv1beta2.DetachmentFailed)},
85
        string(azdiskv1beta2.Attached):           {string(azdiskv1beta2.Detaching)},
86
        string(azdiskv1beta2.Detached):           {},
87
        string(azdiskv1beta2.AttachmentFailed):   {string(azdiskv1beta2.Detaching)},
88
        string(azdiskv1beta2.DetachmentFailed):   {string(azdiskv1beta2.ForceDetachPending)},
89
        string(azdiskv1beta2.ForceDetachPending): {string(azdiskv1beta2.Detaching)},
90
}
91

92
func (r *ReconcileAttachDetach) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
6✔
93
        if !r.isRecoveryComplete() {
6✔
94
                return reconcile.Result{Requeue: true}, nil
×
95
        }
×
96

97
        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, request.Name, request.Namespace, true)
6✔
98
        // if object is not found, it means the object has been deleted. Log the deletion and do not requeue
6✔
99
        if apiErrors.IsNotFound(err) {
6✔
100
                r.azVolumeAttachmentToVaMap.Delete(request.Name)
×
101
                return reconcileReturnOnSuccess(request.Name, r.retryInfo)
×
102
        } else if err != nil {
6✔
103
                azVolumeAttachment.Name = request.Name
×
104
                return reconcileReturnOnError(ctx, azVolumeAttachment, "get", err, r.retryInfo)
×
105
        }
×
106

107
        ctx, _ = workflow.GetWorkflowFromObj(ctx, azVolumeAttachment)
6✔
108

6✔
109
        // if underlying cloud operation already in process, skip until operation is completed
6✔
110
        if isOperationInProcess(azVolumeAttachment) {
6✔
111
                return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
×
112
        }
×
113

114
        // deletion request
115
        if deleteRequested, deleteAfter := objectDeletionRequested(azVolumeAttachment); deleteRequested {
7✔
116
                if deleteAfter > 0 {
1✔
117
                        return reconcileAfter(deleteAfter, request.Name, r.retryInfo)
×
118
                }
×
119
                if err := r.removeFinalizer(ctx, azVolumeAttachment); err != nil {
1✔
120
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "delete", err, r.retryInfo)
×
121
                }
×
122
                // deletion of azVolumeAttachment is succeeded, the node's remaining capacity of disk attachment should be increased by 1
123
                r.incrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
1✔
124
                // detachment request
125
        } else if volumeDetachRequested(azVolumeAttachment) {
6✔
126
                if err := r.triggerDetach(ctx, azVolumeAttachment); err != nil {
1✔
127
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "detach", err, r.retryInfo)
×
128
                }
×
129
                // attachment request
130
        } else if azVolumeAttachment.Status.Detail == nil {
6✔
131
                if err := r.triggerAttach(ctx, azVolumeAttachment); err != nil {
3✔
132
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "attach", err, r.retryInfo)
1✔
133
                }
1✔
134
                // promotion/demotion request
135
        } else if azVolumeAttachment.Spec.RequestedRole != azVolumeAttachment.Status.Detail.Role {
4✔
136
                switch azVolumeAttachment.Spec.RequestedRole {
2✔
137
                case azdiskv1beta2.PrimaryRole:
1✔
138
                        if err := r.promote(ctx, azVolumeAttachment); err != nil {
1✔
139
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "promote", err, r.retryInfo)
×
140
                        }
×
141
                case azdiskv1beta2.ReplicaRole:
1✔
142
                        if err := r.demote(ctx, azVolumeAttachment); err != nil {
1✔
143
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "demote", err, r.retryInfo)
×
144
                        }
×
145
                }
146
        }
147

148
        return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
5✔
149
}
150

151
func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
152
        var err error
2✔
153
        ctx, w := workflow.New(ctx)
2✔
154
        defer func() { w.Finish(err) }()
4✔
155

156
        // requeue if AzVolumeAttachment's state is being updated by a different worker
157
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
2✔
158
                err = getOperationRequeueError("attach", azVolumeAttachment)
×
159
                return err
×
160
        }
×
161
        defer r.stateLock.Delete(azVolumeAttachment.Name)
2✔
162

2✔
163
        if !volumeAttachRequested(azVolumeAttachment) {
3✔
164
                err = status.Errorf(codes.FailedPrecondition, "attach operation has not yet been requested")
1✔
165
                return err
1✔
166
        }
1✔
167

168
        var azVolume *azdiskv1beta2.AzVolume
1✔
169
        if azVolume, err = azureutils.GetAzVolume(ctx, r.cachedClient, r.azClient, strings.ToLower(azVolumeAttachment.Spec.VolumeName), r.config.ObjectNamespace, true); err != nil {
1✔
170
                if apiErrors.IsNotFound(err) {
×
171
                        w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) not found", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
172
                        err = nil
×
173
                        return nil
×
174
                }
×
175
                return err
×
176
        } else if deleteRequested, _ := objectDeletionRequested(azVolume); deleteRequested {
1✔
177
                w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) scheduled for deletion", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
178
                return nil
×
179
        }
×
180

181
        // update status block
182
        updateFunc := func(obj client.Object) error {
2✔
183
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
184
                // Update state to attaching, Initialize finalizer and add label to the object
1✔
185
                _, derr := updateState(azv, azdiskv1beta2.Attaching, normalUpdate)
1✔
186
                return derr
1✔
187
        }
1✔
188

189
        var updatedObj client.Object
1✔
190
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
191
                return err
×
192
        }
×
193
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
194

1✔
195
        w.Logger().V(5).Info("Attaching volume")
1✔
196
        waitCh := make(chan goSignal)
1✔
197
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
198
        go func() {
2✔
199
                var attachErr error
1✔
200
                _, goWorkflow := workflow.New(ctx)
1✔
201
                defer func() { goWorkflow.Finish(attachErr) }()
2✔
202
                waitCh <- goSignal{}
1✔
203

1✔
204
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
205
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
206
                defer cloudCancel()
1✔
207

1✔
208
                // attempt to attach the disk to a node
1✔
209
                var publishCtx map[string]string
1✔
210
                var pods []v1.Pod
1✔
211
                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
1✔
212
                        var err error
×
213
                        pods, err = r.getPodsFromVolume(goCtx, r.cachedClient, azVolumeAttachment.Spec.VolumeName)
×
214
                        if err != nil {
×
215
                                goWorkflow.Logger().Error(err, "failed to list pods for volume")
×
216
                        }
×
217
                }
218

219
                var handleSuccess func(bool)
1✔
220
                var handleError func()
1✔
221

1✔
222
                attachAndUpdate := func() {
2✔
223
                        attachResult := r.attachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName, azVolumeAttachment.Spec.VolumeContext)
1✔
224
                        if publishCtx = attachResult.PublishContext(); publishCtx != nil {
2✔
225
                                handleSuccess(false)
1✔
226
                        }
1✔
227
                        var ok bool
1✔
228
                        if attachErr, ok = <-attachResult.ResultChannel(); !ok || attachErr != nil {
1✔
229
                                handleError()
×
230
                        } else {
1✔
231
                                handleSuccess(true)
1✔
232
                        }
1✔
233
                }
234

235
                handleError = func() {
1✔
236
                        if len(pods) > 0 {
×
237
                                for _, pod := range pods {
×
238
                                        r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, "Replica mount for volume %s failed to be attached to node %s with error: %v", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName, attachErr)
×
239
                                }
×
240
                        }
241

242
                        // if the disk is attached to a different node
243
                        if danglingAttachErr, ok := attachErr.(*volerr.DanglingAttachError); ok {
×
244
                                // get disk, current node and attachment name
×
245
                                currentNodeName := string(danglingAttachErr.CurrentNode)
×
246
                                currentAttachmentName := azureutils.GetAzVolumeAttachmentName(azVolumeAttachment.Spec.VolumeName, currentNodeName)
×
247
                                goWorkflow.Logger().Infof("Dangling attach detected for %s", currentNodeName)
×
248

×
249
                                // check if AzVolumeAttachment exists for the existing attachment
×
250
                                _, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Get(cloudCtx, currentAttachmentName, metav1.GetOptions{})
×
251
                                var detachErr error
×
252
                                if apiErrors.IsNotFound(err) {
×
253
                                        // AzVolumeAttachment doesn't exist so we only need to detach disk from cloud
×
254
                                        detachErr = r.cloudDiskAttacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
255
                                        if detachErr != nil {
×
256
                                                goWorkflow.Logger().Errorf(detachErr, "failed to detach dangling volume (%s) from node (%s). Error: %v", azVolumeAttachment.Spec.VolumeID, currentNodeName, err)
×
257
                                        }
×
258
                                } else {
×
259
                                        // AzVolumeAttachment exist so we need to detach disk through crdProvisioner
×
260
                                        detachErr = r.crdDetacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName, make(map[string]string), consts.Detach)
×
261
                                        if detachErr != nil {
×
262
                                                goWorkflow.Logger().Errorf(detachErr, "failed to make a unpublish request dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
263
                                        }
×
264
                                        detachErr = r.crdDetacher.WaitForDetach(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
265
                                        if detachErr != nil {
×
266
                                                goWorkflow.Logger().Errorf(detachErr, "failed to unpublish dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
267
                                        }
×
268
                                }
269
                                // attempt to attach the disk to a node after detach
270
                                if detachErr == nil {
×
271
                                        attachAndUpdate()
×
272
                                        return
×
273
                                }
×
274
                        }
275

276
                        updateFunc := func(obj client.Object) error {
×
277
                                azva := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
278
                                // add retriable annotation if the attach error is PartialUpdateError or timeout
×
279
                                if _, ok := attachErr.(*retry.PartialUpdateError); ok || errors.Is(err, context.DeadlineExceeded) {
×
280
                                        azva.Status.Annotations = azureutils.AddToMap(azva.Status.Annotations, consts.ReplicaVolumeAttachRetryAnnotation, "true")
×
281
                                }
×
282

283
                                _, uerr := reportError(azva, azdiskv1beta2.AttachmentFailed, attachErr)
×
284
                                return uerr
×
285
                        }
286
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
287
                        _, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
×
288
                }
289
                handleSuccess = func(asyncComplete bool) {
3✔
290
                        // Publish event to indicate attachment success
2✔
291
                        if asyncComplete {
3✔
292
                                if len(pods) > 0 {
1✔
293
                                        for _, pod := range pods {
×
294
                                                r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeNormal, consts.ReplicaAttachmentSuccessEvent, "Replica mount for volume %s successfully attached to node %s", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName)
×
295
                                        }
×
296
                                }
297
                                // the node's remaining capacity of disk attachment should be decreased by 1, since the disk attachment is succeeded.
298
                                r.decrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
1✔
299
                        }
300

301
                        updateFunc := func(obj client.Object) error {
4✔
302
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
2✔
303
                                azv = updateStatusDetail(azv, publishCtx)
2✔
304
                                var uerr error
2✔
305
                                if asyncComplete {
3✔
306
                                        _, uerr = updateState(azv, azdiskv1beta2.Attached, forceUpdate)
1✔
307
                                }
1✔
308
                                return uerr
2✔
309
                        }
310

311
                        if asyncComplete && azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
3✔
312
                                _ = r.updateVolumeAttachmentWithResult(goCtx, azVolumeAttachment)
1✔
313
                        }
1✔
314
                        updatedObj, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
2✔
315
                        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
2✔
316
                }
317

318
                attachAndUpdate()
1✔
319
        }()
320
        <-waitCh
1✔
321

1✔
322
        return nil
1✔
323
}
324

325
func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
326
        var err error
1✔
327
        ctx, w := workflow.New(ctx)
1✔
328
        defer func() { w.Finish(err) }()
2✔
329

330
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
1✔
331
                return getOperationRequeueError("detach", azVolumeAttachment)
×
332
        }
×
333
        defer r.stateLock.Delete(azVolumeAttachment.Name)
1✔
334

1✔
335
        updateFunc := func(obj client.Object) error {
2✔
336
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
337
                // Update state to detaching
1✔
338
                _, derr := updateState(azv, azdiskv1beta2.Detaching, normalUpdate)
1✔
339
                return derr
1✔
340
        }
1✔
341

342
        var updatedObj client.Object
1✔
343
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
344
                return err
×
345
        }
×
346
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
347

1✔
348
        w.Logger().V(5).Info("Detaching volume")
1✔
349
        waitCh := make(chan goSignal)
1✔
350
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
351
        go func() {
2✔
352
                var detachErr error
1✔
353
                _, goWorkflow := workflow.New(ctx)
1✔
354
                defer func() { goWorkflow.Finish(detachErr) }()
2✔
355
                waitCh <- goSignal{}
1✔
356

1✔
357
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
358
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
359
                defer cloudCancel()
1✔
360

1✔
361
                var updateFunc func(obj client.Object) error
1✔
362
                detachErr = r.detachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName)
1✔
363
                if detachErr != nil {
1✔
364
                        updateFunc = func(obj client.Object) error {
×
365
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
366
                                _, derr := reportError(azv, azdiskv1beta2.DetachmentFailed, detachErr)
×
367
                                return derr
×
368
                        }
×
369
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
370
                        if _, uerr := azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus); uerr != nil {
×
371
                                w.Logger().Errorf(uerr, "failed to update final status of AzVolumeAttachement")
×
372
                        }
×
373
                } else {
1✔
374
                        //nolint:contextcheck // delete of the CRI must occur even when the current context's deadline passes.
1✔
375
                        if derr := r.cachedClient.Delete(goCtx, azVolumeAttachment); derr != nil {
1✔
376
                                w.Logger().Error(derr, "failed to delete AzVolumeAttachment")
×
377
                        }
×
378
                }
379
        }()
380
        <-waitCh
1✔
381
        return nil
1✔
382
}
383

384
func (r *ReconcileAttachDetach) removeFinalizer(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
385
        var err error
1✔
386
        ctx, w := workflow.New(ctx)
1✔
387
        defer func() { w.Finish(err) }()
2✔
388

389
        updateFunc := func(obj client.Object) error {
2✔
390
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
391
                // delete finalizer
1✔
392
                _ = r.deleteFinalizer(azv)
1✔
393
                return nil
1✔
394
        }
1✔
395

396
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI)
1✔
397
        return err
1✔
398
}
399

400
func (r *ReconcileAttachDetach) promote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
401
        var err error
1✔
402
        ctx, w := workflow.New(ctx)
1✔
403
        defer func() { w.Finish(err) }()
2✔
404

405
        w.Logger().Infof("Promoting AzVolumeAttachment")
1✔
406
        if err = r.updateVolumeAttachmentWithResult(ctx, azVolumeAttachment); err != nil {
1✔
407
                return err
×
408
        }
×
409
        // initialize metadata and update status block
410
        updateFunc := func(obj client.Object) error {
2✔
411
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
412
                _ = updateRole(azv, azdiskv1beta2.PrimaryRole)
1✔
413
                return nil
1✔
414
        }
1✔
415
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
416
                return err
×
417
        }
×
418
        return nil
1✔
419
}
420

421
func (r *ReconcileAttachDetach) demote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
422
        var err error
1✔
423
        ctx, w := workflow.New(ctx)
1✔
424
        defer func() { w.Finish(err) }()
2✔
425

426
        w.Logger().V(5).Info("Demoting AzVolumeAttachment")
1✔
427
        // initialize metadata and update status block
1✔
428
        updateFunc := func(obj client.Object) error {
2✔
429
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
430
                delete(azv.Status.Annotations, consts.VolumeAttachmentKey)
1✔
431
                _ = updateRole(azv, azdiskv1beta2.ReplicaRole)
1✔
432
                return nil
1✔
433
        }
1✔
434

435
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
436
                return err
×
437
        }
×
438
        return nil
1✔
439
}
440

441
func (r *ReconcileAttachDetach) updateVolumeAttachmentWithResult(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
442
        ctx, w := workflow.New(ctx)
2✔
443
        var vaName string
2✔
444
        vaName, err := r.waitForVolumeAttachmentName(ctx, azVolumeAttachment)
2✔
445
        if err != nil {
2✔
446
                return err
×
447
        }
×
448

449
        vaUpdateFunc := func(obj client.Object) error {
4✔
450
                va := obj.(*storagev1.VolumeAttachment)
2✔
451
                if azVolumeAttachment.Status.Detail != nil {
4✔
452
                        for key, value := range azVolumeAttachment.Status.Detail.PublishContext {
3✔
453
                                va.Status.AttachmentMetadata = azureutils.AddToMap(va.Status.AttachmentMetadata, key, value)
1✔
454
                        }
1✔
455
                }
456
                return nil
2✔
457
        }
458

459
        originalVA := &storagev1.VolumeAttachment{}
2✔
460
        if err = r.cachedClient.Get(ctx, types.NamespacedName{Namespace: azVolumeAttachment.Namespace, Name: vaName}, originalVA); err != nil {
2✔
461
                w.Logger().Errorf(err, "failed to get original VolumeAttachment (%s)", vaName)
×
462
                return err
×
463
        }
×
464
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, originalVA, vaUpdateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
2✔
465
        return err
2✔
466
}
467

468
func (r *ReconcileAttachDetach) deleteFinalizer(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) *azdiskv1beta2.AzVolumeAttachment {
1✔
469
        if azVolumeAttachment == nil {
1✔
470
                return nil
×
471
        }
×
472

473
        if azVolumeAttachment.ObjectMeta.Finalizers == nil {
1✔
474
                return azVolumeAttachment
×
475
        }
×
476

477
        finalizers := []string{}
1✔
478
        for _, finalizer := range azVolumeAttachment.ObjectMeta.Finalizers {
2✔
479
                if finalizer == consts.AzVolumeAttachmentFinalizer {
2✔
480
                        continue
1✔
481
                }
482
                finalizers = append(finalizers, finalizer)
×
483
        }
484
        azVolumeAttachment.ObjectMeta.Finalizers = finalizers
1✔
485
        return azVolumeAttachment
1✔
486
}
487

488
func (r *ReconcileAttachDetach) attachVolume(ctx context.Context, volumeID, node string, volumeContext map[string]string) provisioner.CloudAttachResult {
1✔
489
        return r.cloudDiskAttacher.PublishVolume(ctx, volumeID, node, volumeContext)
1✔
490
}
1✔
491

492
func (r *ReconcileAttachDetach) detachVolume(ctx context.Context, volumeID, node string) error {
1✔
493
        return r.cloudDiskAttacher.UnpublishVolume(ctx, volumeID, node)
1✔
494
}
1✔
495

496
func (r *ReconcileAttachDetach) Recover(ctx context.Context, recoveryID string) error {
3✔
497
        var err error
3✔
498
        ctx, w := workflow.New(ctx)
3✔
499
        defer func() { w.Finish(err) }()
6✔
500

501
        w.Logger().V(5).Info("Recovering AzVolumeAttachment CRIs...")
3✔
502
        // try to recreate missing AzVolumeAttachment CRI
3✔
503
        var syncedVolumeAttachments, volumesToSync map[string]bool
3✔
504

3✔
505
        for i := 0; i < maxRetry; i++ {
6✔
506
                if syncedVolumeAttachments, volumesToSync, err = r.recreateAzVolumeAttachment(ctx, syncedVolumeAttachments, volumesToSync); err == nil {
6✔
507
                        break
3✔
508
                }
509
                w.Logger().Error(err, "failed to recreate missing AzVolumeAttachment CRI")
×
510
        }
511
        // retrigger any aborted operation from possible previous controller crash
512
        recovered := &sync.Map{}
3✔
513
        for i := 0; i < maxRetry; i++ {
6✔
514
                if err = r.recoverAzVolumeAttachment(ctx, recovered, recoveryID); err == nil {
6✔
515
                        break
3✔
516
                }
517
                w.Logger().Error(err, "failed to recover AzVolumeAttachment state")
×
518
        }
519

520
        return err
3✔
521
}
522

523
func updateRole(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, role azdiskv1beta2.Role) *azdiskv1beta2.AzVolumeAttachment {
4✔
524
        if azVolumeAttachment == nil {
4✔
525
                return nil
×
526
        }
×
527

528
        if azVolumeAttachment.Status.Detail == nil {
4✔
529
                return azVolumeAttachment
×
530
        }
×
531

532
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
4✔
533
        azVolumeAttachment.Status.Detail.Role = role
4✔
534

4✔
535
        return azVolumeAttachment
4✔
536
}
537

538
func updateStatusDetail(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, status map[string]string) *azdiskv1beta2.AzVolumeAttachment {
2✔
539
        if azVolumeAttachment == nil {
2✔
540
                return nil
×
541
        }
×
542

543
        if azVolumeAttachment.Status.Detail == nil {
3✔
544
                azVolumeAttachment.Status.Detail = &azdiskv1beta2.AzVolumeAttachmentStatusDetail{}
1✔
545
        }
1✔
546

547
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
2✔
548
        azVolumeAttachment.Status.Detail.Role = azVolumeAttachment.Spec.RequestedRole
2✔
549
        azVolumeAttachment.Status.Detail.PublishContext = status
2✔
550

2✔
551
        return azVolumeAttachment
2✔
552
}
553

554
func updateError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, err error) *azdiskv1beta2.AzVolumeAttachment {
×
555
        if azVolumeAttachment == nil {
×
556
                return nil
×
557
        }
×
558

559
        if err != nil {
×
560
                azVolumeAttachment.Status.Error = util.NewAzError(err)
×
561
        }
×
562

563
        return azVolumeAttachment
×
564
}
565

566
func updateState(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, mode updateMode) (*azdiskv1beta2.AzVolumeAttachment, error) {
6✔
567
        var err error
6✔
568
        if azVolumeAttachment == nil {
6✔
569
                return nil, status.Errorf(codes.FailedPrecondition, "function `updateState` requires non-nil AzVolumeAttachment object.")
×
570
        }
×
571
        if mode == normalUpdate {
9✔
572
                if expectedStates, exists := allowedTargetAttachmentStates[string(azVolumeAttachment.Status.State)]; !exists || !containsString(string(state), expectedStates) {
3✔
573
                        err = status.Error(codes.FailedPrecondition, formatUpdateStateError("azVolumeAttachment", string(azVolumeAttachment.Status.State), string(state), expectedStates...))
×
574
                }
×
575
        }
576
        if err == nil {
12✔
577
                azVolumeAttachment.Status.State = state
6✔
578
        }
6✔
579
        return azVolumeAttachment, err
6✔
580
}
581

582
func reportError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, err error) (*azdiskv1beta2.AzVolumeAttachment, error) {
×
583
        if azVolumeAttachment == nil {
×
584
                return nil, status.Errorf(codes.FailedPrecondition, "function `reportError` requires non-nil AzVolumeAttachment object.")
×
585
        }
×
586
        azVolumeAttachment = updateError(azVolumeAttachment, err)
×
587
        return updateState(azVolumeAttachment, state, forceUpdate)
×
588
}
589

590
func (r *ReconcileAttachDetach) recreateAzVolumeAttachment(ctx context.Context, syncedVolumeAttachments map[string]bool, volumesToSync map[string]bool) (map[string]bool, map[string]bool, error) {
3✔
591
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
592
        // Get all volumeAttachments
3✔
593
        volumeAttachments, err := r.kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
3✔
594
        if err != nil {
3✔
595
                return syncedVolumeAttachments, volumesToSync, err
×
596
        }
×
597

598
        if syncedVolumeAttachments == nil {
6✔
599
                syncedVolumeAttachments = map[string]bool{}
3✔
600
        }
3✔
601
        if volumesToSync == nil {
6✔
602
                volumesToSync = map[string]bool{}
3✔
603
        }
3✔
604

605
        // Loop through volumeAttachments and create Primary AzVolumeAttachments in correspondence
606
        for _, volumeAttachment := range volumeAttachments.Items {
5✔
607
                // skip if sync has been completed volumeAttachment
2✔
608
                if syncedVolumeAttachments[volumeAttachment.Name] {
2✔
609
                        continue
×
610
                }
611
                if volumeAttachment.Spec.Attacher == r.config.DriverName {
4✔
612
                        pvName := volumeAttachment.Spec.Source.PersistentVolumeName
2✔
613
                        if pvName == nil {
2✔
614
                                continue
×
615
                        }
616
                        // get PV and retrieve diskName
617
                        pv, err := r.kubeClient.CoreV1().PersistentVolumes().Get(ctx, *pvName, metav1.GetOptions{})
2✔
618
                        if err != nil {
2✔
619
                                w.Logger().Errorf(err, "failed to get PV (%s)", *pvName)
×
620
                                return syncedVolumeAttachments, volumesToSync, err
×
621
                        }
×
622

623
                        // if pv is migrated intree pv, convert it to csi pv for processing
624
                        // translate intree pv to csi pv to convert them into AzVolume resource
625
                        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
2✔
626
                                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
2✔
627
                                pv.Spec.AzureDisk != nil {
2✔
628
                                // translate intree pv to csi pv to convert them into AzVolume resource
×
629
                                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
630
                                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
631
                                        pv.Spec.AzureDisk != nil {
×
632
                                        if pv, err = r.translateInTreePVToCSI(pv); err != nil {
×
633
                                                w.Logger().V(5).Errorf(err, "skipping azVolumeAttachment creation for volumeAttachment (%s)", volumeAttachment.Name)
×
634
                                        }
×
635
                                }
636
                        }
637

638
                        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
2✔
639
                                continue
×
640
                        }
641
                        volumesToSync[pv.Spec.CSI.VolumeHandle] = true
2✔
642

2✔
643
                        diskName, err := azureutils.GetDiskName(pv.Spec.CSI.VolumeHandle)
2✔
644
                        if err != nil {
2✔
645
                                w.Logger().Errorf(err, "failed to extract disk name from volumehandle (%s)", pv.Spec.CSI.VolumeHandle)
×
646
                                delete(volumesToSync, pv.Spec.CSI.VolumeHandle)
×
647
                                continue
×
648
                        }
649
                        volumeName := strings.ToLower(diskName)
2✔
650
                        nodeName := volumeAttachment.Spec.NodeName
2✔
651
                        azVolumeAttachmentName := azureutils.GetAzVolumeAttachmentName(diskName, nodeName)
2✔
652
                        r.azVolumeAttachmentToVaMap.Store(azVolumeAttachmentName, volumeAttachment.Name)
2✔
653

2✔
654
                        desiredAzVolumeAttachment := &azdiskv1beta2.AzVolumeAttachment{
2✔
655
                                ObjectMeta: metav1.ObjectMeta{
2✔
656
                                        Name: azVolumeAttachmentName,
2✔
657
                                        Labels: map[string]string{
2✔
658
                                                consts.NodeNameLabel:   nodeName,
2✔
659
                                                consts.VolumeNameLabel: volumeName,
2✔
660
                                        },
2✔
661
                                        // if the volumeAttachment shows not yet attached, and VolumeAttachRequestAnnotation needs to be set from the controllerserver
2✔
662
                                        // if the volumeAttachment shows attached but the actual volume isn't due to controller restart, VolumeAttachRequestAnnotation needs to be set by the noderserver during NodeStageVolume
2✔
663
                                        Finalizers: []string{consts.AzVolumeAttachmentFinalizer},
2✔
664
                                },
2✔
665
                                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
2✔
666
                                        VolumeName:    volumeName,
2✔
667
                                        VolumeID:      pv.Spec.CSI.VolumeHandle,
2✔
668
                                        NodeName:      nodeName,
2✔
669
                                        RequestedRole: azdiskv1beta2.PrimaryRole,
2✔
670
                                        VolumeContext: pv.Spec.CSI.VolumeAttributes,
2✔
671
                                },
2✔
672
                        }
2✔
673
                        azureutils.AnnotateAPIVersion(desiredAzVolumeAttachment)
2✔
674

2✔
675
                        statusUpdateRequired := true
2✔
676
                        // check if the CRI exists already
2✔
677
                        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, azVolumeAttachmentName, r.config.ObjectNamespace, false)
2✔
678
                        if err != nil {
3✔
679
                                if apiErrors.IsNotFound(err) {
2✔
680
                                        w.Logger().Infof("Recreating AzVolumeAttachment(%s)", azVolumeAttachmentName)
1✔
681

1✔
682
                                        azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Create(ctx, desiredAzVolumeAttachment, metav1.CreateOptions{})
1✔
683
                                        if err != nil {
1✔
684
                                                w.Logger().Errorf(err, "failed to create AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
685
                                                return syncedVolumeAttachments, volumesToSync, err
×
686
                                        }
×
687
                                } else {
×
688
                                        w.Logger().Errorf(err, "failed to get AzVolumeAttachment (%s): %v", azVolumeAttachmentName, err)
×
689
                                        return syncedVolumeAttachments, volumesToSync, err
×
690
                                }
×
691
                        } else if apiVersion, ok := azureutils.GetFromMap(azVolumeAttachment.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
2✔
692
                                w.Logger().Infof("Found AzVolumeAttachment (%s) with older api version. Converting to apiVersion(%s)", azVolumeAttachmentName, azdiskv1beta2.APIVersion)
1✔
693

1✔
694
                                for k, v := range desiredAzVolumeAttachment.Labels {
3✔
695
                                        azVolumeAttachment.Labels = azureutils.AddToMap(azVolumeAttachment.Labels, k, v)
2✔
696
                                }
2✔
697

698
                                for k, v := range azVolumeAttachment.Annotations {
2✔
699
                                        azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
700
                                }
1✔
701

702
                                // for now, we don't empty the meta annotatinos after migrating them to status annotation for safety.
703
                                // note that this will leave some remnant garbage entries in meta annotations
704

705
                                for k, v := range desiredAzVolumeAttachment.Annotations {
2✔
706
                                        azVolumeAttachment.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
707
                                }
1✔
708

709
                                azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Update(ctx, azVolumeAttachment, metav1.UpdateOptions{})
1✔
710
                                if err != nil {
1✔
711
                                        w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
712
                                        return syncedVolumeAttachments, volumesToSync, err
×
713
                                }
×
714
                        } else {
×
715
                                statusUpdateRequired = false
×
716
                        }
×
717

718
                        if statusUpdateRequired {
4✔
719
                                azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, consts.VolumeAttachmentKey, volumeAttachment.Name)
2✔
720

2✔
721
                                // update status
2✔
722
                                _, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).UpdateStatus(ctx, azVolumeAttachment, metav1.UpdateOptions{})
2✔
723
                                if err != nil {
2✔
724
                                        w.Logger().Errorf(err, "failed to update status of AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
725
                                        return syncedVolumeAttachments, volumesToSync, err
×
726
                                }
×
727
                        }
728

729
                        syncedVolumeAttachments[volumeAttachment.Name] = true
2✔
730
                }
731
        }
732
        return syncedVolumeAttachments, volumesToSync, nil
3✔
733
}
734

735
func (r *ReconcileAttachDetach) recoverAzVolumeAttachment(ctx context.Context, recoveredAzVolumeAttachments *sync.Map, recoveryID string) error {
3✔
736
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
737

3✔
738
        // list all AzVolumeAttachment
3✔
739
        azVolumeAttachments, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).List(ctx, metav1.ListOptions{})
3✔
740
        if err != nil {
3✔
741
                w.Logger().Error(err, "failed to get list of existing AzVolumeAttachment CRI in controller recovery stage")
×
742
                return err
×
743
        }
×
744

745
        var wg sync.WaitGroup
3✔
746
        numRecovered := int32(0)
3✔
747

3✔
748
        for _, azVolumeAttachment := range azVolumeAttachments.Items {
7✔
749
                // skip if AzVolumeAttachment already recovered
4✔
750
                if _, ok := recoveredAzVolumeAttachments.Load(azVolumeAttachment.Name); ok {
4✔
751
                        numRecovered++
×
752
                        continue
×
753
                }
754

755
                wg.Add(1)
4✔
756
                go func(azv azdiskv1beta2.AzVolumeAttachment, azvMap *sync.Map) {
8✔
757
                        defer wg.Done()
4✔
758
                        var targetState azdiskv1beta2.AzVolumeAttachmentAttachmentState
4✔
759
                        updateMode := azureutils.UpdateCRIStatus
4✔
760
                        if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
761
                                updateMode = azureutils.UpdateAll
×
762
                        }
×
763
                        updateFunc := func(obj client.Object) error {
8✔
764
                                var err error
4✔
765
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
4✔
766
                                if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
767
                                        // conversion logic from v1beta1 to v1beta2 for replicas come here
×
768
                                        azv.Status.Annotations = azv.ObjectMeta.Annotations
×
769
                                        azv.ObjectMeta.Annotations = map[string]string{consts.VolumeAttachRequestAnnotation: "CRI recovery"}
×
770
                                }
×
771
                                // add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state
772
                                azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.RecoverAnnotation, recoveryID)
4✔
773
                                if azv.Status.State != targetState {
6✔
774
                                        _, err = updateState(azv, targetState, forceUpdate)
2✔
775
                                }
2✔
776
                                return err
4✔
777
                        }
778
                        switch azv.Status.State {
4✔
779
                        case azdiskv1beta2.Attaching:
1✔
780
                                // reset state to Pending so Attach operation can be redone
1✔
781
                                targetState = azdiskv1beta2.AttachmentPending
1✔
782
                                updateFunc = azureutils.AppendToUpdateCRIFunc(updateFunc, func(obj client.Object) error {
2✔
783
                                        azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
784
                                        azv.Status.Detail = nil
1✔
785
                                        azv.Status.Error = nil
1✔
786
                                        return nil
1✔
787
                                })
1✔
788
                        case azdiskv1beta2.Detaching:
1✔
789
                                // reset state to Attached so Detach operation can be redone
1✔
790
                                targetState = azdiskv1beta2.Attached
1✔
791
                        default:
2✔
792
                                targetState = azv.Status.State
2✔
793
                        }
794

795
                        if _, err := azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, &azv, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode); err != nil {
4✔
796
                                w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for recovery", azv.Name)
×
797
                        } else {
4✔
798
                                // if update succeeded, add the CRI to the recoveryComplete list
4✔
799
                                azvMap.Store(azv.Name, struct{}{})
4✔
800
                                atomic.AddInt32(&numRecovered, 1)
4✔
801
                        }
4✔
802
                }(azVolumeAttachment, recoveredAzVolumeAttachments)
803
        }
804
        wg.Wait()
3✔
805

3✔
806
        // if recovery has not been completed for all CRIs, return error
3✔
807
        if numRecovered < int32(len(azVolumeAttachments.Items)) {
3✔
808
                return status.Errorf(codes.Internal, "failed to recover some AzVolumeAttachment states")
×
809
        }
×
810
        return nil
3✔
811
}
812

813
func NewAttachDetachController(mgr manager.Manager, cloudDiskAttacher CloudDiskAttachDetacher, crdDetacher CrdDetacher, controllerSharedState *SharedState) (*ReconcileAttachDetach, error) {
×
814
        logger := mgr.GetLogger().WithValues("controller", "azvolumeattachment")
×
815
        reconciler := ReconcileAttachDetach{
×
816
                crdDetacher:       crdDetacher,
×
817
                cloudDiskAttacher: cloudDiskAttacher,
×
818
                stateLock:         &sync.Map{},
×
819
                retryInfo:         newRetryInfo(),
×
820
                SharedState:       controllerSharedState,
×
821
                logger:            logger,
×
822
        }
×
823

×
824
        c, err := controller.New("azvolumeattachment-controller", mgr, controller.Options{
×
825
                MaxConcurrentReconciles: controllerSharedState.config.ControllerConfig.WorkerThreads,
×
826
                Reconciler:              &reconciler,
×
827
                LogConstructor:          func(req *reconcile.Request) logr.Logger { return logger },
×
828
        })
829

830
        if err != nil {
×
831
                c.GetLogger().Error(err, "failed to create controller")
×
832
                return nil, err
×
833
        }
×
834

835
        c.GetLogger().Info("Starting to watch AzVolumeAttachments.")
×
836

×
837
        // Watch for CRUD events on azVolumeAttachment objects
×
838
        err = c.Watch(&source.Kind{Type: &azdiskv1beta2.AzVolumeAttachment{}}, &handler.EnqueueRequestForObject{})
×
839
        if err != nil {
×
840
                c.GetLogger().Error(err, "failed to initialize watch for AzVolumeAttachment CRI")
×
841
                return nil, err
×
842
        }
×
843

844
        c.GetLogger().Info("Controller set-up successful.")
×
845
        return &reconciler, nil
×
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc