• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5422

30 Jun 2025 04:38PM UTC coverage: 59.374% (-0.05%) from 59.422%
#5422

push

travis-ci

web-flow
datasources: introduce datasource pointers (#3760)

DataSource Pointers are a new DataSource Source in addition to the
existing Snapshot and PVC sources.

Pointers (Source.DataSource) allow one DataSource to point to the source
of another DataSource with some limitations:
1. The DataSource may not self-reference, this is validated at the API
   level.
2. The DataSource pointer reference chain may not exceed a maxium depth
   of 1.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

30 of 61 new or added lines in 4 files covered. (49.18%)

5 existing lines in 2 files now uncovered.

16968 of 28578 relevant lines covered (59.37%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.97
/pkg/controller/datavolume/controller-base.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "crypto/rsa"
22
        "encoding/json"
23
        "fmt"
24
        "net/http"
25
        "reflect"
26
        "strconv"
27
        "time"
28

29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        "github.com/pkg/errors"
32

33
        corev1 "k8s.io/api/core/v1"
34
        storagev1 "k8s.io/api/storage/v1"
35
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37
        "k8s.io/apimachinery/pkg/runtime"
38
        "k8s.io/apimachinery/pkg/runtime/schema"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/client-go/tools/record"
41
        "k8s.io/utils/ptr"
42

43
        "sigs.k8s.io/controller-runtime/pkg/client"
44
        "sigs.k8s.io/controller-runtime/pkg/controller"
45
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
46
        "sigs.k8s.io/controller-runtime/pkg/handler"
47
        "sigs.k8s.io/controller-runtime/pkg/manager"
48
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
49
        "sigs.k8s.io/controller-runtime/pkg/source"
50

51
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
52
        "kubevirt.io/containerized-data-importer/pkg/common"
53
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
54
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
55
        cloneMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-cloner"
56
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
57
        importMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-importer"
58
        "kubevirt.io/containerized-data-importer/pkg/token"
59
        "kubevirt.io/containerized-data-importer/pkg/util"
60
)
61

62
const (
63
        // ErrResourceExists provides a const to indicate a resource exists error
64
        ErrResourceExists = "ErrResourceExists"
65
        // ErrResourceMarkedForDeletion provides a const to indicate a resource marked for deletion error
66
        ErrResourceMarkedForDeletion = "ErrResourceMarkedForDeletion"
67
        // ErrClaimLost provides a const to indicate a claim is lost
68
        ErrClaimLost = "ErrClaimLost"
69

70
        // MessageResourceMarkedForDeletion provides a const to form a resource marked for deletion error message
71
        MessageResourceMarkedForDeletion = "Resource %q marked for deletion"
72
        // MessageResourceExists provides a const to form a resource exists error message
73
        MessageResourceExists = "Resource %q already exists and is not managed by DataVolume"
74
        // MessageErrClaimLost provides a const to form claim lost message
75
        MessageErrClaimLost = "PVC %s lost"
76

77
        dvPhaseField = "status.phase"
78

79
        claimRefField = "spec.claimRef"
80

81
        claimStorageClassNameField = "spec.storageClassName"
82
)
83

84
var (
85
        httpClient *http.Client
86

87
        delayedAnnotations = []string{
88
                cc.AnnPopulatedFor,
89
        }
90
)
91

92
// Event represents DV controller event
93
type Event struct {
94
        eventType string
95
        reason    string
96
        message   string
97
}
98

99
type statusPhaseSync struct {
100
        phase  cdiv1.DataVolumePhase
101
        pvcKey *client.ObjectKey
102
        event  Event
103
}
104

105
type dvSyncResult struct {
106
        result    *reconcile.Result
107
        phaseSync *statusPhaseSync
108
}
109

110
type dvSyncState struct {
111
        dv        *cdiv1.DataVolume
112
        dvMutated *cdiv1.DataVolume
113
        pvc       *corev1.PersistentVolumeClaim
114
        pvcSpec   *corev1.PersistentVolumeClaimSpec
115
        snapshot  *snapshotv1.VolumeSnapshot
116
        dvSyncResult
117
        usePopulator bool
118
}
119

120
// ReconcilerBase members
121
type ReconcilerBase struct {
122
        client               client.Client
123
        recorder             record.EventRecorder
124
        scheme               *runtime.Scheme
125
        log                  logr.Logger
126
        featureGates         featuregates.FeatureGates
127
        installerLabels      map[string]string
128
        shouldUpdateProgress bool
129
}
130

131
func pvcIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
132
        if pvc == nil || dv == nil {
1✔
133
                return false
×
134
        }
×
135
        dvName, ok := pvc.Annotations[cc.AnnPopulatedFor]
1✔
136
        return ok && dvName == dv.Name
1✔
137
}
138

139
func dvIsPrePopulated(dv *cdiv1.DataVolume) bool {
1✔
140
        _, ok := dv.Annotations[cc.AnnPrePopulated]
1✔
141
        return ok
1✔
142
}
1✔
143

144
func checkStaticProvisionPending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
145
        if pvc == nil || dv == nil {
2✔
146
                return false
1✔
147
        }
1✔
148
        if _, ok := dv.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
149
                return false
1✔
150
        }
1✔
151
        _, ok := pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
152
        return ok
1✔
153
}
154

155
func shouldSetDataVolumePending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
156
        if checkStaticProvisionPending(pvc, dv) {
2✔
157
                return true
1✔
158
        }
1✔
159

160
        if pvc != nil {
2✔
161
                return false
1✔
162
        }
1✔
163

164
        return dvIsPrePopulated(dv) || (dv.Status.Phase == cdiv1.PhaseUnset)
1✔
165
}
166

167
// dataVolumeOp is the datavolume's requested operation
168
type dataVolumeOp int
169

170
const (
171
        dataVolumeNop dataVolumeOp = iota
172
        dataVolumeImport
173
        dataVolumeUpload
174
        dataVolumePvcClone
175
        dataVolumeSnapshotClone
176
        dataVolumePopulator
177
)
178

179
type indexArgs struct {
180
        obj          client.Object
181
        field        string
182
        extractValue client.IndexerFunc
183
}
184

185
func getIndexArgs() []indexArgs {
1✔
186
        return []indexArgs{
1✔
187
                {
1✔
188
                        obj:   &cdiv1.DataVolume{},
1✔
189
                        field: dvPhaseField,
1✔
190
                        extractValue: func(obj client.Object) []string {
1✔
191
                                return []string{string(obj.(*cdiv1.DataVolume).Status.Phase)}
×
192
                        },
×
193
                },
194
                {
195
                        obj:   &corev1.PersistentVolume{},
196
                        field: claimRefField,
197
                        extractValue: func(obj client.Object) []string {
1✔
198
                                if pv, ok := obj.(*corev1.PersistentVolume); ok {
2✔
199
                                        if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace != "" && pv.Spec.ClaimRef.Name != "" {
2✔
200
                                                return []string{claimRefIndexKeyFunc(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)}
1✔
201
                                        }
1✔
202
                                }
203
                                return nil
×
204
                        },
205
                },
206
                {
207
                        obj:          &corev1.PersistentVolume{},
208
                        field:        claimStorageClassNameField,
209
                        extractValue: extractAvailablePersistentVolumeStorageClassName,
210
                },
211
        }
212
}
213

214
func claimRefIndexKeyFunc(namespace, name string) string {
1✔
215
        return namespace + "/" + name
1✔
216
}
1✔
217

218
// CreateCommonIndexes creates indexes used by all controllers
219
func CreateCommonIndexes(mgr manager.Manager) error {
×
220
        for _, ia := range getIndexArgs() {
×
221
                if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
×
222
                        return err
×
223
                }
×
224
        }
225
        return nil
×
226
}
227

228
// CreateAvailablePersistentVolumeIndex adds storage class name index for available PersistentVolumes
229
func CreateAvailablePersistentVolumeIndex(fieldIndexer client.FieldIndexer) error {
×
230
        return fieldIndexer.IndexField(context.TODO(), &corev1.PersistentVolume{},
×
231
                claimStorageClassNameField, extractAvailablePersistentVolumeStorageClassName)
×
232
}
×
233

234
func extractAvailablePersistentVolumeStorageClassName(obj client.Object) []string {
×
235
        if pv, ok := obj.(*corev1.PersistentVolume); ok && pv.Status.Phase == corev1.VolumeAvailable {
×
236
                return []string{pv.Spec.StorageClassName}
×
237
        }
×
238
        return nil
×
239
}
240

241
func addDataVolumeControllerCommonWatches(mgr manager.Manager, dataVolumeController controller.Controller, op dataVolumeOp) error {
×
242
        appendMatchingDataVolumeRequest := func(ctx context.Context, reqs []reconcile.Request, mgr manager.Manager, namespace, name string) []reconcile.Request {
×
243
                dvKey := types.NamespacedName{Namespace: namespace, Name: name}
×
244
                dv := &cdiv1.DataVolume{}
×
245
                if err := mgr.GetClient().Get(ctx, dvKey, dv); err != nil {
×
246
                        if !k8serrors.IsNotFound(err) {
×
247
                                mgr.GetLogger().Error(err, "Failed to get DV", "dvKey", dvKey)
×
248
                        }
×
249
                        return reqs
×
250
                }
251
                if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) == op {
×
252
                        reqs = append(reqs, reconcile.Request{NamespacedName: dvKey})
×
253
                }
×
254
                return reqs
×
255
        }
256

257
        // Setup watches
258
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](
×
259
                func(ctx context.Context, dv *cdiv1.DataVolume) []reconcile.Request {
×
260
                        if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) != op {
×
261
                                return nil
×
262
                        }
×
263
                        updatePendingDataVolumesGauge(ctx, mgr.GetLogger(), dv, mgr.GetClient())
×
264
                        return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}}}
×
265
                }),
266
        )); err != nil {
×
267
                return err
×
268
        }
×
269
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](
×
270
                func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
271
                        var result []reconcile.Request
×
272
                        owner := metav1.GetControllerOf(obj)
×
273
                        if owner != nil && owner.Kind == "DataVolume" {
×
274
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), owner.Name)
×
275
                        }
×
276
                        populatedFor := obj.GetAnnotations()[cc.AnnPopulatedFor]
×
277
                        if populatedFor != "" {
×
278
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), populatedFor)
×
279
                        }
×
280
                        // it is okay if result contains the same entry twice, will be deduplicated by caller
281
                        return result
×
282
                }),
283
        )); err != nil {
×
284
                return err
×
285
        }
×
286
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
287
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
288
                        owner := metav1.GetControllerOf(obj)
×
289
                        if owner == nil || owner.Kind != "DataVolume" {
×
290
                                return nil
×
291
                        }
×
292
                        return appendMatchingDataVolumeRequest(ctx, nil, mgr, obj.GetNamespace(), owner.Name)
×
293
                }),
294
        )); err != nil {
×
295
                return err
×
296
        }
×
297
        for _, k := range []client.Object{&corev1.PersistentVolumeClaim{}, &corev1.Pod{}, &cdiv1.ObjectTransfer{}} {
×
298
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
299
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
300
                                if !hasAnnOwnedByDataVolume(obj) {
×
301
                                        return nil
×
302
                                }
×
303
                                namespace, name, err := getAnnOwnedByDataVolume(obj)
×
304
                                if err != nil {
×
305
                                        return nil
×
306
                                }
×
307
                                return appendMatchingDataVolumeRequest(ctx, nil, mgr, namespace, name)
×
308
                        }),
309
                )); err != nil {
×
310
                        return err
×
311
                }
×
312
        }
313

314
        // Watch for StorageClass and StorageProfile updates and reconcile the DVs waiting for StorageClass or its complete StorageProfile.
315
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass with compelete StorageProfile yet,
316
        // so PVC cannot be created (test_id:9922).
317
        for _, k := range []client.Object{&storagev1.StorageClass{}, &cdiv1.StorageProfile{}} {
×
318
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
319
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
320
                                dvList := &cdiv1.DataVolumeList{}
×
321
                                if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
322
                                        return nil
×
323
                                }
×
324
                                var reqs []reconcile.Request
×
325
                                for _, dv := range dvList.Items {
×
326
                                        if getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
327
                                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
328
                                        }
×
329
                                }
330
                                return reqs
×
331
                        },
332
                ),
333
                )); err != nil {
×
334
                        return err
×
335
                }
×
336
        }
337

338
        // Watch for PV updates to reconcile the DVs waiting for available PV
339
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9924,9925)
340
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolume](
×
341
                func(ctx context.Context, pv *corev1.PersistentVolume) []reconcile.Request {
×
342
                        dvList := &cdiv1.DataVolumeList{}
×
343
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
344
                                return nil
×
345
                        }
×
346
                        var reqs []reconcile.Request
×
347
                        for _, dv := range dvList.Items {
×
348
                                storage := dv.Spec.Storage
×
349
                                if storage != nil &&
×
350
                                        storage.StorageClassName != nil &&
×
351
                                        *storage.StorageClassName == pv.Spec.StorageClassName &&
×
352
                                        pv.Status.Phase == corev1.VolumeAvailable &&
×
353
                                        getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
354
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
355
                                }
×
356
                        }
357
                        return reqs
×
358
                },
359
        ),
360
        )); err != nil {
×
361
                return err
×
362
        }
×
363

364
        return nil
×
365
}
366

367
func getDataVolumeOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
368
        src := dv.Spec.Source
×
369

×
370
        if dv.Spec.SourceRef != nil {
×
371
                return getSourceRefOp(ctx, log, dv, client)
×
372
        }
×
373
        if src != nil && src.PVC != nil {
×
374
                return dataVolumePvcClone
×
375
        }
×
376
        if src != nil && src.Snapshot != nil {
×
377
                return dataVolumeSnapshotClone
×
378
        }
×
379
        if src == nil {
×
380
                if dvUsesVolumePopulator(dv) {
×
381
                        return dataVolumePopulator
×
382
                }
×
383
                return dataVolumeNop
×
384
        }
385
        if src.Upload != nil {
×
386
                return dataVolumeUpload
×
387
        }
×
388
        if src.HTTP != nil || src.S3 != nil || src.GCS != nil || src.Registry != nil || src.Blank != nil || src.Imageio != nil || src.VDDK != nil {
×
389
                return dataVolumeImport
×
390
        }
×
391

392
        return dataVolumeNop
×
393
}
394

395
func getSourceRefOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
396
        dataSource := &cdiv1.DataSource{}
×
397
        ns := dv.Namespace
×
398
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
399
                ns = *dv.Spec.SourceRef.Namespace
×
400
        }
×
401
        nn := types.NamespacedName{Namespace: ns, Name: dv.Spec.SourceRef.Name}
×
402
        if err := client.Get(ctx, nn, dataSource); err != nil {
×
403
                log.Error(err, "Unable to get DataSource", "namespacedName", nn)
×
404
                return dataVolumeNop
×
405
        }
×
NEW
406
        resolved, err := cc.ResolveDataSourceChain(ctx, client, dataSource)
×
NEW
407
        if err != nil {
×
NEW
408
                log.Error(err, "Unable to resolve DataSource chain", "namespacedName", nn)
×
NEW
409
                return dataVolumeNop
×
NEW
410
        }
×
411

412
        switch {
×
NEW
413
        case resolved.Spec.Source.PVC != nil:
×
414
                return dataVolumePvcClone
×
NEW
415
        case resolved.Spec.Source.Snapshot != nil:
×
416
                return dataVolumeSnapshotClone
×
417
        default:
×
418
                return dataVolumeNop
×
419
        }
420
}
421

422
func updatePendingDataVolumesGauge(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, c client.Client) {
×
423
        if !cc.IsDataVolumeUsingDefaultStorageClass(dv) {
×
424
                return
×
425
        }
×
426

427
        countPending, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.Pending))
×
428
        if err != nil {
×
429
                log.V(3).Error(err, "Failed listing the pending DataVolumes")
×
430
                return
×
431
        }
×
432
        countUnset, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.PhaseUnset))
×
433
        if err != nil {
×
434
                log.V(3).Error(err, "Failed listing the unset DataVolumes")
×
435
                return
×
436
        }
×
437

438
        metrics.SetDataVolumePending(countPending + countUnset)
×
439
}
440

441
func getDefaultStorageClassDataVolumeCount(ctx context.Context, c client.Client, dvPhase string) (int, error) {
×
442
        dvList := &cdiv1.DataVolumeList{}
×
443
        if err := c.List(ctx, dvList, client.MatchingFields{dvPhaseField: dvPhase}); err != nil {
×
444
                return 0, err
×
445
        }
×
446

447
        dvCount := 0
×
448
        for _, dv := range dvList.Items {
×
449
                if cc.IsDataVolumeUsingDefaultStorageClass(&dv) {
×
450
                        dvCount++
×
451
                }
×
452
        }
453

454
        return dvCount, nil
×
455
}
456

457
type dvController interface {
458
        reconcile.Reconciler
459
        sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error)
460
        updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error
461
}
462

463
func (r *ReconcilerBase) reconcile(ctx context.Context, req reconcile.Request, dvc dvController) (reconcile.Result, error) {
1✔
464
        log := r.log.WithValues("DataVolume", req.NamespacedName)
1✔
465
        syncRes, syncErr := dvc.sync(log, req)
1✔
466
        res, err := r.updateStatus(req, syncRes.phaseSync, dvc)
1✔
467
        if syncErr != nil {
2✔
468
                err = syncErr
1✔
469
        }
1✔
470
        if syncRes.result != nil {
2✔
471
                res = *syncRes.result
1✔
472
        }
1✔
473
        return res, err
1✔
474
}
475

476
type dvSyncStateFunc func(*dvSyncState) error
477

478
func (r *ReconcilerBase) syncCommon(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
479
        syncState, err := r.syncDvPvcState(log, req, cleanup, prepare)
1✔
480
        if err == nil {
2✔
481
                err = r.syncUpdate(log, &syncState)
1✔
482
        }
1✔
483
        return syncState, err
1✔
484
}
485

486
func (r *ReconcilerBase) syncDvPvcState(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
487
        syncState := dvSyncState{}
1✔
488
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
489
        if dv == nil || err != nil {
2✔
490
                syncState.result = &reconcile.Result{}
1✔
491
                return syncState, err
1✔
492
        }
1✔
493
        syncState.dv = dv
1✔
494
        syncState.dvMutated = dv.DeepCopy()
1✔
495
        syncState.pvc, err = r.getPVC(req.NamespacedName)
1✔
496
        if err != nil {
1✔
497
                return syncState, err
×
498
        }
×
499

500
        if cleanup != nil {
2✔
501
                if err := cleanup(&syncState); err != nil {
1✔
502
                        return syncState, err
×
503
                }
×
504
        }
505

506
        if dv.DeletionTimestamp != nil {
1✔
507
                log.Info("DataVolume marked for deletion")
×
508
                syncState.result = &reconcile.Result{}
×
509
                return syncState, nil
×
510
        }
×
511

512
        if prepare != nil {
2✔
513
                if err := prepare(&syncState); err != nil {
1✔
514
                        return syncState, err
×
515
                }
×
516
        }
517

518
        syncState.pvcSpec, err = renderPvcSpec(r.client, r.recorder, log, syncState.dvMutated, syncState.pvc)
1✔
519
        if err != nil {
2✔
520
                if syncErr := r.syncDataVolumeStatusPhaseWithEvent(&syncState, cdiv1.PhaseUnset, nil,
1✔
521
                        Event{corev1.EventTypeWarning, cc.ErrClaimNotValid, err.Error()}); syncErr != nil {
1✔
522
                        log.Error(syncErr, "failed to sync DataVolume status with event")
×
523
                }
×
524
                if errors.Is(err, ErrStorageClassNotFound) {
2✔
525
                        syncState.result = &reconcile.Result{}
1✔
526
                        return syncState, nil
1✔
527
                }
1✔
528
                return syncState, err
1✔
529
        }
530

531
        syncState.usePopulator, err = r.shouldUseCDIPopulator(&syncState)
1✔
532
        if err != nil {
1✔
533
                return syncState, err
×
534
        }
×
535
        updateDataVolumeUseCDIPopulator(&syncState)
1✔
536

1✔
537
        if err := r.handleStaticVolume(&syncState, log); err != nil || syncState.result != nil {
2✔
538
                return syncState, err
1✔
539
        }
1✔
540

541
        if err := r.handleDelayedAnnotations(&syncState, log); err != nil || syncState.result != nil {
2✔
542
                return syncState, err
1✔
543
        }
1✔
544

545
        if err = updateDataVolumeDefaultInstancetypeLabels(r.client, &syncState); err != nil {
1✔
546
                return syncState, err
×
547
        }
×
548

549
        if syncState.pvc != nil {
2✔
550
                if err := r.validatePVC(dv, syncState.pvc); err != nil {
2✔
551
                        return syncState, err
1✔
552
                }
1✔
553
                r.handlePrePopulation(syncState.dvMutated, syncState.pvc)
1✔
554
        }
555

556
        return syncState, nil
1✔
557
}
558

559
func (r *ReconcilerBase) syncUpdate(log logr.Logger, syncState *dvSyncState) error {
1✔
560
        if syncState.dv == nil || syncState.dvMutated == nil {
2✔
561
                return nil
1✔
562
        }
1✔
563
        if !reflect.DeepEqual(syncState.dv.Status, syncState.dvMutated.Status) {
1✔
564
                return fmt.Errorf("status update is not allowed in sync phase")
×
565
        }
×
566
        if !reflect.DeepEqual(syncState.dv.ObjectMeta, syncState.dvMutated.ObjectMeta) {
2✔
567
                _, ok := syncState.dv.Annotations[cc.AnnExtendedCloneToken]
1✔
568
                _, ok2 := syncState.dvMutated.Annotations[cc.AnnExtendedCloneToken]
1✔
569
                if err := r.updateDataVolume(syncState.dvMutated); err != nil {
1✔
570
                        r.log.Error(err, "Unable to sync update dv meta", "name", syncState.dvMutated.Name)
×
571
                        return err
×
572
                }
×
573
                if !ok && ok2 {
2✔
574
                        delta := time.Since(syncState.dv.ObjectMeta.CreationTimestamp.Time)
1✔
575
                        log.V(3).Info("Adding extended DataVolume token took", "delta", delta)
1✔
576
                }
1✔
577
                syncState.dv = syncState.dvMutated.DeepCopy()
1✔
578
        }
579
        return nil
1✔
580
}
581

582
func (r *ReconcilerBase) handleStaticVolume(syncState *dvSyncState, log logr.Logger) error {
1✔
583
        if _, ok := syncState.dvMutated.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
584
                return nil
1✔
585
        }
1✔
586

587
        if syncState.pvc == nil {
2✔
588
                volumes, err := r.getAvailableVolumesForDV(syncState, log)
1✔
589
                if err != nil {
1✔
590
                        return err
×
591
                }
×
592

593
                if len(volumes) == 0 {
2✔
594
                        log.Info("No PVs for DV")
1✔
595
                        return nil
1✔
596
                }
1✔
597

598
                if err := r.handlePvcCreation(log, syncState, func(_ *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
2✔
599
                        bs, err := json.Marshal(volumes)
1✔
600
                        if err != nil {
1✔
601
                                return err
×
602
                        }
×
603
                        cc.AddAnnotation(pvc, cc.AnnPersistentVolumeList, string(bs))
1✔
604
                        return nil
1✔
605
                }); err != nil {
×
606
                        return err
×
607
                }
×
608

609
                // set result to make sure callers don't do anything else in sync
610
                syncState.result = &reconcile.Result{}
1✔
611
                return nil
1✔
612
        }
613

614
        volumeAnno, ok := syncState.pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
615
        if !ok {
1✔
616
                // etiher did not create the PVC here OR bind to expected PV succeeded
×
617
                return nil
×
618
        }
×
619

620
        if cc.IsUnbound(syncState.pvc) {
2✔
621
                // set result to make sure callers don't do anything else in sync
1✔
622
                syncState.result = &reconcile.Result{}
1✔
623
                return nil
1✔
624
        }
1✔
625

626
        var volumes []string
1✔
627
        if err := json.Unmarshal([]byte(volumeAnno), &volumes); err != nil {
1✔
628
                return err
×
629
        }
×
630

631
        for _, v := range volumes {
2✔
632
                if v == syncState.pvc.Spec.VolumeName {
2✔
633
                        pvcCpy := syncState.pvc.DeepCopy()
1✔
634
                        // handle as "populatedFor" going forward
1✔
635
                        cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, syncState.dvMutated.Name)
1✔
636
                        delete(pvcCpy.Annotations, cc.AnnPersistentVolumeList)
1✔
637
                        if err := r.updatePVC(pvcCpy); err != nil {
1✔
638
                                return err
×
639
                        }
×
640
                        syncState.pvc = pvcCpy
1✔
641
                        return nil
1✔
642
                }
643
        }
644

645
        // delete the pvc and hope for better luck...
646
        pvcCpy := syncState.pvc.DeepCopy()
1✔
647
        if err := r.client.Delete(context.TODO(), pvcCpy, &client.DeleteOptions{}); err != nil {
1✔
648
                return err
×
649
        }
×
650

651
        syncState.pvc = pvcCpy
1✔
652

1✔
653
        return fmt.Errorf("DataVolume bound to unexpected PV %s", syncState.pvc.Spec.VolumeName)
1✔
654
}
655

656
func (r *ReconcilerBase) handleDelayedAnnotations(syncState *dvSyncState, log logr.Logger) error {
1✔
657
        dataVolume := syncState.dv
1✔
658
        if dataVolume.Status.Phase != cdiv1.Succeeded {
2✔
659
                return nil
1✔
660
        }
1✔
661

662
        if syncState.pvc == nil {
1✔
663
                return nil
×
664
        }
×
665

666
        pvcCpy := syncState.pvc.DeepCopy()
1✔
667
        for _, anno := range delayedAnnotations {
2✔
668
                if val, ok := dataVolume.Annotations[anno]; ok {
2✔
669
                        // only add if not already present
1✔
670
                        if _, ok := pvcCpy.Annotations[anno]; !ok {
2✔
671
                                cc.AddAnnotation(pvcCpy, anno, val)
1✔
672
                        }
1✔
673
                }
674
        }
675

676
        if !reflect.DeepEqual(syncState.pvc, pvcCpy) {
2✔
677
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
678
                        return err
×
679
                }
×
680
                syncState.pvc = pvcCpy
1✔
681
                syncState.result = &reconcile.Result{}
1✔
682
        }
683

684
        return nil
1✔
685
}
686

687
func (r *ReconcilerBase) getAvailableVolumesForDV(syncState *dvSyncState, log logr.Logger) ([]string, error) {
1✔
688
        pvList := &corev1.PersistentVolumeList{}
1✔
689
        fields := client.MatchingFields{claimRefField: claimRefIndexKeyFunc(syncState.dv.Namespace, syncState.dv.Name)}
1✔
690
        if err := r.client.List(context.TODO(), pvList, fields); err != nil {
1✔
691
                return nil, err
×
692
        }
×
693
        if syncState.pvcSpec == nil {
1✔
694
                return nil, fmt.Errorf("missing pvc spec")
×
695
        }
×
696
        var pvNames []string
1✔
697
        for _, pv := range pvList.Items {
2✔
698
                if pv.Status.Phase == corev1.VolumeAvailable {
2✔
699
                        pvc := &corev1.PersistentVolumeClaim{
1✔
700
                                Spec: *syncState.pvcSpec,
1✔
701
                        }
1✔
702
                        if err := CheckVolumeSatisfyClaim(&pv, pvc); err != nil {
1✔
703
                                continue
×
704
                        }
705
                        log.Info("Found matching volume for DV", "pv", pv.Name)
1✔
706
                        pvNames = append(pvNames, pv.Name)
1✔
707
                }
708
        }
709
        return pvNames, nil
1✔
710
}
711

712
func (r *ReconcilerBase) handlePrePopulation(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) {
1✔
713
        if pvc.Status.Phase == corev1.ClaimBound && pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
714
                cc.AddAnnotation(dv, cc.AnnPrePopulated, pvc.Name)
1✔
715
        }
1✔
716
}
717

718
func (r *ReconcilerBase) validatePVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
719
        // If the PVC is being deleted, we should log a warning to the event recorder and return to wait the deletion complete
1✔
720
        // don't bother with owner refs is the pvc is deleted
1✔
721
        if pvc.DeletionTimestamp != nil {
1✔
722
                msg := fmt.Sprintf(MessageResourceMarkedForDeletion, pvc.Name)
×
723
                r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceMarkedForDeletion, msg)
×
724
                return errors.New(msg)
×
725
        }
×
726
        // If the PVC is not controlled by this DataVolume resource, we should log
727
        // a warning to the event recorder and return
728
        if !metav1.IsControlledBy(pvc, dv) {
2✔
729
                requiresWork, err := r.pvcRequiresWork(pvc, dv)
1✔
730
                if err != nil {
1✔
731
                        return err
×
732
                }
×
733
                if !requiresWork {
2✔
734
                        if err := r.addOwnerRef(pvc, dv); err != nil {
1✔
735
                                return err
×
736
                        }
×
737
                } else {
1✔
738
                        msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
1✔
739
                        r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceExists, msg)
1✔
740
                        return errors.New(msg)
1✔
741
                }
1✔
742
        }
743
        return nil
1✔
744
}
745

746
func (r *ReconcilerBase) getPVC(key types.NamespacedName) (*corev1.PersistentVolumeClaim, error) {
1✔
747
        pvc := &corev1.PersistentVolumeClaim{}
1✔
748
        if err := r.client.Get(context.TODO(), key, pvc); err != nil {
2✔
749
                if k8serrors.IsNotFound(err) {
2✔
750
                        return nil, nil
1✔
751
                }
1✔
752
                return nil, err
×
753
        }
754
        return pvc, nil
1✔
755
}
756

757
func (r *ReconcilerBase) getDataVolume(key types.NamespacedName) (*cdiv1.DataVolume, error) {
1✔
758
        dv := &cdiv1.DataVolume{}
1✔
759
        if err := r.client.Get(context.TODO(), key, dv); err != nil {
2✔
760
                if k8serrors.IsNotFound(err) {
2✔
761
                        return nil, nil
1✔
762
                }
1✔
763
                return nil, err
×
764
        }
765
        return dv, nil
1✔
766
}
767

768
type pvcModifierFunc func(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error
769

770
func (r *ReconcilerBase) createPvcForDatavolume(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec,
771
        pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
772
        newPvc, err := r.newPersistentVolumeClaim(datavolume, pvcSpec, datavolume.Namespace, datavolume.Name, pvcModifier)
1✔
773
        if err != nil {
2✔
774
                return nil, err
1✔
775
        }
1✔
776
        util.SetRecommendedLabels(newPvc, r.installerLabels, "cdi-controller")
1✔
777
        if err := r.client.Create(context.TODO(), newPvc); err != nil {
1✔
778
                return nil, err
×
779
        }
×
780
        return newPvc, nil
1✔
781
}
782

783
func (r *ReconcilerBase) getStorageClassBindingMode(storageClassName *string) (*storagev1.VolumeBindingMode, error) {
1✔
784
        // Handle unspecified storage class name, fallback to default storage class
1✔
785
        storageClass, err := cc.GetStorageClassByNameWithK8sFallback(context.TODO(), r.client, storageClassName)
1✔
786
        if err != nil {
1✔
787
                return nil, err
×
788
        }
×
789

790
        if storageClass != nil && storageClass.VolumeBindingMode != nil {
2✔
791
                return storageClass.VolumeBindingMode, nil
1✔
792
        }
1✔
793

794
        // no storage class, then the assumption is immediate binding
795
        volumeBindingImmediate := storagev1.VolumeBindingImmediate
1✔
796
        return &volumeBindingImmediate, nil
1✔
797
}
798

799
func (r *ReconcilerBase) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, result *reconcile.Result) error {
1✔
800
        var podNamespace string
1✔
801
        if datavolume.Status.Progress == "" {
2✔
802
                datavolume.Status.Progress = "N/A"
1✔
803
        }
1✔
804

805
        if !r.shouldUpdateProgress {
2✔
806
                return nil
1✔
807
        }
1✔
808

809
        if usePopulator, _ := CheckPVCUsingPopulators(pvc); usePopulator {
2✔
810
                if progress, ok := pvc.Annotations[cc.AnnPopulatorProgress]; ok {
2✔
811
                        datavolume.Status.Progress = cdiv1.DataVolumeProgress(progress)
1✔
812
                } else {
2✔
813
                        datavolume.Status.Progress = "N/A"
1✔
814
                }
1✔
815
                return nil
1✔
816
        }
817

818
        if datavolume.Spec.Source != nil && datavolume.Spec.Source.PVC != nil {
2✔
819
                podNamespace = datavolume.Spec.Source.PVC.Namespace
1✔
820
        } else {
2✔
821
                podNamespace = datavolume.Namespace
1✔
822
        }
1✔
823

824
        if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
2✔
825
                // Data volume completed progress, or failed, either way stop queueing the data volume.
1✔
826
                r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
1✔
827
                return nil
1✔
828
        }
1✔
829
        pod, err := cc.GetPodFromPvc(r.client, podNamespace, pvc)
1✔
830
        if err == nil {
1✔
831
                if pod.Status.Phase != corev1.PodRunning {
×
832
                        // Avoid long timeouts and error traces from HTTP get when pod is already gone
×
833
                        return nil
×
834
                }
×
835
                if err := updateProgressUsingPod(datavolume, pod); err != nil {
×
836
                        return err
×
837
                }
×
838
        }
839
        // We are not done yet, force a re-reconcile in 2 seconds to get an update.
840
        result.RequeueAfter = 2 * time.Second
1✔
841
        return nil
1✔
842
}
843

844
func (r *ReconcilerBase) syncDataVolumeStatusPhaseWithEvent(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim, event Event) error {
1✔
845
        if syncState.phaseSync != nil {
1✔
846
                return fmt.Errorf("phaseSync is already set")
×
847
        }
×
848
        syncState.phaseSync = &statusPhaseSync{phase: phase, event: event}
1✔
849
        if pvc != nil {
1✔
850
                key := client.ObjectKeyFromObject(pvc)
×
851
                syncState.phaseSync.pvcKey = &key
×
852
        }
×
853
        return nil
1✔
854
}
855

856
func (r *ReconcilerBase) updateDataVolumeStatusPhaseSync(ps *statusPhaseSync, dv *cdiv1.DataVolume, dvCopy *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
857
        var condPvc *corev1.PersistentVolumeClaim
1✔
858
        var err error
1✔
859
        if ps.pvcKey != nil {
1✔
860
                if pvc == nil || *ps.pvcKey != client.ObjectKeyFromObject(pvc) {
×
861
                        condPvc, err = r.getPVC(*ps.pvcKey)
×
862
                        if err != nil {
×
863
                                return err
×
864
                        }
×
865
                } else {
×
866
                        condPvc = pvc
×
867
                }
×
868
        }
869
        return r.updateDataVolumeStatusPhaseWithEvent(ps.phase, dv, dvCopy, condPvc, ps.event)
1✔
870
}
871

872
func (r *ReconcilerBase) updateDataVolumeStatusPhaseWithEvent(
873
        phase cdiv1.DataVolumePhase,
874
        dataVolume *cdiv1.DataVolume,
875
        dataVolumeCopy *cdiv1.DataVolume,
876
        pvc *corev1.PersistentVolumeClaim,
877
        event Event) error {
1✔
878
        if dataVolume == nil {
1✔
879
                return nil
×
880
        }
×
881

882
        curPhase := dataVolumeCopy.Status.Phase
1✔
883
        dataVolumeCopy.Status.Phase = phase
1✔
884

1✔
885
        reason := ""
1✔
886
        message := ""
1✔
887
        if pvc == nil {
2✔
888
                reason = event.reason
1✔
889
                message = event.message
1✔
890
        }
1✔
891
        r.updateConditions(dataVolumeCopy, pvc, reason, message)
1✔
892
        return r.emitEvent(dataVolume, dataVolumeCopy, curPhase, dataVolume.Status.Conditions, &event)
1✔
893
}
894

895
func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPhaseSync, dvc dvController) (reconcile.Result, error) {
1✔
896
        result := reconcile.Result{}
1✔
897
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
898
        if dv == nil || err != nil {
2✔
899
                return reconcile.Result{}, err
1✔
900
        }
1✔
901

902
        dataVolumeCopy := dv.DeepCopy()
1✔
903

1✔
904
        pvc, err := r.getPVC(req.NamespacedName)
1✔
905
        if err != nil {
1✔
906
                return reconcile.Result{}, err
×
907
        }
×
908

909
        if phaseSync != nil {
2✔
910
                err = r.updateDataVolumeStatusPhaseSync(phaseSync, dv, dataVolumeCopy, pvc)
1✔
911
                return reconcile.Result{}, err
1✔
912
        }
1✔
913

914
        curPhase := dataVolumeCopy.Status.Phase
1✔
915
        var event Event
1✔
916

1✔
917
        if shouldSetDataVolumePending(pvc, dataVolumeCopy) {
2✔
918
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
919
        } else if pvc != nil {
3✔
920
                dataVolumeCopy.Status.ClaimName = pvc.Name
1✔
921

1✔
922
                phase := pvc.Annotations[cc.AnnPodPhase]
1✔
923
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
924
                if err != nil {
1✔
925
                        return reconcile.Result{}, err
×
926
                }
×
927
                if phase == string(cdiv1.Succeeded) && requiresWork {
2✔
928
                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
929
                                return reconcile.Result{}, err
×
930
                        }
×
931
                } else {
1✔
932
                        switch pvc.Status.Phase {
1✔
933
                        case corev1.ClaimPending:
1✔
934
                                if requiresWork {
2✔
935
                                        if err := r.updateStatusPVCPending(pvc, dvc, dataVolumeCopy, &event); err != nil {
1✔
936
                                                return reconcile.Result{}, err
×
937
                                        }
×
938
                                } else {
1✔
939
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
940
                                }
1✔
941
                        case corev1.ClaimBound:
1✔
942
                                switch dataVolumeCopy.Status.Phase {
1✔
943
                                case cdiv1.Pending:
1✔
944
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
945
                                case cdiv1.WaitForFirstConsumer:
×
946
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
×
947
                                case cdiv1.Unknown:
1✔
948
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
949
                                }
950

951
                                if requiresWork {
2✔
952
                                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
953
                                                return reconcile.Result{}, err
×
954
                                        }
×
955
                                } else {
1✔
956
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
957
                                }
1✔
958

959
                        case corev1.ClaimLost:
1✔
960
                                dataVolumeCopy.Status.Phase = cdiv1.Failed
1✔
961
                                event.eventType = corev1.EventTypeWarning
1✔
962
                                event.reason = ErrClaimLost
1✔
963
                                event.message = fmt.Sprintf(MessageErrClaimLost, pvc.Name)
1✔
964
                        default:
1✔
965
                                if pvc.Status.Phase != "" {
1✔
966
                                        dataVolumeCopy.Status.Phase = cdiv1.Unknown
×
967
                                }
×
968
                        }
969
                }
970

971
                if i, err := strconv.ParseInt(pvc.Annotations[cc.AnnPodRestarts], 10, 32); err == nil && i >= 0 {
2✔
972
                        dataVolumeCopy.Status.RestartCount = int32(i)
1✔
973
                }
1✔
974
                if err := r.reconcileProgressUpdate(dataVolumeCopy, pvc, &result); err != nil {
1✔
975
                        return result, err
×
976
                }
×
977
        }
978

979
        currentCond := make([]cdiv1.DataVolumeCondition, len(dataVolumeCopy.Status.Conditions))
1✔
980
        copy(currentCond, dataVolumeCopy.Status.Conditions)
1✔
981
        r.updateConditions(dataVolumeCopy, pvc, "", "")
1✔
982
        return result, r.emitEvent(dv, dataVolumeCopy, curPhase, currentCond, &event)
1✔
983
}
984

985
func (r ReconcilerBase) updateStatusPVCPending(pvc *corev1.PersistentVolumeClaim, dvc dvController, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
986
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
987
        if err != nil {
1✔
988
                return err
×
989
        }
×
990
        if usePopulator {
2✔
991
                // when using populators the target pvc phase will stay pending until the population completes,
1✔
992
                // hence if not wffc we should update the dv phase according to the pod phase
1✔
993
                shouldBeMarkedPendingPopulation, err := r.shouldBeMarkedPendingPopulation(pvc)
1✔
994
                if err != nil {
1✔
995
                        return err
×
996
                }
×
997
                if shouldBeMarkedPendingPopulation {
2✔
998
                        dataVolumeCopy.Status.Phase = cdiv1.PendingPopulation
1✔
999
                } else if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, event); err != nil {
2✔
1000
                        return err
×
1001
                }
×
1002
                return nil
1✔
1003
        }
1004

1005
        shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
1✔
1006
        if err != nil {
1✔
1007
                return err
×
1008
        }
×
1009
        if shouldBeMarkedWaitForFirstConsumer {
2✔
1010
                dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
1✔
1011
        } else {
2✔
1012
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
1013
        }
1✔
1014
        return nil
1✔
1015
}
1016

1017
func (r *ReconcilerBase) updateConditions(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, reason, message string) {
1✔
1018
        var anno map[string]string
1✔
1019

1✔
1020
        if dataVolume.Status.Conditions == nil {
2✔
1021
                dataVolume.Status.Conditions = make([]cdiv1.DataVolumeCondition, 0)
1✔
1022
        }
1✔
1023

1024
        if pvc != nil {
2✔
1025
                anno = pvc.Annotations
1✔
1026
        } else {
2✔
1027
                anno = make(map[string]string)
1✔
1028
        }
1✔
1029

1030
        var readyStatus corev1.ConditionStatus
1✔
1031
        switch dataVolume.Status.Phase {
1✔
1032
        case cdiv1.Succeeded:
1✔
1033
                readyStatus = corev1.ConditionTrue
1✔
1034
        case cdiv1.Unknown:
×
1035
                readyStatus = corev1.ConditionUnknown
×
1036
        default:
1✔
1037
                readyStatus = corev1.ConditionFalse
1✔
1038
        }
1039

1040
        dataVolume.Status.Conditions = updateBoundCondition(dataVolume.Status.Conditions, pvc, message, reason)
1✔
1041
        dataVolume.Status.Conditions = UpdateReadyCondition(dataVolume.Status.Conditions, readyStatus, message, reason)
1✔
1042
        dataVolume.Status.Conditions = updateRunningCondition(dataVolume.Status.Conditions, anno)
1✔
1043
}
1044

1045
func (r *ReconcilerBase) emitConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1046
        r.emitBoundConditionEvent(dataVolume, FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions), FindConditionByType(cdiv1.DataVolumeBound, originalCond))
1✔
1047
        r.emitFailureConditionEvent(dataVolume, originalCond)
1✔
1048
}
1✔
1049

1050
func (r *ReconcilerBase) emitBoundConditionEvent(dataVolume *cdiv1.DataVolume, current, original *cdiv1.DataVolumeCondition) {
1✔
1051
        // We know reason and message won't be empty for bound.
1✔
1052
        if current != nil && (original == nil || current.Status != original.Status || current.Reason != original.Reason || current.Message != original.Message) {
2✔
1053
                r.recorder.Event(dataVolume, corev1.EventTypeNormal, current.Reason, current.Message)
1✔
1054
        }
1✔
1055
}
1056

1057
func (r *ReconcilerBase) emitFailureConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1058
        curReady := FindConditionByType(cdiv1.DataVolumeReady, dataVolume.Status.Conditions)
1✔
1059
        curBound := FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions)
1✔
1060
        curRunning := FindConditionByType(cdiv1.DataVolumeRunning, dataVolume.Status.Conditions)
1✔
1061
        orgRunning := FindConditionByType(cdiv1.DataVolumeRunning, originalCond)
1✔
1062

1✔
1063
        if curReady == nil || curBound == nil || curRunning == nil {
1✔
1064
                return
×
1065
        }
×
1066
        if curReady.Status == corev1.ConditionFalse && curRunning.Status == corev1.ConditionFalse &&
1✔
1067
                dvBoundOrPopulationInProgress(dataVolume, curBound) {
2✔
1068
                // Bound or in progress, not ready, and not running.
1✔
1069
                // Avoiding triggering an event for scratch space required since it will be addressed
1✔
1070
                // by CDI and sounds more drastic than it actually is.
1✔
1071
                if curRunning.Message != "" && curRunning.Message != common.ScratchSpaceRequired &&
1✔
1072
                        (orgRunning == nil || orgRunning.Message != curRunning.Message) {
1✔
1073
                        r.recorder.Event(dataVolume, corev1.EventTypeWarning, curRunning.Reason, curRunning.Message)
×
1074
                }
×
1075
        }
1076
}
1077

1078
func (r *ReconcilerBase) emitEvent(dataVolume *cdiv1.DataVolume, dataVolumeCopy *cdiv1.DataVolume, curPhase cdiv1.DataVolumePhase, originalCond []cdiv1.DataVolumeCondition, event *Event) error {
1✔
1079
        if !reflect.DeepEqual(dataVolume.ObjectMeta, dataVolumeCopy.ObjectMeta) {
1✔
1080
                return fmt.Errorf("meta update is not allowed in updateStatus phase")
×
1081
        }
×
1082
        // Update status subresource only if changed
1083
        if !reflect.DeepEqual(dataVolume.Status, dataVolumeCopy.Status) {
2✔
1084
                if err := r.client.Status().Update(context.TODO(), dataVolumeCopy); err != nil {
1✔
1085
                        r.log.Error(err, "unable to update datavolume status", "name", dataVolumeCopy.Name)
×
1086
                        return err
×
1087
                }
×
1088
                // Emit the event only on status phase change
1089
                if event.eventType != "" && curPhase != dataVolumeCopy.Status.Phase {
2✔
1090
                        r.recorder.Event(dataVolumeCopy, event.eventType, event.reason, event.message)
1✔
1091
                }
1✔
1092
                r.emitConditionEvent(dataVolumeCopy, originalCond)
1✔
1093
        }
1094
        return nil
1✔
1095
}
1096

1097
func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) error {
1✔
1098
        if err := controllerutil.SetControllerReference(dv, pvc, r.scheme); err != nil {
1✔
1099
                return err
×
1100
        }
×
1101

1102
        return r.updatePVC(pvc)
1✔
1103
}
1104

1105
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
1✔
1106
        httpClient = cc.BuildHTTPClient(httpClient)
1✔
1107
        url, err := cc.GetMetricsURL(pod)
1✔
1108
        if err != nil {
2✔
1109
                return err
1✔
1110
        }
1✔
1111
        if url == "" {
1✔
1112
                return nil
×
1113
        }
×
1114

1115
        // Used for both import and clone, so it should match both metric names
1116
        progressReport, err := cc.GetProgressReportFromURL(context.TODO(), url, httpClient,
1✔
1117
                fmt.Sprintf("%s|%s", importMetrics.ImportProgressMetricName, cloneMetrics.CloneProgressMetricName),
1✔
1118
                string(dataVolumeCopy.UID))
1✔
1119
        if err != nil {
1✔
1120
                return err
×
1121
        }
×
1122
        if progressReport != "" {
2✔
1123
                if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
2✔
1124
                        dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
1✔
1125
                }
1✔
1126
        }
1127
        return nil
1✔
1128
}
1129

1130
// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
1131
// It also sets the appropriate OwnerReferences on the resource
1132
// which allows handleObject to discover the DataVolume resource
1133
// that 'owns' it.
1134
func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, targetPvcSpec *corev1.PersistentVolumeClaimSpec, namespace, name string, pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
1135
        labels := map[string]string{
1✔
1136
                common.CDILabelKey: common.CDILabelValue,
1✔
1137
        }
1✔
1138
        if util.ResolveVolumeMode(targetPvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
2✔
1139
                labels[common.KubePersistentVolumeFillingUpSuppressLabelKey] = common.KubePersistentVolumeFillingUpSuppressLabelValue
1✔
1140
        }
1✔
1141
        for k, v := range dataVolume.Labels {
2✔
1142
                labels[k] = v
1✔
1143
        }
1✔
1144

1145
        annotations := make(map[string]string)
1✔
1146
        for k, v := range dataVolume.ObjectMeta.Annotations {
2✔
1147
                annotations[k] = v
1✔
1148
        }
1✔
1149
        annotations[cc.AnnPodRestarts] = "0"
1✔
1150
        annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType))
1✔
1151
        if dataVolume.Spec.PriorityClassName != "" {
2✔
1152
                annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName
1✔
1153
        }
1✔
1154
        annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(context.TODO(), r.client, dataVolume.Spec.Preallocation))
1✔
1155
        annotations[cc.AnnCreatedForDataVolume] = string(dataVolume.UID)
1✔
1156

1✔
1157
        if dataVolume.Spec.Storage != nil && labels[common.PvcApplyStorageProfileLabel] == "true" {
1✔
1158
                isWebhookPvcRenderingEnabled, err := featuregates.IsWebhookPvcRenderingEnabled(r.client)
×
1159
                if err != nil {
×
1160
                        return nil, err
×
1161
                }
×
1162
                if isWebhookPvcRenderingEnabled {
×
1163
                        labels[common.PvcApplyStorageProfileLabel] = "true"
×
1164
                        if targetPvcSpec.VolumeMode == nil {
×
1165
                                targetPvcSpec.VolumeMode = ptr.To[corev1.PersistentVolumeMode](cdiv1.PersistentVolumeFromStorageProfile)
×
1166
                        }
×
1167
                }
1168
        }
1169

1170
        pvc := &corev1.PersistentVolumeClaim{
1✔
1171
                ObjectMeta: metav1.ObjectMeta{
1✔
1172
                        Namespace:   namespace,
1✔
1173
                        Name:        name,
1✔
1174
                        Labels:      labels,
1✔
1175
                        Annotations: annotations,
1✔
1176
                },
1✔
1177
                Spec: *targetPvcSpec,
1✔
1178
        }
1✔
1179

1✔
1180
        if pvcModifier != nil {
2✔
1181
                if err := pvcModifier(dataVolume, pvc); err != nil {
2✔
1182
                        return nil, err
1✔
1183
                }
1✔
1184
        }
1185

1186
        if pvc.Namespace == dataVolume.Namespace {
2✔
1187
                pvc.OwnerReferences = []metav1.OwnerReference{
1✔
1188
                        *metav1.NewControllerRef(dataVolume, schema.GroupVersionKind{
1✔
1189
                                Group:   cdiv1.SchemeGroupVersion.Group,
1✔
1190
                                Version: cdiv1.SchemeGroupVersion.Version,
1✔
1191
                                Kind:    "DataVolume",
1✔
1192
                        }),
1✔
1193
                }
1✔
1194
        } else {
1✔
1195
                if err := setAnnOwnedByDataVolume(pvc, dataVolume); err != nil {
×
1196
                        return nil, err
×
1197
                }
×
1198
                pvc.Annotations[cc.AnnOwnerUID] = string(dataVolume.UID)
×
1199
        }
1200

1201
        for _, anno := range delayedAnnotations {
2✔
1202
                delete(pvc.Annotations, anno)
1✔
1203
        }
1✔
1204

1205
        return pvc, nil
1✔
1206
}
1207

1208
// Whenever the controller updates a DV, we must make sure to nil out spec.source when using other population methods
1209
func (r *ReconcilerBase) updateDataVolume(dv *cdiv1.DataVolume) error {
1✔
1210
        // Restore so we don't nil out the dv that is being worked on
1✔
1211
        var sourceCopy *cdiv1.DataVolumeSource
1✔
1212

1✔
1213
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1214
                sourceCopy = dv.Spec.Source
1✔
1215
                dv.Spec.Source = nil
1✔
1216
        }
1✔
1217

1218
        err := r.client.Update(context.TODO(), dv)
1✔
1219
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1220
                dv.Spec.Source = sourceCopy
1✔
1221
        }
1✔
1222
        return err
1✔
1223
}
1224

1225
func (r *ReconcilerBase) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
1226
        return r.client.Update(context.TODO(), pvc)
1✔
1227
}
1✔
1228

1229
func newLongTermCloneTokenGenerator(key *rsa.PrivateKey) token.Generator {
×
1230
        return token.NewGenerator(common.ExtendedCloneTokenIssuer, key, 10*365*24*time.Hour)
×
1231
}
×
1232

1233
// storageClassWaitForFirstConsumer returns if the binding mode of a given storage class is WFFC
1234
func (r *ReconcilerBase) storageClassWaitForFirstConsumer(storageClass *string) (bool, error) {
1✔
1235
        storageClassBindingMode, err := r.getStorageClassBindingMode(storageClass)
1✔
1236
        if err != nil {
1✔
1237
                return false, err
×
1238
        }
×
1239

1240
        return storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer, nil
1✔
1241
}
1242

1243
// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
1244
func (r *ReconcilerBase) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1245
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1246
        if err != nil {
1✔
1247
                return false, err
×
1248
        }
×
1249

1250
        honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
1✔
1251
        if err != nil {
1✔
1252
                return false, err
×
1253
        }
×
1254

1255
        res := honorWaitForFirstConsumerEnabled && wffc &&
1✔
1256
                pvc.Status.Phase == corev1.ClaimPending
1✔
1257

1✔
1258
        return res, nil
1✔
1259
}
1260

1261
func (r *ReconcilerBase) shouldReconcileVolumeSourceCR(syncState *dvSyncState) bool {
1✔
1262
        if syncState.pvc == nil {
2✔
1263
                return true
1✔
1264
        }
1✔
1265
        phase := syncState.pvc.Annotations[cc.AnnPodPhase]
1✔
1266
        return phase != string(corev1.PodSucceeded) || syncState.dvMutated.Status.Phase != cdiv1.Succeeded
1✔
1267
}
1268

1269
// shouldBeMarkedPendingPopulation decides whether we should mark DV as PendingPopulation
1270
func (r *ReconcilerBase) shouldBeMarkedPendingPopulation(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1271
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1272
        if err != nil {
1✔
1273
                return false, err
×
1274
        }
×
1275
        nodeName := pvc.Annotations[cc.AnnSelectedNode]
1✔
1276
        immediateBindingRequested := cc.ImmediateBindingRequested(pvc)
1✔
1277

1✔
1278
        return wffc && nodeName == "" && !immediateBindingRequested, nil
1✔
1279
}
1280

1281
// handlePvcCreation works as a wrapper for non-clone PVC creation and error handling
1282
func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncState, pvcModifier pvcModifierFunc) error {
1✔
1283
        if syncState.pvc != nil {
2✔
1284
                return nil
1✔
1285
        }
1✔
1286
        if dvIsPrePopulated(syncState.dvMutated) {
1✔
1287
                return nil
×
1288
        }
×
1289
        // Creating the PVC
1290
        newPvc, err := r.createPvcForDatavolume(syncState.dvMutated, syncState.pvcSpec, pvcModifier)
1✔
1291
        if err != nil {
2✔
1292
                if cc.ErrQuotaExceeded(err) {
1✔
1293
                        syncErr := r.syncDataVolumeStatusPhaseWithEvent(syncState, cdiv1.Pending, nil, Event{corev1.EventTypeWarning, cc.ErrExceededQuota, err.Error()})
×
1294
                        if syncErr != nil {
×
1295
                                log.Error(syncErr, "failed to sync DataVolume status with event")
×
1296
                        }
×
1297
                }
1298
                return err
1✔
1299
        }
1300
        syncState.pvc = newPvc
1✔
1301

1✔
1302
        return nil
1✔
1303
}
1304

1305
// shouldUseCDIPopulator returns if the population of the PVC should be done using
1306
// CDI populators.
1307
// Currently it will use populators only if:
1308
// * storageClass used is CSI storageClass
1309
// * annotation cdi.kubevirt.io/storage.usePopulator is not set by user to "false"
1310
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
1✔
1311
        dv := syncState.dvMutated
1✔
1312
        if usePopulator, ok := dv.Annotations[cc.AnnUsePopulator]; ok {
2✔
1313
                boolUsePopulator, err := strconv.ParseBool(usePopulator)
1✔
1314
                if err != nil {
1✔
1315
                        return false, err
×
1316
                }
×
1317
                return boolUsePopulator, nil
1✔
1318
        }
1319
        log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
1✔
1320
        usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
1✔
1321
        if err != nil {
1✔
1322
                return false, err
×
1323
        }
×
1324
        if !usePopulator {
2✔
1325
                if syncState.pvcSpec.StorageClassName != nil {
2✔
1326
                        log.Info("Not using CDI populators, storage class is not a CSI storage", "storageClass", *syncState.pvcSpec.StorageClassName)
1✔
1327
                }
1✔
1328
        }
1329

1330
        return usePopulator, nil
1✔
1331
}
1332

1333
func (r *ReconcilerBase) pvcRequiresWork(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
1334
        if pvc == nil || dv == nil {
2✔
1335
                return true, nil
1✔
1336
        }
1✔
1337
        if pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
1338
                return false, nil
1✔
1339
        }
1✔
1340
        canAdopt, err := cc.AllowClaimAdoption(r.client, pvc, dv)
1✔
1341
        if err != nil {
1✔
1342
                return true, err
×
1343
        }
×
1344
        if canAdopt {
2✔
1345
                return false, nil
1✔
1346
        }
1✔
1347
        return true, nil
1✔
1348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc