• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 4513378976

24 Mar 2023 04:48PM UTC coverage: 69.321% (+22.3%) from 47.059%
4513378976

push

github

GitHub
Merge pull request #1756 from hccheng72/replica-attach-retry-update

7 of 7 new or added lines in 2 files covered. (100.0%)

7174 of 10349 relevant lines covered (69.32%)

6.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.2
/pkg/controller/attach_detach.go
1
/*
2
Copyright 2021 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "strings"
23
        "sync"
24
        "sync/atomic"
25

26
        "github.com/go-logr/logr"
27
        "google.golang.org/grpc/codes"
28
        "google.golang.org/grpc/status"
29
        v1 "k8s.io/api/core/v1"
30
        storagev1 "k8s.io/api/storage/v1"
31
        apiErrors "k8s.io/apimachinery/pkg/api/errors"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/types"
34
        utilfeature "k8s.io/apiserver/pkg/util/feature"
35
        "k8s.io/kubernetes/pkg/features"
36
        azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
37
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
38
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
39
        "sigs.k8s.io/azuredisk-csi-driver/pkg/provisioner"
40
        "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
41
        "sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"
42

43
        volerr "k8s.io/cloud-provider/volume/errors"
44
        "sigs.k8s.io/cloud-provider-azure/pkg/retry"
45
        "sigs.k8s.io/controller-runtime/pkg/client"
46
        "sigs.k8s.io/controller-runtime/pkg/controller"
47
        "sigs.k8s.io/controller-runtime/pkg/handler"
48
        "sigs.k8s.io/controller-runtime/pkg/manager"
49
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
50
        "sigs.k8s.io/controller-runtime/pkg/source"
51
)
52

53
type CloudDiskAttachDetacher interface {
54
        PublishVolume(ctx context.Context, volumeID string, nodeID string, volumeContext map[string]string) provisioner.CloudAttachResult
55
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string) error
56
}
57

58
type CrdDetacher interface {
59
        UnpublishVolume(ctx context.Context, volumeID string, nodeID string, secrets map[string]string, mode consts.UnpublishMode) error
60
        WaitForDetach(ctx context.Context, volumeID, nodeID string) error
61
}
62

63
/*
64
Attach Detach controller is responsible for
65
 1. attaching volume to a specified node upon creation of AzVolumeAttachment CRI
66
 2. promoting AzVolumeAttachment to primary upon spec update
67
 3. detaching volume upon deletions marked with certain annotations
68
*/
69
type ReconcileAttachDetach struct {
70
        *SharedState
71
        logger            logr.Logger
72
        crdDetacher       CrdDetacher
73
        cloudDiskAttacher CloudDiskAttachDetacher
74
        stateLock         *sync.Map
75
        retryInfo         *retryInfo
76
}
77

78
var _ reconcile.Reconciler = &ReconcileAttachDetach{}
79

80
var allowedTargetAttachmentStates = map[string][]string{
81
        "":                                       {string(azdiskv1beta2.AttachmentPending), string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
82
        string(azdiskv1beta2.AttachmentPending):  {string(azdiskv1beta2.Attaching), string(azdiskv1beta2.Detaching)},
83
        string(azdiskv1beta2.Attaching):          {string(azdiskv1beta2.Attached), string(azdiskv1beta2.AttachmentFailed)},
84
        string(azdiskv1beta2.Detaching):          {string(azdiskv1beta2.Detached), string(azdiskv1beta2.DetachmentFailed)},
85
        string(azdiskv1beta2.Attached):           {string(azdiskv1beta2.Detaching)},
86
        string(azdiskv1beta2.Detached):           {},
87
        string(azdiskv1beta2.AttachmentFailed):   {string(azdiskv1beta2.Detaching)},
88
        string(azdiskv1beta2.DetachmentFailed):   {string(azdiskv1beta2.ForceDetachPending)},
89
        string(azdiskv1beta2.ForceDetachPending): {string(azdiskv1beta2.Detaching)},
90
}
91

92
func (r *ReconcileAttachDetach) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
6✔
93
        if !r.isRecoveryComplete() {
6✔
94
                return reconcile.Result{Requeue: true}, nil
×
95
        }
×
96

97
        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, request.Name, request.Namespace, true)
6✔
98
        // if object is not found, it means the object has been deleted. Log the deletion and do not requeue
6✔
99
        if apiErrors.IsNotFound(err) {
6✔
100
                r.azVolumeAttachmentToVaMap.Delete(request.Name)
×
101
                return reconcileReturnOnSuccess(request.Name, r.retryInfo)
×
102
        } else if err != nil {
6✔
103
                azVolumeAttachment.Name = request.Name
×
104
                return reconcileReturnOnError(ctx, azVolumeAttachment, "get", err, r.retryInfo)
×
105
        }
×
106

107
        ctx, _ = workflow.GetWorkflowFromObj(ctx, azVolumeAttachment)
6✔
108

6✔
109
        // if underlying cloud operation already in process, skip until operation is completed
6✔
110
        if isOperationInProcess(azVolumeAttachment) {
6✔
111
                return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
×
112
        }
×
113

114
        // deletion request
115
        if deleteRequested, deleteAfter := objectDeletionRequested(azVolumeAttachment); deleteRequested {
7✔
116
                if deleteAfter > 0 {
1✔
117
                        return reconcileAfter(deleteAfter, request.Name, r.retryInfo)
×
118
                }
×
119
                if err := r.removeFinalizer(ctx, azVolumeAttachment); err != nil {
1✔
120
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "delete", err, r.retryInfo)
×
121
                }
×
122
                // deletion of azVolumeAttachment is succeeded, the node's remaining capacity of disk attachment should be increased by 1
123
                r.incrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
1✔
124
                // detachment request
125
        } else if volumeDetachRequested(azVolumeAttachment) {
6✔
126
                if err := r.triggerDetach(ctx, azVolumeAttachment); err != nil {
1✔
127
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "detach", err, r.retryInfo)
×
128
                }
×
129
                // attachment request
130
        } else if azVolumeAttachment.Status.Detail == nil {
6✔
131
                if err := r.triggerAttach(ctx, azVolumeAttachment); err != nil {
3✔
132
                        return reconcileReturnOnError(ctx, azVolumeAttachment, "attach", err, r.retryInfo)
1✔
133
                }
1✔
134
                // promotion/demotion request
135
        } else if azVolumeAttachment.Spec.RequestedRole != azVolumeAttachment.Status.Detail.Role {
4✔
136
                switch azVolumeAttachment.Spec.RequestedRole {
2✔
137
                case azdiskv1beta2.PrimaryRole:
1✔
138
                        if err := r.promote(ctx, azVolumeAttachment); err != nil {
1✔
139
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "promote", err, r.retryInfo)
×
140
                        }
×
141
                case azdiskv1beta2.ReplicaRole:
1✔
142
                        if err := r.demote(ctx, azVolumeAttachment); err != nil {
1✔
143
                                return reconcileReturnOnError(ctx, azVolumeAttachment, "demote", err, r.retryInfo)
×
144
                        }
×
145
                }
146
        }
147

148
        return reconcileReturnOnSuccess(azVolumeAttachment.Name, r.retryInfo)
5✔
149
}
150

151
func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
152
        var err error
2✔
153
        ctx, w := workflow.New(ctx)
2✔
154
        defer func() { w.Finish(err) }()
4✔
155

156
        // requeue if AzVolumeAttachment's state is being updated by a different worker
157
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
2✔
158
                err = getOperationRequeueError("attach", azVolumeAttachment)
×
159
                return err
×
160
        }
×
161
        defer r.stateLock.Delete(azVolumeAttachment.Name)
2✔
162

2✔
163
        if !volumeAttachRequested(azVolumeAttachment) {
3✔
164
                err = status.Errorf(codes.FailedPrecondition, "attach operation has not yet been requested")
1✔
165
                return err
1✔
166
        }
1✔
167

168
        var azVolume *azdiskv1beta2.AzVolume
1✔
169
        if azVolume, err = azureutils.GetAzVolume(ctx, r.cachedClient, r.azClient, strings.ToLower(azVolumeAttachment.Spec.VolumeName), r.config.ObjectNamespace, true); err != nil {
1✔
170
                if apiErrors.IsNotFound(err) {
×
171
                        w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) not found", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
172
                        err = nil
×
173
                        return nil
×
174
                }
×
175
                return err
×
176
        } else if deleteRequested, _ := objectDeletionRequested(azVolume); deleteRequested {
1✔
177
                w.Logger().V(5).Infof("Aborting attach operation for AzVolumeAttachment (%s): AzVolume (%s) scheduled for deletion", azVolumeAttachment.Name, azVolumeAttachment.Spec.VolumeName)
×
178
                return nil
×
179
        }
×
180

181
        // update status block
182
        updateFunc := func(obj client.Object) error {
2✔
183
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
184
                // Update state to attaching, Initialize finalizer and add label to the object
1✔
185
                _, derr := updateState(azv, azdiskv1beta2.Attaching, normalUpdate)
1✔
186
                return derr
1✔
187
        }
1✔
188

189
        var updatedObj client.Object
1✔
190
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
191
                return err
×
192
        }
×
193
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
194

1✔
195
        w.Logger().V(5).Info("Attaching volume")
1✔
196
        waitCh := make(chan goSignal)
1✔
197
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
198
        go func() {
2✔
199
                var attachErr error
1✔
200
                _, goWorkflow := workflow.New(ctx)
1✔
201
                defer func() { goWorkflow.Finish(attachErr) }()
2✔
202
                waitCh <- goSignal{}
1✔
203

1✔
204
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
205
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
206
                defer cloudCancel()
1✔
207

1✔
208
                // attempt to attach the disk to a node
1✔
209
                var publishCtx map[string]string
1✔
210
                var pods []v1.Pod
1✔
211
                if azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
1✔
212
                        var err error
×
213
                        pods, err = r.getPodsFromVolume(goCtx, r.cachedClient, azVolumeAttachment.Spec.VolumeName)
×
214
                        if err != nil {
×
215
                                goWorkflow.Logger().Error(err, "failed to list pods for volume")
×
216
                        }
×
217
                }
218

219
                var handleSuccess func(bool)
1✔
220
                var handleError func()
1✔
221

1✔
222
                attachAndUpdate := func() {
2✔
223
                        attachResult := r.attachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName, azVolumeAttachment.Spec.VolumeContext)
1✔
224
                        if publishCtx = attachResult.PublishContext(); publishCtx != nil {
2✔
225
                                handleSuccess(false)
1✔
226
                        }
1✔
227
                        var ok bool
1✔
228
                        if attachErr, ok = <-attachResult.ResultChannel(); !ok || attachErr != nil {
1✔
229
                                handleError()
×
230
                        } else {
1✔
231
                                handleSuccess(true)
1✔
232
                        }
1✔
233
                }
234

235
                handleError = func() {
1✔
236
                        if len(pods) > 0 {
×
237
                                for _, pod := range pods {
×
238
                                        r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeWarning, consts.ReplicaAttachmentFailedEvent, "Replica mount for volume %s failed to be attached to node %s with error: %v", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName, attachErr)
×
239
                                }
×
240
                        }
241

242
                        // if the disk is attached to a different node
243
                        if danglingAttachErr, ok := attachErr.(*volerr.DanglingAttachError); ok {
×
244
                                // get disk, current node and attachment name
×
245
                                currentNodeName := string(danglingAttachErr.CurrentNode)
×
246
                                currentAttachmentName := azureutils.GetAzVolumeAttachmentName(azVolumeAttachment.Spec.VolumeName, currentNodeName)
×
247
                                goWorkflow.Logger().Infof("Dangling attach detected for %s", currentNodeName)
×
248

×
249
                                // check if AzVolumeAttachment exists for the existing attachment
×
250
                                _, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Get(cloudCtx, currentAttachmentName, metav1.GetOptions{})
×
251
                                var detachErr error
×
252
                                if apiErrors.IsNotFound(err) {
×
253
                                        // AzVolumeAttachment doesn't exist so we only need to detach disk from cloud
×
254
                                        detachErr = r.cloudDiskAttacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
255
                                        if detachErr != nil {
×
256
                                                goWorkflow.Logger().Errorf(detachErr, "failed to detach dangling volume (%s) from node (%s). Error: %v", azVolumeAttachment.Spec.VolumeID, currentNodeName, err)
×
257
                                        }
×
258
                                } else {
×
259
                                        // AzVolumeAttachment exist so we need to detach disk through crdProvisioner
×
260
                                        detachErr = r.crdDetacher.UnpublishVolume(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName, make(map[string]string), consts.Detach)
×
261
                                        if detachErr != nil {
×
262
                                                goWorkflow.Logger().Errorf(detachErr, "failed to make a unpublish request dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
263
                                        }
×
264
                                        detachErr = r.crdDetacher.WaitForDetach(goCtx, azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
265
                                        if detachErr != nil {
×
266
                                                goWorkflow.Logger().Errorf(detachErr, "failed to unpublish dangling AzVolumeAttachment for volume (%s) and node (%s)", azVolumeAttachment.Spec.VolumeID, currentNodeName)
×
267
                                        }
×
268
                                }
269
                                // attempt to attach the disk to a node after detach
270
                                if detachErr == nil {
×
271
                                        attachAndUpdate()
×
272
                                        return
×
273
                                }
×
274
                        }
275

276
                        updateFunc := func(obj client.Object) error {
×
277
                                azva := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
278
                                // add retriable annotation if the replica attach error is PartialUpdateError or timeout
×
279
                                if azva.Status.Detail.Role == azdiskv1beta2.ReplicaRole {
×
280
                                        if _, ok := attachErr.(*retry.PartialUpdateError); ok || errors.Is(err, context.DeadlineExceeded) {
×
281
                                                azva.Status.Annotations = azureutils.AddToMap(azva.Status.Annotations, consts.ReplicaVolumeAttachRetryAnnotation, "true")
×
282
                                        }
×
283
                                }
284
                                _, uerr := reportError(azva, azdiskv1beta2.AttachmentFailed, attachErr)
×
285
                                return uerr
×
286
                        }
287
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
288
                        _, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
×
289
                }
290
                handleSuccess = func(asyncComplete bool) {
3✔
291
                        // Publish event to indicate attachment success
2✔
292
                        if asyncComplete {
3✔
293
                                if len(pods) > 0 {
1✔
294
                                        for _, pod := range pods {
×
295
                                                r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeNormal, consts.ReplicaAttachmentSuccessEvent, "Replica mount for volume %s successfully attached to node %s", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName)
×
296
                                        }
×
297
                                }
298
                                // the node's remaining capacity of disk attachment should be decreased by 1, since the disk attachment is succeeded.
299
                                r.decrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
1✔
300
                        }
301

302
                        updateFunc := func(obj client.Object) error {
4✔
303
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
2✔
304
                                azv = updateStatusDetail(azv, publishCtx)
2✔
305
                                var uerr error
2✔
306
                                if asyncComplete {
3✔
307
                                        _, uerr = updateState(azv, azdiskv1beta2.Attached, forceUpdate)
1✔
308
                                }
1✔
309
                                return uerr
2✔
310
                        }
311

312
                        if asyncComplete && azVolumeAttachment.Spec.RequestedRole == azdiskv1beta2.PrimaryRole {
3✔
313
                                _ = r.updateVolumeAttachmentWithResult(goCtx, azVolumeAttachment)
1✔
314
                        }
1✔
315
                        updatedObj, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
2✔
316
                        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
2✔
317
                }
318

319
                attachAndUpdate()
1✔
320
        }()
321
        <-waitCh
1✔
322

1✔
323
        return nil
1✔
324
}
325

326
func (r *ReconcileAttachDetach) triggerDetach(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
327
        var err error
1✔
328
        ctx, w := workflow.New(ctx)
1✔
329
        defer func() { w.Finish(err) }()
2✔
330

331
        if _, ok := r.stateLock.LoadOrStore(azVolumeAttachment.Name, nil); ok {
1✔
332
                return getOperationRequeueError("detach", azVolumeAttachment)
×
333
        }
×
334
        defer r.stateLock.Delete(azVolumeAttachment.Name)
1✔
335

1✔
336
        updateFunc := func(obj client.Object) error {
2✔
337
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
338
                // Update state to detaching
1✔
339
                _, derr := updateState(azv, azdiskv1beta2.Detaching, normalUpdate)
1✔
340
                return derr
1✔
341
        }
1✔
342

343
        var updatedObj client.Object
1✔
344
        if updatedObj, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
345
                return err
×
346
        }
×
347
        azVolumeAttachment = updatedObj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
348

1✔
349
        w.Logger().V(5).Info("Detaching volume")
1✔
350
        waitCh := make(chan goSignal)
1✔
351
        //nolint:contextcheck // call is asynchronous; context is not inherited by design
1✔
352
        go func() {
2✔
353
                var detachErr error
1✔
354
                _, goWorkflow := workflow.New(ctx)
1✔
355
                defer func() { goWorkflow.Finish(detachErr) }()
2✔
356
                waitCh <- goSignal{}
1✔
357

1✔
358
                goCtx := goWorkflow.SaveToContext(context.Background())
1✔
359
                cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
1✔
360
                defer cloudCancel()
1✔
361

1✔
362
                var updateFunc func(obj client.Object) error
1✔
363
                detachErr = r.detachVolume(cloudCtx, azVolumeAttachment.Spec.VolumeID, azVolumeAttachment.Spec.NodeName)
1✔
364
                if detachErr != nil {
1✔
365
                        updateFunc = func(obj client.Object) error {
×
366
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
×
367
                                _, derr := reportError(azv, azdiskv1beta2.DetachmentFailed, detachErr)
×
368
                                return derr
×
369
                        }
×
370
                        //nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
371
                        if _, uerr := azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus); uerr != nil {
×
372
                                w.Logger().Errorf(uerr, "failed to update final status of AzVolumeAttachement")
×
373
                        }
×
374
                } else {
1✔
375
                        //nolint:contextcheck // delete of the CRI must occur even when the current context's deadline passes.
1✔
376
                        if derr := r.cachedClient.Delete(goCtx, azVolumeAttachment); derr != nil {
1✔
377
                                w.Logger().Error(derr, "failed to delete AzVolumeAttachment")
×
378
                        }
×
379
                }
380
        }()
381
        <-waitCh
1✔
382
        return nil
1✔
383
}
384

385
func (r *ReconcileAttachDetach) removeFinalizer(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
386
        var err error
1✔
387
        ctx, w := workflow.New(ctx)
1✔
388
        defer func() { w.Finish(err) }()
2✔
389

390
        updateFunc := func(obj client.Object) error {
2✔
391
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
392
                // delete finalizer
1✔
393
                _ = r.deleteFinalizer(azv)
1✔
394
                return nil
1✔
395
        }
1✔
396

397
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI)
1✔
398
        return err
1✔
399
}
400

401
func (r *ReconcileAttachDetach) promote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
402
        var err error
1✔
403
        ctx, w := workflow.New(ctx)
1✔
404
        defer func() { w.Finish(err) }()
2✔
405

406
        w.Logger().Infof("Promoting AzVolumeAttachment")
1✔
407
        if err = r.updateVolumeAttachmentWithResult(ctx, azVolumeAttachment); err != nil {
1✔
408
                return err
×
409
        }
×
410
        // initialize metadata and update status block
411
        updateFunc := func(obj client.Object) error {
2✔
412
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
413
                _ = updateRole(azv, azdiskv1beta2.PrimaryRole)
1✔
414
                return nil
1✔
415
        }
1✔
416
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
417
                return err
×
418
        }
×
419
        return nil
1✔
420
}
421

422
func (r *ReconcileAttachDetach) demote(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
1✔
423
        var err error
1✔
424
        ctx, w := workflow.New(ctx)
1✔
425
        defer func() { w.Finish(err) }()
2✔
426

427
        w.Logger().V(5).Info("Demoting AzVolumeAttachment")
1✔
428
        // initialize metadata and update status block
1✔
429
        updateFunc := func(obj client.Object) error {
2✔
430
                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
431
                delete(azv.Status.Annotations, consts.VolumeAttachmentKey)
1✔
432
                _ = updateRole(azv, azdiskv1beta2.ReplicaRole)
1✔
433
                return nil
1✔
434
        }
1✔
435

436
        if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolumeAttachment, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRIStatus); err != nil {
1✔
437
                return err
×
438
        }
×
439
        return nil
1✔
440
}
441

442
func (r *ReconcileAttachDetach) updateVolumeAttachmentWithResult(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) error {
2✔
443
        ctx, w := workflow.New(ctx)
2✔
444
        var vaName string
2✔
445
        vaName, err := r.waitForVolumeAttachmentName(ctx, azVolumeAttachment)
2✔
446
        if err != nil {
2✔
447
                return err
×
448
        }
×
449

450
        vaUpdateFunc := func(obj client.Object) error {
4✔
451
                va := obj.(*storagev1.VolumeAttachment)
2✔
452
                if azVolumeAttachment.Status.Detail != nil {
4✔
453
                        for key, value := range azVolumeAttachment.Status.Detail.PublishContext {
3✔
454
                                va.Status.AttachmentMetadata = azureutils.AddToMap(va.Status.AttachmentMetadata, key, value)
1✔
455
                        }
1✔
456
                }
457
                return nil
2✔
458
        }
459

460
        originalVA := &storagev1.VolumeAttachment{}
2✔
461
        if err = r.cachedClient.Get(ctx, types.NamespacedName{Namespace: azVolumeAttachment.Namespace, Name: vaName}, originalVA); err != nil {
2✔
462
                w.Logger().Errorf(err, "failed to get original VolumeAttachment (%s)", vaName)
×
463
                return err
×
464
        }
×
465
        _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, originalVA, vaUpdateFunc, consts.ForcedUpdateMaxNetRetry, azureutils.UpdateCRIStatus)
2✔
466
        return err
2✔
467
}
468

469
func (r *ReconcileAttachDetach) deleteFinalizer(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) *azdiskv1beta2.AzVolumeAttachment {
1✔
470
        if azVolumeAttachment == nil {
1✔
471
                return nil
×
472
        }
×
473

474
        if azVolumeAttachment.ObjectMeta.Finalizers == nil {
1✔
475
                return azVolumeAttachment
×
476
        }
×
477

478
        finalizers := []string{}
1✔
479
        for _, finalizer := range azVolumeAttachment.ObjectMeta.Finalizers {
2✔
480
                if finalizer == consts.AzVolumeAttachmentFinalizer {
2✔
481
                        continue
1✔
482
                }
483
                finalizers = append(finalizers, finalizer)
×
484
        }
485
        azVolumeAttachment.ObjectMeta.Finalizers = finalizers
1✔
486
        return azVolumeAttachment
1✔
487
}
488

489
func (r *ReconcileAttachDetach) attachVolume(ctx context.Context, volumeID, node string, volumeContext map[string]string) provisioner.CloudAttachResult {
1✔
490
        return r.cloudDiskAttacher.PublishVolume(ctx, volumeID, node, volumeContext)
1✔
491
}
1✔
492

493
func (r *ReconcileAttachDetach) detachVolume(ctx context.Context, volumeID, node string) error {
1✔
494
        return r.cloudDiskAttacher.UnpublishVolume(ctx, volumeID, node)
1✔
495
}
1✔
496

497
func (r *ReconcileAttachDetach) Recover(ctx context.Context, recoveryID string) error {
3✔
498
        var err error
3✔
499
        ctx, w := workflow.New(ctx)
3✔
500
        defer func() { w.Finish(err) }()
6✔
501

502
        w.Logger().V(5).Info("Recovering AzVolumeAttachment CRIs...")
3✔
503
        // try to recreate missing AzVolumeAttachment CRI
3✔
504
        var syncedVolumeAttachments, volumesToSync map[string]bool
3✔
505

3✔
506
        for i := 0; i < maxRetry; i++ {
6✔
507
                if syncedVolumeAttachments, volumesToSync, err = r.recreateAzVolumeAttachment(ctx, syncedVolumeAttachments, volumesToSync); err == nil {
6✔
508
                        break
3✔
509
                }
510
                w.Logger().Error(err, "failed to recreate missing AzVolumeAttachment CRI")
×
511
        }
512
        // retrigger any aborted operation from possible previous controller crash
513
        recovered := &sync.Map{}
3✔
514
        for i := 0; i < maxRetry; i++ {
6✔
515
                if err = r.recoverAzVolumeAttachment(ctx, recovered, recoveryID); err == nil {
6✔
516
                        break
3✔
517
                }
518
                w.Logger().Error(err, "failed to recover AzVolumeAttachment state")
×
519
        }
520

521
        return err
3✔
522
}
523

524
func updateRole(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, role azdiskv1beta2.Role) *azdiskv1beta2.AzVolumeAttachment {
4✔
525
        if azVolumeAttachment == nil {
4✔
526
                return nil
×
527
        }
×
528

529
        if azVolumeAttachment.Status.Detail == nil {
4✔
530
                return azVolumeAttachment
×
531
        }
×
532

533
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
4✔
534
        azVolumeAttachment.Status.Detail.Role = role
4✔
535

4✔
536
        return azVolumeAttachment
4✔
537
}
538

539
func updateStatusDetail(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, status map[string]string) *azdiskv1beta2.AzVolumeAttachment {
2✔
540
        if azVolumeAttachment == nil {
2✔
541
                return nil
×
542
        }
×
543

544
        if azVolumeAttachment.Status.Detail == nil {
3✔
545
                azVolumeAttachment.Status.Detail = &azdiskv1beta2.AzVolumeAttachmentStatusDetail{}
1✔
546
        }
1✔
547

548
        azVolumeAttachment.Status.Detail.PreviousRole = azVolumeAttachment.Status.Detail.Role
2✔
549
        azVolumeAttachment.Status.Detail.Role = azVolumeAttachment.Spec.RequestedRole
2✔
550
        azVolumeAttachment.Status.Detail.PublishContext = status
2✔
551

2✔
552
        return azVolumeAttachment
2✔
553
}
554

555
func updateError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, err error) *azdiskv1beta2.AzVolumeAttachment {
×
556
        if azVolumeAttachment == nil {
×
557
                return nil
×
558
        }
×
559

560
        if err != nil {
×
561
                azVolumeAttachment.Status.Error = util.NewAzError(err)
×
562
        }
×
563

564
        return azVolumeAttachment
×
565
}
566

567
func updateState(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, mode updateMode) (*azdiskv1beta2.AzVolumeAttachment, error) {
6✔
568
        var err error
6✔
569
        if azVolumeAttachment == nil {
6✔
570
                return nil, status.Errorf(codes.FailedPrecondition, "function `updateState` requires non-nil AzVolumeAttachment object.")
×
571
        }
×
572
        if mode == normalUpdate {
9✔
573
                if expectedStates, exists := allowedTargetAttachmentStates[string(azVolumeAttachment.Status.State)]; !exists || !containsString(string(state), expectedStates) {
3✔
574
                        err = status.Error(codes.FailedPrecondition, formatUpdateStateError("azVolumeAttachment", string(azVolumeAttachment.Status.State), string(state), expectedStates...))
×
575
                }
×
576
        }
577
        if err == nil {
12✔
578
                azVolumeAttachment.Status.State = state
6✔
579
        }
6✔
580
        return azVolumeAttachment, err
6✔
581
}
582

583
func reportError(azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment, state azdiskv1beta2.AzVolumeAttachmentAttachmentState, err error) (*azdiskv1beta2.AzVolumeAttachment, error) {
×
584
        if azVolumeAttachment == nil {
×
585
                return nil, status.Errorf(codes.FailedPrecondition, "function `reportError` requires non-nil AzVolumeAttachment object.")
×
586
        }
×
587
        azVolumeAttachment = updateError(azVolumeAttachment, err)
×
588
        return updateState(azVolumeAttachment, state, forceUpdate)
×
589
}
590

591
func (r *ReconcileAttachDetach) recreateAzVolumeAttachment(ctx context.Context, syncedVolumeAttachments map[string]bool, volumesToSync map[string]bool) (map[string]bool, map[string]bool, error) {
3✔
592
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
593
        // Get all volumeAttachments
3✔
594
        volumeAttachments, err := r.kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
3✔
595
        if err != nil {
3✔
596
                return syncedVolumeAttachments, volumesToSync, err
×
597
        }
×
598

599
        if syncedVolumeAttachments == nil {
6✔
600
                syncedVolumeAttachments = map[string]bool{}
3✔
601
        }
3✔
602
        if volumesToSync == nil {
6✔
603
                volumesToSync = map[string]bool{}
3✔
604
        }
3✔
605

606
        // Loop through volumeAttachments and create Primary AzVolumeAttachments in correspondence
607
        for _, volumeAttachment := range volumeAttachments.Items {
5✔
608
                // skip if sync has been completed volumeAttachment
2✔
609
                if syncedVolumeAttachments[volumeAttachment.Name] {
2✔
610
                        continue
×
611
                }
612
                if volumeAttachment.Spec.Attacher == r.config.DriverName {
4✔
613
                        pvName := volumeAttachment.Spec.Source.PersistentVolumeName
2✔
614
                        if pvName == nil {
2✔
615
                                continue
×
616
                        }
617
                        // get PV and retrieve diskName
618
                        pv, err := r.kubeClient.CoreV1().PersistentVolumes().Get(ctx, *pvName, metav1.GetOptions{})
2✔
619
                        if err != nil {
2✔
620
                                w.Logger().Errorf(err, "failed to get PV (%s)", *pvName)
×
621
                                return syncedVolumeAttachments, volumesToSync, err
×
622
                        }
×
623

624
                        // if pv is migrated intree pv, convert it to csi pv for processing
625
                        // translate intree pv to csi pv to convert them into AzVolume resource
626
                        if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
2✔
627
                                utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
2✔
628
                                pv.Spec.AzureDisk != nil {
2✔
629
                                // translate intree pv to csi pv to convert them into AzVolume resource
×
630
                                if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
×
631
                                        utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
×
632
                                        pv.Spec.AzureDisk != nil {
×
633
                                        if pv, err = r.translateInTreePVToCSI(pv); err != nil {
×
634
                                                w.Logger().V(5).Errorf(err, "skipping azVolumeAttachment creation for volumeAttachment (%s)", volumeAttachment.Name)
×
635
                                        }
×
636
                                }
637
                        }
638

639
                        if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
2✔
640
                                continue
×
641
                        }
642
                        volumesToSync[pv.Spec.CSI.VolumeHandle] = true
2✔
643

2✔
644
                        diskName, err := azureutils.GetDiskName(pv.Spec.CSI.VolumeHandle)
2✔
645
                        if err != nil {
2✔
646
                                w.Logger().Errorf(err, "failed to extract disk name from volumehandle (%s)", pv.Spec.CSI.VolumeHandle)
×
647
                                delete(volumesToSync, pv.Spec.CSI.VolumeHandle)
×
648
                                continue
×
649
                        }
650
                        volumeName := strings.ToLower(diskName)
2✔
651
                        nodeName := volumeAttachment.Spec.NodeName
2✔
652
                        azVolumeAttachmentName := azureutils.GetAzVolumeAttachmentName(diskName, nodeName)
2✔
653
                        r.azVolumeAttachmentToVaMap.Store(azVolumeAttachmentName, volumeAttachment.Name)
2✔
654

2✔
655
                        desiredAzVolumeAttachment := &azdiskv1beta2.AzVolumeAttachment{
2✔
656
                                ObjectMeta: metav1.ObjectMeta{
2✔
657
                                        Name: azVolumeAttachmentName,
2✔
658
                                        Labels: map[string]string{
2✔
659
                                                consts.NodeNameLabel:   nodeName,
2✔
660
                                                consts.VolumeNameLabel: volumeName,
2✔
661
                                        },
2✔
662
                                        // if the volumeAttachment shows not yet attached, and VolumeAttachRequestAnnotation needs to be set from the controllerserver
2✔
663
                                        // if the volumeAttachment shows attached but the actual volume isn't due to controller restart, VolumeAttachRequestAnnotation needs to be set by the noderserver during NodeStageVolume
2✔
664
                                        Finalizers: []string{consts.AzVolumeAttachmentFinalizer},
2✔
665
                                },
2✔
666
                                Spec: azdiskv1beta2.AzVolumeAttachmentSpec{
2✔
667
                                        VolumeName:    volumeName,
2✔
668
                                        VolumeID:      pv.Spec.CSI.VolumeHandle,
2✔
669
                                        NodeName:      nodeName,
2✔
670
                                        RequestedRole: azdiskv1beta2.PrimaryRole,
2✔
671
                                        VolumeContext: pv.Spec.CSI.VolumeAttributes,
2✔
672
                                },
2✔
673
                        }
2✔
674
                        azureutils.AnnotateAPIVersion(desiredAzVolumeAttachment)
2✔
675

2✔
676
                        statusUpdateRequired := true
2✔
677
                        // check if the CRI exists already
2✔
678
                        azVolumeAttachment, err := azureutils.GetAzVolumeAttachment(ctx, r.cachedClient, r.azClient, azVolumeAttachmentName, r.config.ObjectNamespace, false)
2✔
679
                        if err != nil {
3✔
680
                                if apiErrors.IsNotFound(err) {
2✔
681
                                        w.Logger().Infof("Recreating AzVolumeAttachment(%s)", azVolumeAttachmentName)
1✔
682

1✔
683
                                        azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Create(ctx, desiredAzVolumeAttachment, metav1.CreateOptions{})
1✔
684
                                        if err != nil {
1✔
685
                                                w.Logger().Errorf(err, "failed to create AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
686
                                                return syncedVolumeAttachments, volumesToSync, err
×
687
                                        }
×
688
                                } else {
×
689
                                        w.Logger().Errorf(err, "failed to get AzVolumeAttachment (%s): %v", azVolumeAttachmentName, err)
×
690
                                        return syncedVolumeAttachments, volumesToSync, err
×
691
                                }
×
692
                        } else if apiVersion, ok := azureutils.GetFromMap(azVolumeAttachment.Annotations, consts.APIVersion); !ok || apiVersion != azdiskv1beta2.APIVersion {
2✔
693
                                w.Logger().Infof("Found AzVolumeAttachment (%s) with older api version. Converting to apiVersion(%s)", azVolumeAttachmentName, azdiskv1beta2.APIVersion)
1✔
694

1✔
695
                                for k, v := range desiredAzVolumeAttachment.Labels {
3✔
696
                                        azVolumeAttachment.Labels = azureutils.AddToMap(azVolumeAttachment.Labels, k, v)
2✔
697
                                }
2✔
698

699
                                for k, v := range azVolumeAttachment.Annotations {
2✔
700
                                        azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
701
                                }
1✔
702

703
                                // for now, we don't empty the meta annotatinos after migrating them to status annotation for safety.
704
                                // note that this will leave some remnant garbage entries in meta annotations
705

706
                                for k, v := range desiredAzVolumeAttachment.Annotations {
2✔
707
                                        azVolumeAttachment.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, k, v)
1✔
708
                                }
1✔
709

710
                                azVolumeAttachment, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).Update(ctx, azVolumeAttachment, metav1.UpdateOptions{})
1✔
711
                                if err != nil {
1✔
712
                                        w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
713
                                        return syncedVolumeAttachments, volumesToSync, err
×
714
                                }
×
715
                        } else {
×
716
                                statusUpdateRequired = false
×
717
                        }
×
718

719
                        if statusUpdateRequired {
4✔
720
                                azVolumeAttachment.Status.Annotations = azureutils.AddToMap(azVolumeAttachment.Status.Annotations, consts.VolumeAttachmentKey, volumeAttachment.Name)
2✔
721

2✔
722
                                // update status
2✔
723
                                _, err = r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).UpdateStatus(ctx, azVolumeAttachment, metav1.UpdateOptions{})
2✔
724
                                if err != nil {
2✔
725
                                        w.Logger().Errorf(err, "failed to update status of AzVolumeAttachment (%s) for volume (%s) and node (%s): %v", azVolumeAttachmentName, *pvName, nodeName, err)
×
726
                                        return syncedVolumeAttachments, volumesToSync, err
×
727
                                }
×
728
                        }
729

730
                        syncedVolumeAttachments[volumeAttachment.Name] = true
2✔
731
                }
732
        }
733
        return syncedVolumeAttachments, volumesToSync, nil
3✔
734
}
735

736
func (r *ReconcileAttachDetach) recoverAzVolumeAttachment(ctx context.Context, recoveredAzVolumeAttachments *sync.Map, recoveryID string) error {
3✔
737
        w, _ := workflow.GetWorkflowFromContext(ctx)
3✔
738

3✔
739
        // list all AzVolumeAttachment
3✔
740
        azVolumeAttachments, err := r.azClient.DiskV1beta2().AzVolumeAttachments(r.config.ObjectNamespace).List(ctx, metav1.ListOptions{})
3✔
741
        if err != nil {
3✔
742
                w.Logger().Error(err, "failed to get list of existing AzVolumeAttachment CRI in controller recovery stage")
×
743
                return err
×
744
        }
×
745

746
        var wg sync.WaitGroup
3✔
747
        numRecovered := int32(0)
3✔
748

3✔
749
        for _, azVolumeAttachment := range azVolumeAttachments.Items {
7✔
750
                // skip if AzVolumeAttachment already recovered
4✔
751
                if _, ok := recoveredAzVolumeAttachments.Load(azVolumeAttachment.Name); ok {
4✔
752
                        numRecovered++
×
753
                        continue
×
754
                }
755

756
                wg.Add(1)
4✔
757
                go func(azv azdiskv1beta2.AzVolumeAttachment, azvMap *sync.Map) {
8✔
758
                        defer wg.Done()
4✔
759
                        var targetState azdiskv1beta2.AzVolumeAttachmentAttachmentState
4✔
760
                        updateMode := azureutils.UpdateCRIStatus
4✔
761
                        if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
762
                                updateMode = azureutils.UpdateAll
×
763
                        }
×
764
                        updateFunc := func(obj client.Object) error {
8✔
765
                                var err error
4✔
766
                                azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
4✔
767
                                if azv.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
4✔
768
                                        // conversion logic from v1beta1 to v1beta2 for replicas come here
×
769
                                        azv.Status.Annotations = azv.ObjectMeta.Annotations
×
770
                                        azv.ObjectMeta.Annotations = map[string]string{consts.VolumeAttachRequestAnnotation: "CRI recovery"}
×
771
                                }
×
772
                                // add a recover annotation to the CRI so that reconciliation can be triggered for the CRI even if CRI's current state == target state
773
                                azv.Status.Annotations = azureutils.AddToMap(azv.Status.Annotations, consts.RecoverAnnotation, recoveryID)
4✔
774
                                if azv.Status.State != targetState {
6✔
775
                                        _, err = updateState(azv, targetState, forceUpdate)
2✔
776
                                }
2✔
777
                                return err
4✔
778
                        }
779
                        switch azv.Status.State {
4✔
780
                        case azdiskv1beta2.Attaching:
1✔
781
                                // reset state to Pending so Attach operation can be redone
1✔
782
                                targetState = azdiskv1beta2.AttachmentPending
1✔
783
                                updateFunc = azureutils.AppendToUpdateCRIFunc(updateFunc, func(obj client.Object) error {
2✔
784
                                        azv := obj.(*azdiskv1beta2.AzVolumeAttachment)
1✔
785
                                        azv.Status.Detail = nil
1✔
786
                                        azv.Status.Error = nil
1✔
787
                                        return nil
1✔
788
                                })
1✔
789
                        case azdiskv1beta2.Detaching:
1✔
790
                                // reset state to Attached so Detach operation can be redone
1✔
791
                                targetState = azdiskv1beta2.Attached
1✔
792
                        default:
2✔
793
                                targetState = azv.Status.State
2✔
794
                        }
795

796
                        if _, err := azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, &azv, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode); err != nil {
4✔
797
                                w.Logger().Errorf(err, "failed to update AzVolumeAttachment (%s) for recovery", azv.Name)
×
798
                        } else {
4✔
799
                                // if update succeeded, add the CRI to the recoveryComplete list
4✔
800
                                azvMap.Store(azv.Name, struct{}{})
4✔
801
                                atomic.AddInt32(&numRecovered, 1)
4✔
802
                        }
4✔
803
                }(azVolumeAttachment, recoveredAzVolumeAttachments)
804
        }
805
        wg.Wait()
3✔
806

3✔
807
        // if recovery has not been completed for all CRIs, return error
3✔
808
        if numRecovered < int32(len(azVolumeAttachments.Items)) {
3✔
809
                return status.Errorf(codes.Internal, "failed to recover some AzVolumeAttachment states")
×
810
        }
×
811
        return nil
3✔
812
}
813

814
func NewAttachDetachController(mgr manager.Manager, cloudDiskAttacher CloudDiskAttachDetacher, crdDetacher CrdDetacher, controllerSharedState *SharedState) (*ReconcileAttachDetach, error) {
×
815
        logger := mgr.GetLogger().WithValues("controller", "azvolumeattachment")
×
816
        reconciler := ReconcileAttachDetach{
×
817
                crdDetacher:       crdDetacher,
×
818
                cloudDiskAttacher: cloudDiskAttacher,
×
819
                stateLock:         &sync.Map{},
×
820
                retryInfo:         newRetryInfo(),
×
821
                SharedState:       controllerSharedState,
×
822
                logger:            logger,
×
823
        }
×
824

×
825
        c, err := controller.New("azvolumeattachment-controller", mgr, controller.Options{
×
826
                MaxConcurrentReconciles: controllerSharedState.config.ControllerConfig.WorkerThreads,
×
827
                Reconciler:              &reconciler,
×
828
                LogConstructor:          func(req *reconcile.Request) logr.Logger { return logger },
×
829
        })
830

831
        if err != nil {
×
832
                c.GetLogger().Error(err, "failed to create controller")
×
833
                return nil, err
×
834
        }
×
835

836
        c.GetLogger().Info("Starting to watch AzVolumeAttachments.")
×
837

×
838
        // Watch for CRUD events on azVolumeAttachment objects
×
839
        err = c.Watch(&source.Kind{Type: &azdiskv1beta2.AzVolumeAttachment{}}, &handler.EnqueueRequestForObject{})
×
840
        if err != nil {
×
841
                c.GetLogger().Error(err, "failed to initialize watch for AzVolumeAttachment CRI")
×
842
                return nil, err
×
843
        }
×
844

845
        c.GetLogger().Info("Controller set-up successful.")
×
846
        return &reconciler, nil
×
847
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc