• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 5733296808

02 Aug 2023 12:26AM UTC coverage: 68.743% (+24.2%) from 44.538%
5733296808

Pull #1915

github

edreed
fix: fix error handling of attach status updates
Pull Request #1915: [V2] fix: fix error handling of attach status updates

28 of 28 new or added lines in 2 files covered. (100.0%)

7260 of 10561 relevant lines covered (68.74%)

7.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.93
/pkg/controller/attach_detach.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "strings"
23
        "sync"
24
        "sync/atomic"
25

26
        "github.com/go-logr/logr"
27
        "google.golang.org/grpc/codes"
28
        "google.golang.org/grpc/status"
29
        v1 "k8s.io/api/core/v1"
30
        storagev1 "k8s.io/api/storage/v1"
31
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/types"
34
        utilfeature "k8s.io/apiserver/pkg/util/feature"
35
        "k8s.io/kubernetes/pkg/features"
36
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
37
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
38
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
39
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
40
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
41
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
42

43
        volerr "k8s.io/cloud-provider/volume/errors"
44
        "sigs.k8s.io/cloud-provider-azure/pkg/retry"
45
        "sigs.k8s.io/controller-runtime/pkg/client"
46
        "sigs.k8s.io/controller-runtime/pkg/controller"
47
        "sigs.k8s.io/controller-runtime/pkg/handler"
48
        "sigs.k8s.io/controller-runtime/pkg/manager"
49
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
50
        "sigs.k8s.io/controller-runtime/pkg/source"
51
)
52

53
type CloudDiskAttachDetacher interface {
54
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
55
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
56
}
57

58
type CrdDetacher interface {
59
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string, secrets map[string]string, mode consts.UnpublishMode) error
60
        WaitForDetach(ctx context.Context, volumeID, nodeID string) error
61
}
62

63
/*
64
Attach Detach controller is responsible for
65
 1. attaching volume to a specified node upon creation of AzVolumeAttachment CRI
66
 2. promoting AzVolumeAttachment to primary upon spec update
67
 3. detaching volume upon deletions marked with certain annotations
68
*/
69
type ReconcileAttachDetach struct {
70
        *SharedState
71
        logger            logr.Logger
72
        crdDetacher       CrdDetacher
73
        cloudDiskAttacher CloudDiskAttachDetacher
74
        stateLock         *sync.Map
75
        retryInfo         *retryInfo
76
}
77

78
var _ reconcile.Reconciler = &ReconcileAttachDetach{}
79

80
var allowedTargetAttachmentStates = map[string][]string{
81
        "":                                       {string(azdiskv1beta2.AttachmentPending), string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
82
        string(azdiskv1beta2.AttachmentPending):  {string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
83
        string(azdiskv1beta2.Attaching):          {string(azdiskv1beta2.Attached), string(azdiskv1beta2.AttachmentFailed)},
84
        string(azdiskv1beta2.Detaching):          {string(azdiskv1beta2.Detached), string(azdiskv1beta2.DetachmentFailed)},
85
        string(azdiskv1beta2.Attached):           {string(azdiskv1beta2.Detaching)},
86
        string(azdiskv1beta2.Detached):           {},
87
        string(azdiskv1beta2.AttachmentFailed):   {string(azdiskv1beta2.Detaching)},
88
        string(azdiskv1beta2.DetachmentFailed):   {string(azdiskv1beta2.ForceDetachPending)},
89
        string(azdiskv1beta2.ForceDetachPending): {string(azdiskv1beta2.Detaching)},
90
}
91

92
func (r *ReconcileAttachDetach) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
6✔
93
        if !r.isRecoveryComplete() {
6✔
94
                return reconcile.Result{Requeue: true}, nil
×
95
        }
×
96

97
        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, request.Name, request.Namespace, true)
6✔
98
        // if object is not found, it means the object has been deleted. Log the deletion and do not requeue
6✔
99
        if apiErrors.IsNotFound(err) {
6✔
100
                r.azVolumeAttachmentToVaMap.Delete(request.Name)
×
101
                return reconcileReturnOnSuccess(request.Name, r.retryInfo)
×
102
        } else if err != nil {
6✔
103
                azVolumeAttachment.Name = request.Name
×
104
                return reconcileReturnOnError(ctx, azVolumeAttachment, "get", err, r.retryInfo)
×
105
        }
×
106

107
        ctx, _ = workflow.GetWorkflowFromObj(ctx, azVolumeAttachment)
6✔
108

6✔
109
        // if underlying cloud operation already in process, skip until operation is completed
6✔
110
        if isOperationInProcess(azVolumeAttachment) {
6✔
111
                return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
×
112
        }
×
113

114
        // deletion request
115
        if deleteRequested, deleteAfter := objectDeletionRequested(azVolumeAttachment); deleteRequested {
7✔
116
                if deleteAfter > 0 {
1✔
117
                        return reconcileAfter(deleteAfter, request.Name, r.retryInfo)
×
118
                }
×
119
                if err := r.removeFinalizer(ctx, azVolumeAttachment); err != nil {
1✔
120
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "delete", err, r.retryInfo)
×
121
                }
×
122
                // detachment request
123
        } else if volumeDetachRequested(azVolumeAttachment) {
6✔
124
                if err := r.triggerDetach(ctx, azVolumeAttachment); err != nil {
1✔
125
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "detach", err, r.retryInfo)
×
126
                }
×
127
                // attachment request
128
        } else if azVolumeAttachment.Status.Detail == nil {
6✔
129
                if err := r.triggerAttach(ctx, azVolumeAttachment); err != nil {
3✔
130
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "attach", err, r.retryInfo)
1✔
131
                }
1✔
132
                // promotion/demotion request
133
        } else if azVolumeAttachment.Spec.RequestedRole != azVolumeAttachment.Status.Detail.Role {
4✔
134
                switch azVolumeAttachment.Spec.RequestedRole {
2✔
135
                case azdiskv1beta2.PrimaryRole:
1✔
136
                        if err := r.promote(ctx, azVolumeAttachment); err != nil {
1✔
137
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "promote", err, r.retryInfo)
×
138
                        }
×
139
                case azdiskv1beta2.ReplicaRole:
1✔
140
                        if err := r.demote(ctx, azVolumeAttachment); err != nil {
1✔
141
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "demote", err, r.retryInfo)
×
142
                        }
×
143
                }
144
        }
145

146
        return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
5✔
147
}
148

149
func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
150
        var err error
2✔
151
        ctx, w := workflow.New(ctx)
2✔
152
        defer func() { w.Finish(err) }()
4✔
153

154
        // requeue if AzVolumeAttachment's state is being updated by a different worker
155
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
2✔
156
                err = getOperationRequeueError("attach", azVolumeAttachment)
×
157
                return err
×
158
        }
×
159
        defer r.stateLock.Delete(azVolumeAttachment.Name)
2✔
160

2✔
161
        if !volumeAttachRequested(azVolumeAttachment) {
3✔
162
                err = status.Errorf(codes.FailedPrecondition, "attach operation has not yet been requested")
1✔
163
                return err
1✔
164
        }
1✔
165

166
        var azVolume *azdiskv1beta2.AzVolume
1✔
167
        if azVolume, err = azureutils.GetAzVolume(ctx, r.cachedClient, r.azClient, strings.ToLower(azVolumeAttachment.Spec.VolumeName), r.config.ObjectNamespace, true); err != nil {
1✔
168
                if apiErrors.IsNotFound(err) {
×
169
                        w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) not found", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
170
                        err = nil
×
171
                        return nil
×
172
                }
×
173
                return err
×
174
        } else if deleteRequested, _ := objectDeletionRequested(azVolume); deleteRequested {
1✔
175
                w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) scheduled for deletion", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
176
                return nil
×
177
        }
×
178

179
        // update status block
180
        updateFunc := func(obj client.Object) error {
2✔
181
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
182
                // Update state to attaching, Initialize finalizer and add label to the object
1✔
183
                _, derr := updateState(azv, azdiskv1beta2.Attaching, normalUpdate)
1✔
184
                return derr
1✔
185
        }
1✔
186

187
        var updatedObj client.Object
1✔
188
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
189
                return err
×
190
        }
×
191
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
192

1✔
193
        w.Logger().V(5).Info("Attaching volume")
1✔
194
        waitCh := make(chan goSignal)
1✔
195
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
196
        go func() {
2✔
197
                var attachErr error
1✔
198
                _, goWorkflow := workflow.New(ctx)
1✔
199
                defer func() { goWorkflow.Finish(attachErr) }()
2✔
200
                waitCh <- goSignal{}
1✔
201

1✔
202
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
203
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
204
                defer cloudCancel()
1✔
205

1✔
206
                // attempt to attach the disk to a node
1✔
207
                var publishCtx map[string]string
1✔
208
                var pods []v1.Pod
1✔
209
                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
1✔
210
                        var err error
×
211
                        pods, err = r.getPodsFromVolume(goCtx, r.cachedClient, azVolumeAttachment.Spec.VolumeName)
×
212
                        if err != nil {
×
213
                                goWorkflow.Logger().Error(err, "failed to list pods for volume")
×
214
                        }
×
215
                }
216

217
                var handleSuccess func(bool)
1✔
218
                var handleError func()
1✔
219

1✔
220
                attachAndUpdate := func() {
2✔
221
                        attachResult := r.attachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName, azVolumeAttachment.Spec.VolumeContext)
1✔
222
                        if publishCtx = attachResult.PublishContext(); publishCtx != nil {
2✔
223
                                handleSuccess(false)
1✔
224
                        }
1✔
225
                        var ok bool
1✔
226
                        if attachErr, ok = <-attachResult.ResultChannel(); !ok || attachErr != nil {
1✔
227
                                handleError()
×
228
                        } else {
1✔
229
                                handleSuccess(true)
1✔
230
                        }
1✔
231
                }
232

233
                handleError = func() {
1✔
234
                        if len(pods) > 0 {
×
235
                                for _, pod := range pods {
×
236
                                        r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, "Replica mount for volume %s failed to be attached to node %s with error: %v", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName, attachErr)
×
237
                                }
×
238
                        }
239

240
                        // if the disk is attached to a different node
241
                        if danglingAttachErr, ok := attachErr.(*volerr.DanglingAttachError); ok {
×
242
                                // get disk, current node and attachment name
×
243
                                currentNodeName := string(danglingAttachErr.CurrentNode)
×
244
                                currentAttachmentName := azureutils.GetAzVolumeAttachmentName(azVolumeAttachment.Spec.VolumeName, currentNodeName)
×
245
                                goWorkflow.Logger().Infof("Dangling attach detected for %s", currentNodeName)
×
246

×
247
                                // check if AzVolumeAttachment exists for the existing attachment
×
248
                                _, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Get(cloudCtx, currentAttachmentName, metav1.GetOptions{})
×
249
                                var detachErr error
×
250
                                if apiErrors.IsNotFound(err) {
×
251
                                        // AzVolumeAttachment doesn't exist so we only need to detach disk from cloud
×
252
                                        detachErr = r.cloudDiskAttacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
253
                                        if detachErr != nil {
×
254
                                                goWorkflow.Logger().Errorf(detachErr, "failed to detach dangling volume (%s) from node (%s). Error: %v", azVolumeAttachment.Spec.VolumeID, currentNodeName, err)
×
255
                                        }
×
256
                                } else {
×
257
                                        // AzVolumeAttachment exist so we need to detach disk through crdProvisioner
×
258
                                        detachErr = r.crdDetacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName, make(map[string]string), consts.Detach)
×
259
                                        if detachErr != nil {
×
260
                                                goWorkflow.Logger().Errorf(detachErr, "failed to make a unpublish request dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
261
                                        }
×
262
                                        detachErr = r.crdDetacher.WaitForDetach(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
263
                                        if detachErr != nil {
×
264
                                                goWorkflow.Logger().Errorf(detachErr, "failed to unpublish dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
265
                                        }
×
266
                                }
267
                                // attempt to attach the disk to a node after detach
268
                                if detachErr == nil {
×
269
                                        attachAndUpdate()
×
270
                                        return
×
271
                                }
×
272
                        }
273

274
                        updateFunc := func(obj client.Object) error {
×
275
                                azva := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
276
                                // add retriable annotation if the replica attach error is PartialUpdateError or timeout
×
277
                                if azva.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
×
278
                                        if _, ok := attachErr.(*retry.PartialUpdateError); ok || errors.Is(err, context.DeadlineExceeded) {
×
279
                                                azva.Status.Annotations = azureutils.AddToMap(azva.Status.Annotations, consts.ReplicaVolumeAttachRetryAnnotation, "true")
×
280
                                        }
×
281
                                }
282
                                _, uerr := reportError(azva, azdiskv1beta2.AttachmentFailed, attachErr)
×
283
                                return uerr
×
284
                        }
285

286
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
287
                        if _, err := azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
×
288
                                // There's nothing much we can do in this case, so just log the error and move on.
×
289
                                goWorkflow.Logger().Errorf(err, "failed to update CRI with final attach error status")
×
290
                        }
×
291
                }
292

293
                handleSuccess = func(asyncComplete bool) {
3✔
294
                        // Publish event to indicate attachment success
2✔
295
                        if asyncComplete {
3✔
296
                                if len(pods) > 0 {
1✔
297
                                        for _, pod := range pods {
×
298
                                                r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeNormal, consts.ReplicaAttachmentSuccessEvent, "Replica mount for volume %s successfully attached to node %s", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName)
×
299
                                        }
×
300
                                }
301
                                // the node's remaining capacity of disk attachment should be decreased by 1, since the disk attachment is succeeded.
302
                                r.decrementNodeCapacity(ctx, azVolumeAttachment.Spec.NodeName)
1✔
303
                        }
304

305
                        updateFunc := func(obj client.Object) error {
4✔
306
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
2✔
307
                                azv = updateStatusDetail(azv, publishCtx)
2✔
308
                                var uerr error
2✔
309
                                if asyncComplete {
3✔
310
                                        _, uerr = updateState(azv, azdiskv1beta2.Attached, forceUpdate)
1✔
311
                                }
1✔
312
                                return uerr
2✔
313
                        }
314

315
                        if asyncComplete && azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
3✔
316
                                _ = r.updateVolumeAttachmentWithResult(goCtx, azVolumeAttachment)
1✔
317
                        }
1✔
318

319
                        if updatedObj, err := azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
2✔
320
                                azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
×
321
                        } else {
2✔
322
                                // There's nothing much we can do in this case, so just log the error and move on.
2✔
323
                                phase := "intermediate"
2✔
324
                                if asyncComplete {
3✔
325
                                        phase = "final"
1✔
326
                                }
1✔
327
                                goWorkflow.Logger().Errorf(err, "failed to update CRI with %s attach success status", phase)
2✔
328
                        }
329
                }
330

331
                attachAndUpdate()
1✔
332
        }()
333
        <-waitCh
1✔
334

1✔
335
        return nil
1✔
336
}
337

338
func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
339
        var err error
1✔
340
        ctx, w := workflow.New(ctx)
1✔
341
        defer func() { w.Finish(err) }()
2✔
342

343
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
1✔
344
                return getOperationRequeueError("detach", azVolumeAttachment)
×
345
        }
×
346
        defer r.stateLock.Delete(azVolumeAttachment.Name)
1✔
347

1✔
348
        updateFunc := func(obj client.Object) error {
2✔
349
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
350
                // Update state to detaching
1✔
351
                _, derr := updateState(azv, azdiskv1beta2.Detaching, normalUpdate)
1✔
352
                return derr
1✔
353
        }
1✔
354

355
        var updatedObj client.Object
1✔
356
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
357
                return err
×
358
        }
×
359
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
360

1✔
361
        w.Logger().V(5).Info("Detaching volume")
1✔
362
        waitCh := make(chan goSignal)
1✔
363
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
364
        go func() {
2✔
365
                var detachErr error
1✔
366
                _, goWorkflow := workflow.New(ctx)
1✔
367
                defer func() { goWorkflow.Finish(detachErr) }()
2✔
368
                waitCh <- goSignal{}
1✔
369

1✔
370
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
371
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
372
                defer cloudCancel()
1✔
373

1✔
374
                var updateFunc func(obj client.Object) error
1✔
375
                detachErr = r.detachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName)
1✔
376
                if detachErr != nil {
1✔
377
                        updateFunc = func(obj client.Object) error {
×
378
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
379
                                _, derr := reportError(azv, azdiskv1beta2.DetachmentFailed, detachErr)
×
380
                                return derr
×
381
                        }
×
382
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
383
                        if _, uerr := azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus); uerr != nil {
×
384
                                w.Logger().Errorf(uerr, "failed to update final status of AzVolumeAttachement")
×
385
                        }
×
386
                } else {
1✔
387
                        // detach of azVolumeAttachment is succeeded, the node's remaining capacity of disk attachment should be increased by 1
1✔
388
                        r.incrementNodeCapacity(ctx, azVolumeAttachment.Spec.NodeName)
1✔
389

1✔
390
                        //nolint:contextcheck // delete of the CRI must occur even when the current context's deadline passes.
1✔
391
                        if derr := r.cachedClient.Delete(goCtx, azVolumeAttachment); derr != nil {
1✔
392
                                w.Logger().Error(derr, "failed to delete AzVolumeAttachment")
×
393
                        }
×
394
                }
395
        }()
396
        <-waitCh
1✔
397
        return nil
1✔
398
}
399

400
func (r *ReconcileAttachDetach) removeFinalizer(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
401
        var err error
1✔
402
        ctx, w := workflow.New(ctx)
1✔
403
        defer func() { w.Finish(err) }()
2✔
404

405
        updateFunc := func(obj client.Object) error {
2✔
406
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
407
                // delete finalizer
1✔
408
                _ = r.deleteFinalizer(azv)
1✔
409
                return nil
1✔
410
        }
1✔
411

412
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI)
1✔
413
        return err
1✔
414
}
415

416
func (r *ReconcileAttachDetach) promote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
417
        var err error
1✔
418
        ctx, w := workflow.New(ctx)
1✔
419
        defer func() { w.Finish(err) }()
2✔
420

421
        w.Logger().Infof("Promoting AzVolumeAttachment")
1✔
422
        if err = r.updateVolumeAttachmentWithResult(ctx, azVolumeAttachment); err != nil {
1✔
423
                return err
×
424
        }
×
425
        // initialize metadata and update status block
426
        updateFunc := func(obj client.Object) error {
2✔
427
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
428
                _ = updateRole(azv, azdiskv1beta2.PrimaryRole)
1✔
429
                return nil
1✔
430
        }
1✔
431
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
432
                return err
×
433
        }
×
434
        return nil
1✔
435
}
436

437
func (r *ReconcileAttachDetach) demote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
438
        var err error
1✔
439
        ctx, w := workflow.New(ctx)
1✔
440
        defer func() { w.Finish(err) }()
2✔
441

442
        w.Logger().V(5).Info("Demoting AzVolumeAttachment")
1✔
443
        // initialize metadata and update status block
1✔
444
        updateFunc := func(obj client.Object) error {
2✔
445
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
446
                delete(azv.Status.Annotations, consts.VolumeAttachmentKey)
1✔
447
                _ = updateRole(azv, azdiskv1beta2.ReplicaRole)
1✔
448
                return nil
1✔
449
        }
1✔
450

451
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
452
                return err
×
453
        }
×
454
        return nil
1✔
455
}
456

457
func (r *ReconcileAttachDetach) updateVolumeAttachmentWithResult(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
458
        ctx, w := workflow.New(ctx)
2✔
459
        var vaName string
2✔
460
        vaName, err := r.waitForVolumeAttachmentName(ctx, azVolumeAttachment)
2✔
461
        if err != nil {
2✔
462
                return err
×
463
        }
×
464

465
        vaUpdateFunc := func(obj client.Object) error {
4✔
466
                va := obj.(*storagev1.VolumeAttachment)
2✔
467
                if azVolumeAttachment.Status.Detail != nil {
3✔
468
                        for key, value := range azVolumeAttachment.Status.Detail.PublishContext {
1✔
469
                                va.Status.AttachmentMetadata = azureutils.AddToMap(va.Status.AttachmentMetadata, key, value)
×
470
                        }
×
471
                }
472
                return nil
2✔
473
        }
474

475
        originalVA := &storagev1.VolumeAttachment{}
2✔
476
        if err = r.cachedClient.Get(ctx, types.NamespacedName{Namespace: azVolumeAttachment.Namespace, Name: vaName}, originalVA); err != nil {
2✔
477
                w.Logger().Errorf(err, "failed to get original VolumeAttachment (%s)", vaName)
×
478
                return err
×
479
        }
×
480
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, originalVA, vaUpdateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
2✔
481
        return err
2✔
482
}
483

484
func (r *ReconcileAttachDetach) deleteFinalizer(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) *azdiskv1beta2.AzVolumeAttachment {
1✔
485
        if azVolumeAttachment == nil {
1✔
486
                return nil
×
487
        }
×
488

489
        if azVolumeAttachment.ObjectMeta.Finalizers == nil {
1✔
490
                return azVolumeAttachment
×
491
        }
×
492

493
        finalizers := []string{}
1✔
494
        for _, finalizer := range azVolumeAttachment.ObjectMeta.Finalizers {
2✔
495
                if finalizer == consts.AzVolumeAttachmentFinalizer {
2✔
496
                        continue
1✔
497
                }
498
                finalizers = append(finalizers, finalizer)
×
499
        }
500
        azVolumeAttachment.ObjectMeta.Finalizers = finalizers
1✔
501
        return azVolumeAttachment
1✔
502
}
503

504
func (r *ReconcileAttachDetach) attachVolume(ctx context.Context, volumeID, node string, volumeContext map[string]string) provisioner.CloudAttachResult {
1✔
505
        return r.cloudDiskAttacher.PublishVolume(ctx, volumeID, node, volumeContext)
1✔
506
}
1✔
507

508
func (r *ReconcileAttachDetach) detachVolume(ctx context.Context, volumeID, node string) error {
1✔
509
        return r.cloudDiskAttacher.UnpublishVolume(ctx, volumeID, node)
1✔
510
}
1✔
511

512
func (r *ReconcileAttachDetach) Recover(ctx context.Context, recoveryID string) error {
3✔
513
        var err error
3✔
514
        ctx, w := workflow.New(ctx)
3✔
515
        defer func() { w.Finish(err) }()
6✔
516

517
        w.Logger().V(5).Info("Recovering AzVolumeAttachment CRIs...")
3✔
518
        // try to recreate missing AzVolumeAttachment CRI
3✔
519
        var syncedVolumeAttachments, volumesToSync map[string]bool
3✔
520

3✔
521
        for i := 0; i < maxRetry; i++ {
6✔
522
                if syncedVolumeAttachments, volumesToSync, err = r.recreateAzVolumeAttachment(ctx, syncedVolumeAttachments, volumesToSync); err == nil {
6✔
523
                        break
3✔
524
                }
525
                w.Logger().Error(err, "failed to recreate missing AzVolumeAttachment CRI")
×
526
        }
527
        // retrigger any aborted operation from possible previous controller crash
528
        recovered := &sync.Map{}
3✔
529
        for i := 0; i < maxRetry; i++ {
6✔
530
                if err = r.recoverAzVolumeAttachment(ctx, recovered, recoveryID); err == nil {
6✔
531
                        break
3✔
532
                }
533
                w.Logger().Error(err, "failed to recover AzVolumeAttachment state")
×
534
        }
535

536
        return err
3✔
537
}
538

539
func updateRole(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, role azdiskv1beta2.Role) *azdiskv1beta2.AzVolumeAttachment {
5✔
540
        if azVolumeAttachment == nil {
5✔
541
                return nil
×
542
        }
×
543

544
        if azVolumeAttachment.Status.Detail == nil {
5✔
545
                return azVolumeAttachment
×
546
        }
×
547

548
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
5✔
549
        azVolumeAttachment.Status.Detail.Role = role
5✔
550

5✔
551
        return azVolumeAttachment
5✔
552
}
553

554
func updateStatusDetail(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, status map[string]string) *azdiskv1beta2.AzVolumeAttachment {
2✔
555
        if azVolumeAttachment == nil {
2✔
556
                return nil
×
557
        }
×
558

559
        if azVolumeAttachment.Status.Detail == nil {
4✔
560
                azVolumeAttachment.Status.Detail = &azdiskv1beta2.AzVolumeAttachmentStatusDetail{}
2✔
561
        }
2✔
562

563
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
2✔
564
        azVolumeAttachment.Status.Detail.Role = azVolumeAttachment.Spec.RequestedRole
2✔
565
        azVolumeAttachment.Status.Detail.PublishContext = status
2✔
566

2✔
567
        return azVolumeAttachment
2✔
568
}
569

570
func updateError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, err error) *azdiskv1beta2.AzVolumeAttachment {
×
571
        if azVolumeAttachment == nil {
×
572
                return nil
×
573
        }
×
574

575
        if err != nil {
×
576
                azVolumeAttachment.Status.Error = util.NewAzError(err)
×
577
        }
×
578

579
        return azVolumeAttachment
×
580
}
581

582
func updateState(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, mode updateMode) (*azdiskv1beta2.AzVolumeAttachment, error) {
6✔
583
        var err error
6✔
584
        if azVolumeAttachment == nil {
6✔
585
                return nil, status.Errorf(codes.FailedPrecondition, "function `updateState` requires non-nil AzVolumeAttachment object.")
×
586
        }
×
587
        if mode == normalUpdate {
9✔
588
                if expectedStates, exists := allowedTargetAttachmentStates[string(azVolumeAttachment.Status.State)]; !exists || !containsString(string(state), expectedStates) {
3✔
589
                        err = status.Error(codes.FailedPrecondition, formatUpdateStateError("azVolumeAttachment", string(azVolumeAttachment.Status.State), string(state), expectedStates...))
×
590
                }
×
591
        }
592
        if err == nil {
12✔
593
                azVolumeAttachment.Status.State = state
6✔
594
        }
6✔
595
        return azVolumeAttachment, err
6✔
596
}
597

598
func reportError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, err error) (*azdiskv1beta2.AzVolumeAttachment, error) {
×
599
        if azVolumeAttachment == nil {
×
600
                return nil, status.Errorf(codes.FailedPrecondition, "function `reportError` requires non-nil AzVolumeAttachment object.")
×
601
        }
×
602
        azVolumeAttachment = updateError(azVolumeAttachment, err)
×
603
        return updateState(azVolumeAttachment, state, forceUpdate)
×
604
}
605

606
func (r *ReconcileAttachDetach) recreateAzVolumeAttachment(ctx context.Context, syncedVolumeAttachments map[string]bool, volumesToSync map[string]bool) (map[string]bool, map[string]bool, error) {
3✔
607
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
608
        // Get all volumeAttachments
3✔
609
        volumeAttachments, err := r.kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
3✔
610
        if err != nil {
3✔
611
                return syncedVolumeAttachments, volumesToSync, err
×
612
        }
×
613

614
        if syncedVolumeAttachments == nil {
6✔
615
                syncedVolumeAttachments = map[string]bool{}
3✔
616
        }
3✔
617
        if volumesToSync == nil {
6✔
618
                volumesToSync = map[string]bool{}
3✔
619
        }
3✔
620

621
        // Loop through volumeAttachments and create Primary AzVolumeAttachments in correspondence
622
        for _, volumeAttachment := range volumeAttachments.Items {
5✔
623
                // skip if sync has been completed volumeAttachment
2✔
624
                if syncedVolumeAttachments[volumeAttachment.Name] {
2✔
625
                        continue
×
626
                }
627
                if volumeAttachment.Spec.Attacher == r.config.DriverName {
4✔
628
                        pvName := volumeAttachment.Spec.Source.PersistentVolumeName
2✔
629
                        if pvName == nil {
2✔
630
                                continue
×
631
                        }
632
                        // get PV and retrieve diskName
633
                        pv, err := r.kubeClient.CoreV1().PersistentVolumes().Get(ctx, *pvName, metav1.GetOptions{})
2✔
634
                        if err != nil {
2✔
635
                                w.Logger().Errorf(err, "failed to get PV (%s)", *pvName)
×
636
                                return syncedVolumeAttachments, volumesToSync, err
×
637
                        }
×
638

639
                        // if pv is migrated intree pv, convert it to csi pv for processing
640
                        // translate intree pv to csi pv to convert them into AzVolume resource
641
                        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
2✔
642
                                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
2✔
643
                                pv.Spec.AzureDisk != nil {
2✔
644
                                // translate intree pv to csi pv to convert them into AzVolume resource
×
645
                                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
646
                                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
647
                                        pv.Spec.AzureDisk != nil {
×
648
                                        if pv, err = r.translateInTreePVToCSI(pv); err != nil {
×
649
                                                w.Logger().V(5).Errorf(err, "skipping azVolumeAttachment creation for volumeAttachment (%s)", volumeAttachment.Name)
×
650
                                        }
×
651
                                }
652
                        }
653

654
                        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
2✔
655
                                continue
×
656
                        }
657
                        volumesToSync[pv.Spec.CSI.VolumeHandle] = true
2✔
658

2✔
659
                        diskName, err := azureutils.GetDiskName(pv.Spec.CSI.VolumeHandle)
2✔
660
                        if err != nil {
2✔
661
                                w.Logger().Errorf(err, "failed to extract disk name from volumehandle (%s)", pv.Spec.CSI.VolumeHandle)
×
662
                                delete(volumesToSync, pv.Spec.CSI.VolumeHandle)
×
663
                                continue
×
664
                        }
665
                        volumeName := strings.ToLower(diskName)
2✔
666
                        nodeName := volumeAttachment.Spec.NodeName
2✔
667
                        azVolumeAttachmentName := azureutils.GetAzVolumeAttachmentName(diskName, nodeName)
2✔
668
                        r.azVolumeAttachmentToVaMap.Store(azVolumeAttachmentName, volumeAttachment.Name)
2✔
669

2✔
670
                        desiredAzVolumeAttachment := &azdiskv1beta2.AzVolumeAttachment{
2✔
671
                                ObjectMeta: metav1.ObjectMeta{
2✔
672
                                        Name: azVolumeAttachmentName,
2✔
673
                                        Labels: map[string]string{
2✔
674
                                                consts.NodeNameLabel:   nodeName,
2✔
675
                                                consts.VolumeNameLabel: volumeName,
2✔
676
                                        },
2✔
677
                                        // if the volumeAttachment shows not yet attached, and VolumeAttachRequestAnnotation needs to be set from the controllerserver
2✔
678
                                        // if the volumeAttachment shows attached but the actual volume isn't due to controller restart, VolumeAttachRequestAnnotation needs to be set by the noderserver during NodeStageVolume
2✔
679
                                        Finalizers: []string{consts.AzVolumeAttachmentFinalizer},
2✔
680
                                },
2✔
681
                                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
2✔
682
                                        VolumeName:    volumeName,
2✔
683
                                        VolumeID:      pv.Spec.CSI.VolumeHandle,
2✔
684
                                        NodeName:      nodeName,
2✔
685
                                        RequestedRole: azdiskv1beta2.PrimaryRole,
2✔
686
                                        VolumeContext: pv.Spec.CSI.VolumeAttributes,
2✔
687
                                },
2✔
688
                        }
2✔
689
                        azureutils.AnnotateAPIVersion(desiredAzVolumeAttachment)
2✔
690

2✔
691
                        statusUpdateRequired := true
2✔
692
                        // check if the CRI exists already
2✔
693
                        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, azVolumeAttachmentName, r.config.ObjectNamespace, false)
2✔
694
                        if err != nil {
3✔
695
                                if apiErrors.IsNotFound(err) {
2✔
696
                                        w.Logger().Infof("Recreating AzVolumeAttachment(%s)", azVolumeAttachmentName)
1✔
697

1✔
698
                                        azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Create(ctx, desiredAzVolumeAttachment, metav1.CreateOptions{})
1✔
699
                                        if err != nil {
1✔
700
                                                w.Logger().Errorf(err, "failed to create AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
701
                                                return syncedVolumeAttachments, volumesToSync, err
×
702
                                        }
×
703
                                } else {
×
704
                                        w.Logger().Errorf(err, "failed to get AzVolumeAttachment (%s): %v", azVolumeAttachmentName, err)
×
705
                                        return syncedVolumeAttachments, volumesToSync, err
×
706
                                }
×
707
                        } else if apiVersion, ok := azureutils.GetFromMap(azVolumeAttachment.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
2✔
708
                                w.Logger().Infof("Found AzVolumeAttachment (%s) with older api version. Converting to apiVersion(%s)", azVolumeAttachmentName, azdiskv1beta2.APIVersion)
1✔
709

1✔
710
                                for k, v := range desiredAzVolumeAttachment.Labels {
3✔
711
                                        azVolumeAttachment.Labels = azureutils.AddToMap(azVolumeAttachment.Labels, k, v)
2✔
712
                                }
2✔
713

714
                                for k, v := range azVolumeAttachment.Annotations {
2✔
715
                                        azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
716
                                }
1✔
717

718
                                // for now, we don't empty the meta annotatinos after migrating them to status annotation for safety.
719
                                // note that this will leave some remnant garbage entries in meta annotations
720

721
                                for k, v := range desiredAzVolumeAttachment.Annotations {
2✔
722
                                        azVolumeAttachment.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
723
                                }
1✔
724

725
                                azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Update(ctx, azVolumeAttachment, metav1.UpdateOptions{})
1✔
726
                                if err != nil {
1✔
727
                                        w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
728
                                        return syncedVolumeAttachments, volumesToSync, err
×
729
                                }
×
730
                        } else {
×
731
                                statusUpdateRequired = false
×
732
                        }
×
733

734
                        if statusUpdateRequired {
4✔
735
                                azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, consts.VolumeAttachmentKey, volumeAttachment.Name)
2✔
736

2✔
737
                                // update status
2✔
738
                                _, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).UpdateStatus(ctx, azVolumeAttachment, metav1.UpdateOptions{})
2✔
739
                                if err != nil {
2✔
740
                                        w.Logger().Errorf(err, "failed to update status of AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
741
                                        return syncedVolumeAttachments, volumesToSync, err
×
742
                                }
×
743
                        }
744

745
                        syncedVolumeAttachments[volumeAttachment.Name] = true
2✔
746
                }
747
        }
748
        return syncedVolumeAttachments, volumesToSync, nil
3✔
749
}
750

751
func (r *ReconcileAttachDetach) recoverAzVolumeAttachment(ctx context.Context, recoveredAzVolumeAttachments *sync.Map, recoveryID string) error {
3✔
752
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
753

3✔
754
        // list all AzVolumeAttachment
3✔
755
        azVolumeAttachments, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).List(ctx, metav1.ListOptions{})
3✔
756
        if err != nil {
3✔
757
                w.Logger().Error(err, "failed to get list of existing AzVolumeAttachment CRI in controller recovery stage")
×
758
                return err
×
759
        }
×
760

761
        var wg sync.WaitGroup
3✔
762
        numRecovered := int32(0)
3✔
763

3✔
764
        for _, azVolumeAttachment := range azVolumeAttachments.Items {
7✔
765
                // skip if AzVolumeAttachment already recovered
4✔
766
                if _, ok := recoveredAzVolumeAttachments.Load(azVolumeAttachment.Name); ok {
4✔
767
                        numRecovered++
×
768
                        continue
×
769
                }
770

771
                wg.Add(1)
4✔
772
                go func(azv azdiskv1beta2.AzVolumeAttachment, azvMap *sync.Map) {
8✔
773
                        defer wg.Done()
4✔
774
                        var targetState azdiskv1beta2.AzVolumeAttachmentAttachmentState
4✔
775
                        updateMode := azureutils.UpdateCRIStatus
4✔
776
                        if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
777
                                updateMode = azureutils.UpdateAll
×
778
                        }
×
779
                        updateFunc := func(obj client.Object) error {
8✔
780
                                var err error
4✔
781
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
4✔
782
                                if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
783
                                        // conversion logic from v1beta1 to v1beta2 for replicas come here
×
784
                                        azv.Status.Annotations = azv.ObjectMeta.Annotations
×
785
                                        azv.ObjectMeta.Annotations = map[string]string{consts.VolumeAttachRequestAnnotation: "CRI recovery"}
×
786
                                }
×
787
                                // add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state
788
                                azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.RecoverAnnotation, recoveryID)
4✔
789
                                if azv.Status.State != targetState {
6✔
790
                                        _, err = updateState(azv, targetState, forceUpdate)
2✔
791
                                }
2✔
792
                                return err
4✔
793
                        }
794
                        switch azv.Status.State {
4✔
795
                        case azdiskv1beta2.Attaching:
1✔
796
                                // reset state to Pending so Attach operation can be redone
1✔
797
                                targetState = azdiskv1beta2.AttachmentPending
1✔
798
                                updateFunc = azureutils.AppendToUpdateCRIFunc(updateFunc, func(obj client.Object) error {
2✔
799
                                        azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
800
                                        azv.Status.Detail = nil
1✔
801
                                        azv.Status.Error = nil
1✔
802
                                        return nil
1✔
803
                                })
1✔
804
                        case azdiskv1beta2.Detaching:
1✔
805
                                // reset state to Attached so Detach operation can be redone
1✔
806
                                targetState = azdiskv1beta2.Attached
1✔
807
                        default:
2✔
808
                                targetState = azv.Status.State
2✔
809
                        }
810

811
                        if _, err := azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, &azv, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode); err != nil {
4✔
812
                                w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for recovery", azv.Name)
×
813
                        } else {
4✔
814
                                // if update succeeded, add the CRI to the recoveryComplete list
4✔
815
                                azvMap.Store(azv.Name, struct{}{})
4✔
816
                                atomic.AddInt32(&numRecovered, 1)
4✔
817
                        }
4✔
818
                }(azVolumeAttachment, recoveredAzVolumeAttachments)
819
        }
820
        wg.Wait()
3✔
821

3✔
822
        // if recovery has not been completed for all CRIs, return error
3✔
823
        if numRecovered < int32(len(azVolumeAttachments.Items)) {
3✔
824
                return status.Errorf(codes.Internal, "failed to recover some AzVolumeAttachment states")
×
825
        }
×
826
        return nil
3✔
827
}
828

829
func NewAttachDetachController(mgr manager.Manager, cloudDiskAttacher CloudDiskAttachDetacher, crdDetacher CrdDetacher, controllerSharedState *SharedState) (*ReconcileAttachDetach, error) {
×
830
        logger := mgr.GetLogger().WithValues("controller", "azvolumeattachment")
×
831
        reconciler := ReconcileAttachDetach{
×
832
                crdDetacher:       crdDetacher,
×
833
                cloudDiskAttacher: cloudDiskAttacher,
×
834
                stateLock:         &sync.Map{},
×
835
                retryInfo:         newRetryInfo(),
×
836
                SharedState:       controllerSharedState,
×
837
                logger:            logger,
×
838
        }
×
839

×
840
        c, err := controller.New("azvolumeattachment-controller", mgr, controller.Options{
×
841
                MaxConcurrentReconciles: controllerSharedState.config.ControllerConfig.WorkerThreads,
×
842
                Reconciler:              &reconciler,
×
843
                LogConstructor:          func(req *reconcile.Request) logr.Logger { return logger },
×
844
        })
845

846
        if err != nil {
×
847
                c.GetLogger().Error(err, "failed to create controller")
×
848
                return nil, err
×
849
        }
×
850

851
        c.GetLogger().Info("Starting to watch AzVolumeAttachments.")
×
852

×
853
        // Watch for CRUD events on azVolumeAttachment objects
×
854
        err = c.Watch(&source.Kind{Type: &azdiskv1beta2.AzVolumeAttachment{}}, &handler.EnqueueRequestForObject{})
×
855
        if err != nil {
×
856
                c.GetLogger().Error(err, "failed to initialize watch for AzVolumeAttachment CRI")
×
857
                return nil, err
×
858
        }
×
859

860
        c.GetLogger().Info("Controller set-up successful.")
×
861
        return &reconciler, nil
×
862
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc