• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5033

02 Dec 2024 02:31PM UTC coverage: 59.276% (+0.02%) from 59.254%
#5033

push

travis-ci

web-flow
Watch for StorageProfile updates to reconcile DVs (#3547)

Currently, completing an incomplete StorageProfile does not reconcile
the relevant incomplete DVs waiting fro it, unlike the case of missing
StorageClass.

Even after updating the StorageProfile claimPropertySets to have
relevant AccessMode and VolumeMode, the DV is stuck with incomplete
AccessMode and VolumeMode until its exponential back-off reconcile, or
until it's manually updated (e.g. annotated) and reconciled.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

0 of 15 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

16729 of 28222 relevant lines covered (59.28%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.34
/pkg/controller/datavolume/controller-base.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "crypto/rsa"
22
        "encoding/json"
23
        "fmt"
24
        "net/http"
25
        "reflect"
26
        "strconv"
27
        "time"
28

29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        "github.com/pkg/errors"
32

33
        corev1 "k8s.io/api/core/v1"
34
        storagev1 "k8s.io/api/storage/v1"
35
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37
        "k8s.io/apimachinery/pkg/runtime"
38
        "k8s.io/apimachinery/pkg/runtime/schema"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/client-go/tools/record"
41
        "k8s.io/utils/ptr"
42

43
        "sigs.k8s.io/controller-runtime/pkg/client"
44
        "sigs.k8s.io/controller-runtime/pkg/controller"
45
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
46
        "sigs.k8s.io/controller-runtime/pkg/handler"
47
        "sigs.k8s.io/controller-runtime/pkg/manager"
48
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
49
        "sigs.k8s.io/controller-runtime/pkg/source"
50

51
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
52
        "kubevirt.io/containerized-data-importer/pkg/common"
53
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
54
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
55
        cloneMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-cloner"
56
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
57
        importMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-importer"
58
        "kubevirt.io/containerized-data-importer/pkg/token"
59
        "kubevirt.io/containerized-data-importer/pkg/util"
60
)
61

62
const (
63
        // ErrResourceExists provides a const to indicate a resource exists error
64
        ErrResourceExists = "ErrResourceExists"
65
        // ErrResourceMarkedForDeletion provides a const to indicate a resource marked for deletion error
66
        ErrResourceMarkedForDeletion = "ErrResourceMarkedForDeletion"
67
        // ErrClaimLost provides a const to indicate a claim is lost
68
        ErrClaimLost = "ErrClaimLost"
69

70
        // MessageResourceMarkedForDeletion provides a const to form a resource marked for deletion error message
71
        MessageResourceMarkedForDeletion = "Resource %q marked for deletion"
72
        // MessageResourceExists provides a const to form a resource exists error message
73
        MessageResourceExists = "Resource %q already exists and is not managed by DataVolume"
74
        // MessageErrClaimLost provides a const to form claim lost message
75
        MessageErrClaimLost = "PVC %s lost"
76

77
        dvPhaseField = "status.phase"
78

79
        claimRefField = "spec.claimRef"
80

81
        claimStorageClassNameField = "spec.storageClassName"
82
)
83

84
var (
85
        httpClient *http.Client
86

87
        delayedAnnotations = []string{
88
                cc.AnnPopulatedFor,
89
        }
90
)
91

92
// Event represents DV controller event
93
type Event struct {
94
        eventType string
95
        reason    string
96
        message   string
97
}
98

99
type statusPhaseSync struct {
100
        phase  cdiv1.DataVolumePhase
101
        pvcKey *client.ObjectKey
102
        event  Event
103
}
104

105
type dvSyncResult struct {
106
        result    *reconcile.Result
107
        phaseSync *statusPhaseSync
108
}
109

110
type dvSyncState struct {
111
        dv        *cdiv1.DataVolume
112
        dvMutated *cdiv1.DataVolume
113
        pvc       *corev1.PersistentVolumeClaim
114
        pvcSpec   *corev1.PersistentVolumeClaimSpec
115
        snapshot  *snapshotv1.VolumeSnapshot
116
        dvSyncResult
117
        usePopulator bool
118
}
119

120
// ReconcilerBase members
121
type ReconcilerBase struct {
122
        client               client.Client
123
        recorder             record.EventRecorder
124
        scheme               *runtime.Scheme
125
        log                  logr.Logger
126
        featureGates         featuregates.FeatureGates
127
        installerLabels      map[string]string
128
        shouldUpdateProgress bool
129
}
130

131
func pvcIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
132
        if pvc == nil || dv == nil {
1✔
133
                return false
×
134
        }
×
135
        dvName, ok := pvc.Annotations[cc.AnnPopulatedFor]
1✔
136
        return ok && dvName == dv.Name
1✔
137
}
138

139
func dvIsPrePopulated(dv *cdiv1.DataVolume) bool {
1✔
140
        _, ok := dv.Annotations[cc.AnnPrePopulated]
1✔
141
        return ok
1✔
142
}
1✔
143

144
func checkStaticProvisionPending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
145
        if pvc == nil || dv == nil {
2✔
146
                return false
1✔
147
        }
1✔
148
        if _, ok := dv.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
149
                return false
1✔
150
        }
1✔
151
        _, ok := pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
152
        return ok
1✔
153
}
154

155
func shouldSetDataVolumePending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
156
        if checkStaticProvisionPending(pvc, dv) {
2✔
157
                return true
1✔
158
        }
1✔
159

160
        if pvc != nil {
2✔
161
                return false
1✔
162
        }
1✔
163

164
        return dvIsPrePopulated(dv) || (dv.Status.Phase == cdiv1.PhaseUnset)
1✔
165
}
166

167
// dataVolumeOp is the datavolume's requested operation
168
type dataVolumeOp int
169

170
const (
171
        dataVolumeNop dataVolumeOp = iota
172
        dataVolumeImport
173
        dataVolumeUpload
174
        dataVolumePvcClone
175
        dataVolumeSnapshotClone
176
        dataVolumePopulator
177
)
178

179
type indexArgs struct {
180
        obj          client.Object
181
        field        string
182
        extractValue client.IndexerFunc
183
}
184

185
func getIndexArgs() []indexArgs {
1✔
186
        return []indexArgs{
1✔
187
                {
1✔
188
                        obj:   &cdiv1.DataVolume{},
1✔
189
                        field: dvPhaseField,
1✔
190
                        extractValue: func(obj client.Object) []string {
1✔
191
                                return []string{string(obj.(*cdiv1.DataVolume).Status.Phase)}
×
192
                        },
×
193
                },
194
                {
195
                        obj:   &corev1.PersistentVolume{},
196
                        field: claimRefField,
197
                        extractValue: func(obj client.Object) []string {
1✔
198
                                if pv, ok := obj.(*corev1.PersistentVolume); ok {
2✔
199
                                        if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace != "" && pv.Spec.ClaimRef.Name != "" {
2✔
200
                                                return []string{claimRefIndexKeyFunc(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)}
1✔
201
                                        }
1✔
202
                                }
203
                                return nil
×
204
                        },
205
                },
206
                {
207
                        obj:          &corev1.PersistentVolume{},
208
                        field:        claimStorageClassNameField,
209
                        extractValue: extractAvailablePersistentVolumeStorageClassName,
210
                },
211
        }
212
}
213

214
func claimRefIndexKeyFunc(namespace, name string) string {
1✔
215
        return namespace + "/" + name
1✔
216
}
1✔
217

218
// CreateCommonIndexes creates indexes used by all controllers
219
func CreateCommonIndexes(mgr manager.Manager) error {
×
220
        for _, ia := range getIndexArgs() {
×
221
                if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
×
222
                        return err
×
223
                }
×
224
        }
225
        return nil
×
226
}
227

228
// CreateAvailablePersistentVolumeIndex adds storage class name index for available PersistentVolumes
229
func CreateAvailablePersistentVolumeIndex(fieldIndexer client.FieldIndexer) error {
×
230
        return fieldIndexer.IndexField(context.TODO(), &corev1.PersistentVolume{},
×
231
                claimStorageClassNameField, extractAvailablePersistentVolumeStorageClassName)
×
232
}
×
233

234
func extractAvailablePersistentVolumeStorageClassName(obj client.Object) []string {
×
235
        if pv, ok := obj.(*corev1.PersistentVolume); ok && pv.Status.Phase == corev1.VolumeAvailable {
×
236
                return []string{pv.Spec.StorageClassName}
×
237
        }
×
238
        return nil
×
239
}
240

241
func addDataVolumeControllerCommonWatches(mgr manager.Manager, dataVolumeController controller.Controller, op dataVolumeOp) error {
×
242
        appendMatchingDataVolumeRequest := func(ctx context.Context, reqs []reconcile.Request, mgr manager.Manager, namespace, name string) []reconcile.Request {
×
243
                dvKey := types.NamespacedName{Namespace: namespace, Name: name}
×
244
                dv := &cdiv1.DataVolume{}
×
245
                if err := mgr.GetClient().Get(ctx, dvKey, dv); err != nil {
×
246
                        if !k8serrors.IsNotFound(err) {
×
247
                                mgr.GetLogger().Error(err, "Failed to get DV", "dvKey", dvKey)
×
248
                        }
×
249
                        return reqs
×
250
                }
251
                if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) == op {
×
252
                        reqs = append(reqs, reconcile.Request{NamespacedName: dvKey})
×
253
                }
×
254
                return reqs
×
255
        }
256

257
        // Setup watches
258
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](
×
259
                func(ctx context.Context, dv *cdiv1.DataVolume) []reconcile.Request {
×
260
                        if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) != op {
×
261
                                return nil
×
262
                        }
×
263
                        updatePendingDataVolumesGauge(ctx, mgr.GetLogger(), dv, mgr.GetClient())
×
264
                        return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}}}
×
265
                }),
266
        )); err != nil {
×
267
                return err
×
268
        }
×
269
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](
×
270
                func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
271
                        var result []reconcile.Request
×
272
                        owner := metav1.GetControllerOf(obj)
×
273
                        if owner != nil && owner.Kind == "DataVolume" {
×
274
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), owner.Name)
×
275
                        }
×
276
                        populatedFor := obj.GetAnnotations()[cc.AnnPopulatedFor]
×
277
                        if populatedFor != "" {
×
278
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), populatedFor)
×
279
                        }
×
280
                        // it is okay if result contains the same entry twice, will be deduplicated by caller
281
                        return result
×
282
                }),
283
        )); err != nil {
×
284
                return err
×
285
        }
×
286
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
287
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
288
                        owner := metav1.GetControllerOf(obj)
×
289
                        if owner == nil || owner.Kind != "DataVolume" {
×
290
                                return nil
×
291
                        }
×
292
                        return appendMatchingDataVolumeRequest(ctx, nil, mgr, obj.GetNamespace(), owner.Name)
×
293
                }),
294
        )); err != nil {
×
295
                return err
×
296
        }
×
297
        for _, k := range []client.Object{&corev1.PersistentVolumeClaim{}, &corev1.Pod{}, &cdiv1.ObjectTransfer{}} {
×
298
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
299
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
300
                                if !hasAnnOwnedByDataVolume(obj) {
×
301
                                        return nil
×
302
                                }
×
303
                                namespace, name, err := getAnnOwnedByDataVolume(obj)
×
304
                                if err != nil {
×
305
                                        return nil
×
306
                                }
×
307
                                return appendMatchingDataVolumeRequest(ctx, nil, mgr, namespace, name)
×
308
                        }),
309
                )); err != nil {
×
310
                        return err
×
311
                }
×
312
        }
313

314
        // Watch for StorageClass and StorageProfile updates and reconcile the DVs waiting for StorageClass or its complete StorageProfile.
315
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass with compelete StorageProfile yet,
316
        // so PVC cannot be created (test_id:9922).
NEW
317
        for _, k := range []client.Object{&storagev1.StorageClass{}, &cdiv1.StorageProfile{}} {
×
NEW
318
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
NEW
319
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
NEW
320
                                dvList := &cdiv1.DataVolumeList{}
×
NEW
321
                                if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
NEW
322
                                        return nil
×
UNCOV
323
                                }
×
NEW
324
                                var reqs []reconcile.Request
×
NEW
325
                                for _, dv := range dvList.Items {
×
NEW
326
                                        if getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
NEW
327
                                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
NEW
328
                                        }
×
329
                                }
NEW
330
                                return reqs
×
331
                        },
332
                ),
NEW
333
                )); err != nil {
×
NEW
334
                        return err
×
NEW
335
                }
×
336
        }
337

338
        // Watch for PV updates to reconcile the DVs waiting for available PV
339
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9924,9925)
340
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolume](
×
341
                func(ctx context.Context, pv *corev1.PersistentVolume) []reconcile.Request {
×
342
                        dvList := &cdiv1.DataVolumeList{}
×
343
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
344
                                return nil
×
345
                        }
×
346
                        var reqs []reconcile.Request
×
347
                        for _, dv := range dvList.Items {
×
348
                                storage := dv.Spec.Storage
×
349
                                if storage != nil &&
×
350
                                        storage.StorageClassName != nil &&
×
351
                                        *storage.StorageClassName == pv.Spec.StorageClassName &&
×
352
                                        pv.Status.Phase == corev1.VolumeAvailable &&
×
353
                                        getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
354
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
355
                                }
×
356
                        }
357
                        return reqs
×
358
                },
359
        ),
360
        )); err != nil {
×
361
                return err
×
362
        }
×
363

364
        return nil
×
365
}
366

367
func getDataVolumeOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
368
        src := dv.Spec.Source
×
369

×
370
        if dv.Spec.SourceRef != nil {
×
371
                return getSourceRefOp(ctx, log, dv, client)
×
372
        }
×
373
        if src != nil && src.PVC != nil {
×
374
                return dataVolumePvcClone
×
375
        }
×
376
        if src != nil && src.Snapshot != nil {
×
377
                return dataVolumeSnapshotClone
×
378
        }
×
379
        if src == nil {
×
380
                if dvUsesVolumePopulator(dv) {
×
381
                        return dataVolumePopulator
×
382
                }
×
383
                return dataVolumeNop
×
384
        }
385
        if src.Upload != nil {
×
386
                return dataVolumeUpload
×
387
        }
×
388
        if src.HTTP != nil || src.S3 != nil || src.GCS != nil || src.Registry != nil || src.Blank != nil || src.Imageio != nil || src.VDDK != nil {
×
389
                return dataVolumeImport
×
390
        }
×
391

392
        return dataVolumeNop
×
393
}
394

395
func getSourceRefOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
396
        dataSource := &cdiv1.DataSource{}
×
397
        ns := dv.Namespace
×
398
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
399
                ns = *dv.Spec.SourceRef.Namespace
×
400
        }
×
401
        nn := types.NamespacedName{Namespace: ns, Name: dv.Spec.SourceRef.Name}
×
402
        if err := client.Get(ctx, nn, dataSource); err != nil {
×
403
                log.Error(err, "Unable to get DataSource", "namespacedName", nn)
×
404
                return dataVolumeNop
×
405
        }
×
406

407
        switch {
×
408
        case dataSource.Spec.Source.PVC != nil:
×
409
                return dataVolumePvcClone
×
410
        case dataSource.Spec.Source.Snapshot != nil:
×
411
                return dataVolumeSnapshotClone
×
412
        default:
×
413
                return dataVolumeNop
×
414
        }
415
}
416

417
func updatePendingDataVolumesGauge(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, c client.Client) {
×
418
        if !cc.IsDataVolumeUsingDefaultStorageClass(dv) {
×
419
                return
×
420
        }
×
421

422
        countPending, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.Pending))
×
423
        if err != nil {
×
424
                log.V(3).Error(err, "Failed listing the pending DataVolumes")
×
425
                return
×
426
        }
×
427
        countUnset, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.PhaseUnset))
×
428
        if err != nil {
×
429
                log.V(3).Error(err, "Failed listing the unset DataVolumes")
×
430
                return
×
431
        }
×
432

433
        metrics.SetDataVolumePending(countPending + countUnset)
×
434
}
435

436
func getDefaultStorageClassDataVolumeCount(ctx context.Context, c client.Client, dvPhase string) (int, error) {
×
437
        dvList := &cdiv1.DataVolumeList{}
×
438
        if err := c.List(ctx, dvList, client.MatchingFields{dvPhaseField: dvPhase}); err != nil {
×
439
                return 0, err
×
440
        }
×
441

442
        dvCount := 0
×
443
        for _, dv := range dvList.Items {
×
444
                if cc.IsDataVolumeUsingDefaultStorageClass(&dv) {
×
445
                        dvCount++
×
446
                }
×
447
        }
448

449
        return dvCount, nil
×
450
}
451

452
type dvController interface {
453
        reconcile.Reconciler
454
        sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error)
455
        updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error
456
}
457

458
func (r *ReconcilerBase) reconcile(ctx context.Context, req reconcile.Request, dvc dvController) (reconcile.Result, error) {
1✔
459
        log := r.log.WithValues("DataVolume", req.NamespacedName)
1✔
460
        syncRes, syncErr := dvc.sync(log, req)
1✔
461
        res, err := r.updateStatus(req, syncRes.phaseSync, dvc)
1✔
462
        if syncErr != nil {
2✔
463
                err = syncErr
1✔
464
        }
1✔
465
        if syncRes.result != nil {
2✔
466
                res = *syncRes.result
1✔
467
        }
1✔
468
        return res, err
1✔
469
}
470

471
type dvSyncStateFunc func(*dvSyncState) error
472

473
func (r *ReconcilerBase) syncCommon(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
474
        syncState, err := r.syncDvPvcState(log, req, cleanup, prepare)
1✔
475
        if err == nil {
2✔
476
                err = r.syncUpdate(log, &syncState)
1✔
477
        }
1✔
478
        return syncState, err
1✔
479
}
480

481
func (r *ReconcilerBase) syncDvPvcState(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
482
        syncState := dvSyncState{}
1✔
483
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
484
        if dv == nil || err != nil {
2✔
485
                syncState.result = &reconcile.Result{}
1✔
486
                return syncState, err
1✔
487
        }
1✔
488
        syncState.dv = dv
1✔
489
        syncState.dvMutated = dv.DeepCopy()
1✔
490
        syncState.pvc, err = r.getPVC(req.NamespacedName)
1✔
491
        if err != nil {
1✔
492
                return syncState, err
×
493
        }
×
494

495
        if cleanup != nil {
2✔
496
                if err := cleanup(&syncState); err != nil {
1✔
497
                        return syncState, err
×
498
                }
×
499
        }
500

501
        if dv.DeletionTimestamp != nil {
1✔
502
                log.Info("DataVolume marked for deletion")
×
503
                syncState.result = &reconcile.Result{}
×
504
                return syncState, nil
×
505
        }
×
506

507
        if prepare != nil {
2✔
508
                if err := prepare(&syncState); err != nil {
1✔
509
                        return syncState, err
×
510
                }
×
511
        }
512

513
        syncState.pvcSpec, err = renderPvcSpec(r.client, r.recorder, log, syncState.dvMutated, syncState.pvc)
1✔
514
        if err != nil {
2✔
515
                if syncErr := r.syncDataVolumeStatusPhaseWithEvent(&syncState, cdiv1.PhaseUnset, nil,
1✔
516
                        Event{corev1.EventTypeWarning, cc.ErrClaimNotValid, err.Error()}); syncErr != nil {
1✔
517
                        log.Error(syncErr, "failed to sync DataVolume status with event")
×
518
                }
×
519
                if errors.Is(err, ErrStorageClassNotFound) {
2✔
520
                        syncState.result = &reconcile.Result{}
1✔
521
                        return syncState, nil
1✔
522
                }
1✔
523
                return syncState, err
1✔
524
        }
525

526
        syncState.usePopulator, err = r.shouldUseCDIPopulator(&syncState)
1✔
527
        if err != nil {
1✔
528
                return syncState, err
×
529
        }
×
530
        updateDataVolumeUseCDIPopulator(&syncState)
1✔
531

1✔
532
        if err := r.handleStaticVolume(&syncState, log); err != nil || syncState.result != nil {
2✔
533
                return syncState, err
1✔
534
        }
1✔
535

536
        if err := r.handleDelayedAnnotations(&syncState, log); err != nil || syncState.result != nil {
2✔
537
                return syncState, err
1✔
538
        }
1✔
539

540
        if err = updateDataVolumeDefaultInstancetypeLabels(r.client, &syncState); err != nil {
1✔
541
                return syncState, err
×
542
        }
×
543

544
        if syncState.pvc != nil {
2✔
545
                if err := r.garbageCollect(&syncState, log); err != nil {
1✔
546
                        return syncState, err
×
547
                }
×
548
                if syncState.result != nil || syncState.dv == nil {
2✔
549
                        return syncState, nil
1✔
550
                }
1✔
551
                if err := r.validatePVC(dv, syncState.pvc); err != nil {
2✔
552
                        return syncState, err
1✔
553
                }
1✔
554
                r.handlePrePopulation(syncState.dvMutated, syncState.pvc)
1✔
555
        }
556

557
        return syncState, nil
1✔
558
}
559

560
func (r *ReconcilerBase) syncUpdate(log logr.Logger, syncState *dvSyncState) error {
1✔
561
        if syncState.dv == nil || syncState.dvMutated == nil {
2✔
562
                return nil
1✔
563
        }
1✔
564
        if !reflect.DeepEqual(syncState.dv.Status, syncState.dvMutated.Status) {
1✔
565
                return fmt.Errorf("status update is not allowed in sync phase")
×
566
        }
×
567
        if !reflect.DeepEqual(syncState.dv.ObjectMeta, syncState.dvMutated.ObjectMeta) {
2✔
568
                _, ok := syncState.dv.Annotations[cc.AnnExtendedCloneToken]
1✔
569
                _, ok2 := syncState.dvMutated.Annotations[cc.AnnExtendedCloneToken]
1✔
570
                if err := r.updateDataVolume(syncState.dvMutated); err != nil {
1✔
571
                        r.log.Error(err, "Unable to sync update dv meta", "name", syncState.dvMutated.Name)
×
572
                        return err
×
573
                }
×
574
                if !ok && ok2 {
2✔
575
                        delta := time.Since(syncState.dv.ObjectMeta.CreationTimestamp.Time)
1✔
576
                        log.V(3).Info("Adding extended DataVolume token took", "delta", delta)
1✔
577
                }
1✔
578
                syncState.dv = syncState.dvMutated.DeepCopy()
1✔
579
        }
580
        return nil
1✔
581
}
582

583
func (r *ReconcilerBase) handleStaticVolume(syncState *dvSyncState, log logr.Logger) error {
1✔
584
        if _, ok := syncState.dvMutated.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
585
                return nil
1✔
586
        }
1✔
587

588
        if syncState.pvc == nil {
2✔
589
                volumes, err := r.getAvailableVolumesForDV(syncState, log)
1✔
590
                if err != nil {
1✔
591
                        return err
×
592
                }
×
593

594
                if len(volumes) == 0 {
2✔
595
                        log.Info("No PVs for DV")
1✔
596
                        return nil
1✔
597
                }
1✔
598

599
                if err := r.handlePvcCreation(log, syncState, func(_ *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
2✔
600
                        bs, err := json.Marshal(volumes)
1✔
601
                        if err != nil {
1✔
602
                                return err
×
603
                        }
×
604
                        cc.AddAnnotation(pvc, cc.AnnPersistentVolumeList, string(bs))
1✔
605
                        return nil
1✔
606
                }); err != nil {
×
607
                        return err
×
608
                }
×
609

610
                // set result to make sure callers don't do anything else in sync
611
                syncState.result = &reconcile.Result{}
1✔
612
                return nil
1✔
613
        }
614

615
        volumeAnno, ok := syncState.pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
616
        if !ok {
1✔
617
                // etiher did not create the PVC here OR bind to expected PV succeeded
×
618
                return nil
×
619
        }
×
620

621
        if cc.IsUnbound(syncState.pvc) {
2✔
622
                // set result to make sure callers don't do anything else in sync
1✔
623
                syncState.result = &reconcile.Result{}
1✔
624
                return nil
1✔
625
        }
1✔
626

627
        var volumes []string
1✔
628
        if err := json.Unmarshal([]byte(volumeAnno), &volumes); err != nil {
1✔
629
                return err
×
630
        }
×
631

632
        for _, v := range volumes {
2✔
633
                if v == syncState.pvc.Spec.VolumeName {
2✔
634
                        pvcCpy := syncState.pvc.DeepCopy()
1✔
635
                        // handle as "populatedFor" going forward
1✔
636
                        cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, syncState.dvMutated.Name)
1✔
637
                        delete(pvcCpy.Annotations, cc.AnnPersistentVolumeList)
1✔
638
                        if err := r.updatePVC(pvcCpy); err != nil {
1✔
639
                                return err
×
640
                        }
×
641
                        syncState.pvc = pvcCpy
1✔
642
                        return nil
1✔
643
                }
644
        }
645

646
        // delete the pvc and hope for better luck...
647
        pvcCpy := syncState.pvc.DeepCopy()
1✔
648
        if err := r.client.Delete(context.TODO(), pvcCpy, &client.DeleteOptions{}); err != nil {
1✔
649
                return err
×
650
        }
×
651

652
        syncState.pvc = pvcCpy
1✔
653

1✔
654
        return fmt.Errorf("DataVolume bound to unexpected PV %s", syncState.pvc.Spec.VolumeName)
1✔
655
}
656

657
func (r *ReconcilerBase) handleDelayedAnnotations(syncState *dvSyncState, log logr.Logger) error {
1✔
658
        dataVolume := syncState.dv
1✔
659
        if dataVolume.Status.Phase != cdiv1.Succeeded {
2✔
660
                return nil
1✔
661
        }
1✔
662

663
        if syncState.pvc == nil {
1✔
664
                return nil
×
665
        }
×
666

667
        pvcCpy := syncState.pvc.DeepCopy()
1✔
668
        for _, anno := range delayedAnnotations {
2✔
669
                if val, ok := dataVolume.Annotations[anno]; ok {
2✔
670
                        // only add if not already present
1✔
671
                        if _, ok := pvcCpy.Annotations[anno]; !ok {
2✔
672
                                cc.AddAnnotation(pvcCpy, anno, val)
1✔
673
                        }
1✔
674
                }
675
        }
676

677
        if !reflect.DeepEqual(syncState.pvc, pvcCpy) {
2✔
678
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
679
                        return err
×
680
                }
×
681
                syncState.pvc = pvcCpy
1✔
682
                syncState.result = &reconcile.Result{}
1✔
683
        }
684

685
        return nil
1✔
686
}
687

688
func (r *ReconcilerBase) getAvailableVolumesForDV(syncState *dvSyncState, log logr.Logger) ([]string, error) {
1✔
689
        pvList := &corev1.PersistentVolumeList{}
1✔
690
        fields := client.MatchingFields{claimRefField: claimRefIndexKeyFunc(syncState.dv.Namespace, syncState.dv.Name)}
1✔
691
        if err := r.client.List(context.TODO(), pvList, fields); err != nil {
1✔
692
                return nil, err
×
693
        }
×
694
        if syncState.pvcSpec == nil {
1✔
695
                return nil, fmt.Errorf("missing pvc spec")
×
696
        }
×
697
        var pvNames []string
1✔
698
        for _, pv := range pvList.Items {
2✔
699
                if pv.Status.Phase == corev1.VolumeAvailable {
2✔
700
                        pvc := &corev1.PersistentVolumeClaim{
1✔
701
                                Spec: *syncState.pvcSpec,
1✔
702
                        }
1✔
703
                        if err := CheckVolumeSatisfyClaim(&pv, pvc); err != nil {
1✔
704
                                continue
×
705
                        }
706
                        log.Info("Found matching volume for DV", "pv", pv.Name)
1✔
707
                        pvNames = append(pvNames, pv.Name)
1✔
708
                }
709
        }
710
        return pvNames, nil
1✔
711
}
712

713
func (r *ReconcilerBase) handlePrePopulation(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) {
1✔
714
        if pvc.Status.Phase == corev1.ClaimBound && pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
715
                cc.AddAnnotation(dv, cc.AnnPrePopulated, pvc.Name)
1✔
716
        }
1✔
717
}
718

719
func (r *ReconcilerBase) validatePVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
720
        // If the PVC is being deleted, we should log a warning to the event recorder and return to wait the deletion complete
1✔
721
        // don't bother with owner refs is the pvc is deleted
1✔
722
        if pvc.DeletionTimestamp != nil {
1✔
723
                msg := fmt.Sprintf(MessageResourceMarkedForDeletion, pvc.Name)
×
724
                r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceMarkedForDeletion, msg)
×
725
                return errors.Errorf(msg)
×
726
        }
×
727
        // If the PVC is not controlled by this DataVolume resource, we should log
728
        // a warning to the event recorder and return
729
        if !metav1.IsControlledBy(pvc, dv) {
2✔
730
                requiresWork, err := r.pvcRequiresWork(pvc, dv)
1✔
731
                if err != nil {
1✔
732
                        return err
×
733
                }
×
734
                if !requiresWork {
2✔
735
                        if err := r.addOwnerRef(pvc, dv); err != nil {
1✔
736
                                return err
×
737
                        }
×
738
                } else {
1✔
739
                        msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
1✔
740
                        r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceExists, msg)
1✔
741
                        return errors.Errorf(msg)
1✔
742
                }
1✔
743
        }
744
        return nil
1✔
745
}
746

747
func (r *ReconcilerBase) getPVC(key types.NamespacedName) (*corev1.PersistentVolumeClaim, error) {
1✔
748
        pvc := &corev1.PersistentVolumeClaim{}
1✔
749
        if err := r.client.Get(context.TODO(), key, pvc); err != nil {
2✔
750
                if k8serrors.IsNotFound(err) {
2✔
751
                        return nil, nil
1✔
752
                }
1✔
753
                return nil, err
×
754
        }
755
        return pvc, nil
1✔
756
}
757

758
func (r *ReconcilerBase) getDataVolume(key types.NamespacedName) (*cdiv1.DataVolume, error) {
1✔
759
        dv := &cdiv1.DataVolume{}
1✔
760
        if err := r.client.Get(context.TODO(), key, dv); err != nil {
2✔
761
                if k8serrors.IsNotFound(err) {
2✔
762
                        return nil, nil
1✔
763
                }
1✔
764
                return nil, err
×
765
        }
766
        return dv, nil
1✔
767
}
768

769
type pvcModifierFunc func(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error
770

771
func (r *ReconcilerBase) createPvcForDatavolume(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec,
772
        pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
773
        newPvc, err := r.newPersistentVolumeClaim(datavolume, pvcSpec, datavolume.Namespace, datavolume.Name, pvcModifier)
1✔
774
        if err != nil {
2✔
775
                return nil, err
1✔
776
        }
1✔
777
        util.SetRecommendedLabels(newPvc, r.installerLabels, "cdi-controller")
1✔
778
        if err := r.client.Create(context.TODO(), newPvc); err != nil {
1✔
779
                return nil, err
×
780
        }
×
781
        return newPvc, nil
1✔
782
}
783

784
func (r *ReconcilerBase) getStorageClassBindingMode(storageClassName *string) (*storagev1.VolumeBindingMode, error) {
1✔
785
        // Handle unspecified storage class name, fallback to default storage class
1✔
786
        storageClass, err := cc.GetStorageClassByNameWithK8sFallback(context.TODO(), r.client, storageClassName)
1✔
787
        if err != nil {
1✔
788
                return nil, err
×
789
        }
×
790

791
        if storageClass != nil && storageClass.VolumeBindingMode != nil {
2✔
792
                return storageClass.VolumeBindingMode, nil
1✔
793
        }
1✔
794

795
        // no storage class, then the assumption is immediate binding
796
        volumeBindingImmediate := storagev1.VolumeBindingImmediate
1✔
797
        return &volumeBindingImmediate, nil
1✔
798
}
799

800
func (r *ReconcilerBase) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, result *reconcile.Result) error {
1✔
801
        var podNamespace string
1✔
802
        if datavolume.Status.Progress == "" {
2✔
803
                datavolume.Status.Progress = "N/A"
1✔
804
        }
1✔
805

806
        if !r.shouldUpdateProgress {
2✔
807
                return nil
1✔
808
        }
1✔
809

810
        if usePopulator, _ := CheckPVCUsingPopulators(pvc); usePopulator {
2✔
811
                if progress, ok := pvc.Annotations[cc.AnnPopulatorProgress]; ok {
2✔
812
                        datavolume.Status.Progress = cdiv1.DataVolumeProgress(progress)
1✔
813
                } else {
2✔
814
                        datavolume.Status.Progress = "N/A"
1✔
815
                }
1✔
816
                return nil
1✔
817
        }
818

819
        if datavolume.Spec.Source != nil && datavolume.Spec.Source.PVC != nil {
2✔
820
                podNamespace = datavolume.Spec.Source.PVC.Namespace
1✔
821
        } else {
2✔
822
                podNamespace = datavolume.Namespace
1✔
823
        }
1✔
824

825
        if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
2✔
826
                // Data volume completed progress, or failed, either way stop queueing the data volume.
1✔
827
                r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
1✔
828
                return nil
1✔
829
        }
1✔
830
        pod, err := cc.GetPodFromPvc(r.client, podNamespace, pvc)
1✔
831
        if err == nil {
1✔
832
                if pod.Status.Phase != corev1.PodRunning {
×
833
                        // Avoid long timeouts and error traces from HTTP get when pod is already gone
×
834
                        return nil
×
835
                }
×
836
                if err := updateProgressUsingPod(datavolume, pod); err != nil {
×
837
                        return err
×
838
                }
×
839
        }
840
        // We are not done yet, force a re-reconcile in 2 seconds to get an update.
841
        result.RequeueAfter = 2 * time.Second
1✔
842
        return nil
1✔
843
}
844

845
func (r *ReconcilerBase) syncDataVolumeStatusPhaseWithEvent(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim, event Event) error {
1✔
846
        if syncState.phaseSync != nil {
1✔
847
                return fmt.Errorf("phaseSync is already set")
×
848
        }
×
849
        syncState.phaseSync = &statusPhaseSync{phase: phase, event: event}
1✔
850
        if pvc != nil {
1✔
851
                key := client.ObjectKeyFromObject(pvc)
×
852
                syncState.phaseSync.pvcKey = &key
×
853
        }
×
854
        return nil
1✔
855
}
856

857
func (r *ReconcilerBase) updateDataVolumeStatusPhaseSync(ps *statusPhaseSync, dv *cdiv1.DataVolume, dvCopy *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
858
        var condPvc *corev1.PersistentVolumeClaim
1✔
859
        var err error
1✔
860
        if ps.pvcKey != nil {
1✔
861
                if pvc == nil || *ps.pvcKey != client.ObjectKeyFromObject(pvc) {
×
862
                        condPvc, err = r.getPVC(*ps.pvcKey)
×
863
                        if err != nil {
×
864
                                return err
×
865
                        }
×
866
                } else {
×
867
                        condPvc = pvc
×
868
                }
×
869
        }
870
        return r.updateDataVolumeStatusPhaseWithEvent(ps.phase, dv, dvCopy, condPvc, ps.event)
1✔
871
}
872

873
func (r *ReconcilerBase) updateDataVolumeStatusPhaseWithEvent(
874
        phase cdiv1.DataVolumePhase,
875
        dataVolume *cdiv1.DataVolume,
876
        dataVolumeCopy *cdiv1.DataVolume,
877
        pvc *corev1.PersistentVolumeClaim,
878
        event Event) error {
1✔
879
        if dataVolume == nil {
1✔
880
                return nil
×
881
        }
×
882

883
        curPhase := dataVolumeCopy.Status.Phase
1✔
884
        dataVolumeCopy.Status.Phase = phase
1✔
885

1✔
886
        reason := ""
1✔
887
        message := ""
1✔
888
        if pvc == nil {
2✔
889
                reason = event.reason
1✔
890
                message = event.message
1✔
891
        }
1✔
892
        r.updateConditions(dataVolumeCopy, pvc, reason, message)
1✔
893
        return r.emitEvent(dataVolume, dataVolumeCopy, curPhase, dataVolume.Status.Conditions, &event)
1✔
894
}
895

896
func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPhaseSync, dvc dvController) (reconcile.Result, error) {
1✔
897
        result := reconcile.Result{}
1✔
898
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
899
        if dv == nil || err != nil {
2✔
900
                return reconcile.Result{}, err
1✔
901
        }
1✔
902

903
        dataVolumeCopy := dv.DeepCopy()
1✔
904

1✔
905
        pvc, err := r.getPVC(req.NamespacedName)
1✔
906
        if err != nil {
1✔
907
                return reconcile.Result{}, err
×
908
        }
×
909

910
        if phaseSync != nil {
2✔
911
                err = r.updateDataVolumeStatusPhaseSync(phaseSync, dv, dataVolumeCopy, pvc)
1✔
912
                return reconcile.Result{}, err
1✔
913
        }
1✔
914

915
        curPhase := dataVolumeCopy.Status.Phase
1✔
916
        var event Event
1✔
917

1✔
918
        if shouldSetDataVolumePending(pvc, dataVolumeCopy) {
2✔
919
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
920
        } else if pvc != nil {
3✔
921
                dataVolumeCopy.Status.ClaimName = pvc.Name
1✔
922

1✔
923
                phase := pvc.Annotations[cc.AnnPodPhase]
1✔
924
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
925
                if err != nil {
1✔
926
                        return reconcile.Result{}, err
×
927
                }
×
928
                if phase == string(cdiv1.Succeeded) && requiresWork {
2✔
929
                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
930
                                return reconcile.Result{}, err
×
931
                        }
×
932
                } else {
1✔
933
                        switch pvc.Status.Phase {
1✔
934
                        case corev1.ClaimPending:
1✔
935
                                if requiresWork {
2✔
936
                                        if err := r.updateStatusPVCPending(pvc, dvc, dataVolumeCopy, &event); err != nil {
1✔
937
                                                return reconcile.Result{}, err
×
938
                                        }
×
939
                                } else {
1✔
940
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
941
                                }
1✔
942
                        case corev1.ClaimBound:
1✔
943
                                switch dataVolumeCopy.Status.Phase {
1✔
944
                                case cdiv1.Pending:
1✔
945
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
946
                                case cdiv1.WaitForFirstConsumer:
×
947
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
×
948
                                case cdiv1.Unknown:
1✔
949
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
950
                                }
951

952
                                if requiresWork {
2✔
953
                                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
954
                                                return reconcile.Result{}, err
×
955
                                        }
×
956
                                } else {
1✔
957
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
958
                                }
1✔
959

960
                        case corev1.ClaimLost:
1✔
961
                                dataVolumeCopy.Status.Phase = cdiv1.Failed
1✔
962
                                event.eventType = corev1.EventTypeWarning
1✔
963
                                event.reason = ErrClaimLost
1✔
964
                                event.message = fmt.Sprintf(MessageErrClaimLost, pvc.Name)
1✔
965
                        default:
1✔
966
                                if pvc.Status.Phase != "" {
1✔
967
                                        dataVolumeCopy.Status.Phase = cdiv1.Unknown
×
968
                                }
×
969
                        }
970
                }
971

972
                if i, err := strconv.ParseInt(pvc.Annotations[cc.AnnPodRestarts], 10, 32); err == nil && i >= 0 {
2✔
973
                        dataVolumeCopy.Status.RestartCount = int32(i)
1✔
974
                }
1✔
975
                if err := r.reconcileProgressUpdate(dataVolumeCopy, pvc, &result); err != nil {
1✔
976
                        return result, err
×
977
                }
×
978
        }
979

980
        currentCond := make([]cdiv1.DataVolumeCondition, len(dataVolumeCopy.Status.Conditions))
1✔
981
        copy(currentCond, dataVolumeCopy.Status.Conditions)
1✔
982
        r.updateConditions(dataVolumeCopy, pvc, "", "")
1✔
983
        return result, r.emitEvent(dv, dataVolumeCopy, curPhase, currentCond, &event)
1✔
984
}
985

986
func (r ReconcilerBase) updateStatusPVCPending(pvc *corev1.PersistentVolumeClaim, dvc dvController, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
987
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
988
        if err != nil {
1✔
989
                return err
×
990
        }
×
991
        if usePopulator {
2✔
992
                // when using populators the target pvc phase will stay pending until the population completes,
1✔
993
                // hence if not wffc we should update the dv phase according to the pod phase
1✔
994
                shouldBeMarkedPendingPopulation, err := r.shouldBeMarkedPendingPopulation(pvc)
1✔
995
                if err != nil {
1✔
996
                        return err
×
997
                }
×
998
                if shouldBeMarkedPendingPopulation {
2✔
999
                        dataVolumeCopy.Status.Phase = cdiv1.PendingPopulation
1✔
1000
                } else if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, event); err != nil {
2✔
1001
                        return err
×
1002
                }
×
1003
                return nil
1✔
1004
        }
1005

1006
        shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
1✔
1007
        if err != nil {
1✔
1008
                return err
×
1009
        }
×
1010
        if shouldBeMarkedWaitForFirstConsumer {
2✔
1011
                dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
1✔
1012
        } else {
2✔
1013
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
1014
        }
1✔
1015
        return nil
1✔
1016
}
1017

1018
func (r *ReconcilerBase) updateConditions(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, reason, message string) {
1✔
1019
        var anno map[string]string
1✔
1020

1✔
1021
        if dataVolume.Status.Conditions == nil {
2✔
1022
                dataVolume.Status.Conditions = make([]cdiv1.DataVolumeCondition, 0)
1✔
1023
        }
1✔
1024

1025
        if pvc != nil {
2✔
1026
                anno = pvc.Annotations
1✔
1027
        } else {
2✔
1028
                anno = make(map[string]string)
1✔
1029
        }
1✔
1030

1031
        var readyStatus corev1.ConditionStatus
1✔
1032
        switch dataVolume.Status.Phase {
1✔
1033
        case cdiv1.Succeeded:
1✔
1034
                readyStatus = corev1.ConditionTrue
1✔
1035
        case cdiv1.Unknown:
×
1036
                readyStatus = corev1.ConditionUnknown
×
1037
        default:
1✔
1038
                readyStatus = corev1.ConditionFalse
1✔
1039
        }
1040

1041
        dataVolume.Status.Conditions = updateBoundCondition(dataVolume.Status.Conditions, pvc, message, reason)
1✔
1042
        dataVolume.Status.Conditions = UpdateReadyCondition(dataVolume.Status.Conditions, readyStatus, message, reason)
1✔
1043
        dataVolume.Status.Conditions = updateRunningCondition(dataVolume.Status.Conditions, anno)
1✔
1044
}
1045

1046
func (r *ReconcilerBase) emitConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1047
        r.emitBoundConditionEvent(dataVolume, FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions), FindConditionByType(cdiv1.DataVolumeBound, originalCond))
1✔
1048
        r.emitFailureConditionEvent(dataVolume, originalCond)
1✔
1049
}
1✔
1050

1051
func (r *ReconcilerBase) emitBoundConditionEvent(dataVolume *cdiv1.DataVolume, current, original *cdiv1.DataVolumeCondition) {
1✔
1052
        // We know reason and message won't be empty for bound.
1✔
1053
        if current != nil && (original == nil || current.Status != original.Status || current.Reason != original.Reason || current.Message != original.Message) {
2✔
1054
                r.recorder.Event(dataVolume, corev1.EventTypeNormal, current.Reason, current.Message)
1✔
1055
        }
1✔
1056
}
1057

1058
func (r *ReconcilerBase) emitFailureConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1059
        curReady := FindConditionByType(cdiv1.DataVolumeReady, dataVolume.Status.Conditions)
1✔
1060
        curBound := FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions)
1✔
1061
        curRunning := FindConditionByType(cdiv1.DataVolumeRunning, dataVolume.Status.Conditions)
1✔
1062
        orgRunning := FindConditionByType(cdiv1.DataVolumeRunning, originalCond)
1✔
1063

1✔
1064
        if curReady == nil || curBound == nil || curRunning == nil {
1✔
1065
                return
×
1066
        }
×
1067
        if curReady.Status == corev1.ConditionFalse && curRunning.Status == corev1.ConditionFalse &&
1✔
1068
                dvBoundOrPopulationInProgress(dataVolume, curBound) {
2✔
1069
                // Bound or in progress, not ready, and not running.
1✔
1070
                // Avoiding triggering an event for scratch space required since it will be addressed
1✔
1071
                // by CDI and sounds more drastic than it actually is.
1✔
1072
                if curRunning.Message != "" && curRunning.Message != common.ScratchSpaceRequired &&
1✔
1073
                        (orgRunning == nil || orgRunning.Message != curRunning.Message) {
1✔
1074
                        r.recorder.Event(dataVolume, corev1.EventTypeWarning, curRunning.Reason, curRunning.Message)
×
1075
                }
×
1076
        }
1077
}
1078

1079
func (r *ReconcilerBase) emitEvent(dataVolume *cdiv1.DataVolume, dataVolumeCopy *cdiv1.DataVolume, curPhase cdiv1.DataVolumePhase, originalCond []cdiv1.DataVolumeCondition, event *Event) error {
1✔
1080
        if !reflect.DeepEqual(dataVolume.ObjectMeta, dataVolumeCopy.ObjectMeta) {
1✔
1081
                return fmt.Errorf("meta update is not allowed in updateStatus phase")
×
1082
        }
×
1083
        // Update status subresource only if changed
1084
        if !reflect.DeepEqual(dataVolume.Status, dataVolumeCopy.Status) {
2✔
1085
                if err := r.client.Status().Update(context.TODO(), dataVolumeCopy); err != nil {
1✔
1086
                        r.log.Error(err, "unable to update datavolume status", "name", dataVolumeCopy.Name)
×
1087
                        return err
×
1088
                }
×
1089
                // Emit the event only on status phase change
1090
                if event.eventType != "" && curPhase != dataVolumeCopy.Status.Phase {
2✔
1091
                        r.recorder.Event(dataVolumeCopy, event.eventType, event.reason, event.message)
1✔
1092
                }
1✔
1093
                r.emitConditionEvent(dataVolumeCopy, originalCond)
1✔
1094
        }
1095
        return nil
1✔
1096
}
1097

1098
func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) error {
1✔
1099
        if err := controllerutil.SetControllerReference(dv, pvc, r.scheme); err != nil {
1✔
1100
                return err
×
1101
        }
×
1102

1103
        return r.updatePVC(pvc)
1✔
1104
}
1105

1106
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
1✔
1107
        httpClient = cc.BuildHTTPClient(httpClient)
1✔
1108
        url, err := cc.GetMetricsURL(pod)
1✔
1109
        if err != nil {
2✔
1110
                return err
1✔
1111
        }
1✔
1112
        if url == "" {
1✔
1113
                return nil
×
1114
        }
×
1115

1116
        // Used for both import and clone, so it should match both metric names
1117
        progressReport, err := cc.GetProgressReportFromURL(url, httpClient,
1✔
1118
                fmt.Sprintf("%s|%s", importMetrics.ImportProgressMetricName, cloneMetrics.CloneProgressMetricName),
1✔
1119
                string(dataVolumeCopy.UID))
1✔
1120
        if err != nil {
1✔
1121
                return err
×
1122
        }
×
1123
        if progressReport != "" {
2✔
1124
                if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
2✔
1125
                        dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
1✔
1126
                }
1✔
1127
        }
1128
        return nil
1✔
1129
}
1130

1131
// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
1132
// It also sets the appropriate OwnerReferences on the resource
1133
// which allows handleObject to discover the DataVolume resource
1134
// that 'owns' it.
1135
func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, targetPvcSpec *corev1.PersistentVolumeClaimSpec, namespace, name string, pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
1136
        labels := map[string]string{
1✔
1137
                common.CDILabelKey: common.CDILabelValue,
1✔
1138
        }
1✔
1139
        if util.ResolveVolumeMode(targetPvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
2✔
1140
                labels[common.KubePersistentVolumeFillingUpSuppressLabelKey] = common.KubePersistentVolumeFillingUpSuppressLabelValue
1✔
1141
        }
1✔
1142
        for k, v := range dataVolume.Labels {
2✔
1143
                labels[k] = v
1✔
1144
        }
1✔
1145

1146
        annotations := make(map[string]string)
1✔
1147
        for k, v := range dataVolume.ObjectMeta.Annotations {
2✔
1148
                annotations[k] = v
1✔
1149
        }
1✔
1150
        annotations[cc.AnnPodRestarts] = "0"
1✔
1151
        annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType))
1✔
1152
        if dataVolume.Spec.PriorityClassName != "" {
2✔
1153
                annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName
1✔
1154
        }
1✔
1155
        annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(context.TODO(), r.client, dataVolume.Spec.Preallocation))
1✔
1156
        annotations[cc.AnnCreatedForDataVolume] = string(dataVolume.UID)
1✔
1157

1✔
1158
        if dataVolume.Spec.Storage != nil && labels[common.PvcApplyStorageProfileLabel] == "true" {
1✔
1159
                isWebhookPvcRenderingEnabled, err := featuregates.IsWebhookPvcRenderingEnabled(r.client)
×
1160
                if err != nil {
×
1161
                        return nil, err
×
1162
                }
×
1163
                if isWebhookPvcRenderingEnabled {
×
1164
                        labels[common.PvcApplyStorageProfileLabel] = "true"
×
1165
                        if targetPvcSpec.VolumeMode == nil {
×
1166
                                targetPvcSpec.VolumeMode = ptr.To[corev1.PersistentVolumeMode](cdiv1.PersistentVolumeFromStorageProfile)
×
1167
                        }
×
1168
                }
1169
        }
1170

1171
        pvc := &corev1.PersistentVolumeClaim{
1✔
1172
                ObjectMeta: metav1.ObjectMeta{
1✔
1173
                        Namespace:   namespace,
1✔
1174
                        Name:        name,
1✔
1175
                        Labels:      labels,
1✔
1176
                        Annotations: annotations,
1✔
1177
                },
1✔
1178
                Spec: *targetPvcSpec,
1✔
1179
        }
1✔
1180

1✔
1181
        if pvcModifier != nil {
2✔
1182
                if err := pvcModifier(dataVolume, pvc); err != nil {
2✔
1183
                        return nil, err
1✔
1184
                }
1✔
1185
        }
1186

1187
        if pvc.Namespace == dataVolume.Namespace {
2✔
1188
                pvc.OwnerReferences = []metav1.OwnerReference{
1✔
1189
                        *metav1.NewControllerRef(dataVolume, schema.GroupVersionKind{
1✔
1190
                                Group:   cdiv1.SchemeGroupVersion.Group,
1✔
1191
                                Version: cdiv1.SchemeGroupVersion.Version,
1✔
1192
                                Kind:    "DataVolume",
1✔
1193
                        }),
1✔
1194
                }
1✔
1195
        } else {
1✔
1196
                if err := setAnnOwnedByDataVolume(pvc, dataVolume); err != nil {
×
1197
                        return nil, err
×
1198
                }
×
1199
                pvc.Annotations[cc.AnnOwnerUID] = string(dataVolume.UID)
×
1200
        }
1201

1202
        for _, anno := range delayedAnnotations {
2✔
1203
                delete(pvc.Annotations, anno)
1✔
1204
        }
1✔
1205

1206
        return pvc, nil
1✔
1207
}
1208

1209
// Whenever the controller updates a DV, we must make sure to nil out spec.source when using other population methods
1210
func (r *ReconcilerBase) updateDataVolume(dv *cdiv1.DataVolume) error {
1✔
1211
        // Restore so we don't nil out the dv that is being worked on
1✔
1212
        var sourceCopy *cdiv1.DataVolumeSource
1✔
1213

1✔
1214
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1215
                sourceCopy = dv.Spec.Source
1✔
1216
                dv.Spec.Source = nil
1✔
1217
        }
1✔
1218

1219
        err := r.client.Update(context.TODO(), dv)
1✔
1220
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1221
                dv.Spec.Source = sourceCopy
1✔
1222
        }
1✔
1223
        return err
1✔
1224
}
1225

1226
func (r *ReconcilerBase) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
1227
        return r.client.Update(context.TODO(), pvc)
1✔
1228
}
1✔
1229

1230
func newLongTermCloneTokenGenerator(key *rsa.PrivateKey) token.Generator {
×
1231
        return token.NewGenerator(common.ExtendedCloneTokenIssuer, key, 10*365*24*time.Hour)
×
1232
}
×
1233

1234
// storageClassWaitForFirstConsumer returns if the binding mode of a given storage class is WFFC
1235
func (r *ReconcilerBase) storageClassWaitForFirstConsumer(storageClass *string) (bool, error) {
1✔
1236
        storageClassBindingMode, err := r.getStorageClassBindingMode(storageClass)
1✔
1237
        if err != nil {
1✔
1238
                return false, err
×
1239
        }
×
1240

1241
        return storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer, nil
1✔
1242
}
1243

1244
// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
1245
func (r *ReconcilerBase) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1246
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1247
        if err != nil {
1✔
1248
                return false, err
×
1249
        }
×
1250

1251
        honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
1✔
1252
        if err != nil {
1✔
1253
                return false, err
×
1254
        }
×
1255

1256
        res := honorWaitForFirstConsumerEnabled && wffc &&
1✔
1257
                pvc.Status.Phase == corev1.ClaimPending
1✔
1258

1✔
1259
        return res, nil
1✔
1260
}
1261

1262
func (r *ReconcilerBase) shouldReconcileVolumeSourceCR(syncState *dvSyncState) bool {
1✔
1263
        if syncState.pvc == nil {
2✔
1264
                return true
1✔
1265
        }
1✔
1266
        phase := syncState.pvc.Annotations[cc.AnnPodPhase]
1✔
1267
        return phase != string(corev1.PodSucceeded) || syncState.dvMutated.Status.Phase != cdiv1.Succeeded
1✔
1268
}
1269

1270
// shouldBeMarkedPendingPopulation decides whether we should mark DV as PendingPopulation
1271
func (r *ReconcilerBase) shouldBeMarkedPendingPopulation(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1272
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1273
        if err != nil {
1✔
1274
                return false, err
×
1275
        }
×
1276
        nodeName := pvc.Annotations[cc.AnnSelectedNode]
1✔
1277
        immediateBindingRequested := cc.ImmediateBindingRequested(pvc)
1✔
1278

1✔
1279
        return wffc && nodeName == "" && !immediateBindingRequested, nil
1✔
1280
}
1281

1282
// handlePvcCreation works as a wrapper for non-clone PVC creation and error handling
1283
func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncState, pvcModifier pvcModifierFunc) error {
1✔
1284
        if syncState.pvc != nil {
2✔
1285
                return nil
1✔
1286
        }
1✔
1287
        if dvIsPrePopulated(syncState.dvMutated) {
1✔
1288
                return nil
×
1289
        }
×
1290
        // Creating the PVC
1291
        newPvc, err := r.createPvcForDatavolume(syncState.dvMutated, syncState.pvcSpec, pvcModifier)
1✔
1292
        if err != nil {
2✔
1293
                if cc.ErrQuotaExceeded(err) {
1✔
1294
                        syncErr := r.syncDataVolumeStatusPhaseWithEvent(syncState, cdiv1.Pending, nil, Event{corev1.EventTypeWarning, cc.ErrExceededQuota, err.Error()})
×
1295
                        if syncErr != nil {
×
1296
                                log.Error(syncErr, "failed to sync DataVolume status with event")
×
1297
                        }
×
1298
                }
1299
                return err
1✔
1300
        }
1301
        syncState.pvc = newPvc
1✔
1302

1✔
1303
        return nil
1✔
1304
}
1305

1306
// shouldUseCDIPopulator returns if the population of the PVC should be done using
1307
// CDI populators.
1308
// Currently it will use populators only if:
1309
// * storageClass used is CSI storageClass
1310
// * annotation cdi.kubevirt.io/storage.usePopulator is not set by user to "false"
1311
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
1✔
1312
        dv := syncState.dvMutated
1✔
1313
        if usePopulator, ok := dv.Annotations[cc.AnnUsePopulator]; ok {
2✔
1314
                boolUsePopulator, err := strconv.ParseBool(usePopulator)
1✔
1315
                if err != nil {
1✔
1316
                        return false, err
×
1317
                }
×
1318
                return boolUsePopulator, nil
1✔
1319
        }
1320
        log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
1✔
1321
        usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
1✔
1322
        if err != nil {
1✔
1323
                return false, err
×
1324
        }
×
1325
        if !usePopulator {
2✔
1326
                if syncState.pvcSpec.StorageClassName != nil {
2✔
1327
                        log.Info("Not using CDI populators, storage class is not a CSI storage", "storageClass", *syncState.pvcSpec.StorageClassName)
1✔
1328
                }
1✔
1329
        }
1330

1331
        return usePopulator, nil
1✔
1332
}
1333

1334
func (r *ReconcilerBase) pvcRequiresWork(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
1335
        if pvc == nil || dv == nil {
2✔
1336
                return true, nil
1✔
1337
        }
1✔
1338
        if pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
1339
                return false, nil
1✔
1340
        }
1✔
1341
        canAdopt, err := cc.AllowClaimAdoption(r.client, pvc, dv)
1✔
1342
        if err != nil {
1✔
1343
                return true, err
×
1344
        }
×
1345
        if canAdopt {
2✔
1346
                return false, nil
1✔
1347
        }
1✔
1348
        return true, nil
1✔
1349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc