• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4794

14 Jul 2024 06:12PM UTC coverage: 58.983% (+0.01%) from 58.972%
#4794

push

travis-ci

web-flow
update to k8s 1.30 libs and controller-runtime 0.18.4 (#3336)

* make deps-update

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* ReourceRequirements -> VolumeResourceRequirements

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix calls to controller.Watch()

controller-runtime changed the API!

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* Fix errors with actual openshift/library-go lib

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* make all works now and everything compiles

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix "make update-codegen" because generate_groups.sh deprecated

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* run "make generate"

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix transfer unittest because of change to controller-runtime

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

---------

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

6 of 238 new or added lines in 24 files covered. (2.52%)

10 existing lines in 4 files now uncovered.

16454 of 27896 relevant lines covered (58.98%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.41
/pkg/controller/datavolume/controller-base.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "crypto/rsa"
22
        "encoding/json"
23
        "fmt"
24
        "net/http"
25
        "reflect"
26
        "strconv"
27
        "time"
28

29
        "github.com/go-logr/logr"
30
        "github.com/pkg/errors"
31

32
        corev1 "k8s.io/api/core/v1"
33
        storagev1 "k8s.io/api/storage/v1"
34
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/runtime"
37
        "k8s.io/apimachinery/pkg/runtime/schema"
38
        "k8s.io/apimachinery/pkg/types"
39
        "k8s.io/client-go/tools/record"
40
        "k8s.io/utils/ptr"
41

42
        "sigs.k8s.io/controller-runtime/pkg/client"
43
        "sigs.k8s.io/controller-runtime/pkg/controller"
44
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
45
        "sigs.k8s.io/controller-runtime/pkg/handler"
46
        "sigs.k8s.io/controller-runtime/pkg/manager"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48
        "sigs.k8s.io/controller-runtime/pkg/source"
49

50
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
51
        "kubevirt.io/containerized-data-importer/pkg/common"
52
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
53
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
54
        cloneMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-cloner"
55
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
56
        importMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-importer"
57
        "kubevirt.io/containerized-data-importer/pkg/token"
58
        "kubevirt.io/containerized-data-importer/pkg/util"
59
)
60

61
const (
62
        // ErrResourceExists provides a const to indicate a resource exists error
63
        ErrResourceExists = "ErrResourceExists"
64
        // ErrResourceMarkedForDeletion provides a const to indicate a resource marked for deletion error
65
        ErrResourceMarkedForDeletion = "ErrResourceMarkedForDeletion"
66
        // ErrClaimLost provides a const to indicate a claim is lost
67
        ErrClaimLost = "ErrClaimLost"
68

69
        // MessageResourceMarkedForDeletion provides a const to form a resource marked for deletion error message
70
        MessageResourceMarkedForDeletion = "Resource %q marked for deletion"
71
        // MessageResourceExists provides a const to form a resource exists error message
72
        MessageResourceExists = "Resource %q already exists and is not managed by DataVolume"
73
        // MessageErrClaimLost provides a const to form claim lost message
74
        MessageErrClaimLost = "PVC %s lost"
75

76
        dvPhaseField = "status.phase"
77

78
        claimRefField = "spec.claimRef"
79

80
        claimStorageClassNameField = "spec.storageClassName"
81
)
82

83
var (
84
        httpClient *http.Client
85

86
        delayedAnnotations = []string{
87
                cc.AnnPopulatedFor,
88
        }
89
)
90

91
// Event represents DV controller event
92
type Event struct {
93
        eventType string
94
        reason    string
95
        message   string
96
}
97

98
type statusPhaseSync struct {
99
        phase  cdiv1.DataVolumePhase
100
        pvcKey *client.ObjectKey
101
        event  Event
102
}
103

104
type dvSyncResult struct {
105
        result    *reconcile.Result
106
        phaseSync *statusPhaseSync
107
}
108

109
type dvSyncState struct {
110
        dv        *cdiv1.DataVolume
111
        dvMutated *cdiv1.DataVolume
112
        pvc       *corev1.PersistentVolumeClaim
113
        pvcSpec   *corev1.PersistentVolumeClaimSpec
114
        dvSyncResult
115
        usePopulator bool
116
}
117

118
// ReconcilerBase members
119
type ReconcilerBase struct {
120
        client               client.Client
121
        recorder             record.EventRecorder
122
        scheme               *runtime.Scheme
123
        log                  logr.Logger
124
        featureGates         featuregates.FeatureGates
125
        installerLabels      map[string]string
126
        shouldUpdateProgress bool
127
}
128

129
func pvcIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
130
        if pvc == nil || dv == nil {
1✔
131
                return false
×
132
        }
×
133
        dvName, ok := pvc.Annotations[cc.AnnPopulatedFor]
1✔
134
        return ok && dvName == dv.Name
1✔
135
}
136

137
func dvIsPrePopulated(dv *cdiv1.DataVolume) bool {
1✔
138
        _, ok := dv.Annotations[cc.AnnPrePopulated]
1✔
139
        return ok
1✔
140
}
1✔
141

142
func checkStaticProvisionPending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
143
        if pvc == nil || dv == nil {
2✔
144
                return false
1✔
145
        }
1✔
146
        if _, ok := dv.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
147
                return false
1✔
148
        }
1✔
149
        _, ok := pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
150
        return ok
1✔
151
}
152

153
func shouldSetDataVolumePending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
154
        if checkStaticProvisionPending(pvc, dv) {
2✔
155
                return true
1✔
156
        }
1✔
157

158
        if pvc != nil {
2✔
159
                return false
1✔
160
        }
1✔
161

162
        return dvIsPrePopulated(dv) || (dv.Status.Phase == cdiv1.PhaseUnset)
1✔
163
}
164

165
// dataVolumeOp is the datavolume's requested operation
166
type dataVolumeOp int
167

168
const (
169
        dataVolumeNop dataVolumeOp = iota
170
        dataVolumeImport
171
        dataVolumeUpload
172
        dataVolumePvcClone
173
        dataVolumeSnapshotClone
174
        dataVolumePopulator
175
)
176

177
type indexArgs struct {
178
        obj          client.Object
179
        field        string
180
        extractValue client.IndexerFunc
181
}
182

183
func getIndexArgs() []indexArgs {
1✔
184
        return []indexArgs{
1✔
185
                {
1✔
186
                        obj:   &cdiv1.DataVolume{},
1✔
187
                        field: dvPhaseField,
1✔
188
                        extractValue: func(obj client.Object) []string {
1✔
189
                                return []string{string(obj.(*cdiv1.DataVolume).Status.Phase)}
×
190
                        },
×
191
                },
192
                {
193
                        obj:   &corev1.PersistentVolume{},
194
                        field: claimRefField,
195
                        extractValue: func(obj client.Object) []string {
1✔
196
                                if pv, ok := obj.(*corev1.PersistentVolume); ok {
2✔
197
                                        if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace != "" && pv.Spec.ClaimRef.Name != "" {
2✔
198
                                                return []string{claimRefIndexKeyFunc(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)}
1✔
199
                                        }
1✔
200
                                }
201
                                return nil
×
202
                        },
203
                },
204
                {
205
                        obj:          &corev1.PersistentVolume{},
206
                        field:        claimStorageClassNameField,
207
                        extractValue: extractAvailablePersistentVolumeStorageClassName,
208
                },
209
        }
210
}
211

212
func claimRefIndexKeyFunc(namespace, name string) string {
1✔
213
        return namespace + "/" + name
1✔
214
}
1✔
215

216
// CreateCommonIndexes creates indexes used by all controllers
217
func CreateCommonIndexes(mgr manager.Manager) error {
×
218
        for _, ia := range getIndexArgs() {
×
219
                if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
×
220
                        return err
×
221
                }
×
222
        }
223
        return nil
×
224
}
225

226
// CreateAvailablePersistentVolumeIndex adds storage class name index for available PersistentVolumes
227
func CreateAvailablePersistentVolumeIndex(fieldIndexer client.FieldIndexer) error {
×
228
        return fieldIndexer.IndexField(context.TODO(), &corev1.PersistentVolume{},
×
229
                claimStorageClassNameField, extractAvailablePersistentVolumeStorageClassName)
×
230
}
×
231

232
func extractAvailablePersistentVolumeStorageClassName(obj client.Object) []string {
×
233
        if pv, ok := obj.(*corev1.PersistentVolume); ok && pv.Status.Phase == corev1.VolumeAvailable {
×
234
                return []string{pv.Spec.StorageClassName}
×
235
        }
×
236
        return nil
×
237
}
238

239
func addDataVolumeControllerCommonWatches(mgr manager.Manager, dataVolumeController controller.Controller, op dataVolumeOp) error {
×
240
        appendMatchingDataVolumeRequest := func(ctx context.Context, reqs []reconcile.Request, mgr manager.Manager, namespace, name string) []reconcile.Request {
×
241
                dvKey := types.NamespacedName{Namespace: namespace, Name: name}
×
242
                dv := &cdiv1.DataVolume{}
×
243
                if err := mgr.GetClient().Get(ctx, dvKey, dv); err != nil {
×
244
                        if !k8serrors.IsNotFound(err) {
×
245
                                mgr.GetLogger().Error(err, "Failed to get DV", "dvKey", dvKey)
×
246
                        }
×
247
                        return reqs
×
248
                }
249
                if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) == op {
×
250
                        reqs = append(reqs, reconcile.Request{NamespacedName: dvKey})
×
251
                }
×
252
                return reqs
×
253
        }
254

255
        // Setup watches
NEW
256
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](
×
NEW
257
                func(ctx context.Context, dv *cdiv1.DataVolume) []reconcile.Request {
×
258
                        if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) != op {
×
259
                                return nil
×
260
                        }
×
261
                        updatePendingDataVolumesGauge(ctx, mgr.GetLogger(), dv, mgr.GetClient())
×
262
                        return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}}}
×
263
                }),
NEW
264
        )); err != nil {
×
265
                return err
×
266
        }
×
NEW
267
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](
×
NEW
268
                func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
269
                        var result []reconcile.Request
×
270
                        owner := metav1.GetControllerOf(obj)
×
271
                        if owner != nil && owner.Kind == "DataVolume" {
×
272
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), owner.Name)
×
273
                        }
×
274
                        populatedFor := obj.GetAnnotations()[cc.AnnPopulatedFor]
×
275
                        if populatedFor != "" {
×
276
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), populatedFor)
×
277
                        }
×
278
                        // it is okay if result contains the same entry twice, will be deduplicated by caller
279
                        return result
×
280
                }),
NEW
281
        )); err != nil {
×
282
                return err
×
283
        }
×
NEW
284
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
NEW
285
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
286
                        owner := metav1.GetControllerOf(obj)
×
287
                        if owner == nil || owner.Kind != "DataVolume" {
×
288
                                return nil
×
289
                        }
×
290
                        return appendMatchingDataVolumeRequest(ctx, nil, mgr, obj.GetNamespace(), owner.Name)
×
291
                }),
NEW
292
        )); err != nil {
×
293
                return err
×
294
        }
×
295
        for _, k := range []client.Object{&corev1.PersistentVolumeClaim{}, &corev1.Pod{}, &cdiv1.ObjectTransfer{}} {
×
NEW
296
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
297
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
298
                                if !hasAnnOwnedByDataVolume(obj) {
×
299
                                        return nil
×
300
                                }
×
301
                                namespace, name, err := getAnnOwnedByDataVolume(obj)
×
302
                                if err != nil {
×
303
                                        return nil
×
304
                                }
×
305
                                return appendMatchingDataVolumeRequest(ctx, nil, mgr, namespace, name)
×
306
                        }),
NEW
307
                )); err != nil {
×
308
                        return err
×
309
                }
×
310
        }
311

312
        // Watch for SC updates and reconcile the DVs waiting for default SC
313
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9922)
NEW
314
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{}, handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
NEW
315
                func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
316
                        dvList := &cdiv1.DataVolumeList{}
×
317
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
318
                                return nil
×
319
                        }
×
320
                        var reqs []reconcile.Request
×
321
                        for _, dv := range dvList.Items {
×
322
                                if getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
323
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
324
                                }
×
325
                        }
326
                        return reqs
×
327
                },
328
        ),
NEW
329
        )); err != nil {
×
330
                return err
×
331
        }
×
332

333
        // Watch for PV updates to reconcile the DVs waiting for available PV
334
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9924,9925)
NEW
335
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolume](
×
NEW
336
                func(ctx context.Context, pv *corev1.PersistentVolume) []reconcile.Request {
×
337
                        dvList := &cdiv1.DataVolumeList{}
×
338
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
339
                                return nil
×
340
                        }
×
341
                        var reqs []reconcile.Request
×
342
                        for _, dv := range dvList.Items {
×
343
                                storage := dv.Spec.Storage
×
344
                                if storage != nil &&
×
345
                                        storage.StorageClassName != nil &&
×
346
                                        *storage.StorageClassName == pv.Spec.StorageClassName &&
×
347
                                        pv.Status.Phase == corev1.VolumeAvailable &&
×
348
                                        getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
349
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
350
                                }
×
351
                        }
352
                        return reqs
×
353
                },
354
        ),
NEW
355
        )); err != nil {
×
356
                return err
×
357
        }
×
358

359
        return nil
×
360
}
361

362
func getDataVolumeOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
363
        src := dv.Spec.Source
×
364

×
365
        if dv.Spec.SourceRef != nil {
×
366
                return getSourceRefOp(ctx, log, dv, client)
×
367
        }
×
368
        if src != nil && src.PVC != nil {
×
369
                return dataVolumePvcClone
×
370
        }
×
371
        if src != nil && src.Snapshot != nil {
×
372
                return dataVolumeSnapshotClone
×
373
        }
×
374
        if src == nil {
×
375
                if dvUsesVolumePopulator(dv) {
×
376
                        return dataVolumePopulator
×
377
                }
×
378
                return dataVolumeNop
×
379
        }
380
        if src.Upload != nil {
×
381
                return dataVolumeUpload
×
382
        }
×
383
        if src.HTTP != nil || src.S3 != nil || src.GCS != nil || src.Registry != nil || src.Blank != nil || src.Imageio != nil || src.VDDK != nil {
×
384
                return dataVolumeImport
×
385
        }
×
386

387
        return dataVolumeNop
×
388
}
389

390
func getSourceRefOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
391
        dataSource := &cdiv1.DataSource{}
×
392
        ns := dv.Namespace
×
393
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
394
                ns = *dv.Spec.SourceRef.Namespace
×
395
        }
×
396
        nn := types.NamespacedName{Namespace: ns, Name: dv.Spec.SourceRef.Name}
×
397
        if err := client.Get(ctx, nn, dataSource); err != nil {
×
398
                log.Error(err, "Unable to get DataSource", "namespacedName", nn)
×
399
                return dataVolumeNop
×
400
        }
×
401

402
        switch {
×
403
        case dataSource.Spec.Source.PVC != nil:
×
404
                return dataVolumePvcClone
×
405
        case dataSource.Spec.Source.Snapshot != nil:
×
406
                return dataVolumeSnapshotClone
×
407
        default:
×
408
                return dataVolumeNop
×
409
        }
410
}
411

412
func updatePendingDataVolumesGauge(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, c client.Client) {
×
413
        if !cc.IsDataVolumeUsingDefaultStorageClass(dv) {
×
414
                return
×
415
        }
×
416

417
        countPending, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.Pending))
×
418
        if err != nil {
×
419
                log.V(3).Error(err, "Failed listing the pending DataVolumes")
×
420
                return
×
421
        }
×
422
        countUnset, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.PhaseUnset))
×
423
        if err != nil {
×
424
                log.V(3).Error(err, "Failed listing the unset DataVolumes")
×
425
                return
×
426
        }
×
427

428
        metrics.SetDataVolumePending(countPending + countUnset)
×
429
}
430

431
func getDefaultStorageClassDataVolumeCount(ctx context.Context, c client.Client, dvPhase string) (int, error) {
×
432
        dvList := &cdiv1.DataVolumeList{}
×
433
        if err := c.List(ctx, dvList, client.MatchingFields{dvPhaseField: dvPhase}); err != nil {
×
434
                return 0, err
×
435
        }
×
436

437
        dvCount := 0
×
438
        for _, dv := range dvList.Items {
×
439
                if cc.IsDataVolumeUsingDefaultStorageClass(&dv) {
×
440
                        dvCount++
×
441
                }
×
442
        }
443

444
        return dvCount, nil
×
445
}
446

447
type dvController interface {
448
        reconcile.Reconciler
449
        sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error)
450
        updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error
451
}
452

453
func (r *ReconcilerBase) reconcile(ctx context.Context, req reconcile.Request, dvc dvController) (reconcile.Result, error) {
1✔
454
        log := r.log.WithValues("DataVolume", req.NamespacedName)
1✔
455
        syncRes, syncErr := dvc.sync(log, req)
1✔
456
        res, err := r.updateStatus(req, syncRes.phaseSync, dvc)
1✔
457
        if syncErr != nil {
2✔
458
                err = syncErr
1✔
459
        }
1✔
460
        if syncRes.result != nil {
2✔
461
                res = *syncRes.result
1✔
462
        }
1✔
463
        return res, err
1✔
464
}
465

466
type dvSyncStateFunc func(*dvSyncState) error
467

468
func (r *ReconcilerBase) syncCommon(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
469
        syncState, err := r.syncDvPvcState(log, req, cleanup, prepare)
1✔
470
        if err == nil {
2✔
471
                err = r.syncUpdate(log, &syncState)
1✔
472
        }
1✔
473
        return syncState, err
1✔
474
}
475

476
func (r *ReconcilerBase) syncDvPvcState(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
477
        syncState := dvSyncState{}
1✔
478
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
479
        if dv == nil || err != nil {
2✔
480
                syncState.result = &reconcile.Result{}
1✔
481
                return syncState, err
1✔
482
        }
1✔
483
        syncState.dv = dv
1✔
484
        syncState.dvMutated = dv.DeepCopy()
1✔
485
        syncState.pvc, err = r.getPVC(req.NamespacedName)
1✔
486
        if err != nil {
1✔
487
                return syncState, err
×
488
        }
×
489

490
        if cleanup != nil {
2✔
491
                if err := cleanup(&syncState); err != nil {
1✔
492
                        return syncState, err
×
493
                }
×
494
        }
495

496
        if dv.DeletionTimestamp != nil {
1✔
497
                log.Info("DataVolume marked for deletion")
×
498
                syncState.result = &reconcile.Result{}
×
499
                return syncState, nil
×
500
        }
×
501

502
        if prepare != nil {
2✔
503
                if err := prepare(&syncState); err != nil {
1✔
504
                        return syncState, err
×
505
                }
×
506
        }
507

508
        syncState.pvcSpec, err = renderPvcSpec(r.client, r.recorder, log, syncState.dvMutated, syncState.pvc)
1✔
509
        if err != nil {
2✔
510
                if syncErr := r.syncDataVolumeStatusPhaseWithEvent(&syncState, cdiv1.PhaseUnset, nil,
1✔
511
                        Event{corev1.EventTypeWarning, cc.ErrClaimNotValid, err.Error()}); syncErr != nil {
1✔
512
                        log.Error(syncErr, "failed to sync DataVolume status with event")
×
513
                }
×
514
                if errors.Is(err, ErrStorageClassNotFound) {
2✔
515
                        syncState.result = &reconcile.Result{}
1✔
516
                        return syncState, nil
1✔
517
                }
1✔
518
                return syncState, err
1✔
519
        }
520

521
        syncState.usePopulator, err = r.shouldUseCDIPopulator(&syncState)
1✔
522
        if err != nil {
1✔
523
                return syncState, err
×
524
        }
×
525
        updateDataVolumeUseCDIPopulator(&syncState)
1✔
526

1✔
527
        if err := r.handleStaticVolume(&syncState, log); err != nil || syncState.result != nil {
2✔
528
                return syncState, err
1✔
529
        }
1✔
530

531
        if err := r.handleDelayedAnnotations(&syncState, log); err != nil || syncState.result != nil {
2✔
532
                return syncState, err
1✔
533
        }
1✔
534

535
        if err = updateDataVolumeDefaultInstancetypeLabels(r.client, &syncState); err != nil {
1✔
536
                return syncState, err
×
537
        }
×
538

539
        if syncState.pvc != nil {
2✔
540
                if err := r.garbageCollect(&syncState, log); err != nil {
1✔
541
                        return syncState, err
×
542
                }
×
543
                if syncState.result != nil || syncState.dv == nil {
2✔
544
                        return syncState, nil
1✔
545
                }
1✔
546
                if err := r.validatePVC(dv, syncState.pvc); err != nil {
2✔
547
                        return syncState, err
1✔
548
                }
1✔
549
                r.handlePrePopulation(syncState.dvMutated, syncState.pvc)
1✔
550
        }
551

552
        return syncState, nil
1✔
553
}
554

555
func (r *ReconcilerBase) syncUpdate(log logr.Logger, syncState *dvSyncState) error {
1✔
556
        if syncState.dv == nil || syncState.dvMutated == nil {
2✔
557
                return nil
1✔
558
        }
1✔
559
        if !reflect.DeepEqual(syncState.dv.Status, syncState.dvMutated.Status) {
1✔
560
                return fmt.Errorf("status update is not allowed in sync phase")
×
561
        }
×
562
        if !reflect.DeepEqual(syncState.dv.ObjectMeta, syncState.dvMutated.ObjectMeta) {
2✔
563
                _, ok := syncState.dv.Annotations[cc.AnnExtendedCloneToken]
1✔
564
                _, ok2 := syncState.dvMutated.Annotations[cc.AnnExtendedCloneToken]
1✔
565
                if err := r.updateDataVolume(syncState.dvMutated); err != nil {
1✔
566
                        r.log.Error(err, "Unable to sync update dv meta", "name", syncState.dvMutated.Name)
×
567
                        return err
×
568
                }
×
569
                if !ok && ok2 {
2✔
570
                        delta := time.Since(syncState.dv.ObjectMeta.CreationTimestamp.Time)
1✔
571
                        log.V(3).Info("Adding extended DataVolume token took", "delta", delta)
1✔
572
                }
1✔
573
                syncState.dv = syncState.dvMutated.DeepCopy()
1✔
574
        }
575
        return nil
1✔
576
}
577

578
func (r *ReconcilerBase) handleStaticVolume(syncState *dvSyncState, log logr.Logger) error {
1✔
579
        if _, ok := syncState.dvMutated.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
580
                return nil
1✔
581
        }
1✔
582

583
        if syncState.pvc == nil {
2✔
584
                volumes, err := r.getAvailableVolumesForDV(syncState, log)
1✔
585
                if err != nil {
1✔
586
                        return err
×
587
                }
×
588

589
                if len(volumes) == 0 {
2✔
590
                        log.Info("No PVs for DV")
1✔
591
                        return nil
1✔
592
                }
1✔
593

594
                if err := r.handlePvcCreation(log, syncState, func(_ *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
2✔
595
                        bs, err := json.Marshal(volumes)
1✔
596
                        if err != nil {
1✔
597
                                return err
×
598
                        }
×
599
                        cc.AddAnnotation(pvc, cc.AnnPersistentVolumeList, string(bs))
1✔
600
                        return nil
1✔
601
                }); err != nil {
×
602
                        return err
×
603
                }
×
604

605
                // set result to make sure callers don't do anything else in sync
606
                syncState.result = &reconcile.Result{}
1✔
607
                return nil
1✔
608
        }
609

610
        volumeAnno, ok := syncState.pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
611
        if !ok {
1✔
612
                // etiher did not create the PVC here OR bind to expected PV succeeded
×
613
                return nil
×
614
        }
×
615

616
        if cc.IsUnbound(syncState.pvc) {
2✔
617
                // set result to make sure callers don't do anything else in sync
1✔
618
                syncState.result = &reconcile.Result{}
1✔
619
                return nil
1✔
620
        }
1✔
621

622
        var volumes []string
1✔
623
        if err := json.Unmarshal([]byte(volumeAnno), &volumes); err != nil {
1✔
624
                return err
×
625
        }
×
626

627
        for _, v := range volumes {
2✔
628
                if v == syncState.pvc.Spec.VolumeName {
2✔
629
                        pvcCpy := syncState.pvc.DeepCopy()
1✔
630
                        // handle as "populatedFor" going forward
1✔
631
                        cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, syncState.dvMutated.Name)
1✔
632
                        delete(pvcCpy.Annotations, cc.AnnPersistentVolumeList)
1✔
633
                        if err := r.updatePVC(pvcCpy); err != nil {
1✔
634
                                return err
×
635
                        }
×
636
                        syncState.pvc = pvcCpy
1✔
637
                        return nil
1✔
638
                }
639
        }
640

641
        // delete the pvc and hope for better luck...
642
        pvcCpy := syncState.pvc.DeepCopy()
1✔
643
        if err := r.client.Delete(context.TODO(), pvcCpy, &client.DeleteOptions{}); err != nil {
1✔
644
                return err
×
645
        }
×
646

647
        syncState.pvc = pvcCpy
1✔
648

1✔
649
        return fmt.Errorf("DataVolume bound to unexpected PV %s", syncState.pvc.Spec.VolumeName)
1✔
650
}
651

652
func (r *ReconcilerBase) handleDelayedAnnotations(syncState *dvSyncState, log logr.Logger) error {
1✔
653
        dataVolume := syncState.dv
1✔
654
        if dataVolume.Status.Phase != cdiv1.Succeeded {
2✔
655
                return nil
1✔
656
        }
1✔
657

658
        if syncState.pvc == nil {
1✔
659
                return nil
×
660
        }
×
661

662
        pvcCpy := syncState.pvc.DeepCopy()
1✔
663
        for _, anno := range delayedAnnotations {
2✔
664
                if val, ok := dataVolume.Annotations[anno]; ok {
2✔
665
                        // only add if not already present
1✔
666
                        if _, ok := pvcCpy.Annotations[anno]; !ok {
2✔
667
                                cc.AddAnnotation(pvcCpy, anno, val)
1✔
668
                        }
1✔
669
                }
670
        }
671

672
        if !reflect.DeepEqual(syncState.pvc, pvcCpy) {
2✔
673
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
674
                        return err
×
675
                }
×
676
                syncState.pvc = pvcCpy
1✔
677
                syncState.result = &reconcile.Result{}
1✔
678
        }
679

680
        return nil
1✔
681
}
682

683
func (r *ReconcilerBase) getAvailableVolumesForDV(syncState *dvSyncState, log logr.Logger) ([]string, error) {
1✔
684
        pvList := &corev1.PersistentVolumeList{}
1✔
685
        fields := client.MatchingFields{claimRefField: claimRefIndexKeyFunc(syncState.dv.Namespace, syncState.dv.Name)}
1✔
686
        if err := r.client.List(context.TODO(), pvList, fields); err != nil {
1✔
687
                return nil, err
×
688
        }
×
689
        if syncState.pvcSpec == nil {
1✔
690
                return nil, fmt.Errorf("missing pvc spec")
×
691
        }
×
692
        var pvNames []string
1✔
693
        for _, pv := range pvList.Items {
2✔
694
                if pv.Status.Phase == corev1.VolumeAvailable {
2✔
695
                        pvc := &corev1.PersistentVolumeClaim{
1✔
696
                                Spec: *syncState.pvcSpec,
1✔
697
                        }
1✔
698
                        if err := CheckVolumeSatisfyClaim(&pv, pvc); err != nil {
1✔
699
                                continue
×
700
                        }
701
                        log.Info("Found matching volume for DV", "pv", pv.Name)
1✔
702
                        pvNames = append(pvNames, pv.Name)
1✔
703
                }
704
        }
705
        return pvNames, nil
1✔
706
}
707

708
func (r *ReconcilerBase) handlePrePopulation(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) {
1✔
709
        if pvc.Status.Phase == corev1.ClaimBound && pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
710
                cc.AddAnnotation(dv, cc.AnnPrePopulated, pvc.Name)
1✔
711
        }
1✔
712
}
713

714
func (r *ReconcilerBase) validatePVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
715
        // If the PVC is being deleted, we should log a warning to the event recorder and return to wait the deletion complete
1✔
716
        // don't bother with owner refs is the pvc is deleted
1✔
717
        if pvc.DeletionTimestamp != nil {
1✔
718
                msg := fmt.Sprintf(MessageResourceMarkedForDeletion, pvc.Name)
×
719
                r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceMarkedForDeletion, msg)
×
720
                return errors.Errorf(msg)
×
721
        }
×
722
        // If the PVC is not controlled by this DataVolume resource, we should log
723
        // a warning to the event recorder and return
724
        if !metav1.IsControlledBy(pvc, dv) {
2✔
725
                requiresWork, err := r.pvcRequiresWork(pvc, dv)
1✔
726
                if err != nil {
1✔
727
                        return err
×
728
                }
×
729
                if !requiresWork {
2✔
730
                        if err := r.addOwnerRef(pvc, dv); err != nil {
1✔
731
                                return err
×
732
                        }
×
733
                } else {
1✔
734
                        msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
1✔
735
                        r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceExists, msg)
1✔
736
                        return errors.Errorf(msg)
1✔
737
                }
1✔
738
        }
739
        return nil
1✔
740
}
741

742
func (r *ReconcilerBase) getPVC(key types.NamespacedName) (*corev1.PersistentVolumeClaim, error) {
1✔
743
        pvc := &corev1.PersistentVolumeClaim{}
1✔
744
        if err := r.client.Get(context.TODO(), key, pvc); err != nil {
2✔
745
                if k8serrors.IsNotFound(err) {
2✔
746
                        return nil, nil
1✔
747
                }
1✔
748
                return nil, err
×
749
        }
750
        return pvc, nil
1✔
751
}
752

753
func (r *ReconcilerBase) getDataVolume(key types.NamespacedName) (*cdiv1.DataVolume, error) {
1✔
754
        dv := &cdiv1.DataVolume{}
1✔
755
        if err := r.client.Get(context.TODO(), key, dv); err != nil {
2✔
756
                if k8serrors.IsNotFound(err) {
2✔
757
                        return nil, nil
1✔
758
                }
1✔
759
                return nil, err
×
760
        }
761
        return dv, nil
1✔
762
}
763

764
type pvcModifierFunc func(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error
765

766
func (r *ReconcilerBase) createPvcForDatavolume(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec,
767
        pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
768
        newPvc, err := r.newPersistentVolumeClaim(datavolume, pvcSpec, datavolume.Namespace, datavolume.Name, pvcModifier)
1✔
769
        if err != nil {
2✔
770
                return nil, err
1✔
771
        }
1✔
772
        util.SetRecommendedLabels(newPvc, r.installerLabels, "cdi-controller")
1✔
773
        if err := r.client.Create(context.TODO(), newPvc); err != nil {
1✔
774
                return nil, err
×
775
        }
×
776
        return newPvc, nil
1✔
777
}
778

779
func (r *ReconcilerBase) getStorageClassBindingMode(storageClassName *string) (*storagev1.VolumeBindingMode, error) {
1✔
780
        // Handle unspecified storage class name, fallback to default storage class
1✔
781
        storageClass, err := cc.GetStorageClassByNameWithK8sFallback(context.TODO(), r.client, storageClassName)
1✔
782
        if err != nil {
1✔
783
                return nil, err
×
784
        }
×
785

786
        if storageClass != nil && storageClass.VolumeBindingMode != nil {
2✔
787
                return storageClass.VolumeBindingMode, nil
1✔
788
        }
1✔
789

790
        // no storage class, then the assumption is immediate binding
791
        volumeBindingImmediate := storagev1.VolumeBindingImmediate
1✔
792
        return &volumeBindingImmediate, nil
1✔
793
}
794

795
func (r *ReconcilerBase) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, result *reconcile.Result) error {
1✔
796
        var podNamespace string
1✔
797
        if datavolume.Status.Progress == "" {
2✔
798
                datavolume.Status.Progress = "N/A"
1✔
799
        }
1✔
800

801
        if !r.shouldUpdateProgress {
2✔
802
                return nil
1✔
803
        }
1✔
804

805
        if usePopulator, _ := CheckPVCUsingPopulators(pvc); usePopulator {
2✔
806
                if progress, ok := pvc.Annotations[cc.AnnPopulatorProgress]; ok {
2✔
807
                        datavolume.Status.Progress = cdiv1.DataVolumeProgress(progress)
1✔
808
                } else {
2✔
809
                        datavolume.Status.Progress = "N/A"
1✔
810
                }
1✔
811
                return nil
1✔
812
        }
813

814
        if datavolume.Spec.Source != nil && datavolume.Spec.Source.PVC != nil {
2✔
815
                podNamespace = datavolume.Spec.Source.PVC.Namespace
1✔
816
        } else {
2✔
817
                podNamespace = datavolume.Namespace
1✔
818
        }
1✔
819

820
        if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
2✔
821
                // Data volume completed progress, or failed, either way stop queueing the data volume.
1✔
822
                r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
1✔
823
                return nil
1✔
824
        }
1✔
825
        pod, err := cc.GetPodFromPvc(r.client, podNamespace, pvc)
1✔
826
        if err == nil {
1✔
827
                if pod.Status.Phase != corev1.PodRunning {
×
828
                        // Avoid long timeouts and error traces from HTTP get when pod is already gone
×
829
                        return nil
×
830
                }
×
831
                if err := updateProgressUsingPod(datavolume, pod); err != nil {
×
832
                        return err
×
833
                }
×
834
        }
835
        // We are not done yet, force a re-reconcile in 2 seconds to get an update.
836
        result.RequeueAfter = 2 * time.Second
1✔
837
        return nil
1✔
838
}
839

840
func (r *ReconcilerBase) syncDataVolumeStatusPhaseWithEvent(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim, event Event) error {
1✔
841
        if syncState.phaseSync != nil {
1✔
842
                return fmt.Errorf("phaseSync is already set")
×
843
        }
×
844
        syncState.phaseSync = &statusPhaseSync{phase: phase, event: event}
1✔
845
        if pvc != nil {
1✔
846
                key := client.ObjectKeyFromObject(pvc)
×
847
                syncState.phaseSync.pvcKey = &key
×
848
        }
×
849
        return nil
1✔
850
}
851

852
func (r *ReconcilerBase) updateDataVolumeStatusPhaseSync(ps *statusPhaseSync, dv *cdiv1.DataVolume, dvCopy *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
853
        var condPvc *corev1.PersistentVolumeClaim
1✔
854
        var err error
1✔
855
        if ps.pvcKey != nil {
1✔
856
                if pvc == nil || *ps.pvcKey != client.ObjectKeyFromObject(pvc) {
×
857
                        condPvc, err = r.getPVC(*ps.pvcKey)
×
858
                        if err != nil {
×
859
                                return err
×
860
                        }
×
861
                } else {
×
862
                        condPvc = pvc
×
863
                }
×
864
        }
865
        return r.updateDataVolumeStatusPhaseWithEvent(ps.phase, dv, dvCopy, condPvc, ps.event)
1✔
866
}
867

868
func (r *ReconcilerBase) updateDataVolumeStatusPhaseWithEvent(
869
        phase cdiv1.DataVolumePhase,
870
        dataVolume *cdiv1.DataVolume,
871
        dataVolumeCopy *cdiv1.DataVolume,
872
        pvc *corev1.PersistentVolumeClaim,
873
        event Event) error {
1✔
874
        if dataVolume == nil {
1✔
875
                return nil
×
876
        }
×
877

878
        curPhase := dataVolumeCopy.Status.Phase
1✔
879
        dataVolumeCopy.Status.Phase = phase
1✔
880

1✔
881
        reason := ""
1✔
882
        message := ""
1✔
883
        if pvc == nil {
2✔
884
                reason = event.reason
1✔
885
                message = event.message
1✔
886
        }
1✔
887
        r.updateConditions(dataVolumeCopy, pvc, reason, message)
1✔
888
        return r.emitEvent(dataVolume, dataVolumeCopy, curPhase, dataVolume.Status.Conditions, &event)
1✔
889
}
890

891
func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPhaseSync, dvc dvController) (reconcile.Result, error) {
1✔
892
        result := reconcile.Result{}
1✔
893
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
894
        if dv == nil || err != nil {
2✔
895
                return reconcile.Result{}, err
1✔
896
        }
1✔
897

898
        dataVolumeCopy := dv.DeepCopy()
1✔
899

1✔
900
        pvc, err := r.getPVC(req.NamespacedName)
1✔
901
        if err != nil {
1✔
902
                return reconcile.Result{}, err
×
903
        }
×
904

905
        if phaseSync != nil {
2✔
906
                err = r.updateDataVolumeStatusPhaseSync(phaseSync, dv, dataVolumeCopy, pvc)
1✔
907
                return reconcile.Result{}, err
1✔
908
        }
1✔
909

910
        curPhase := dataVolumeCopy.Status.Phase
1✔
911
        var event Event
1✔
912

1✔
913
        if shouldSetDataVolumePending(pvc, dataVolumeCopy) {
2✔
914
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
915
        } else if pvc != nil {
3✔
916
                dataVolumeCopy.Status.ClaimName = pvc.Name
1✔
917

1✔
918
                phase := pvc.Annotations[cc.AnnPodPhase]
1✔
919
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
920
                if err != nil {
1✔
921
                        return reconcile.Result{}, err
×
922
                }
×
923
                if phase == string(cdiv1.Succeeded) && requiresWork {
2✔
924
                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
925
                                return reconcile.Result{}, err
×
926
                        }
×
927
                } else {
1✔
928
                        switch pvc.Status.Phase {
1✔
929
                        case corev1.ClaimPending:
1✔
930
                                if requiresWork {
2✔
931
                                        if err := r.updateStatusPVCPending(pvc, dvc, dataVolumeCopy, &event); err != nil {
1✔
932
                                                return reconcile.Result{}, err
×
933
                                        }
×
934
                                } else {
1✔
935
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
936
                                }
1✔
937
                        case corev1.ClaimBound:
1✔
938
                                switch dataVolumeCopy.Status.Phase {
1✔
939
                                case cdiv1.Pending:
1✔
940
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
941
                                case cdiv1.WaitForFirstConsumer:
×
942
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
×
943
                                case cdiv1.Unknown:
1✔
944
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
945
                                }
946

947
                                if requiresWork {
2✔
948
                                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
949
                                                return reconcile.Result{}, err
×
950
                                        }
×
951
                                } else {
1✔
952
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
953
                                }
1✔
954

955
                        case corev1.ClaimLost:
1✔
956
                                dataVolumeCopy.Status.Phase = cdiv1.Failed
1✔
957
                                event.eventType = corev1.EventTypeWarning
1✔
958
                                event.reason = ErrClaimLost
1✔
959
                                event.message = fmt.Sprintf(MessageErrClaimLost, pvc.Name)
1✔
960
                        default:
1✔
961
                                if pvc.Status.Phase != "" {
1✔
962
                                        dataVolumeCopy.Status.Phase = cdiv1.Unknown
×
963
                                }
×
964
                        }
965
                }
966

967
                if i, err := strconv.ParseInt(pvc.Annotations[cc.AnnPodRestarts], 10, 32); err == nil && i >= 0 {
2✔
968
                        dataVolumeCopy.Status.RestartCount = int32(i)
1✔
969
                }
1✔
970
                if err := r.reconcileProgressUpdate(dataVolumeCopy, pvc, &result); err != nil {
1✔
971
                        return result, err
×
972
                }
×
973
        }
974

975
        currentCond := make([]cdiv1.DataVolumeCondition, len(dataVolumeCopy.Status.Conditions))
1✔
976
        copy(currentCond, dataVolumeCopy.Status.Conditions)
1✔
977
        r.updateConditions(dataVolumeCopy, pvc, "", "")
1✔
978
        return result, r.emitEvent(dv, dataVolumeCopy, curPhase, currentCond, &event)
1✔
979
}
980

981
func (r ReconcilerBase) updateStatusPVCPending(pvc *corev1.PersistentVolumeClaim, dvc dvController, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
982
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
983
        if err != nil {
1✔
984
                return err
×
985
        }
×
986
        if usePopulator {
2✔
987
                // when using populators the target pvc phase will stay pending until the population completes,
1✔
988
                // hence if not wffc we should update the dv phase according to the pod phase
1✔
989
                shouldBeMarkedPendingPopulation, err := r.shouldBeMarkedPendingPopulation(pvc)
1✔
990
                if err != nil {
1✔
991
                        return err
×
992
                }
×
993
                if shouldBeMarkedPendingPopulation {
2✔
994
                        dataVolumeCopy.Status.Phase = cdiv1.PendingPopulation
1✔
995
                } else if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, event); err != nil {
2✔
996
                        return err
×
997
                }
×
998
                return nil
1✔
999
        }
1000

1001
        shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
1✔
1002
        if err != nil {
1✔
1003
                return err
×
1004
        }
×
1005
        if shouldBeMarkedWaitForFirstConsumer {
2✔
1006
                dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
1✔
1007
        } else {
2✔
1008
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
1009
        }
1✔
1010
        return nil
1✔
1011
}
1012

1013
func (r *ReconcilerBase) updateConditions(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, reason, message string) {
1✔
1014
        var anno map[string]string
1✔
1015

1✔
1016
        if dataVolume.Status.Conditions == nil {
2✔
1017
                dataVolume.Status.Conditions = make([]cdiv1.DataVolumeCondition, 0)
1✔
1018
        }
1✔
1019

1020
        if pvc != nil {
2✔
1021
                anno = pvc.Annotations
1✔
1022
        } else {
2✔
1023
                anno = make(map[string]string)
1✔
1024
        }
1✔
1025

1026
        var readyStatus corev1.ConditionStatus
1✔
1027
        switch dataVolume.Status.Phase {
1✔
1028
        case cdiv1.Succeeded:
1✔
1029
                readyStatus = corev1.ConditionTrue
1✔
1030
        case cdiv1.Unknown:
×
1031
                readyStatus = corev1.ConditionUnknown
×
1032
        default:
1✔
1033
                readyStatus = corev1.ConditionFalse
1✔
1034
        }
1035

1036
        dataVolume.Status.Conditions = updateBoundCondition(dataVolume.Status.Conditions, pvc, message, reason)
1✔
1037
        dataVolume.Status.Conditions = UpdateReadyCondition(dataVolume.Status.Conditions, readyStatus, message, reason)
1✔
1038
        dataVolume.Status.Conditions = updateRunningCondition(dataVolume.Status.Conditions, anno)
1✔
1039
}
1040

1041
func (r *ReconcilerBase) emitConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1042
        r.emitBoundConditionEvent(dataVolume, FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions), FindConditionByType(cdiv1.DataVolumeBound, originalCond))
1✔
1043
        r.emitFailureConditionEvent(dataVolume, originalCond)
1✔
1044
}
1✔
1045

1046
func (r *ReconcilerBase) emitBoundConditionEvent(dataVolume *cdiv1.DataVolume, current, original *cdiv1.DataVolumeCondition) {
1✔
1047
        // We know reason and message won't be empty for bound.
1✔
1048
        if current != nil && (original == nil || current.Status != original.Status || current.Reason != original.Reason || current.Message != original.Message) {
2✔
1049
                r.recorder.Event(dataVolume, corev1.EventTypeNormal, current.Reason, current.Message)
1✔
1050
        }
1✔
1051
}
1052

1053
func (r *ReconcilerBase) emitFailureConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1054
        curReady := FindConditionByType(cdiv1.DataVolumeReady, dataVolume.Status.Conditions)
1✔
1055
        curBound := FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions)
1✔
1056
        curRunning := FindConditionByType(cdiv1.DataVolumeRunning, dataVolume.Status.Conditions)
1✔
1057
        orgRunning := FindConditionByType(cdiv1.DataVolumeRunning, originalCond)
1✔
1058

1✔
1059
        if curReady == nil || curBound == nil || curRunning == nil {
1✔
1060
                return
×
1061
        }
×
1062
        if curReady.Status == corev1.ConditionFalse && curRunning.Status == corev1.ConditionFalse &&
1✔
1063
                dvBoundOrPopulationInProgress(dataVolume, curBound) {
2✔
1064
                // Bound or in progress, not ready, and not running.
1✔
1065
                // Avoiding triggering an event for scratch space required since it will be addressed
1✔
1066
                // by CDI and sounds more drastic than it actually is.
1✔
1067
                if curRunning.Message != "" && curRunning.Message != common.ScratchSpaceRequired &&
1✔
1068
                        (orgRunning == nil || orgRunning.Message != curRunning.Message) {
1✔
1069
                        r.recorder.Event(dataVolume, corev1.EventTypeWarning, curRunning.Reason, curRunning.Message)
×
1070
                }
×
1071
        }
1072
}
1073

1074
func (r *ReconcilerBase) emitEvent(dataVolume *cdiv1.DataVolume, dataVolumeCopy *cdiv1.DataVolume, curPhase cdiv1.DataVolumePhase, originalCond []cdiv1.DataVolumeCondition, event *Event) error {
1✔
1075
        if !reflect.DeepEqual(dataVolume.ObjectMeta, dataVolumeCopy.ObjectMeta) {
1✔
1076
                return fmt.Errorf("meta update is not allowed in updateStatus phase")
×
1077
        }
×
1078
        // Update status subresource only if changed
1079
        if !reflect.DeepEqual(dataVolume.Status, dataVolumeCopy.Status) {
2✔
1080
                if err := r.client.Status().Update(context.TODO(), dataVolumeCopy); err != nil {
1✔
1081
                        r.log.Error(err, "unable to update datavolume status", "name", dataVolumeCopy.Name)
×
1082
                        return err
×
1083
                }
×
1084
                // Emit the event only on status phase change
1085
                if event.eventType != "" && curPhase != dataVolumeCopy.Status.Phase {
2✔
1086
                        r.recorder.Event(dataVolumeCopy, event.eventType, event.reason, event.message)
1✔
1087
                }
1✔
1088
                r.emitConditionEvent(dataVolumeCopy, originalCond)
1✔
1089
        }
1090
        return nil
1✔
1091
}
1092

1093
func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) error {
1✔
1094
        if err := controllerutil.SetControllerReference(dv, pvc, r.scheme); err != nil {
1✔
1095
                return err
×
1096
        }
×
1097

1098
        return r.updatePVC(pvc)
1✔
1099
}
1100

1101
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
1✔
1102
        httpClient = cc.BuildHTTPClient(httpClient)
1✔
1103
        url, err := cc.GetMetricsURL(pod)
1✔
1104
        if err != nil {
2✔
1105
                return err
1✔
1106
        }
1✔
1107
        if url == "" {
1✔
1108
                return nil
×
1109
        }
×
1110

1111
        // Used for both import and clone, so it should match both metric names
1112
        progressReport, err := cc.GetProgressReportFromURL(url, httpClient,
1✔
1113
                fmt.Sprintf("%s|%s", importMetrics.ImportProgressMetricName, cloneMetrics.CloneProgressMetricName),
1✔
1114
                string(dataVolumeCopy.UID))
1✔
1115
        if err != nil {
1✔
1116
                return err
×
1117
        }
×
1118
        if progressReport != "" {
2✔
1119
                if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
2✔
1120
                        dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
1✔
1121
                }
1✔
1122
        }
1123
        return nil
1✔
1124
}
1125

1126
// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
1127
// It also sets the appropriate OwnerReferences on the resource
1128
// which allows handleObject to discover the DataVolume resource
1129
// that 'owns' it.
1130
func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, targetPvcSpec *corev1.PersistentVolumeClaimSpec, namespace, name string, pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
1131
        labels := map[string]string{
1✔
1132
                common.CDILabelKey: common.CDILabelValue,
1✔
1133
        }
1✔
1134
        if util.ResolveVolumeMode(targetPvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
2✔
1135
                labels[common.KubePersistentVolumeFillingUpSuppressLabelKey] = common.KubePersistentVolumeFillingUpSuppressLabelValue
1✔
1136
        }
1✔
1137
        for k, v := range dataVolume.Labels {
2✔
1138
                labels[k] = v
1✔
1139
        }
1✔
1140

1141
        annotations := make(map[string]string)
1✔
1142
        for k, v := range dataVolume.ObjectMeta.Annotations {
2✔
1143
                annotations[k] = v
1✔
1144
        }
1✔
1145
        annotations[cc.AnnPodRestarts] = "0"
1✔
1146
        annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType))
1✔
1147
        if dataVolume.Spec.PriorityClassName != "" {
2✔
1148
                annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName
1✔
1149
        }
1✔
1150
        annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(context.TODO(), r.client, dataVolume.Spec.Preallocation))
1✔
1151
        annotations[cc.AnnCreatedForDataVolume] = string(dataVolume.UID)
1✔
1152

1✔
1153
        if dataVolume.Spec.Storage != nil && labels[common.PvcApplyStorageProfileLabel] == "true" {
1✔
1154
                isWebhookPvcRenderingEnabled, err := featuregates.IsWebhookPvcRenderingEnabled(r.client)
×
1155
                if err != nil {
×
1156
                        return nil, err
×
1157
                }
×
1158
                if isWebhookPvcRenderingEnabled {
×
1159
                        labels[common.PvcApplyStorageProfileLabel] = "true"
×
1160
                        if targetPvcSpec.VolumeMode == nil {
×
1161
                                targetPvcSpec.VolumeMode = ptr.To[corev1.PersistentVolumeMode](cdiv1.PersistentVolumeFromStorageProfile)
×
1162
                        }
×
1163
                }
1164
        }
1165

1166
        pvc := &corev1.PersistentVolumeClaim{
1✔
1167
                ObjectMeta: metav1.ObjectMeta{
1✔
1168
                        Namespace:   namespace,
1✔
1169
                        Name:        name,
1✔
1170
                        Labels:      labels,
1✔
1171
                        Annotations: annotations,
1✔
1172
                },
1✔
1173
                Spec: *targetPvcSpec,
1✔
1174
        }
1✔
1175

1✔
1176
        if pvcModifier != nil {
2✔
1177
                if err := pvcModifier(dataVolume, pvc); err != nil {
2✔
1178
                        return nil, err
1✔
1179
                }
1✔
1180
        }
1181

1182
        if pvc.Namespace == dataVolume.Namespace {
2✔
1183
                pvc.OwnerReferences = []metav1.OwnerReference{
1✔
1184
                        *metav1.NewControllerRef(dataVolume, schema.GroupVersionKind{
1✔
1185
                                Group:   cdiv1.SchemeGroupVersion.Group,
1✔
1186
                                Version: cdiv1.SchemeGroupVersion.Version,
1✔
1187
                                Kind:    "DataVolume",
1✔
1188
                        }),
1✔
1189
                }
1✔
1190
        } else {
1✔
1191
                if err := setAnnOwnedByDataVolume(pvc, dataVolume); err != nil {
×
1192
                        return nil, err
×
1193
                }
×
1194
                pvc.Annotations[cc.AnnOwnerUID] = string(dataVolume.UID)
×
1195
        }
1196

1197
        for _, anno := range delayedAnnotations {
2✔
1198
                delete(pvc.Annotations, anno)
1✔
1199
        }
1✔
1200

1201
        return pvc, nil
1✔
1202
}
1203

1204
// Whenever the controller updates a DV, we must make sure to nil out spec.source when using other population methods
1205
func (r *ReconcilerBase) updateDataVolume(dv *cdiv1.DataVolume) error {
1✔
1206
        // Restore so we don't nil out the dv that is being worked on
1✔
1207
        var sourceCopy *cdiv1.DataVolumeSource
1✔
1208

1✔
1209
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1210
                sourceCopy = dv.Spec.Source
1✔
1211
                dv.Spec.Source = nil
1✔
1212
        }
1✔
1213

1214
        err := r.client.Update(context.TODO(), dv)
1✔
1215
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1216
                dv.Spec.Source = sourceCopy
1✔
1217
        }
1✔
1218
        return err
1✔
1219
}
1220

1221
func (r *ReconcilerBase) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
1222
        return r.client.Update(context.TODO(), pvc)
1✔
1223
}
1✔
1224

1225
func newLongTermCloneTokenGenerator(key *rsa.PrivateKey) token.Generator {
×
1226
        return token.NewGenerator(common.ExtendedCloneTokenIssuer, key, 10*365*24*time.Hour)
×
1227
}
×
1228

1229
// storageClassWaitForFirstConsumer returns if the binding mode of a given storage class is WFFC
1230
func (r *ReconcilerBase) storageClassWaitForFirstConsumer(storageClass *string) (bool, error) {
1✔
1231
        storageClassBindingMode, err := r.getStorageClassBindingMode(storageClass)
1✔
1232
        if err != nil {
1✔
1233
                return false, err
×
1234
        }
×
1235

1236
        return storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer, nil
1✔
1237
}
1238

1239
// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
1240
func (r *ReconcilerBase) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1241
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1242
        if err != nil {
1✔
1243
                return false, err
×
1244
        }
×
1245

1246
        honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
1✔
1247
        if err != nil {
1✔
1248
                return false, err
×
1249
        }
×
1250

1251
        res := honorWaitForFirstConsumerEnabled && wffc &&
1✔
1252
                pvc.Status.Phase == corev1.ClaimPending
1✔
1253

1✔
1254
        return res, nil
1✔
1255
}
1256

1257
func (r *ReconcilerBase) shouldReconcileVolumeSourceCR(syncState *dvSyncState) bool {
1✔
1258
        if syncState.pvc == nil {
2✔
1259
                return true
1✔
1260
        }
1✔
1261
        phase := syncState.pvc.Annotations[cc.AnnPodPhase]
1✔
1262
        return phase != string(corev1.PodSucceeded) || syncState.dvMutated.Status.Phase != cdiv1.Succeeded
1✔
1263
}
1264

1265
// shouldBeMarkedPendingPopulation decides whether we should mark DV as PendingPopulation
1266
func (r *ReconcilerBase) shouldBeMarkedPendingPopulation(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1267
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1268
        if err != nil {
1✔
1269
                return false, err
×
1270
        }
×
1271
        nodeName := pvc.Annotations[cc.AnnSelectedNode]
1✔
1272
        immediateBindingRequested := cc.ImmediateBindingRequested(pvc)
1✔
1273

1✔
1274
        return wffc && nodeName == "" && !immediateBindingRequested, nil
1✔
1275
}
1276

1277
// handlePvcCreation works as a wrapper for non-clone PVC creation and error handling
1278
func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncState, pvcModifier pvcModifierFunc) error {
1✔
1279
        if syncState.pvc != nil {
2✔
1280
                return nil
1✔
1281
        }
1✔
1282
        if dvIsPrePopulated(syncState.dvMutated) {
1✔
1283
                return nil
×
1284
        }
×
1285
        // Creating the PVC
1286
        newPvc, err := r.createPvcForDatavolume(syncState.dvMutated, syncState.pvcSpec, pvcModifier)
1✔
1287
        if err != nil {
2✔
1288
                if cc.ErrQuotaExceeded(err) {
1✔
1289
                        syncErr := r.syncDataVolumeStatusPhaseWithEvent(syncState, cdiv1.Pending, nil, Event{corev1.EventTypeWarning, cc.ErrExceededQuota, err.Error()})
×
1290
                        if syncErr != nil {
×
1291
                                log.Error(syncErr, "failed to sync DataVolume status with event")
×
1292
                        }
×
1293
                }
1294
                return err
1✔
1295
        }
1296
        syncState.pvc = newPvc
1✔
1297

1✔
1298
        return nil
1✔
1299
}
1300

1301
// shouldUseCDIPopulator returns if the population of the PVC should be done using
1302
// CDI populators.
1303
// Currently it will use populators only if:
1304
// * storageClass used is CSI storageClass
1305
// * annotation cdi.kubevirt.io/storage.usePopulator is not set by user to "false"
1306
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
1✔
1307
        dv := syncState.dvMutated
1✔
1308
        if usePopulator, ok := dv.Annotations[cc.AnnUsePopulator]; ok {
2✔
1309
                boolUsePopulator, err := strconv.ParseBool(usePopulator)
1✔
1310
                if err != nil {
1✔
1311
                        return false, err
×
1312
                }
×
1313
                return boolUsePopulator, nil
1✔
1314
        }
1315
        log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
1✔
1316
        usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
1✔
1317
        if err != nil {
1✔
1318
                return false, err
×
1319
        }
×
1320
        if !usePopulator {
2✔
1321
                if syncState.pvcSpec.StorageClassName != nil {
2✔
1322
                        log.Info("Not using CDI populators, storage class is not a CSI storage", "storageClass", *syncState.pvcSpec.StorageClassName)
1✔
1323
                }
1✔
1324
        }
1325

1326
        return usePopulator, nil
1✔
1327
}
1328

1329
func (r *ReconcilerBase) pvcRequiresWork(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
1330
        if pvc == nil || dv == nil {
2✔
1331
                return true, nil
1✔
1332
        }
1✔
1333
        if pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
1334
                return false, nil
1✔
1335
        }
1✔
1336
        canAdopt, err := cc.AllowClaimAdoption(r.client, pvc, dv)
1✔
1337
        if err != nil {
1✔
1338
                return true, err
×
1339
        }
×
1340
        if canAdopt {
2✔
1341
                return false, nil
1✔
1342
        }
1✔
1343
        return true, nil
1✔
1344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc