• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4706

03 Jun 2024 11:25AM UTC coverage: 58.918% (+0.01%) from 58.904%
#4706

push

travis-ci

web-flow
Fix progress metric registration and parsing (#3292)

* Fix progress metric registration and parsing

Use default metric registration. We shouldn't use the controller-runtime
registration as we have no controller here and it will not register the
metric correctly.

Fix the metric parsing to match its new name. Otherwise the DV progress
will not be updated until its 100%.

Regression introduced in #3254

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Add kubevirt_cdi_import_progress_total metric

Use it in the importer instead of kubevirt_cdi_clone_progress_total and
fix metric parsing accordingly.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Move ProgressFromClaim to host-clone

Nobody else is using it.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Add ProgressMetric interface

ProgressReader can now work with either import or clone progress metric.
FIXME: consider removing the direct Add/Get and use only via interface.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Refactor ProgressMetric interface

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Refactor progress parsing

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Refer metric names from the metrics package

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

---------

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

47 of 91 new or added lines in 13 files covered. (51.65%)

9 existing lines in 3 files now uncovered.

16166 of 27438 relevant lines covered (58.92%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.27
/pkg/controller/datavolume/controller-base.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "crypto/rsa"
22
        "encoding/json"
23
        "fmt"
24
        "net/http"
25
        "reflect"
26
        "strconv"
27
        "time"
28

29
        "github.com/go-logr/logr"
30
        "github.com/pkg/errors"
31

32
        corev1 "k8s.io/api/core/v1"
33
        storagev1 "k8s.io/api/storage/v1"
34
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/runtime"
37
        "k8s.io/apimachinery/pkg/runtime/schema"
38
        "k8s.io/apimachinery/pkg/types"
39
        "k8s.io/client-go/tools/record"
40
        "k8s.io/utils/ptr"
41

42
        "sigs.k8s.io/controller-runtime/pkg/client"
43
        "sigs.k8s.io/controller-runtime/pkg/controller"
44
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
45
        "sigs.k8s.io/controller-runtime/pkg/handler"
46
        "sigs.k8s.io/controller-runtime/pkg/manager"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48
        "sigs.k8s.io/controller-runtime/pkg/source"
49

50
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
51
        "kubevirt.io/containerized-data-importer/pkg/common"
52
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
53
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
54
        cloneMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-cloner"
55
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
56
        importMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-importer"
57
        "kubevirt.io/containerized-data-importer/pkg/token"
58
        "kubevirt.io/containerized-data-importer/pkg/util"
59
)
60

61
const (
62
        // ErrResourceExists provides a const to indicate a resource exists error
63
        ErrResourceExists = "ErrResourceExists"
64
        // ErrResourceMarkedForDeletion provides a const to indicate a resource marked for deletion error
65
        ErrResourceMarkedForDeletion = "ErrResourceMarkedForDeletion"
66
        // ErrClaimLost provides a const to indicate a claim is lost
67
        ErrClaimLost = "ErrClaimLost"
68

69
        // MessageResourceMarkedForDeletion provides a const to form a resource marked for deletion error message
70
        MessageResourceMarkedForDeletion = "Resource %q marked for deletion"
71
        // MessageResourceExists provides a const to form a resource exists error message
72
        MessageResourceExists = "Resource %q already exists and is not managed by DataVolume"
73
        // MessageErrClaimLost provides a const to form claim lost message
74
        MessageErrClaimLost = "PVC %s lost"
75

76
        dvPhaseField = "status.phase"
77

78
        claimRefField = "spec.claimRef"
79

80
        claimStorageClassNameField = "spec.storageClassName"
81
)
82

83
var (
84
        httpClient *http.Client
85

86
        delayedAnnotations = []string{
87
                cc.AnnPopulatedFor,
88
        }
89
)
90

91
// Event represents DV controller event
92
type Event struct {
93
        eventType string
94
        reason    string
95
        message   string
96
}
97

98
type statusPhaseSync struct {
99
        phase  cdiv1.DataVolumePhase
100
        pvcKey *client.ObjectKey
101
        event  Event
102
}
103

104
type dvSyncResult struct {
105
        result    *reconcile.Result
106
        phaseSync *statusPhaseSync
107
}
108

109
type dvSyncState struct {
110
        dv        *cdiv1.DataVolume
111
        dvMutated *cdiv1.DataVolume
112
        pvc       *corev1.PersistentVolumeClaim
113
        pvcSpec   *corev1.PersistentVolumeClaimSpec
114
        dvSyncResult
115
        usePopulator bool
116
}
117

118
// ReconcilerBase members
119
type ReconcilerBase struct {
120
        client               client.Client
121
        recorder             record.EventRecorder
122
        scheme               *runtime.Scheme
123
        log                  logr.Logger
124
        featureGates         featuregates.FeatureGates
125
        installerLabels      map[string]string
126
        shouldUpdateProgress bool
127
}
128

129
func pvcIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
130
        if pvc == nil || dv == nil {
1✔
131
                return false
×
132
        }
×
133
        dvName, ok := pvc.Annotations[cc.AnnPopulatedFor]
1✔
134
        return ok && dvName == dv.Name
1✔
135
}
136

137
func dvIsPrePopulated(dv *cdiv1.DataVolume) bool {
1✔
138
        _, ok := dv.Annotations[cc.AnnPrePopulated]
1✔
139
        return ok
1✔
140
}
1✔
141

142
func checkStaticProvisionPending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
143
        if pvc == nil || dv == nil {
2✔
144
                return false
1✔
145
        }
1✔
146
        if _, ok := dv.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
147
                return false
1✔
148
        }
1✔
149
        _, ok := pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
150
        return ok
1✔
151
}
152

153
func shouldSetDataVolumePending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
154
        if checkStaticProvisionPending(pvc, dv) {
2✔
155
                return true
1✔
156
        }
1✔
157

158
        if pvc != nil {
2✔
159
                return false
1✔
160
        }
1✔
161

162
        return dvIsPrePopulated(dv) || (dv.Status.Phase == cdiv1.PhaseUnset)
1✔
163
}
164

165
// dataVolumeOp is the datavolume's requested operation
166
type dataVolumeOp int
167

168
const (
169
        dataVolumeNop dataVolumeOp = iota
170
        dataVolumeImport
171
        dataVolumeUpload
172
        dataVolumePvcClone
173
        dataVolumeSnapshotClone
174
        dataVolumePopulator
175
)
176

177
type indexArgs struct {
178
        obj          client.Object
179
        field        string
180
        extractValue client.IndexerFunc
181
}
182

183
func getIndexArgs() []indexArgs {
1✔
184
        return []indexArgs{
1✔
185
                {
1✔
186
                        obj:   &cdiv1.DataVolume{},
1✔
187
                        field: dvPhaseField,
1✔
188
                        extractValue: func(obj client.Object) []string {
1✔
189
                                return []string{string(obj.(*cdiv1.DataVolume).Status.Phase)}
×
190
                        },
×
191
                },
192
                {
193
                        obj:   &corev1.PersistentVolume{},
194
                        field: claimRefField,
195
                        extractValue: func(obj client.Object) []string {
1✔
196
                                if pv, ok := obj.(*corev1.PersistentVolume); ok {
2✔
197
                                        if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace != "" && pv.Spec.ClaimRef.Name != "" {
2✔
198
                                                return []string{claimRefIndexKeyFunc(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)}
1✔
199
                                        }
1✔
200
                                }
201
                                return nil
×
202
                        },
203
                },
204
                {
205
                        obj:          &corev1.PersistentVolume{},
206
                        field:        claimStorageClassNameField,
207
                        extractValue: extractAvailablePersistentVolumeStorageClassName,
208
                },
209
        }
210
}
211

212
func claimRefIndexKeyFunc(namespace, name string) string {
1✔
213
        return namespace + "/" + name
1✔
214
}
1✔
215

216
// CreateCommonIndexes creates indexes used by all controllers
217
func CreateCommonIndexes(mgr manager.Manager) error {
×
218
        for _, ia := range getIndexArgs() {
×
219
                if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
×
220
                        return err
×
221
                }
×
222
        }
223
        return nil
×
224
}
225

226
// CreateAvailablePersistentVolumeIndex adds storage class name index for available PersistentVolumes
227
func CreateAvailablePersistentVolumeIndex(fieldIndexer client.FieldIndexer) error {
×
228
        return fieldIndexer.IndexField(context.TODO(), &corev1.PersistentVolume{},
×
229
                claimStorageClassNameField, extractAvailablePersistentVolumeStorageClassName)
×
230
}
×
231

232
func extractAvailablePersistentVolumeStorageClassName(obj client.Object) []string {
×
233
        if pv, ok := obj.(*corev1.PersistentVolume); ok && pv.Status.Phase == corev1.VolumeAvailable {
×
234
                return []string{pv.Spec.StorageClassName}
×
235
        }
×
236
        return nil
×
237
}
238

239
func addDataVolumeControllerCommonWatches(mgr manager.Manager, dataVolumeController controller.Controller, op dataVolumeOp) error {
×
240
        appendMatchingDataVolumeRequest := func(ctx context.Context, reqs []reconcile.Request, mgr manager.Manager, namespace, name string) []reconcile.Request {
×
241
                dvKey := types.NamespacedName{Namespace: namespace, Name: name}
×
242
                dv := &cdiv1.DataVolume{}
×
243
                if err := mgr.GetClient().Get(ctx, dvKey, dv); err != nil {
×
244
                        if !k8serrors.IsNotFound(err) {
×
245
                                mgr.GetLogger().Error(err, "Failed to get DV", "dvKey", dvKey)
×
246
                        }
×
247
                        return reqs
×
248
                }
249
                if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) == op {
×
250
                        reqs = append(reqs, reconcile.Request{NamespacedName: dvKey})
×
251
                }
×
252
                return reqs
×
253
        }
254

255
        // Setup watches
256
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{}), handler.EnqueueRequestsFromMapFunc(
×
257
                func(ctx context.Context, obj client.Object) []reconcile.Request {
×
258
                        dv := obj.(*cdiv1.DataVolume)
×
259
                        if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) != op {
×
260
                                return nil
×
261
                        }
×
262
                        updatePendingDataVolumesGauge(ctx, mgr.GetLogger(), dv, mgr.GetClient())
×
263
                        return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}}}
×
264
                }),
265
        ); err != nil {
×
266
                return err
×
267
        }
×
268
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}), handler.EnqueueRequestsFromMapFunc(
×
269
                func(ctx context.Context, obj client.Object) []reconcile.Request {
×
270
                        var result []reconcile.Request
×
271
                        owner := metav1.GetControllerOf(obj)
×
272
                        if owner != nil && owner.Kind == "DataVolume" {
×
273
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), owner.Name)
×
274
                        }
×
275
                        populatedFor := obj.GetAnnotations()[cc.AnnPopulatedFor]
×
276
                        if populatedFor != "" {
×
277
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), populatedFor)
×
278
                        }
×
279
                        // it is okay if result contains the same entry twice, will be deduplicated by caller
280
                        return result
×
281
                }),
282
        ); err != nil {
×
283
                return err
×
284
        }
×
285
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(
×
286
                func(ctx context.Context, obj client.Object) []reconcile.Request {
×
287
                        owner := metav1.GetControllerOf(obj)
×
288
                        if owner == nil || owner.Kind != "DataVolume" {
×
289
                                return nil
×
290
                        }
×
291
                        return appendMatchingDataVolumeRequest(ctx, nil, mgr, obj.GetNamespace(), owner.Name)
×
292
                }),
293
        ); err != nil {
×
294
                return err
×
295
        }
×
296
        for _, k := range []client.Object{&corev1.PersistentVolumeClaim{}, &corev1.Pod{}, &cdiv1.ObjectTransfer{}} {
×
297
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k), handler.EnqueueRequestsFromMapFunc(
×
298
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
299
                                if !hasAnnOwnedByDataVolume(obj) {
×
300
                                        return nil
×
301
                                }
×
302
                                namespace, name, err := getAnnOwnedByDataVolume(obj)
×
303
                                if err != nil {
×
304
                                        return nil
×
305
                                }
×
306
                                return appendMatchingDataVolumeRequest(ctx, nil, mgr, namespace, name)
×
307
                        }),
308
                ); err != nil {
×
309
                        return err
×
310
                }
×
311
        }
312

313
        // Watch for SC updates and reconcile the DVs waiting for default SC
314
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9922)
315
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{}), handler.EnqueueRequestsFromMapFunc(
×
316
                func(ctx context.Context, obj client.Object) []reconcile.Request {
×
317
                        dvList := &cdiv1.DataVolumeList{}
×
318
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
319
                                return nil
×
320
                        }
×
321
                        var reqs []reconcile.Request
×
322
                        for _, dv := range dvList.Items {
×
323
                                if getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
324
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
325
                                }
×
326
                        }
327
                        return reqs
×
328
                },
329
        ),
330
        ); err != nil {
×
331
                return err
×
332
        }
×
333

334
        // Watch for PV updates to reconcile the DVs waiting for available PV
335
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9924,9925)
336
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}), handler.EnqueueRequestsFromMapFunc(
×
337
                func(ctx context.Context, obj client.Object) []reconcile.Request {
×
338
                        pv := obj.(*corev1.PersistentVolume)
×
339
                        dvList := &cdiv1.DataVolumeList{}
×
340
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
341
                                return nil
×
342
                        }
×
343
                        var reqs []reconcile.Request
×
344
                        for _, dv := range dvList.Items {
×
345
                                storage := dv.Spec.Storage
×
346
                                if storage != nil &&
×
347
                                        storage.StorageClassName != nil &&
×
348
                                        *storage.StorageClassName == pv.Spec.StorageClassName &&
×
349
                                        pv.Status.Phase == corev1.VolumeAvailable &&
×
350
                                        getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
351
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
352
                                }
×
353
                        }
354
                        return reqs
×
355
                },
356
        ),
357
        ); err != nil {
×
358
                return err
×
359
        }
×
360

361
        return nil
×
362
}
363

364
func getDataVolumeOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
365
        src := dv.Spec.Source
×
366

×
367
        if dv.Spec.SourceRef != nil {
×
368
                return getSourceRefOp(ctx, log, dv, client)
×
369
        }
×
370
        if src != nil && src.PVC != nil {
×
371
                return dataVolumePvcClone
×
372
        }
×
373
        if src != nil && src.Snapshot != nil {
×
374
                return dataVolumeSnapshotClone
×
375
        }
×
376
        if src == nil {
×
377
                if dvUsesVolumePopulator(dv) {
×
378
                        return dataVolumePopulator
×
379
                }
×
380
                return dataVolumeNop
×
381
        }
382
        if src.Upload != nil {
×
383
                return dataVolumeUpload
×
384
        }
×
385
        if src.HTTP != nil || src.S3 != nil || src.GCS != nil || src.Registry != nil || src.Blank != nil || src.Imageio != nil || src.VDDK != nil {
×
386
                return dataVolumeImport
×
387
        }
×
388

389
        return dataVolumeNop
×
390
}
391

392
func getSourceRefOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
393
        dataSource := &cdiv1.DataSource{}
×
394
        ns := dv.Namespace
×
395
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
396
                ns = *dv.Spec.SourceRef.Namespace
×
397
        }
×
398
        nn := types.NamespacedName{Namespace: ns, Name: dv.Spec.SourceRef.Name}
×
399
        if err := client.Get(ctx, nn, dataSource); err != nil {
×
400
                log.Error(err, "Unable to get DataSource", "namespacedName", nn)
×
401
                return dataVolumeNop
×
402
        }
×
403

404
        switch {
×
405
        case dataSource.Spec.Source.PVC != nil:
×
406
                return dataVolumePvcClone
×
407
        case dataSource.Spec.Source.Snapshot != nil:
×
408
                return dataVolumeSnapshotClone
×
409
        default:
×
410
                return dataVolumeNop
×
411
        }
412
}
413

414
func updatePendingDataVolumesGauge(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, c client.Client) {
×
415
        if !cc.IsDataVolumeUsingDefaultStorageClass(dv) {
×
416
                return
×
417
        }
×
418

419
        countPending, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.Pending))
×
420
        if err != nil {
×
421
                log.V(3).Error(err, "Failed listing the pending DataVolumes")
×
422
                return
×
423
        }
×
424
        countUnset, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.PhaseUnset))
×
425
        if err != nil {
×
426
                log.V(3).Error(err, "Failed listing the unset DataVolumes")
×
427
                return
×
428
        }
×
429

430
        metrics.SetDataVolumePending(countPending + countUnset)
×
431
}
432

433
func getDefaultStorageClassDataVolumeCount(ctx context.Context, c client.Client, dvPhase string) (int, error) {
×
434
        dvList := &cdiv1.DataVolumeList{}
×
435
        if err := c.List(ctx, dvList, client.MatchingFields{dvPhaseField: dvPhase}); err != nil {
×
436
                return 0, err
×
437
        }
×
438

439
        dvCount := 0
×
440
        for _, dv := range dvList.Items {
×
441
                if cc.IsDataVolumeUsingDefaultStorageClass(&dv) {
×
442
                        dvCount++
×
443
                }
×
444
        }
445

446
        return dvCount, nil
×
447
}
448

449
type dvController interface {
450
        reconcile.Reconciler
451
        sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error)
452
        updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error
453
}
454

455
func (r *ReconcilerBase) reconcile(ctx context.Context, req reconcile.Request, dvc dvController) (reconcile.Result, error) {
1✔
456
        log := r.log.WithValues("DataVolume", req.NamespacedName)
1✔
457
        syncRes, syncErr := dvc.sync(log, req)
1✔
458
        res, err := r.updateStatus(req, syncRes.phaseSync, dvc)
1✔
459
        if syncErr != nil {
2✔
460
                err = syncErr
1✔
461
        }
1✔
462
        if syncRes.result != nil {
2✔
463
                res = *syncRes.result
1✔
464
        }
1✔
465
        return res, err
1✔
466
}
467

468
type dvSyncStateFunc func(*dvSyncState) error
469

470
func (r *ReconcilerBase) syncCommon(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
471
        syncState, err := r.syncDvPvcState(log, req, cleanup, prepare)
1✔
472
        if err == nil {
2✔
473
                err = r.syncUpdate(log, &syncState)
1✔
474
        }
1✔
475
        return syncState, err
1✔
476
}
477

478
func (r *ReconcilerBase) syncDvPvcState(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
479
        syncState := dvSyncState{}
1✔
480
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
481
        if dv == nil || err != nil {
2✔
482
                syncState.result = &reconcile.Result{}
1✔
483
                return syncState, err
1✔
484
        }
1✔
485
        syncState.dv = dv
1✔
486
        syncState.dvMutated = dv.DeepCopy()
1✔
487
        syncState.pvc, err = r.getPVC(req.NamespacedName)
1✔
488
        if err != nil {
1✔
489
                return syncState, err
×
490
        }
×
491

492
        if cleanup != nil {
2✔
493
                if err := cleanup(&syncState); err != nil {
1✔
494
                        return syncState, err
×
495
                }
×
496
        }
497

498
        if dv.DeletionTimestamp != nil {
1✔
499
                log.Info("DataVolume marked for deletion")
×
500
                syncState.result = &reconcile.Result{}
×
501
                return syncState, nil
×
502
        }
×
503

504
        if prepare != nil {
2✔
505
                if err := prepare(&syncState); err != nil {
1✔
506
                        return syncState, err
×
507
                }
×
508
        }
509

510
        syncState.pvcSpec, err = renderPvcSpec(r.client, r.recorder, log, syncState.dvMutated, syncState.pvc)
1✔
511
        if err != nil {
2✔
512
                if syncErr := r.syncDataVolumeStatusPhaseWithEvent(&syncState, cdiv1.PhaseUnset, nil,
1✔
513
                        Event{corev1.EventTypeWarning, cc.ErrClaimNotValid, err.Error()}); syncErr != nil {
1✔
514
                        log.Error(syncErr, "failed to sync DataVolume status with event")
×
515
                }
×
516
                if errors.Is(err, ErrStorageClassNotFound) {
2✔
517
                        syncState.result = &reconcile.Result{}
1✔
518
                        return syncState, nil
1✔
519
                }
1✔
520
                return syncState, err
1✔
521
        }
522

523
        syncState.usePopulator, err = r.shouldUseCDIPopulator(&syncState)
1✔
524
        if err != nil {
1✔
525
                return syncState, err
×
526
        }
×
527
        updateDataVolumeUseCDIPopulator(&syncState)
1✔
528

1✔
529
        if err := r.handleStaticVolume(&syncState, log); err != nil || syncState.result != nil {
2✔
530
                return syncState, err
1✔
531
        }
1✔
532

533
        if err := r.handleDelayedAnnotations(&syncState, log); err != nil || syncState.result != nil {
2✔
534
                return syncState, err
1✔
535
        }
1✔
536

537
        if err = updateDataVolumeDefaultInstancetypeLabels(r.client, &syncState); err != nil {
1✔
538
                return syncState, err
×
539
        }
×
540

541
        if syncState.pvc != nil {
2✔
542
                if err := r.garbageCollect(&syncState, log); err != nil {
1✔
543
                        return syncState, err
×
544
                }
×
545
                if syncState.result != nil || syncState.dv == nil {
2✔
546
                        return syncState, nil
1✔
547
                }
1✔
548
                if err := r.validatePVC(dv, syncState.pvc); err != nil {
2✔
549
                        return syncState, err
1✔
550
                }
1✔
551
                r.handlePrePopulation(syncState.dvMutated, syncState.pvc)
1✔
552
        }
553

554
        return syncState, nil
1✔
555
}
556

557
func (r *ReconcilerBase) syncUpdate(log logr.Logger, syncState *dvSyncState) error {
1✔
558
        if syncState.dv == nil || syncState.dvMutated == nil {
2✔
559
                return nil
1✔
560
        }
1✔
561
        if !reflect.DeepEqual(syncState.dv.Status, syncState.dvMutated.Status) {
1✔
562
                return fmt.Errorf("status update is not allowed in sync phase")
×
563
        }
×
564
        if !reflect.DeepEqual(syncState.dv.ObjectMeta, syncState.dvMutated.ObjectMeta) {
2✔
565
                _, ok := syncState.dv.Annotations[cc.AnnExtendedCloneToken]
1✔
566
                _, ok2 := syncState.dvMutated.Annotations[cc.AnnExtendedCloneToken]
1✔
567
                if err := r.updateDataVolume(syncState.dvMutated); err != nil {
1✔
568
                        r.log.Error(err, "Unable to sync update dv meta", "name", syncState.dvMutated.Name)
×
569
                        return err
×
570
                }
×
571
                if !ok && ok2 {
2✔
572
                        delta := time.Since(syncState.dv.ObjectMeta.CreationTimestamp.Time)
1✔
573
                        log.V(3).Info("Adding extended DataVolume token took", "delta", delta)
1✔
574
                }
1✔
575
                syncState.dv = syncState.dvMutated.DeepCopy()
1✔
576
        }
577
        return nil
1✔
578
}
579

580
func (r *ReconcilerBase) handleStaticVolume(syncState *dvSyncState, log logr.Logger) error {
1✔
581
        if _, ok := syncState.dvMutated.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
582
                return nil
1✔
583
        }
1✔
584

585
        if syncState.pvc == nil {
2✔
586
                volumes, err := r.getAvailableVolumesForDV(syncState, log)
1✔
587
                if err != nil {
1✔
588
                        return err
×
589
                }
×
590

591
                if len(volumes) == 0 {
2✔
592
                        log.Info("No PVs for DV")
1✔
593
                        return nil
1✔
594
                }
1✔
595

596
                if err := r.handlePvcCreation(log, syncState, func(_ *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
2✔
597
                        bs, err := json.Marshal(volumes)
1✔
598
                        if err != nil {
1✔
599
                                return err
×
600
                        }
×
601
                        cc.AddAnnotation(pvc, cc.AnnPersistentVolumeList, string(bs))
1✔
602
                        return nil
1✔
603
                }); err != nil {
×
604
                        return err
×
605
                }
×
606

607
                // set result to make sure callers don't do anything else in sync
608
                syncState.result = &reconcile.Result{}
1✔
609
                return nil
1✔
610
        }
611

612
        volumeAnno, ok := syncState.pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
613
        if !ok {
1✔
614
                // etiher did not create the PVC here OR bind to expected PV succeeded
×
615
                return nil
×
616
        }
×
617

618
        if cc.IsUnbound(syncState.pvc) {
2✔
619
                // set result to make sure callers don't do anything else in sync
1✔
620
                syncState.result = &reconcile.Result{}
1✔
621
                return nil
1✔
622
        }
1✔
623

624
        var volumes []string
1✔
625
        if err := json.Unmarshal([]byte(volumeAnno), &volumes); err != nil {
1✔
626
                return err
×
627
        }
×
628

629
        for _, v := range volumes {
2✔
630
                if v == syncState.pvc.Spec.VolumeName {
2✔
631
                        pvcCpy := syncState.pvc.DeepCopy()
1✔
632
                        // handle as "populatedFor" going forward
1✔
633
                        cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, syncState.dvMutated.Name)
1✔
634
                        delete(pvcCpy.Annotations, cc.AnnPersistentVolumeList)
1✔
635
                        if err := r.updatePVC(pvcCpy); err != nil {
1✔
636
                                return err
×
637
                        }
×
638
                        syncState.pvc = pvcCpy
1✔
639
                        return nil
1✔
640
                }
641
        }
642

643
        // delete the pvc and hope for better luck...
644
        pvcCpy := syncState.pvc.DeepCopy()
1✔
645
        if err := r.client.Delete(context.TODO(), pvcCpy, &client.DeleteOptions{}); err != nil {
1✔
646
                return err
×
647
        }
×
648

649
        syncState.pvc = pvcCpy
1✔
650

1✔
651
        return fmt.Errorf("DataVolume bound to unexpected PV %s", syncState.pvc.Spec.VolumeName)
1✔
652
}
653

654
func (r *ReconcilerBase) handleDelayedAnnotations(syncState *dvSyncState, log logr.Logger) error {
1✔
655
        dataVolume := syncState.dv
1✔
656
        if dataVolume.Status.Phase != cdiv1.Succeeded {
2✔
657
                return nil
1✔
658
        }
1✔
659

660
        if syncState.pvc == nil {
1✔
661
                return nil
×
662
        }
×
663

664
        pvcCpy := syncState.pvc.DeepCopy()
1✔
665
        for _, anno := range delayedAnnotations {
2✔
666
                if val, ok := dataVolume.Annotations[anno]; ok {
2✔
667
                        // only add if not already present
1✔
668
                        if _, ok := pvcCpy.Annotations[anno]; !ok {
2✔
669
                                cc.AddAnnotation(pvcCpy, anno, val)
1✔
670
                        }
1✔
671
                }
672
        }
673

674
        if !reflect.DeepEqual(syncState.pvc, pvcCpy) {
2✔
675
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
676
                        return err
×
677
                }
×
678
                syncState.pvc = pvcCpy
1✔
679
                syncState.result = &reconcile.Result{}
1✔
680
        }
681

682
        return nil
1✔
683
}
684

685
func (r *ReconcilerBase) getAvailableVolumesForDV(syncState *dvSyncState, log logr.Logger) ([]string, error) {
1✔
686
        pvList := &corev1.PersistentVolumeList{}
1✔
687
        fields := client.MatchingFields{claimRefField: claimRefIndexKeyFunc(syncState.dv.Namespace, syncState.dv.Name)}
1✔
688
        if err := r.client.List(context.TODO(), pvList, fields); err != nil {
1✔
689
                return nil, err
×
690
        }
×
691
        if syncState.pvcSpec == nil {
1✔
692
                return nil, fmt.Errorf("missing pvc spec")
×
693
        }
×
694
        var pvNames []string
1✔
695
        for _, pv := range pvList.Items {
2✔
696
                if pv.Status.Phase == corev1.VolumeAvailable {
2✔
697
                        pvc := &corev1.PersistentVolumeClaim{
1✔
698
                                Spec: *syncState.pvcSpec,
1✔
699
                        }
1✔
700
                        if err := CheckVolumeSatisfyClaim(&pv, pvc); err != nil {
1✔
701
                                continue
×
702
                        }
703
                        log.Info("Found matching volume for DV", "pv", pv.Name)
1✔
704
                        pvNames = append(pvNames, pv.Name)
1✔
705
                }
706
        }
707
        return pvNames, nil
1✔
708
}
709

710
func (r *ReconcilerBase) handlePrePopulation(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) {
1✔
711
        if pvc.Status.Phase == corev1.ClaimBound && pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
712
                cc.AddAnnotation(dv, cc.AnnPrePopulated, pvc.Name)
1✔
713
        }
1✔
714
}
715

716
func (r *ReconcilerBase) validatePVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
717
        // If the PVC is being deleted, we should log a warning to the event recorder and return to wait the deletion complete
1✔
718
        // don't bother with owner refs is the pvc is deleted
1✔
719
        if pvc.DeletionTimestamp != nil {
1✔
720
                msg := fmt.Sprintf(MessageResourceMarkedForDeletion, pvc.Name)
×
721
                r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceMarkedForDeletion, msg)
×
722
                return errors.Errorf(msg)
×
723
        }
×
724
        // If the PVC is not controlled by this DataVolume resource, we should log
725
        // a warning to the event recorder and return
726
        if !metav1.IsControlledBy(pvc, dv) {
2✔
727
                requiresWork, err := r.pvcRequiresWork(pvc, dv)
1✔
728
                if err != nil {
1✔
729
                        return err
×
730
                }
×
731
                if !requiresWork {
2✔
732
                        if err := r.addOwnerRef(pvc, dv); err != nil {
1✔
733
                                return err
×
734
                        }
×
735
                } else {
1✔
736
                        msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
1✔
737
                        r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceExists, msg)
1✔
738
                        return errors.Errorf(msg)
1✔
739
                }
1✔
740
        }
741
        return nil
1✔
742
}
743

744
func (r *ReconcilerBase) getPVC(key types.NamespacedName) (*corev1.PersistentVolumeClaim, error) {
1✔
745
        pvc := &corev1.PersistentVolumeClaim{}
1✔
746
        if err := r.client.Get(context.TODO(), key, pvc); err != nil {
2✔
747
                if k8serrors.IsNotFound(err) {
2✔
748
                        return nil, nil
1✔
749
                }
1✔
750
                return nil, err
×
751
        }
752
        return pvc, nil
1✔
753
}
754

755
func (r *ReconcilerBase) getDataVolume(key types.NamespacedName) (*cdiv1.DataVolume, error) {
1✔
756
        dv := &cdiv1.DataVolume{}
1✔
757
        if err := r.client.Get(context.TODO(), key, dv); err != nil {
2✔
758
                if k8serrors.IsNotFound(err) {
2✔
759
                        return nil, nil
1✔
760
                }
1✔
761
                return nil, err
×
762
        }
763
        return dv, nil
1✔
764
}
765

766
type pvcModifierFunc func(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error
767

768
func (r *ReconcilerBase) createPvcForDatavolume(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec,
769
        pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
770
        newPvc, err := r.newPersistentVolumeClaim(datavolume, pvcSpec, datavolume.Namespace, datavolume.Name, pvcModifier)
1✔
771
        if err != nil {
2✔
772
                return nil, err
1✔
773
        }
1✔
774
        util.SetRecommendedLabels(newPvc, r.installerLabels, "cdi-controller")
1✔
775
        if err := r.client.Create(context.TODO(), newPvc); err != nil {
1✔
776
                return nil, err
×
777
        }
×
778
        return newPvc, nil
1✔
779
}
780

781
func (r *ReconcilerBase) getStorageClassBindingMode(storageClassName *string) (*storagev1.VolumeBindingMode, error) {
1✔
782
        // Handle unspecified storage class name, fallback to default storage class
1✔
783
        storageClass, err := cc.GetStorageClassByNameWithK8sFallback(context.TODO(), r.client, storageClassName)
1✔
784
        if err != nil {
1✔
785
                return nil, err
×
786
        }
×
787

788
        if storageClass != nil && storageClass.VolumeBindingMode != nil {
2✔
789
                return storageClass.VolumeBindingMode, nil
1✔
790
        }
1✔
791

792
        // no storage class, then the assumption is immediate binding
793
        volumeBindingImmediate := storagev1.VolumeBindingImmediate
1✔
794
        return &volumeBindingImmediate, nil
1✔
795
}
796

797
func (r *ReconcilerBase) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, result *reconcile.Result) error {
1✔
798
        var podNamespace string
1✔
799
        if datavolume.Status.Progress == "" {
2✔
800
                datavolume.Status.Progress = "N/A"
1✔
801
        }
1✔
802

803
        if !r.shouldUpdateProgress {
2✔
804
                return nil
1✔
805
        }
1✔
806

807
        if usePopulator, _ := CheckPVCUsingPopulators(pvc); usePopulator {
2✔
808
                if progress, ok := pvc.Annotations[cc.AnnPopulatorProgress]; ok {
2✔
809
                        datavolume.Status.Progress = cdiv1.DataVolumeProgress(progress)
1✔
810
                } else {
2✔
811
                        datavolume.Status.Progress = "N/A"
1✔
812
                }
1✔
813
                return nil
1✔
814
        }
815

816
        if datavolume.Spec.Source != nil && datavolume.Spec.Source.PVC != nil {
2✔
817
                podNamespace = datavolume.Spec.Source.PVC.Namespace
1✔
818
        } else {
2✔
819
                podNamespace = datavolume.Namespace
1✔
820
        }
1✔
821

822
        if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
2✔
823
                // Data volume completed progress, or failed, either way stop queueing the data volume.
1✔
824
                r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
1✔
825
                return nil
1✔
826
        }
1✔
827
        pod, err := cc.GetPodFromPvc(r.client, podNamespace, pvc)
1✔
828
        if err == nil {
1✔
829
                if pod.Status.Phase != corev1.PodRunning {
×
830
                        // Avoid long timeouts and error traces from HTTP get when pod is already gone
×
831
                        return nil
×
832
                }
×
833
                if err := updateProgressUsingPod(datavolume, pod); err != nil {
×
834
                        return err
×
835
                }
×
836
        }
837
        // We are not done yet, force a re-reconcile in 2 seconds to get an update.
838
        result.RequeueAfter = 2 * time.Second
1✔
839
        return nil
1✔
840
}
841

842
func (r *ReconcilerBase) syncDataVolumeStatusPhaseWithEvent(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim, event Event) error {
1✔
843
        if syncState.phaseSync != nil {
1✔
844
                return fmt.Errorf("phaseSync is already set")
×
845
        }
×
846
        syncState.phaseSync = &statusPhaseSync{phase: phase, event: event}
1✔
847
        if pvc != nil {
1✔
848
                key := client.ObjectKeyFromObject(pvc)
×
849
                syncState.phaseSync.pvcKey = &key
×
850
        }
×
851
        return nil
1✔
852
}
853

854
func (r *ReconcilerBase) updateDataVolumeStatusPhaseSync(ps *statusPhaseSync, dv *cdiv1.DataVolume, dvCopy *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
855
        var condPvc *corev1.PersistentVolumeClaim
1✔
856
        var err error
1✔
857
        if ps.pvcKey != nil {
1✔
858
                if pvc == nil || *ps.pvcKey != client.ObjectKeyFromObject(pvc) {
×
859
                        condPvc, err = r.getPVC(*ps.pvcKey)
×
860
                        if err != nil {
×
861
                                return err
×
862
                        }
×
863
                } else {
×
864
                        condPvc = pvc
×
865
                }
×
866
        }
867
        return r.updateDataVolumeStatusPhaseWithEvent(ps.phase, dv, dvCopy, condPvc, ps.event)
1✔
868
}
869

870
func (r *ReconcilerBase) updateDataVolumeStatusPhaseWithEvent(
871
        phase cdiv1.DataVolumePhase,
872
        dataVolume *cdiv1.DataVolume,
873
        dataVolumeCopy *cdiv1.DataVolume,
874
        pvc *corev1.PersistentVolumeClaim,
875
        event Event) error {
1✔
876
        if dataVolume == nil {
1✔
877
                return nil
×
878
        }
×
879

880
        curPhase := dataVolumeCopy.Status.Phase
1✔
881
        dataVolumeCopy.Status.Phase = phase
1✔
882

1✔
883
        reason := ""
1✔
884
        message := ""
1✔
885
        if pvc == nil {
2✔
886
                reason = event.reason
1✔
887
                message = event.message
1✔
888
        }
1✔
889
        r.updateConditions(dataVolumeCopy, pvc, reason, message)
1✔
890
        return r.emitEvent(dataVolume, dataVolumeCopy, curPhase, dataVolume.Status.Conditions, &event)
1✔
891
}
892

893
func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPhaseSync, dvc dvController) (reconcile.Result, error) {
1✔
894
        result := reconcile.Result{}
1✔
895
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
896
        if dv == nil || err != nil {
2✔
897
                return reconcile.Result{}, err
1✔
898
        }
1✔
899

900
        dataVolumeCopy := dv.DeepCopy()
1✔
901

1✔
902
        pvc, err := r.getPVC(req.NamespacedName)
1✔
903
        if err != nil {
1✔
904
                return reconcile.Result{}, err
×
905
        }
×
906

907
        if phaseSync != nil {
2✔
908
                err = r.updateDataVolumeStatusPhaseSync(phaseSync, dv, dataVolumeCopy, pvc)
1✔
909
                return reconcile.Result{}, err
1✔
910
        }
1✔
911

912
        curPhase := dataVolumeCopy.Status.Phase
1✔
913
        var event Event
1✔
914

1✔
915
        if shouldSetDataVolumePending(pvc, dataVolumeCopy) {
2✔
916
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
917
        } else if pvc != nil {
3✔
918
                dataVolumeCopy.Status.ClaimName = pvc.Name
1✔
919

1✔
920
                phase := pvc.Annotations[cc.AnnPodPhase]
1✔
921
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
922
                if err != nil {
1✔
923
                        return reconcile.Result{}, err
×
924
                }
×
925
                if phase == string(cdiv1.Succeeded) && requiresWork {
2✔
926
                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
927
                                return reconcile.Result{}, err
×
928
                        }
×
929
                } else {
1✔
930
                        switch pvc.Status.Phase {
1✔
931
                        case corev1.ClaimPending:
1✔
932
                                if requiresWork {
2✔
933
                                        if err := r.updateStatusPVCPending(pvc, dvc, dataVolumeCopy, &event); err != nil {
1✔
934
                                                return reconcile.Result{}, err
×
935
                                        }
×
936
                                } else {
1✔
937
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
938
                                }
1✔
939
                        case corev1.ClaimBound:
1✔
940
                                switch dataVolumeCopy.Status.Phase {
1✔
941
                                case cdiv1.Pending:
1✔
942
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
943
                                case cdiv1.WaitForFirstConsumer:
×
944
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
×
945
                                case cdiv1.Unknown:
1✔
946
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
947
                                }
948

949
                                if requiresWork {
2✔
950
                                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
951
                                                return reconcile.Result{}, err
×
952
                                        }
×
953
                                } else {
1✔
954
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
955
                                }
1✔
956

957
                        case corev1.ClaimLost:
1✔
958
                                dataVolumeCopy.Status.Phase = cdiv1.Failed
1✔
959
                                event.eventType = corev1.EventTypeWarning
1✔
960
                                event.reason = ErrClaimLost
1✔
961
                                event.message = fmt.Sprintf(MessageErrClaimLost, pvc.Name)
1✔
962
                        default:
1✔
963
                                if pvc.Status.Phase != "" {
1✔
964
                                        dataVolumeCopy.Status.Phase = cdiv1.Unknown
×
965
                                }
×
966
                        }
967
                }
968

969
                if i, err := strconv.ParseInt(pvc.Annotations[cc.AnnPodRestarts], 10, 32); err == nil && i >= 0 {
2✔
970
                        dataVolumeCopy.Status.RestartCount = int32(i)
1✔
971
                }
1✔
972
                if err := r.reconcileProgressUpdate(dataVolumeCopy, pvc, &result); err != nil {
1✔
973
                        return result, err
×
974
                }
×
975
        }
976

977
        currentCond := make([]cdiv1.DataVolumeCondition, len(dataVolumeCopy.Status.Conditions))
1✔
978
        copy(currentCond, dataVolumeCopy.Status.Conditions)
1✔
979
        r.updateConditions(dataVolumeCopy, pvc, "", "")
1✔
980
        return result, r.emitEvent(dv, dataVolumeCopy, curPhase, currentCond, &event)
1✔
981
}
982

983
func (r ReconcilerBase) updateStatusPVCPending(pvc *corev1.PersistentVolumeClaim, dvc dvController, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
984
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
985
        if err != nil {
1✔
986
                return err
×
987
        }
×
988
        if usePopulator {
2✔
989
                // when using populators the target pvc phase will stay pending until the population completes,
1✔
990
                // hence if not wffc we should update the dv phase according to the pod phase
1✔
991
                shouldBeMarkedPendingPopulation, err := r.shouldBeMarkedPendingPopulation(pvc)
1✔
992
                if err != nil {
1✔
993
                        return err
×
994
                }
×
995
                if shouldBeMarkedPendingPopulation {
2✔
996
                        dataVolumeCopy.Status.Phase = cdiv1.PendingPopulation
1✔
997
                } else if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, event); err != nil {
2✔
998
                        return err
×
999
                }
×
1000
                return nil
1✔
1001
        }
1002

1003
        shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
1✔
1004
        if err != nil {
1✔
1005
                return err
×
1006
        }
×
1007
        if shouldBeMarkedWaitForFirstConsumer {
2✔
1008
                dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
1✔
1009
        } else {
2✔
1010
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
1011
        }
1✔
1012
        return nil
1✔
1013
}
1014

1015
func (r *ReconcilerBase) updateConditions(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, reason, message string) {
1✔
1016
        var anno map[string]string
1✔
1017

1✔
1018
        if dataVolume.Status.Conditions == nil {
2✔
1019
                dataVolume.Status.Conditions = make([]cdiv1.DataVolumeCondition, 0)
1✔
1020
        }
1✔
1021

1022
        if pvc != nil {
2✔
1023
                anno = pvc.Annotations
1✔
1024
        } else {
2✔
1025
                anno = make(map[string]string)
1✔
1026
        }
1✔
1027

1028
        var readyStatus corev1.ConditionStatus
1✔
1029
        switch dataVolume.Status.Phase {
1✔
1030
        case cdiv1.Succeeded:
1✔
1031
                readyStatus = corev1.ConditionTrue
1✔
1032
        case cdiv1.Unknown:
×
1033
                readyStatus = corev1.ConditionUnknown
×
1034
        default:
1✔
1035
                readyStatus = corev1.ConditionFalse
1✔
1036
        }
1037

1038
        dataVolume.Status.Conditions = updateBoundCondition(dataVolume.Status.Conditions, pvc, message, reason)
1✔
1039
        dataVolume.Status.Conditions = UpdateReadyCondition(dataVolume.Status.Conditions, readyStatus, message, reason)
1✔
1040
        dataVolume.Status.Conditions = updateRunningCondition(dataVolume.Status.Conditions, anno)
1✔
1041
}
1042

1043
func (r *ReconcilerBase) emitConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1044
        r.emitBoundConditionEvent(dataVolume, FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions), FindConditionByType(cdiv1.DataVolumeBound, originalCond))
1✔
1045
        r.emitFailureConditionEvent(dataVolume, originalCond)
1✔
1046
}
1✔
1047

1048
func (r *ReconcilerBase) emitBoundConditionEvent(dataVolume *cdiv1.DataVolume, current, original *cdiv1.DataVolumeCondition) {
1✔
1049
        // We know reason and message won't be empty for bound.
1✔
1050
        if current != nil && (original == nil || current.Status != original.Status || current.Reason != original.Reason || current.Message != original.Message) {
2✔
1051
                r.recorder.Event(dataVolume, corev1.EventTypeNormal, current.Reason, current.Message)
1✔
1052
        }
1✔
1053
}
1054

1055
func (r *ReconcilerBase) emitFailureConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1056
        curReady := FindConditionByType(cdiv1.DataVolumeReady, dataVolume.Status.Conditions)
1✔
1057
        curBound := FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions)
1✔
1058
        curRunning := FindConditionByType(cdiv1.DataVolumeRunning, dataVolume.Status.Conditions)
1✔
1059
        orgRunning := FindConditionByType(cdiv1.DataVolumeRunning, originalCond)
1✔
1060

1✔
1061
        if curReady == nil || curBound == nil || curRunning == nil {
1✔
1062
                return
×
1063
        }
×
1064
        if curReady.Status == corev1.ConditionFalse && curRunning.Status == corev1.ConditionFalse &&
1✔
1065
                dvBoundOrPopulationInProgress(dataVolume, curBound) {
2✔
1066
                // Bound or in progress, not ready, and not running.
1✔
1067
                // Avoiding triggering an event for scratch space required since it will be addressed
1✔
1068
                // by CDI and sounds more drastic than it actually is.
1✔
1069
                if curRunning.Message != "" && curRunning.Message != common.ScratchSpaceRequired &&
1✔
1070
                        (orgRunning == nil || orgRunning.Message != curRunning.Message) {
1✔
1071
                        r.recorder.Event(dataVolume, corev1.EventTypeWarning, curRunning.Reason, curRunning.Message)
×
1072
                }
×
1073
        }
1074
}
1075

1076
func (r *ReconcilerBase) emitEvent(dataVolume *cdiv1.DataVolume, dataVolumeCopy *cdiv1.DataVolume, curPhase cdiv1.DataVolumePhase, originalCond []cdiv1.DataVolumeCondition, event *Event) error {
1✔
1077
        if !reflect.DeepEqual(dataVolume.ObjectMeta, dataVolumeCopy.ObjectMeta) {
1✔
1078
                return fmt.Errorf("meta update is not allowed in updateStatus phase")
×
1079
        }
×
1080
        // Update status subresource only if changed
1081
        if !reflect.DeepEqual(dataVolume.Status, dataVolumeCopy.Status) {
2✔
1082
                if err := r.client.Status().Update(context.TODO(), dataVolumeCopy); err != nil {
1✔
1083
                        r.log.Error(err, "unable to update datavolume status", "name", dataVolumeCopy.Name)
×
1084
                        return err
×
1085
                }
×
1086
                // Emit the event only on status phase change
1087
                if event.eventType != "" && curPhase != dataVolumeCopy.Status.Phase {
2✔
1088
                        r.recorder.Event(dataVolumeCopy, event.eventType, event.reason, event.message)
1✔
1089
                }
1✔
1090
                r.emitConditionEvent(dataVolumeCopy, originalCond)
1✔
1091
        }
1092
        return nil
1✔
1093
}
1094

1095
func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) error {
1✔
1096
        if err := controllerutil.SetControllerReference(dv, pvc, r.scheme); err != nil {
1✔
1097
                return err
×
1098
        }
×
1099

1100
        return r.updatePVC(pvc)
1✔
1101
}
1102

1103
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
1✔
1104
        httpClient = cc.BuildHTTPClient(httpClient)
1✔
1105
        url, err := cc.GetMetricsURL(pod)
1✔
1106
        if err != nil {
2✔
1107
                return err
1✔
1108
        }
1✔
1109
        if url == "" {
1✔
1110
                return nil
×
1111
        }
×
1112

1113
        // Used for both import and clone, so it should match both metric names
1114
        progressReport, err := cc.GetProgressReportFromURL(url, httpClient,
1✔
1115
                fmt.Sprintf("%s|%s", importMetrics.ImportProgressMetricName, cloneMetrics.CloneProgressMetricName),
1✔
1116
                string(dataVolumeCopy.UID))
1✔
1117
        if err != nil {
1✔
UNCOV
1118
                return err
×
NEW
1119
        }
×
1120
        if progressReport != "" {
2✔
1121
                if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
2✔
1122
                        dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
1✔
1123
                }
1✔
1124
        }
1125
        return nil
1✔
1126
}
1127

1128
// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
1129
// It also sets the appropriate OwnerReferences on the resource
1130
// which allows handleObject to discover the DataVolume resource
1131
// that 'owns' it.
1132
func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, targetPvcSpec *corev1.PersistentVolumeClaimSpec, namespace, name string, pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
1133
        labels := map[string]string{
1✔
1134
                common.CDILabelKey: common.CDILabelValue,
1✔
1135
        }
1✔
1136
        if util.ResolveVolumeMode(targetPvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
2✔
1137
                labels[common.KubePersistentVolumeFillingUpSuppressLabelKey] = common.KubePersistentVolumeFillingUpSuppressLabelValue
1✔
1138
        }
1✔
1139
        for k, v := range dataVolume.Labels {
2✔
1140
                labels[k] = v
1✔
1141
        }
1✔
1142

1143
        annotations := make(map[string]string)
1✔
1144
        for k, v := range dataVolume.ObjectMeta.Annotations {
2✔
1145
                annotations[k] = v
1✔
1146
        }
1✔
1147
        annotations[cc.AnnPodRestarts] = "0"
1✔
1148
        annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType))
1✔
1149
        if dataVolume.Spec.PriorityClassName != "" {
2✔
1150
                annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName
1✔
1151
        }
1✔
1152
        annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(context.TODO(), r.client, dataVolume.Spec.Preallocation))
1✔
1153
        annotations[cc.AnnCreatedForDataVolume] = string(dataVolume.UID)
1✔
1154

1✔
1155
        if dataVolume.Spec.Storage != nil && labels[common.PvcApplyStorageProfileLabel] == "true" {
1✔
1156
                isWebhookPvcRenderingEnabled, err := featuregates.IsWebhookPvcRenderingEnabled(r.client)
×
1157
                if err != nil {
×
1158
                        return nil, err
×
1159
                }
×
1160
                if isWebhookPvcRenderingEnabled {
×
1161
                        labels[common.PvcApplyStorageProfileLabel] = "true"
×
1162
                        if targetPvcSpec.VolumeMode == nil {
×
1163
                                targetPvcSpec.VolumeMode = ptr.To[corev1.PersistentVolumeMode](cdiv1.PersistentVolumeFromStorageProfile)
×
1164
                        }
×
1165
                }
1166
        }
1167

1168
        pvc := &corev1.PersistentVolumeClaim{
1✔
1169
                ObjectMeta: metav1.ObjectMeta{
1✔
1170
                        Namespace:   namespace,
1✔
1171
                        Name:        name,
1✔
1172
                        Labels:      labels,
1✔
1173
                        Annotations: annotations,
1✔
1174
                },
1✔
1175
                Spec: *targetPvcSpec,
1✔
1176
        }
1✔
1177

1✔
1178
        if pvcModifier != nil {
2✔
1179
                if err := pvcModifier(dataVolume, pvc); err != nil {
2✔
1180
                        return nil, err
1✔
1181
                }
1✔
1182
        }
1183

1184
        if pvc.Namespace == dataVolume.Namespace {
2✔
1185
                pvc.OwnerReferences = []metav1.OwnerReference{
1✔
1186
                        *metav1.NewControllerRef(dataVolume, schema.GroupVersionKind{
1✔
1187
                                Group:   cdiv1.SchemeGroupVersion.Group,
1✔
1188
                                Version: cdiv1.SchemeGroupVersion.Version,
1✔
1189
                                Kind:    "DataVolume",
1✔
1190
                        }),
1✔
1191
                }
1✔
1192
        } else {
1✔
1193
                if err := setAnnOwnedByDataVolume(pvc, dataVolume); err != nil {
×
1194
                        return nil, err
×
1195
                }
×
1196
                pvc.Annotations[cc.AnnOwnerUID] = string(dataVolume.UID)
×
1197
        }
1198

1199
        for _, anno := range delayedAnnotations {
2✔
1200
                delete(pvc.Annotations, anno)
1✔
1201
        }
1✔
1202

1203
        return pvc, nil
1✔
1204
}
1205

1206
// Whenever the controller updates a DV, we must make sure to nil out spec.source when using other population methods
1207
func (r *ReconcilerBase) updateDataVolume(dv *cdiv1.DataVolume) error {
1✔
1208
        // Restore so we don't nil out the dv that is being worked on
1✔
1209
        var sourceCopy *cdiv1.DataVolumeSource
1✔
1210

1✔
1211
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1212
                sourceCopy = dv.Spec.Source
1✔
1213
                dv.Spec.Source = nil
1✔
1214
        }
1✔
1215

1216
        err := r.client.Update(context.TODO(), dv)
1✔
1217
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1218
                dv.Spec.Source = sourceCopy
1✔
1219
        }
1✔
1220
        return err
1✔
1221
}
1222

1223
func (r *ReconcilerBase) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
1224
        return r.client.Update(context.TODO(), pvc)
1✔
1225
}
1✔
1226

1227
func newLongTermCloneTokenGenerator(key *rsa.PrivateKey) token.Generator {
×
1228
        return token.NewGenerator(common.ExtendedCloneTokenIssuer, key, 10*365*24*time.Hour)
×
1229
}
×
1230

1231
// storageClassWaitForFirstConsumer returns if the binding mode of a given storage class is WFFC
1232
func (r *ReconcilerBase) storageClassWaitForFirstConsumer(storageClass *string) (bool, error) {
1✔
1233
        storageClassBindingMode, err := r.getStorageClassBindingMode(storageClass)
1✔
1234
        if err != nil {
1✔
1235
                return false, err
×
1236
        }
×
1237

1238
        return storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer, nil
1✔
1239
}
1240

1241
// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
1242
func (r *ReconcilerBase) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1243
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1244
        if err != nil {
1✔
1245
                return false, err
×
1246
        }
×
1247

1248
        honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
1✔
1249
        if err != nil {
1✔
1250
                return false, err
×
1251
        }
×
1252

1253
        res := honorWaitForFirstConsumerEnabled && wffc &&
1✔
1254
                pvc.Status.Phase == corev1.ClaimPending
1✔
1255

1✔
1256
        return res, nil
1✔
1257
}
1258

1259
func (r *ReconcilerBase) shouldReconcileVolumeSourceCR(syncState *dvSyncState) bool {
1✔
1260
        if syncState.pvc == nil {
2✔
1261
                return true
1✔
1262
        }
1✔
1263
        phase := syncState.pvc.Annotations[cc.AnnPodPhase]
1✔
1264
        return phase != string(corev1.PodSucceeded) || syncState.dvMutated.Status.Phase != cdiv1.Succeeded
1✔
1265
}
1266

1267
// shouldBeMarkedPendingPopulation decides whether we should mark DV as PendingPopulation
1268
func (r *ReconcilerBase) shouldBeMarkedPendingPopulation(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1269
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1270
        if err != nil {
1✔
1271
                return false, err
×
1272
        }
×
1273
        nodeName := pvc.Annotations[cc.AnnSelectedNode]
1✔
1274
        immediateBindingRequested := cc.ImmediateBindingRequested(pvc)
1✔
1275

1✔
1276
        return wffc && nodeName == "" && !immediateBindingRequested, nil
1✔
1277
}
1278

1279
// handlePvcCreation works as a wrapper for non-clone PVC creation and error handling
1280
func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncState, pvcModifier pvcModifierFunc) error {
1✔
1281
        if syncState.pvc != nil {
2✔
1282
                return nil
1✔
1283
        }
1✔
1284
        if dvIsPrePopulated(syncState.dvMutated) {
1✔
1285
                return nil
×
1286
        }
×
1287
        // Creating the PVC
1288
        newPvc, err := r.createPvcForDatavolume(syncState.dvMutated, syncState.pvcSpec, pvcModifier)
1✔
1289
        if err != nil {
2✔
1290
                if cc.ErrQuotaExceeded(err) {
1✔
1291
                        syncErr := r.syncDataVolumeStatusPhaseWithEvent(syncState, cdiv1.Pending, nil, Event{corev1.EventTypeWarning, cc.ErrExceededQuota, err.Error()})
×
1292
                        if syncErr != nil {
×
1293
                                log.Error(syncErr, "failed to sync DataVolume status with event")
×
1294
                        }
×
1295
                }
1296
                return err
1✔
1297
        }
1298
        syncState.pvc = newPvc
1✔
1299

1✔
1300
        return nil
1✔
1301
}
1302

1303
// shouldUseCDIPopulator returns if the population of the PVC should be done using
1304
// CDI populators.
1305
// Currently it will use populators only if:
1306
// * storageClass used is CSI storageClass
1307
// * annotation cdi.kubevirt.io/storage.usePopulator is not set by user to "false"
1308
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
1✔
1309
        dv := syncState.dvMutated
1✔
1310
        if usePopulator, ok := dv.Annotations[cc.AnnUsePopulator]; ok {
2✔
1311
                boolUsePopulator, err := strconv.ParseBool(usePopulator)
1✔
1312
                if err != nil {
1✔
1313
                        return false, err
×
1314
                }
×
1315
                return boolUsePopulator, nil
1✔
1316
        }
1317
        log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
1✔
1318
        usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
1✔
1319
        if err != nil {
1✔
1320
                return false, err
×
1321
        }
×
1322
        if !usePopulator {
2✔
1323
                if syncState.pvcSpec.StorageClassName != nil {
2✔
1324
                        log.Info("Not using CDI populators, storage class is not a CSI storage", "storageClass", *syncState.pvcSpec.StorageClassName)
1✔
1325
                }
1✔
1326
        }
1327

1328
        return usePopulator, nil
1✔
1329
}
1330

1331
func (r *ReconcilerBase) pvcRequiresWork(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
1332
        if pvc == nil || dv == nil {
2✔
1333
                return true, nil
1✔
1334
        }
1✔
1335
        if pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
1336
                return false, nil
1✔
1337
        }
1✔
1338
        canAdopt, err := cc.AllowClaimAdoption(r.client, pvc, dv)
1✔
1339
        if err != nil {
1✔
1340
                return true, err
×
1341
        }
×
1342
        if canAdopt {
2✔
1343
                return false, nil
1✔
1344
        }
1✔
1345
        return true, nil
1✔
1346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc