• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5048

09 Dec 2024 05:21PM UTC coverage: 59.379% (+0.1%) from 59.276%
#5048

push

travis-ci

web-flow
Make deps update over apiserver change (#3560)

Not sure why the CI lane deps-verify didn't catch this.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

16689 of 28106 relevant lines covered (59.38%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.31
/pkg/controller/datavolume/controller-base.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "crypto/rsa"
22
        "encoding/json"
23
        "fmt"
24
        "net/http"
25
        "reflect"
26
        "strconv"
27
        "time"
28

29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        "github.com/pkg/errors"
32

33
        corev1 "k8s.io/api/core/v1"
34
        storagev1 "k8s.io/api/storage/v1"
35
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37
        "k8s.io/apimachinery/pkg/runtime"
38
        "k8s.io/apimachinery/pkg/runtime/schema"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/client-go/tools/record"
41
        "k8s.io/utils/ptr"
42

43
        "sigs.k8s.io/controller-runtime/pkg/client"
44
        "sigs.k8s.io/controller-runtime/pkg/controller"
45
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
46
        "sigs.k8s.io/controller-runtime/pkg/handler"
47
        "sigs.k8s.io/controller-runtime/pkg/manager"
48
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
49
        "sigs.k8s.io/controller-runtime/pkg/source"
50

51
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
52
        "kubevirt.io/containerized-data-importer/pkg/common"
53
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
54
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
55
        cloneMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-cloner"
56
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
57
        importMetrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-importer"
58
        "kubevirt.io/containerized-data-importer/pkg/token"
59
        "kubevirt.io/containerized-data-importer/pkg/util"
60
)
61

62
const (
63
        // ErrResourceExists provides a const to indicate a resource exists error
64
        ErrResourceExists = "ErrResourceExists"
65
        // ErrResourceMarkedForDeletion provides a const to indicate a resource marked for deletion error
66
        ErrResourceMarkedForDeletion = "ErrResourceMarkedForDeletion"
67
        // ErrClaimLost provides a const to indicate a claim is lost
68
        ErrClaimLost = "ErrClaimLost"
69

70
        // MessageResourceMarkedForDeletion provides a const to form a resource marked for deletion error message
71
        MessageResourceMarkedForDeletion = "Resource %q marked for deletion"
72
        // MessageResourceExists provides a const to form a resource exists error message
73
        MessageResourceExists = "Resource %q already exists and is not managed by DataVolume"
74
        // MessageErrClaimLost provides a const to form claim lost message
75
        MessageErrClaimLost = "PVC %s lost"
76

77
        dvPhaseField = "status.phase"
78

79
        claimRefField = "spec.claimRef"
80

81
        claimStorageClassNameField = "spec.storageClassName"
82
)
83

84
var (
85
        httpClient *http.Client
86

87
        delayedAnnotations = []string{
88
                cc.AnnPopulatedFor,
89
        }
90
)
91

92
// Event represents DV controller event
93
type Event struct {
94
        eventType string
95
        reason    string
96
        message   string
97
}
98

99
type statusPhaseSync struct {
100
        phase  cdiv1.DataVolumePhase
101
        pvcKey *client.ObjectKey
102
        event  Event
103
}
104

105
type dvSyncResult struct {
106
        result    *reconcile.Result
107
        phaseSync *statusPhaseSync
108
}
109

110
type dvSyncState struct {
111
        dv        *cdiv1.DataVolume
112
        dvMutated *cdiv1.DataVolume
113
        pvc       *corev1.PersistentVolumeClaim
114
        pvcSpec   *corev1.PersistentVolumeClaimSpec
115
        snapshot  *snapshotv1.VolumeSnapshot
116
        dvSyncResult
117
        usePopulator bool
118
}
119

120
// ReconcilerBase members
121
type ReconcilerBase struct {
122
        client               client.Client
123
        recorder             record.EventRecorder
124
        scheme               *runtime.Scheme
125
        log                  logr.Logger
126
        featureGates         featuregates.FeatureGates
127
        installerLabels      map[string]string
128
        shouldUpdateProgress bool
129
}
130

131
func pvcIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
132
        if pvc == nil || dv == nil {
1✔
133
                return false
×
134
        }
×
135
        dvName, ok := pvc.Annotations[cc.AnnPopulatedFor]
1✔
136
        return ok && dvName == dv.Name
1✔
137
}
138

139
func dvIsPrePopulated(dv *cdiv1.DataVolume) bool {
1✔
140
        _, ok := dv.Annotations[cc.AnnPrePopulated]
1✔
141
        return ok
1✔
142
}
1✔
143

144
func checkStaticProvisionPending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
145
        if pvc == nil || dv == nil {
2✔
146
                return false
1✔
147
        }
1✔
148
        if _, ok := dv.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
149
                return false
1✔
150
        }
1✔
151
        _, ok := pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
152
        return ok
1✔
153
}
154

155
func shouldSetDataVolumePending(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
1✔
156
        if checkStaticProvisionPending(pvc, dv) {
2✔
157
                return true
1✔
158
        }
1✔
159

160
        if pvc != nil {
2✔
161
                return false
1✔
162
        }
1✔
163

164
        return dvIsPrePopulated(dv) || (dv.Status.Phase == cdiv1.PhaseUnset)
1✔
165
}
166

167
// dataVolumeOp is the datavolume's requested operation
168
type dataVolumeOp int
169

170
const (
171
        dataVolumeNop dataVolumeOp = iota
172
        dataVolumeImport
173
        dataVolumeUpload
174
        dataVolumePvcClone
175
        dataVolumeSnapshotClone
176
        dataVolumePopulator
177
)
178

179
type indexArgs struct {
180
        obj          client.Object
181
        field        string
182
        extractValue client.IndexerFunc
183
}
184

185
func getIndexArgs() []indexArgs {
1✔
186
        return []indexArgs{
1✔
187
                {
1✔
188
                        obj:   &cdiv1.DataVolume{},
1✔
189
                        field: dvPhaseField,
1✔
190
                        extractValue: func(obj client.Object) []string {
1✔
191
                                return []string{string(obj.(*cdiv1.DataVolume).Status.Phase)}
×
192
                        },
×
193
                },
194
                {
195
                        obj:   &corev1.PersistentVolume{},
196
                        field: claimRefField,
197
                        extractValue: func(obj client.Object) []string {
1✔
198
                                if pv, ok := obj.(*corev1.PersistentVolume); ok {
2✔
199
                                        if pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Namespace != "" && pv.Spec.ClaimRef.Name != "" {
2✔
200
                                                return []string{claimRefIndexKeyFunc(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)}
1✔
201
                                        }
1✔
202
                                }
203
                                return nil
×
204
                        },
205
                },
206
                {
207
                        obj:          &corev1.PersistentVolume{},
208
                        field:        claimStorageClassNameField,
209
                        extractValue: extractAvailablePersistentVolumeStorageClassName,
210
                },
211
        }
212
}
213

214
func claimRefIndexKeyFunc(namespace, name string) string {
1✔
215
        return namespace + "/" + name
1✔
216
}
1✔
217

218
// CreateCommonIndexes creates indexes used by all controllers
219
func CreateCommonIndexes(mgr manager.Manager) error {
×
220
        for _, ia := range getIndexArgs() {
×
221
                if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
×
222
                        return err
×
223
                }
×
224
        }
225
        return nil
×
226
}
227

228
// CreateAvailablePersistentVolumeIndex adds storage class name index for available PersistentVolumes
229
func CreateAvailablePersistentVolumeIndex(fieldIndexer client.FieldIndexer) error {
×
230
        return fieldIndexer.IndexField(context.TODO(), &corev1.PersistentVolume{},
×
231
                claimStorageClassNameField, extractAvailablePersistentVolumeStorageClassName)
×
232
}
×
233

234
func extractAvailablePersistentVolumeStorageClassName(obj client.Object) []string {
×
235
        if pv, ok := obj.(*corev1.PersistentVolume); ok && pv.Status.Phase == corev1.VolumeAvailable {
×
236
                return []string{pv.Spec.StorageClassName}
×
237
        }
×
238
        return nil
×
239
}
240

241
func addDataVolumeControllerCommonWatches(mgr manager.Manager, dataVolumeController controller.Controller, op dataVolumeOp) error {
×
242
        appendMatchingDataVolumeRequest := func(ctx context.Context, reqs []reconcile.Request, mgr manager.Manager, namespace, name string) []reconcile.Request {
×
243
                dvKey := types.NamespacedName{Namespace: namespace, Name: name}
×
244
                dv := &cdiv1.DataVolume{}
×
245
                if err := mgr.GetClient().Get(ctx, dvKey, dv); err != nil {
×
246
                        if !k8serrors.IsNotFound(err) {
×
247
                                mgr.GetLogger().Error(err, "Failed to get DV", "dvKey", dvKey)
×
248
                        }
×
249
                        return reqs
×
250
                }
251
                if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) == op {
×
252
                        reqs = append(reqs, reconcile.Request{NamespacedName: dvKey})
×
253
                }
×
254
                return reqs
×
255
        }
256

257
        // Setup watches
258
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](
×
259
                func(ctx context.Context, dv *cdiv1.DataVolume) []reconcile.Request {
×
260
                        if getDataVolumeOp(ctx, mgr.GetLogger(), dv, mgr.GetClient()) != op {
×
261
                                return nil
×
262
                        }
×
263
                        updatePendingDataVolumesGauge(ctx, mgr.GetLogger(), dv, mgr.GetClient())
×
264
                        return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}}}
×
265
                }),
266
        )); err != nil {
×
267
                return err
×
268
        }
×
269
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](
×
270
                func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
271
                        var result []reconcile.Request
×
272
                        owner := metav1.GetControllerOf(obj)
×
273
                        if owner != nil && owner.Kind == "DataVolume" {
×
274
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), owner.Name)
×
275
                        }
×
276
                        populatedFor := obj.GetAnnotations()[cc.AnnPopulatedFor]
×
277
                        if populatedFor != "" {
×
278
                                result = appendMatchingDataVolumeRequest(ctx, result, mgr, obj.GetNamespace(), populatedFor)
×
279
                        }
×
280
                        // it is okay if result contains the same entry twice, will be deduplicated by caller
281
                        return result
×
282
                }),
283
        )); err != nil {
×
284
                return err
×
285
        }
×
286
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Pod](
×
287
                func(ctx context.Context, obj *corev1.Pod) []reconcile.Request {
×
288
                        owner := metav1.GetControllerOf(obj)
×
289
                        if owner == nil || owner.Kind != "DataVolume" {
×
290
                                return nil
×
291
                        }
×
292
                        return appendMatchingDataVolumeRequest(ctx, nil, mgr, obj.GetNamespace(), owner.Name)
×
293
                }),
294
        )); err != nil {
×
295
                return err
×
296
        }
×
297
        for _, k := range []client.Object{&corev1.PersistentVolumeClaim{}, &corev1.Pod{}, &cdiv1.ObjectTransfer{}} {
×
298
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
299
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
300
                                if !hasAnnOwnedByDataVolume(obj) {
×
301
                                        return nil
×
302
                                }
×
303
                                namespace, name, err := getAnnOwnedByDataVolume(obj)
×
304
                                if err != nil {
×
305
                                        return nil
×
306
                                }
×
307
                                return appendMatchingDataVolumeRequest(ctx, nil, mgr, namespace, name)
×
308
                        }),
309
                )); err != nil {
×
310
                        return err
×
311
                }
×
312
        }
313

314
        // Watch for StorageClass and StorageProfile updates and reconcile the DVs waiting for StorageClass or its complete StorageProfile.
315
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass with compelete StorageProfile yet,
316
        // so PVC cannot be created (test_id:9922).
317
        for _, k := range []client.Object{&storagev1.StorageClass{}, &cdiv1.StorageProfile{}} {
×
318
                if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), k, handler.EnqueueRequestsFromMapFunc(
×
319
                        func(ctx context.Context, obj client.Object) []reconcile.Request {
×
320
                                dvList := &cdiv1.DataVolumeList{}
×
321
                                if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
322
                                        return nil
×
323
                                }
×
324
                                var reqs []reconcile.Request
×
325
                                for _, dv := range dvList.Items {
×
326
                                        if getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
327
                                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
328
                                        }
×
329
                                }
330
                                return reqs
×
331
                        },
332
                ),
333
                )); err != nil {
×
334
                        return err
×
335
                }
×
336
        }
337

338
        // Watch for PV updates to reconcile the DVs waiting for available PV
339
        // Relevant only when the DV StorageSpec has no AccessModes set and no matching StorageClass yet, so PVC cannot be created (test_id:9924,9925)
340
        if err := dataVolumeController.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolume](
×
341
                func(ctx context.Context, pv *corev1.PersistentVolume) []reconcile.Request {
×
342
                        dvList := &cdiv1.DataVolumeList{}
×
343
                        if err := mgr.GetClient().List(ctx, dvList, client.MatchingFields{dvPhaseField: ""}); err != nil {
×
344
                                return nil
×
345
                        }
×
346
                        var reqs []reconcile.Request
×
347
                        for _, dv := range dvList.Items {
×
348
                                storage := dv.Spec.Storage
×
349
                                if storage != nil &&
×
350
                                        storage.StorageClassName != nil &&
×
351
                                        *storage.StorageClassName == pv.Spec.StorageClassName &&
×
352
                                        pv.Status.Phase == corev1.VolumeAvailable &&
×
353
                                        getDataVolumeOp(ctx, mgr.GetLogger(), &dv, mgr.GetClient()) == op {
×
354
                                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}})
×
355
                                }
×
356
                        }
357
                        return reqs
×
358
                },
359
        ),
360
        )); err != nil {
×
361
                return err
×
362
        }
×
363

364
        return nil
×
365
}
366

367
func getDataVolumeOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
368
        src := dv.Spec.Source
×
369

×
370
        if dv.Spec.SourceRef != nil {
×
371
                return getSourceRefOp(ctx, log, dv, client)
×
372
        }
×
373
        if src != nil && src.PVC != nil {
×
374
                return dataVolumePvcClone
×
375
        }
×
376
        if src != nil && src.Snapshot != nil {
×
377
                return dataVolumeSnapshotClone
×
378
        }
×
379
        if src == nil {
×
380
                if dvUsesVolumePopulator(dv) {
×
381
                        return dataVolumePopulator
×
382
                }
×
383
                return dataVolumeNop
×
384
        }
385
        if src.Upload != nil {
×
386
                return dataVolumeUpload
×
387
        }
×
388
        if src.HTTP != nil || src.S3 != nil || src.GCS != nil || src.Registry != nil || src.Blank != nil || src.Imageio != nil || src.VDDK != nil {
×
389
                return dataVolumeImport
×
390
        }
×
391

392
        return dataVolumeNop
×
393
}
394

395
func getSourceRefOp(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, client client.Client) dataVolumeOp {
×
396
        dataSource := &cdiv1.DataSource{}
×
397
        ns := dv.Namespace
×
398
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
399
                ns = *dv.Spec.SourceRef.Namespace
×
400
        }
×
401
        nn := types.NamespacedName{Namespace: ns, Name: dv.Spec.SourceRef.Name}
×
402
        if err := client.Get(ctx, nn, dataSource); err != nil {
×
403
                log.Error(err, "Unable to get DataSource", "namespacedName", nn)
×
404
                return dataVolumeNop
×
405
        }
×
406

407
        switch {
×
408
        case dataSource.Spec.Source.PVC != nil:
×
409
                return dataVolumePvcClone
×
410
        case dataSource.Spec.Source.Snapshot != nil:
×
411
                return dataVolumeSnapshotClone
×
412
        default:
×
413
                return dataVolumeNop
×
414
        }
415
}
416

417
func updatePendingDataVolumesGauge(ctx context.Context, log logr.Logger, dv *cdiv1.DataVolume, c client.Client) {
×
418
        if !cc.IsDataVolumeUsingDefaultStorageClass(dv) {
×
419
                return
×
420
        }
×
421

422
        countPending, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.Pending))
×
423
        if err != nil {
×
424
                log.V(3).Error(err, "Failed listing the pending DataVolumes")
×
425
                return
×
426
        }
×
427
        countUnset, err := getDefaultStorageClassDataVolumeCount(ctx, c, string(cdiv1.PhaseUnset))
×
428
        if err != nil {
×
429
                log.V(3).Error(err, "Failed listing the unset DataVolumes")
×
430
                return
×
431
        }
×
432

433
        metrics.SetDataVolumePending(countPending + countUnset)
×
434
}
435

436
func getDefaultStorageClassDataVolumeCount(ctx context.Context, c client.Client, dvPhase string) (int, error) {
×
437
        dvList := &cdiv1.DataVolumeList{}
×
438
        if err := c.List(ctx, dvList, client.MatchingFields{dvPhaseField: dvPhase}); err != nil {
×
439
                return 0, err
×
440
        }
×
441

442
        dvCount := 0
×
443
        for _, dv := range dvList.Items {
×
444
                if cc.IsDataVolumeUsingDefaultStorageClass(&dv) {
×
445
                        dvCount++
×
446
                }
×
447
        }
448

449
        return dvCount, nil
×
450
}
451

452
type dvController interface {
453
        reconcile.Reconciler
454
        sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error)
455
        updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error
456
}
457

458
func (r *ReconcilerBase) reconcile(ctx context.Context, req reconcile.Request, dvc dvController) (reconcile.Result, error) {
1✔
459
        log := r.log.WithValues("DataVolume", req.NamespacedName)
1✔
460
        syncRes, syncErr := dvc.sync(log, req)
1✔
461
        res, err := r.updateStatus(req, syncRes.phaseSync, dvc)
1✔
462
        if syncErr != nil {
2✔
463
                err = syncErr
1✔
464
        }
1✔
465
        if syncRes.result != nil {
2✔
466
                res = *syncRes.result
1✔
467
        }
1✔
468
        return res, err
1✔
469
}
470

471
type dvSyncStateFunc func(*dvSyncState) error
472

473
func (r *ReconcilerBase) syncCommon(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
474
        syncState, err := r.syncDvPvcState(log, req, cleanup, prepare)
1✔
475
        if err == nil {
2✔
476
                err = r.syncUpdate(log, &syncState)
1✔
477
        }
1✔
478
        return syncState, err
1✔
479
}
480

481
func (r *ReconcilerBase) syncDvPvcState(log logr.Logger, req reconcile.Request, cleanup, prepare dvSyncStateFunc) (dvSyncState, error) {
1✔
482
        syncState := dvSyncState{}
1✔
483
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
484
        if dv == nil || err != nil {
2✔
485
                syncState.result = &reconcile.Result{}
1✔
486
                return syncState, err
1✔
487
        }
1✔
488
        syncState.dv = dv
1✔
489
        syncState.dvMutated = dv.DeepCopy()
1✔
490
        syncState.pvc, err = r.getPVC(req.NamespacedName)
1✔
491
        if err != nil {
1✔
492
                return syncState, err
×
493
        }
×
494

495
        if cleanup != nil {
2✔
496
                if err := cleanup(&syncState); err != nil {
1✔
497
                        return syncState, err
×
498
                }
×
499
        }
500

501
        if dv.DeletionTimestamp != nil {
1✔
502
                log.Info("DataVolume marked for deletion")
×
503
                syncState.result = &reconcile.Result{}
×
504
                return syncState, nil
×
505
        }
×
506

507
        if prepare != nil {
2✔
508
                if err := prepare(&syncState); err != nil {
1✔
509
                        return syncState, err
×
510
                }
×
511
        }
512

513
        syncState.pvcSpec, err = renderPvcSpec(r.client, r.recorder, log, syncState.dvMutated, syncState.pvc)
1✔
514
        if err != nil {
2✔
515
                if syncErr := r.syncDataVolumeStatusPhaseWithEvent(&syncState, cdiv1.PhaseUnset, nil,
1✔
516
                        Event{corev1.EventTypeWarning, cc.ErrClaimNotValid, err.Error()}); syncErr != nil {
1✔
517
                        log.Error(syncErr, "failed to sync DataVolume status with event")
×
518
                }
×
519
                if errors.Is(err, ErrStorageClassNotFound) {
2✔
520
                        syncState.result = &reconcile.Result{}
1✔
521
                        return syncState, nil
1✔
522
                }
1✔
523
                return syncState, err
1✔
524
        }
525

526
        syncState.usePopulator, err = r.shouldUseCDIPopulator(&syncState)
1✔
527
        if err != nil {
1✔
528
                return syncState, err
×
529
        }
×
530
        updateDataVolumeUseCDIPopulator(&syncState)
1✔
531

1✔
532
        if err := r.handleStaticVolume(&syncState, log); err != nil || syncState.result != nil {
2✔
533
                return syncState, err
1✔
534
        }
1✔
535

536
        if err := r.handleDelayedAnnotations(&syncState, log); err != nil || syncState.result != nil {
2✔
537
                return syncState, err
1✔
538
        }
1✔
539

540
        if err = updateDataVolumeDefaultInstancetypeLabels(r.client, &syncState); err != nil {
1✔
541
                return syncState, err
×
542
        }
×
543

544
        if syncState.pvc != nil {
2✔
545
                if err := r.validatePVC(dv, syncState.pvc); err != nil {
2✔
546
                        return syncState, err
1✔
547
                }
1✔
548
                r.handlePrePopulation(syncState.dvMutated, syncState.pvc)
1✔
549
        }
550

551
        return syncState, nil
1✔
552
}
553

554
func (r *ReconcilerBase) syncUpdate(log logr.Logger, syncState *dvSyncState) error {
1✔
555
        if syncState.dv == nil || syncState.dvMutated == nil {
2✔
556
                return nil
1✔
557
        }
1✔
558
        if !reflect.DeepEqual(syncState.dv.Status, syncState.dvMutated.Status) {
1✔
559
                return fmt.Errorf("status update is not allowed in sync phase")
×
560
        }
×
561
        if !reflect.DeepEqual(syncState.dv.ObjectMeta, syncState.dvMutated.ObjectMeta) {
2✔
562
                _, ok := syncState.dv.Annotations[cc.AnnExtendedCloneToken]
1✔
563
                _, ok2 := syncState.dvMutated.Annotations[cc.AnnExtendedCloneToken]
1✔
564
                if err := r.updateDataVolume(syncState.dvMutated); err != nil {
1✔
565
                        r.log.Error(err, "Unable to sync update dv meta", "name", syncState.dvMutated.Name)
×
566
                        return err
×
567
                }
×
568
                if !ok && ok2 {
2✔
569
                        delta := time.Since(syncState.dv.ObjectMeta.CreationTimestamp.Time)
1✔
570
                        log.V(3).Info("Adding extended DataVolume token took", "delta", delta)
1✔
571
                }
1✔
572
                syncState.dv = syncState.dvMutated.DeepCopy()
1✔
573
        }
574
        return nil
1✔
575
}
576

577
func (r *ReconcilerBase) handleStaticVolume(syncState *dvSyncState, log logr.Logger) error {
1✔
578
        if _, ok := syncState.dvMutated.Annotations[cc.AnnCheckStaticVolume]; !ok {
2✔
579
                return nil
1✔
580
        }
1✔
581

582
        if syncState.pvc == nil {
2✔
583
                volumes, err := r.getAvailableVolumesForDV(syncState, log)
1✔
584
                if err != nil {
1✔
585
                        return err
×
586
                }
×
587

588
                if len(volumes) == 0 {
2✔
589
                        log.Info("No PVs for DV")
1✔
590
                        return nil
1✔
591
                }
1✔
592

593
                if err := r.handlePvcCreation(log, syncState, func(_ *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
2✔
594
                        bs, err := json.Marshal(volumes)
1✔
595
                        if err != nil {
1✔
596
                                return err
×
597
                        }
×
598
                        cc.AddAnnotation(pvc, cc.AnnPersistentVolumeList, string(bs))
1✔
599
                        return nil
1✔
600
                }); err != nil {
×
601
                        return err
×
602
                }
×
603

604
                // set result to make sure callers don't do anything else in sync
605
                syncState.result = &reconcile.Result{}
1✔
606
                return nil
1✔
607
        }
608

609
        volumeAnno, ok := syncState.pvc.Annotations[cc.AnnPersistentVolumeList]
1✔
610
        if !ok {
1✔
611
                // etiher did not create the PVC here OR bind to expected PV succeeded
×
612
                return nil
×
613
        }
×
614

615
        if cc.IsUnbound(syncState.pvc) {
2✔
616
                // set result to make sure callers don't do anything else in sync
1✔
617
                syncState.result = &reconcile.Result{}
1✔
618
                return nil
1✔
619
        }
1✔
620

621
        var volumes []string
1✔
622
        if err := json.Unmarshal([]byte(volumeAnno), &volumes); err != nil {
1✔
623
                return err
×
624
        }
×
625

626
        for _, v := range volumes {
2✔
627
                if v == syncState.pvc.Spec.VolumeName {
2✔
628
                        pvcCpy := syncState.pvc.DeepCopy()
1✔
629
                        // handle as "populatedFor" going forward
1✔
630
                        cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, syncState.dvMutated.Name)
1✔
631
                        delete(pvcCpy.Annotations, cc.AnnPersistentVolumeList)
1✔
632
                        if err := r.updatePVC(pvcCpy); err != nil {
1✔
633
                                return err
×
634
                        }
×
635
                        syncState.pvc = pvcCpy
1✔
636
                        return nil
1✔
637
                }
638
        }
639

640
        // delete the pvc and hope for better luck...
641
        pvcCpy := syncState.pvc.DeepCopy()
1✔
642
        if err := r.client.Delete(context.TODO(), pvcCpy, &client.DeleteOptions{}); err != nil {
1✔
643
                return err
×
644
        }
×
645

646
        syncState.pvc = pvcCpy
1✔
647

1✔
648
        return fmt.Errorf("DataVolume bound to unexpected PV %s", syncState.pvc.Spec.VolumeName)
1✔
649
}
650

651
func (r *ReconcilerBase) handleDelayedAnnotations(syncState *dvSyncState, log logr.Logger) error {
1✔
652
        dataVolume := syncState.dv
1✔
653
        if dataVolume.Status.Phase != cdiv1.Succeeded {
2✔
654
                return nil
1✔
655
        }
1✔
656

657
        if syncState.pvc == nil {
1✔
658
                return nil
×
659
        }
×
660

661
        pvcCpy := syncState.pvc.DeepCopy()
1✔
662
        for _, anno := range delayedAnnotations {
2✔
663
                if val, ok := dataVolume.Annotations[anno]; ok {
2✔
664
                        // only add if not already present
1✔
665
                        if _, ok := pvcCpy.Annotations[anno]; !ok {
2✔
666
                                cc.AddAnnotation(pvcCpy, anno, val)
1✔
667
                        }
1✔
668
                }
669
        }
670

671
        if !reflect.DeepEqual(syncState.pvc, pvcCpy) {
2✔
672
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
673
                        return err
×
674
                }
×
675
                syncState.pvc = pvcCpy
1✔
676
                syncState.result = &reconcile.Result{}
1✔
677
        }
678

679
        return nil
1✔
680
}
681

682
func (r *ReconcilerBase) getAvailableVolumesForDV(syncState *dvSyncState, log logr.Logger) ([]string, error) {
1✔
683
        pvList := &corev1.PersistentVolumeList{}
1✔
684
        fields := client.MatchingFields{claimRefField: claimRefIndexKeyFunc(syncState.dv.Namespace, syncState.dv.Name)}
1✔
685
        if err := r.client.List(context.TODO(), pvList, fields); err != nil {
1✔
686
                return nil, err
×
687
        }
×
688
        if syncState.pvcSpec == nil {
1✔
689
                return nil, fmt.Errorf("missing pvc spec")
×
690
        }
×
691
        var pvNames []string
1✔
692
        for _, pv := range pvList.Items {
2✔
693
                if pv.Status.Phase == corev1.VolumeAvailable {
2✔
694
                        pvc := &corev1.PersistentVolumeClaim{
1✔
695
                                Spec: *syncState.pvcSpec,
1✔
696
                        }
1✔
697
                        if err := CheckVolumeSatisfyClaim(&pv, pvc); err != nil {
1✔
698
                                continue
×
699
                        }
700
                        log.Info("Found matching volume for DV", "pv", pv.Name)
1✔
701
                        pvNames = append(pvNames, pv.Name)
1✔
702
                }
703
        }
704
        return pvNames, nil
1✔
705
}
706

707
func (r *ReconcilerBase) handlePrePopulation(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) {
1✔
708
        if pvc.Status.Phase == corev1.ClaimBound && pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
709
                cc.AddAnnotation(dv, cc.AnnPrePopulated, pvc.Name)
1✔
710
        }
1✔
711
}
712

713
func (r *ReconcilerBase) validatePVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
714
        // If the PVC is being deleted, we should log a warning to the event recorder and return to wait the deletion complete
1✔
715
        // don't bother with owner refs is the pvc is deleted
1✔
716
        if pvc.DeletionTimestamp != nil {
1✔
717
                msg := fmt.Sprintf(MessageResourceMarkedForDeletion, pvc.Name)
×
718
                r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceMarkedForDeletion, msg)
×
719
                return errors.Errorf(msg)
×
720
        }
×
721
        // If the PVC is not controlled by this DataVolume resource, we should log
722
        // a warning to the event recorder and return
723
        if !metav1.IsControlledBy(pvc, dv) {
2✔
724
                requiresWork, err := r.pvcRequiresWork(pvc, dv)
1✔
725
                if err != nil {
1✔
726
                        return err
×
727
                }
×
728
                if !requiresWork {
2✔
729
                        if err := r.addOwnerRef(pvc, dv); err != nil {
1✔
730
                                return err
×
731
                        }
×
732
                } else {
1✔
733
                        msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
1✔
734
                        r.recorder.Event(dv, corev1.EventTypeWarning, ErrResourceExists, msg)
1✔
735
                        return errors.Errorf(msg)
1✔
736
                }
1✔
737
        }
738
        return nil
1✔
739
}
740

741
func (r *ReconcilerBase) getPVC(key types.NamespacedName) (*corev1.PersistentVolumeClaim, error) {
1✔
742
        pvc := &corev1.PersistentVolumeClaim{}
1✔
743
        if err := r.client.Get(context.TODO(), key, pvc); err != nil {
2✔
744
                if k8serrors.IsNotFound(err) {
2✔
745
                        return nil, nil
1✔
746
                }
1✔
747
                return nil, err
×
748
        }
749
        return pvc, nil
1✔
750
}
751

752
func (r *ReconcilerBase) getDataVolume(key types.NamespacedName) (*cdiv1.DataVolume, error) {
1✔
753
        dv := &cdiv1.DataVolume{}
1✔
754
        if err := r.client.Get(context.TODO(), key, dv); err != nil {
2✔
755
                if k8serrors.IsNotFound(err) {
2✔
756
                        return nil, nil
1✔
757
                }
1✔
758
                return nil, err
×
759
        }
760
        return dv, nil
1✔
761
}
762

763
type pvcModifierFunc func(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error
764

765
func (r *ReconcilerBase) createPvcForDatavolume(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec,
766
        pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
767
        newPvc, err := r.newPersistentVolumeClaim(datavolume, pvcSpec, datavolume.Namespace, datavolume.Name, pvcModifier)
1✔
768
        if err != nil {
2✔
769
                return nil, err
1✔
770
        }
1✔
771
        util.SetRecommendedLabels(newPvc, r.installerLabels, "cdi-controller")
1✔
772
        if err := r.client.Create(context.TODO(), newPvc); err != nil {
1✔
773
                return nil, err
×
774
        }
×
775
        return newPvc, nil
1✔
776
}
777

778
func (r *ReconcilerBase) getStorageClassBindingMode(storageClassName *string) (*storagev1.VolumeBindingMode, error) {
1✔
779
        // Handle unspecified storage class name, fallback to default storage class
1✔
780
        storageClass, err := cc.GetStorageClassByNameWithK8sFallback(context.TODO(), r.client, storageClassName)
1✔
781
        if err != nil {
1✔
782
                return nil, err
×
783
        }
×
784

785
        if storageClass != nil && storageClass.VolumeBindingMode != nil {
2✔
786
                return storageClass.VolumeBindingMode, nil
1✔
787
        }
1✔
788

789
        // no storage class, then the assumption is immediate binding
790
        volumeBindingImmediate := storagev1.VolumeBindingImmediate
1✔
791
        return &volumeBindingImmediate, nil
1✔
792
}
793

794
func (r *ReconcilerBase) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, result *reconcile.Result) error {
1✔
795
        var podNamespace string
1✔
796
        if datavolume.Status.Progress == "" {
2✔
797
                datavolume.Status.Progress = "N/A"
1✔
798
        }
1✔
799

800
        if !r.shouldUpdateProgress {
2✔
801
                return nil
1✔
802
        }
1✔
803

804
        if usePopulator, _ := CheckPVCUsingPopulators(pvc); usePopulator {
2✔
805
                if progress, ok := pvc.Annotations[cc.AnnPopulatorProgress]; ok {
2✔
806
                        datavolume.Status.Progress = cdiv1.DataVolumeProgress(progress)
1✔
807
                } else {
2✔
808
                        datavolume.Status.Progress = "N/A"
1✔
809
                }
1✔
810
                return nil
1✔
811
        }
812

813
        if datavolume.Spec.Source != nil && datavolume.Spec.Source.PVC != nil {
2✔
814
                podNamespace = datavolume.Spec.Source.PVC.Namespace
1✔
815
        } else {
2✔
816
                podNamespace = datavolume.Namespace
1✔
817
        }
1✔
818

819
        if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
2✔
820
                // Data volume completed progress, or failed, either way stop queueing the data volume.
1✔
821
                r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
1✔
822
                return nil
1✔
823
        }
1✔
824
        pod, err := cc.GetPodFromPvc(r.client, podNamespace, pvc)
1✔
825
        if err == nil {
1✔
826
                if pod.Status.Phase != corev1.PodRunning {
×
827
                        // Avoid long timeouts and error traces from HTTP get when pod is already gone
×
828
                        return nil
×
829
                }
×
830
                if err := updateProgressUsingPod(datavolume, pod); err != nil {
×
831
                        return err
×
832
                }
×
833
        }
834
        // We are not done yet, force a re-reconcile in 2 seconds to get an update.
835
        result.RequeueAfter = 2 * time.Second
1✔
836
        return nil
1✔
837
}
838

839
func (r *ReconcilerBase) syncDataVolumeStatusPhaseWithEvent(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim, event Event) error {
1✔
840
        if syncState.phaseSync != nil {
1✔
841
                return fmt.Errorf("phaseSync is already set")
×
842
        }
×
843
        syncState.phaseSync = &statusPhaseSync{phase: phase, event: event}
1✔
844
        if pvc != nil {
1✔
845
                key := client.ObjectKeyFromObject(pvc)
×
846
                syncState.phaseSync.pvcKey = &key
×
847
        }
×
848
        return nil
1✔
849
}
850

851
func (r *ReconcilerBase) updateDataVolumeStatusPhaseSync(ps *statusPhaseSync, dv *cdiv1.DataVolume, dvCopy *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
852
        var condPvc *corev1.PersistentVolumeClaim
1✔
853
        var err error
1✔
854
        if ps.pvcKey != nil {
1✔
855
                if pvc == nil || *ps.pvcKey != client.ObjectKeyFromObject(pvc) {
×
856
                        condPvc, err = r.getPVC(*ps.pvcKey)
×
857
                        if err != nil {
×
858
                                return err
×
859
                        }
×
860
                } else {
×
861
                        condPvc = pvc
×
862
                }
×
863
        }
864
        return r.updateDataVolumeStatusPhaseWithEvent(ps.phase, dv, dvCopy, condPvc, ps.event)
1✔
865
}
866

867
func (r *ReconcilerBase) updateDataVolumeStatusPhaseWithEvent(
868
        phase cdiv1.DataVolumePhase,
869
        dataVolume *cdiv1.DataVolume,
870
        dataVolumeCopy *cdiv1.DataVolume,
871
        pvc *corev1.PersistentVolumeClaim,
872
        event Event) error {
1✔
873
        if dataVolume == nil {
1✔
874
                return nil
×
875
        }
×
876

877
        curPhase := dataVolumeCopy.Status.Phase
1✔
878
        dataVolumeCopy.Status.Phase = phase
1✔
879

1✔
880
        reason := ""
1✔
881
        message := ""
1✔
882
        if pvc == nil {
2✔
883
                reason = event.reason
1✔
884
                message = event.message
1✔
885
        }
1✔
886
        r.updateConditions(dataVolumeCopy, pvc, reason, message)
1✔
887
        return r.emitEvent(dataVolume, dataVolumeCopy, curPhase, dataVolume.Status.Conditions, &event)
1✔
888
}
889

890
func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPhaseSync, dvc dvController) (reconcile.Result, error) {
1✔
891
        result := reconcile.Result{}
1✔
892
        dv, err := r.getDataVolume(req.NamespacedName)
1✔
893
        if dv == nil || err != nil {
2✔
894
                return reconcile.Result{}, err
1✔
895
        }
1✔
896

897
        dataVolumeCopy := dv.DeepCopy()
1✔
898

1✔
899
        pvc, err := r.getPVC(req.NamespacedName)
1✔
900
        if err != nil {
1✔
901
                return reconcile.Result{}, err
×
902
        }
×
903

904
        if phaseSync != nil {
2✔
905
                err = r.updateDataVolumeStatusPhaseSync(phaseSync, dv, dataVolumeCopy, pvc)
1✔
906
                return reconcile.Result{}, err
1✔
907
        }
1✔
908

909
        curPhase := dataVolumeCopy.Status.Phase
1✔
910
        var event Event
1✔
911

1✔
912
        if shouldSetDataVolumePending(pvc, dataVolumeCopy) {
2✔
913
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
914
        } else if pvc != nil {
3✔
915
                dataVolumeCopy.Status.ClaimName = pvc.Name
1✔
916

1✔
917
                phase := pvc.Annotations[cc.AnnPodPhase]
1✔
918
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
919
                if err != nil {
1✔
920
                        return reconcile.Result{}, err
×
921
                }
×
922
                if phase == string(cdiv1.Succeeded) && requiresWork {
2✔
923
                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
924
                                return reconcile.Result{}, err
×
925
                        }
×
926
                } else {
1✔
927
                        switch pvc.Status.Phase {
1✔
928
                        case corev1.ClaimPending:
1✔
929
                                if requiresWork {
2✔
930
                                        if err := r.updateStatusPVCPending(pvc, dvc, dataVolumeCopy, &event); err != nil {
1✔
931
                                                return reconcile.Result{}, err
×
932
                                        }
×
933
                                } else {
1✔
934
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
935
                                }
1✔
936
                        case corev1.ClaimBound:
1✔
937
                                switch dataVolumeCopy.Status.Phase {
1✔
938
                                case cdiv1.Pending:
1✔
939
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
940
                                case cdiv1.WaitForFirstConsumer:
×
941
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
×
942
                                case cdiv1.Unknown:
1✔
943
                                        dataVolumeCopy.Status.Phase = cdiv1.PVCBound
1✔
944
                                }
945

946
                                if requiresWork {
2✔
947
                                        if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, &event); err != nil {
1✔
948
                                                return reconcile.Result{}, err
×
949
                                        }
×
950
                                } else {
1✔
951
                                        dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
952
                                }
1✔
953

954
                        case corev1.ClaimLost:
1✔
955
                                dataVolumeCopy.Status.Phase = cdiv1.Failed
1✔
956
                                event.eventType = corev1.EventTypeWarning
1✔
957
                                event.reason = ErrClaimLost
1✔
958
                                event.message = fmt.Sprintf(MessageErrClaimLost, pvc.Name)
1✔
959
                        default:
1✔
960
                                if pvc.Status.Phase != "" {
1✔
961
                                        dataVolumeCopy.Status.Phase = cdiv1.Unknown
×
962
                                }
×
963
                        }
964
                }
965

966
                if i, err := strconv.ParseInt(pvc.Annotations[cc.AnnPodRestarts], 10, 32); err == nil && i >= 0 {
2✔
967
                        dataVolumeCopy.Status.RestartCount = int32(i)
1✔
968
                }
1✔
969
                if err := r.reconcileProgressUpdate(dataVolumeCopy, pvc, &result); err != nil {
1✔
970
                        return result, err
×
971
                }
×
972
        }
973

974
        currentCond := make([]cdiv1.DataVolumeCondition, len(dataVolumeCopy.Status.Conditions))
1✔
975
        copy(currentCond, dataVolumeCopy.Status.Conditions)
1✔
976
        r.updateConditions(dataVolumeCopy, pvc, "", "")
1✔
977
        return result, r.emitEvent(dv, dataVolumeCopy, curPhase, currentCond, &event)
1✔
978
}
979

980
func (r ReconcilerBase) updateStatusPVCPending(pvc *corev1.PersistentVolumeClaim, dvc dvController, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
981
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
982
        if err != nil {
1✔
983
                return err
×
984
        }
×
985
        if usePopulator {
2✔
986
                // when using populators the target pvc phase will stay pending until the population completes,
1✔
987
                // hence if not wffc we should update the dv phase according to the pod phase
1✔
988
                shouldBeMarkedPendingPopulation, err := r.shouldBeMarkedPendingPopulation(pvc)
1✔
989
                if err != nil {
1✔
990
                        return err
×
991
                }
×
992
                if shouldBeMarkedPendingPopulation {
2✔
993
                        dataVolumeCopy.Status.Phase = cdiv1.PendingPopulation
1✔
994
                } else if err := dvc.updateStatusPhase(pvc, dataVolumeCopy, event); err != nil {
2✔
995
                        return err
×
996
                }
×
997
                return nil
1✔
998
        }
999

1000
        shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
1✔
1001
        if err != nil {
1✔
1002
                return err
×
1003
        }
×
1004
        if shouldBeMarkedWaitForFirstConsumer {
2✔
1005
                dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
1✔
1006
        } else {
2✔
1007
                dataVolumeCopy.Status.Phase = cdiv1.Pending
1✔
1008
        }
1✔
1009
        return nil
1✔
1010
}
1011

1012
func (r *ReconcilerBase) updateConditions(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, reason, message string) {
1✔
1013
        var anno map[string]string
1✔
1014

1✔
1015
        if dataVolume.Status.Conditions == nil {
2✔
1016
                dataVolume.Status.Conditions = make([]cdiv1.DataVolumeCondition, 0)
1✔
1017
        }
1✔
1018

1019
        if pvc != nil {
2✔
1020
                anno = pvc.Annotations
1✔
1021
        } else {
2✔
1022
                anno = make(map[string]string)
1✔
1023
        }
1✔
1024

1025
        var readyStatus corev1.ConditionStatus
1✔
1026
        switch dataVolume.Status.Phase {
1✔
1027
        case cdiv1.Succeeded:
1✔
1028
                readyStatus = corev1.ConditionTrue
1✔
1029
        case cdiv1.Unknown:
×
1030
                readyStatus = corev1.ConditionUnknown
×
1031
        default:
1✔
1032
                readyStatus = corev1.ConditionFalse
1✔
1033
        }
1034

1035
        dataVolume.Status.Conditions = updateBoundCondition(dataVolume.Status.Conditions, pvc, message, reason)
1✔
1036
        dataVolume.Status.Conditions = UpdateReadyCondition(dataVolume.Status.Conditions, readyStatus, message, reason)
1✔
1037
        dataVolume.Status.Conditions = updateRunningCondition(dataVolume.Status.Conditions, anno)
1✔
1038
}
1039

1040
func (r *ReconcilerBase) emitConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1041
        r.emitBoundConditionEvent(dataVolume, FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions), FindConditionByType(cdiv1.DataVolumeBound, originalCond))
1✔
1042
        r.emitFailureConditionEvent(dataVolume, originalCond)
1✔
1043
}
1✔
1044

1045
func (r *ReconcilerBase) emitBoundConditionEvent(dataVolume *cdiv1.DataVolume, current, original *cdiv1.DataVolumeCondition) {
1✔
1046
        // We know reason and message won't be empty for bound.
1✔
1047
        if current != nil && (original == nil || current.Status != original.Status || current.Reason != original.Reason || current.Message != original.Message) {
2✔
1048
                r.recorder.Event(dataVolume, corev1.EventTypeNormal, current.Reason, current.Message)
1✔
1049
        }
1✔
1050
}
1051

1052
func (r *ReconcilerBase) emitFailureConditionEvent(dataVolume *cdiv1.DataVolume, originalCond []cdiv1.DataVolumeCondition) {
1✔
1053
        curReady := FindConditionByType(cdiv1.DataVolumeReady, dataVolume.Status.Conditions)
1✔
1054
        curBound := FindConditionByType(cdiv1.DataVolumeBound, dataVolume.Status.Conditions)
1✔
1055
        curRunning := FindConditionByType(cdiv1.DataVolumeRunning, dataVolume.Status.Conditions)
1✔
1056
        orgRunning := FindConditionByType(cdiv1.DataVolumeRunning, originalCond)
1✔
1057

1✔
1058
        if curReady == nil || curBound == nil || curRunning == nil {
1✔
1059
                return
×
1060
        }
×
1061
        if curReady.Status == corev1.ConditionFalse && curRunning.Status == corev1.ConditionFalse &&
1✔
1062
                dvBoundOrPopulationInProgress(dataVolume, curBound) {
2✔
1063
                // Bound or in progress, not ready, and not running.
1✔
1064
                // Avoiding triggering an event for scratch space required since it will be addressed
1✔
1065
                // by CDI and sounds more drastic than it actually is.
1✔
1066
                if curRunning.Message != "" && curRunning.Message != common.ScratchSpaceRequired &&
1✔
1067
                        (orgRunning == nil || orgRunning.Message != curRunning.Message) {
1✔
1068
                        r.recorder.Event(dataVolume, corev1.EventTypeWarning, curRunning.Reason, curRunning.Message)
×
1069
                }
×
1070
        }
1071
}
1072

1073
func (r *ReconcilerBase) emitEvent(dataVolume *cdiv1.DataVolume, dataVolumeCopy *cdiv1.DataVolume, curPhase cdiv1.DataVolumePhase, originalCond []cdiv1.DataVolumeCondition, event *Event) error {
1✔
1074
        if !reflect.DeepEqual(dataVolume.ObjectMeta, dataVolumeCopy.ObjectMeta) {
1✔
1075
                return fmt.Errorf("meta update is not allowed in updateStatus phase")
×
1076
        }
×
1077
        // Update status subresource only if changed
1078
        if !reflect.DeepEqual(dataVolume.Status, dataVolumeCopy.Status) {
2✔
1079
                if err := r.client.Status().Update(context.TODO(), dataVolumeCopy); err != nil {
1✔
1080
                        r.log.Error(err, "unable to update datavolume status", "name", dataVolumeCopy.Name)
×
1081
                        return err
×
1082
                }
×
1083
                // Emit the event only on status phase change
1084
                if event.eventType != "" && curPhase != dataVolumeCopy.Status.Phase {
2✔
1085
                        r.recorder.Event(dataVolumeCopy, event.eventType, event.reason, event.message)
1✔
1086
                }
1✔
1087
                r.emitConditionEvent(dataVolumeCopy, originalCond)
1✔
1088
        }
1089
        return nil
1✔
1090
}
1091

1092
func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) error {
1✔
1093
        if err := controllerutil.SetControllerReference(dv, pvc, r.scheme); err != nil {
1✔
1094
                return err
×
1095
        }
×
1096

1097
        return r.updatePVC(pvc)
1✔
1098
}
1099

1100
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
1✔
1101
        httpClient = cc.BuildHTTPClient(httpClient)
1✔
1102
        url, err := cc.GetMetricsURL(pod)
1✔
1103
        if err != nil {
2✔
1104
                return err
1✔
1105
        }
1✔
1106
        if url == "" {
1✔
1107
                return nil
×
1108
        }
×
1109

1110
        // Used for both import and clone, so it should match both metric names
1111
        progressReport, err := cc.GetProgressReportFromURL(url, httpClient,
1✔
1112
                fmt.Sprintf("%s|%s", importMetrics.ImportProgressMetricName, cloneMetrics.CloneProgressMetricName),
1✔
1113
                string(dataVolumeCopy.UID))
1✔
1114
        if err != nil {
1✔
1115
                return err
×
1116
        }
×
1117
        if progressReport != "" {
2✔
1118
                if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
2✔
1119
                        dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
1✔
1120
                }
1✔
1121
        }
1122
        return nil
1✔
1123
}
1124

1125
// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
1126
// It also sets the appropriate OwnerReferences on the resource
1127
// which allows handleObject to discover the DataVolume resource
1128
// that 'owns' it.
1129
func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, targetPvcSpec *corev1.PersistentVolumeClaimSpec, namespace, name string, pvcModifier pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
1✔
1130
        labels := map[string]string{
1✔
1131
                common.CDILabelKey: common.CDILabelValue,
1✔
1132
        }
1✔
1133
        if util.ResolveVolumeMode(targetPvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
2✔
1134
                labels[common.KubePersistentVolumeFillingUpSuppressLabelKey] = common.KubePersistentVolumeFillingUpSuppressLabelValue
1✔
1135
        }
1✔
1136
        for k, v := range dataVolume.Labels {
2✔
1137
                labels[k] = v
1✔
1138
        }
1✔
1139

1140
        annotations := make(map[string]string)
1✔
1141
        for k, v := range dataVolume.ObjectMeta.Annotations {
2✔
1142
                annotations[k] = v
1✔
1143
        }
1✔
1144
        annotations[cc.AnnPodRestarts] = "0"
1✔
1145
        annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType))
1✔
1146
        if dataVolume.Spec.PriorityClassName != "" {
2✔
1147
                annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName
1✔
1148
        }
1✔
1149
        annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(context.TODO(), r.client, dataVolume.Spec.Preallocation))
1✔
1150
        annotations[cc.AnnCreatedForDataVolume] = string(dataVolume.UID)
1✔
1151

1✔
1152
        if dataVolume.Spec.Storage != nil && labels[common.PvcApplyStorageProfileLabel] == "true" {
1✔
1153
                isWebhookPvcRenderingEnabled, err := featuregates.IsWebhookPvcRenderingEnabled(r.client)
×
1154
                if err != nil {
×
1155
                        return nil, err
×
1156
                }
×
1157
                if isWebhookPvcRenderingEnabled {
×
1158
                        labels[common.PvcApplyStorageProfileLabel] = "true"
×
1159
                        if targetPvcSpec.VolumeMode == nil {
×
1160
                                targetPvcSpec.VolumeMode = ptr.To[corev1.PersistentVolumeMode](cdiv1.PersistentVolumeFromStorageProfile)
×
1161
                        }
×
1162
                }
1163
        }
1164

1165
        pvc := &corev1.PersistentVolumeClaim{
1✔
1166
                ObjectMeta: metav1.ObjectMeta{
1✔
1167
                        Namespace:   namespace,
1✔
1168
                        Name:        name,
1✔
1169
                        Labels:      labels,
1✔
1170
                        Annotations: annotations,
1✔
1171
                },
1✔
1172
                Spec: *targetPvcSpec,
1✔
1173
        }
1✔
1174

1✔
1175
        if pvcModifier != nil {
2✔
1176
                if err := pvcModifier(dataVolume, pvc); err != nil {
2✔
1177
                        return nil, err
1✔
1178
                }
1✔
1179
        }
1180

1181
        if pvc.Namespace == dataVolume.Namespace {
2✔
1182
                pvc.OwnerReferences = []metav1.OwnerReference{
1✔
1183
                        *metav1.NewControllerRef(dataVolume, schema.GroupVersionKind{
1✔
1184
                                Group:   cdiv1.SchemeGroupVersion.Group,
1✔
1185
                                Version: cdiv1.SchemeGroupVersion.Version,
1✔
1186
                                Kind:    "DataVolume",
1✔
1187
                        }),
1✔
1188
                }
1✔
1189
        } else {
1✔
1190
                if err := setAnnOwnedByDataVolume(pvc, dataVolume); err != nil {
×
1191
                        return nil, err
×
1192
                }
×
1193
                pvc.Annotations[cc.AnnOwnerUID] = string(dataVolume.UID)
×
1194
        }
1195

1196
        for _, anno := range delayedAnnotations {
2✔
1197
                delete(pvc.Annotations, anno)
1✔
1198
        }
1✔
1199

1200
        return pvc, nil
1✔
1201
}
1202

1203
// Whenever the controller updates a DV, we must make sure to nil out spec.source when using other population methods
1204
func (r *ReconcilerBase) updateDataVolume(dv *cdiv1.DataVolume) error {
1✔
1205
        // Restore so we don't nil out the dv that is being worked on
1✔
1206
        var sourceCopy *cdiv1.DataVolumeSource
1✔
1207

1✔
1208
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1209
                sourceCopy = dv.Spec.Source
1✔
1210
                dv.Spec.Source = nil
1✔
1211
        }
1✔
1212

1213
        err := r.client.Update(context.TODO(), dv)
1✔
1214
        if dv.Spec.SourceRef != nil || dvUsesVolumePopulator(dv) {
2✔
1215
                dv.Spec.Source = sourceCopy
1✔
1216
        }
1✔
1217
        return err
1✔
1218
}
1219

1220
func (r *ReconcilerBase) updatePVC(pvc *corev1.PersistentVolumeClaim) error {
1✔
1221
        return r.client.Update(context.TODO(), pvc)
1✔
1222
}
1✔
1223

1224
func newLongTermCloneTokenGenerator(key *rsa.PrivateKey) token.Generator {
×
1225
        return token.NewGenerator(common.ExtendedCloneTokenIssuer, key, 10*365*24*time.Hour)
×
1226
}
×
1227

1228
// storageClassWaitForFirstConsumer returns if the binding mode of a given storage class is WFFC
1229
func (r *ReconcilerBase) storageClassWaitForFirstConsumer(storageClass *string) (bool, error) {
1✔
1230
        storageClassBindingMode, err := r.getStorageClassBindingMode(storageClass)
1✔
1231
        if err != nil {
1✔
1232
                return false, err
×
1233
        }
×
1234

1235
        return storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer, nil
1✔
1236
}
1237

1238
// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
1239
func (r *ReconcilerBase) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1240
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1241
        if err != nil {
1✔
1242
                return false, err
×
1243
        }
×
1244

1245
        honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
1✔
1246
        if err != nil {
1✔
1247
                return false, err
×
1248
        }
×
1249

1250
        res := honorWaitForFirstConsumerEnabled && wffc &&
1✔
1251
                pvc.Status.Phase == corev1.ClaimPending
1✔
1252

1✔
1253
        return res, nil
1✔
1254
}
1255

1256
func (r *ReconcilerBase) shouldReconcileVolumeSourceCR(syncState *dvSyncState) bool {
1✔
1257
        if syncState.pvc == nil {
2✔
1258
                return true
1✔
1259
        }
1✔
1260
        phase := syncState.pvc.Annotations[cc.AnnPodPhase]
1✔
1261
        return phase != string(corev1.PodSucceeded) || syncState.dvMutated.Status.Phase != cdiv1.Succeeded
1✔
1262
}
1263

1264
// shouldBeMarkedPendingPopulation decides whether we should mark DV as PendingPopulation
1265
func (r *ReconcilerBase) shouldBeMarkedPendingPopulation(pvc *corev1.PersistentVolumeClaim) (bool, error) {
1✔
1266
        wffc, err := r.storageClassWaitForFirstConsumer(pvc.Spec.StorageClassName)
1✔
1267
        if err != nil {
1✔
1268
                return false, err
×
1269
        }
×
1270
        nodeName := pvc.Annotations[cc.AnnSelectedNode]
1✔
1271
        immediateBindingRequested := cc.ImmediateBindingRequested(pvc)
1✔
1272

1✔
1273
        return wffc && nodeName == "" && !immediateBindingRequested, nil
1✔
1274
}
1275

1276
// handlePvcCreation works as a wrapper for non-clone PVC creation and error handling
1277
func (r *ReconcilerBase) handlePvcCreation(log logr.Logger, syncState *dvSyncState, pvcModifier pvcModifierFunc) error {
1✔
1278
        if syncState.pvc != nil {
2✔
1279
                return nil
1✔
1280
        }
1✔
1281
        if dvIsPrePopulated(syncState.dvMutated) {
1✔
1282
                return nil
×
1283
        }
×
1284
        // Creating the PVC
1285
        newPvc, err := r.createPvcForDatavolume(syncState.dvMutated, syncState.pvcSpec, pvcModifier)
1✔
1286
        if err != nil {
2✔
1287
                if cc.ErrQuotaExceeded(err) {
1✔
1288
                        syncErr := r.syncDataVolumeStatusPhaseWithEvent(syncState, cdiv1.Pending, nil, Event{corev1.EventTypeWarning, cc.ErrExceededQuota, err.Error()})
×
1289
                        if syncErr != nil {
×
1290
                                log.Error(syncErr, "failed to sync DataVolume status with event")
×
1291
                        }
×
1292
                }
1293
                return err
1✔
1294
        }
1295
        syncState.pvc = newPvc
1✔
1296

1✔
1297
        return nil
1✔
1298
}
1299

1300
// shouldUseCDIPopulator returns if the population of the PVC should be done using
1301
// CDI populators.
1302
// Currently it will use populators only if:
1303
// * storageClass used is CSI storageClass
1304
// * annotation cdi.kubevirt.io/storage.usePopulator is not set by user to "false"
1305
func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, error) {
1✔
1306
        dv := syncState.dvMutated
1✔
1307
        if usePopulator, ok := dv.Annotations[cc.AnnUsePopulator]; ok {
2✔
1308
                boolUsePopulator, err := strconv.ParseBool(usePopulator)
1✔
1309
                if err != nil {
1✔
1310
                        return false, err
×
1311
                }
×
1312
                return boolUsePopulator, nil
1✔
1313
        }
1314
        log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
1✔
1315
        usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
1✔
1316
        if err != nil {
1✔
1317
                return false, err
×
1318
        }
×
1319
        if !usePopulator {
2✔
1320
                if syncState.pvcSpec.StorageClassName != nil {
2✔
1321
                        log.Info("Not using CDI populators, storage class is not a CSI storage", "storageClass", *syncState.pvcSpec.StorageClassName)
1✔
1322
                }
1✔
1323
        }
1324

1325
        return usePopulator, nil
1✔
1326
}
1327

1328
func (r *ReconcilerBase) pvcRequiresWork(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
1329
        if pvc == nil || dv == nil {
2✔
1330
                return true, nil
1✔
1331
        }
1✔
1332
        if pvcIsPopulatedForDataVolume(pvc, dv) {
2✔
1333
                return false, nil
1✔
1334
        }
1✔
1335
        canAdopt, err := cc.AllowClaimAdoption(r.client, pvc, dv)
1✔
1336
        if err != nil {
1✔
1337
                return true, err
×
1338
        }
×
1339
        if canAdopt {
2✔
1340
                return false, nil
1✔
1341
        }
1✔
1342
        return true, nil
1✔
1343
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc