• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5400

24 Jun 2025 07:02AM UTC coverage: 59.418% (+0.003%) from 59.415%
#5400

Pull #3760

travis-ci

Acedus
testing: test pullMethod: node multi-arch import flake

Signed-off-by: Adi Aloni <aaloni@redhat.com>
Pull Request #3760: VEP48: Introduce DataSource Source DataSource (DataSource Pointers)

27 of 59 new or added lines in 5 files covered. (45.76%)

83 existing lines in 2 files now uncovered.

16980 of 28577 relevant lines covered (59.42%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.16
/pkg/controller/datavolume/clone-controller-base.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23

24
        "github.com/pkg/errors"
25

26
        corev1 "k8s.io/api/core/v1"
27
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/types"
30

31
        "sigs.k8s.io/controller-runtime/pkg/client"
32
        "sigs.k8s.io/controller-runtime/pkg/controller"
33
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34
        "sigs.k8s.io/controller-runtime/pkg/event"
35
        "sigs.k8s.io/controller-runtime/pkg/handler"
36
        "sigs.k8s.io/controller-runtime/pkg/manager"
37
        "sigs.k8s.io/controller-runtime/pkg/predicate"
38
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
39
        "sigs.k8s.io/controller-runtime/pkg/source"
40

41
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
42
        "kubevirt.io/containerized-data-importer/pkg/controller/clone"
43
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
44
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
45
        "kubevirt.io/containerized-data-importer/pkg/token"
46
)
47

48
const (
49
        // ErrUnableToClone provides a const to indicate some errors are blocking the clone
50
        ErrUnableToClone = "ErrUnableToClone"
51

52
        // CloneScheduled provides a const to indicate clone is scheduled
53
        CloneScheduled = "CloneScheduled"
54
        // CloneInProgress provides a const to indicate clone is in progress
55
        CloneInProgress = "CloneInProgress"
56
        // SnapshotForSmartCloneInProgress provides a const to indicate snapshot creation for smart-clone is in progress
57
        SnapshotForSmartCloneInProgress = "SnapshotForSmartCloneInProgress"
58
        // CloneFromSnapshotSourceInProgress provides a const to indicate clone from snapshot source is in progress
59
        CloneFromSnapshotSourceInProgress = "CloneFromSnapshotSourceInProgress"
60
        // SnapshotForSmartCloneCreated provides a const to indicate snapshot creation for smart-clone has been completed
61
        SnapshotForSmartCloneCreated = "SnapshotForSmartCloneCreated"
62
        // CSICloneInProgress provides a const to indicate  csi volume clone is in progress
63
        CSICloneInProgress = "CSICloneInProgress"
64
        // CloneFailed provides a const to indicate clone has failed
65
        CloneFailed = "CloneFailed"
66
        // CloneSucceeded provides a const to indicate clone has succeeded
67
        CloneSucceeded = "CloneSucceeded"
68

69
        // MessageCloneScheduled provides a const to form clone is scheduled message
70
        MessageCloneScheduled = "Cloning from %s/%s into %s/%s scheduled"
71
        // MessageCloneInProgress provides a const to form clone is in progress message
72
        MessageCloneInProgress = "Cloning from %s/%s into %s/%s in progress"
73
        // MessageCloneFailed provides a const to form clone has failed message
74
        MessageCloneFailed = "Cloning from %s/%s into %s/%s failed"
75
        // MessageCloneSucceeded provides a const to form clone has succeeded message
76
        MessageCloneSucceeded = "Successfully cloned from %s/%s into %s/%s"
77
        // MessageSmartCloneInProgress provides a const to form snapshot for smart-clone is in progress message
78
        MessageSmartCloneInProgress = "Creating snapshot for smart-clone is in progress (for pvc %s/%s)"
79
        // MessageCloneFromSnapshotSourceInProgress provides a const to form clone from snapshot source is in progress message
80
        MessageCloneFromSnapshotSourceInProgress = "Creating PVC from snapshot source is in progress (for %s %s/%s)"
81
        // MessageCsiCloneInProgress provides a const to form a CSI Volume Clone in progress message
82
        MessageCsiCloneInProgress = "CSI Volume clone in progress (for pvc %s/%s)"
83

84
        // ExpansionInProgress is const representing target PVC expansion
85
        ExpansionInProgress = "ExpansionInProgress"
86
        // MessageExpansionInProgress is a const for reporting target expansion
87
        MessageExpansionInProgress = "Expanding PersistentVolumeClaim for DataVolume %s/%s"
88
        // NamespaceTransferInProgress is const representing target PVC transfer
89
        NamespaceTransferInProgress = "NamespaceTransferInProgress"
90
        // MessageNamespaceTransferInProgress is a const for reporting target transfer
91
        MessageNamespaceTransferInProgress = "Transferring PersistentVolumeClaim for DataVolume %s/%s"
92
        // SizeDetectionPodCreated provides a const to indicate that the size-detection pod has been created (reason)
93
        SizeDetectionPodCreated = "SizeDetectionPodCreated"
94
        // MessageSizeDetectionPodCreated provides a const to indicate that the size-detection pod has been created (message)
95
        MessageSizeDetectionPodCreated = "Size-detection pod created"
96
        // SizeDetectionPodNotReady reports that the size-detection pod has not finished its execution (reason)
97
        SizeDetectionPodNotReady = "SizeDetectionPodNotReady"
98
        // MessageSizeDetectionPodNotReady reports that the size-detection pod has not finished its execution (message)
99
        MessageSizeDetectionPodNotReady = "The size detection pod is not finished yet"
100
        // ImportPVCNotReady reports that it's not yet possible to access the source PVC (reason)
101
        ImportPVCNotReady = "ImportPVCNotReady"
102
        // MessageImportPVCNotReady reports that it's not yet possible to access the source PVC (message)
103
        MessageImportPVCNotReady = "The source PVC is not fully imported"
104
        // CloneValidationFailed reports that a clone wasn't admitted by our validation mechanism (reason)
105
        CloneValidationFailed = "CloneValidationFailed"
106
        // MessageCloneValidationFailed reports that a clone wasn't admitted by our validation mechanism (message)
107
        MessageCloneValidationFailed = "The clone doesn't meet the validation requirements: %s"
108
        // CloneWithoutSource reports that the source of a clone doesn't exists (reason)
109
        CloneWithoutSource = "CloneWithoutSource"
110
        // MessageCloneWithoutSource reports that the source of a clone doesn't exists (message)
111
        MessageCloneWithoutSource = "The source %s %s doesn't exist"
112
        // PrepClaimInProgress is const representing target PVC prep
113
        PrepClaimInProgress = "PrepClaimInProgress"
114
        // MessagePrepClaimInProgress is a const for reporting target prep
115
        MessagePrepClaimInProgress = "Prepping PersistentVolumeClaim for DataVolume %s/%s"
116
        // RebindInProgress is const representing target PVC rebind
117
        RebindInProgress = "RebindInProgress"
118
        // MessageRebindInProgress is a const for reporting target rebind
119
        MessageRebindInProgress = "Rebinding PersistentVolumeClaim for DataVolume %s/%s"
120
        // NoPopulator reports CDI populator is not used so we fallback to host-assisted cloning (reason)
121
        NoPopulator = "NoPopulator"
122
        // NoPopulatorMessage reports CDI populator is not used so we fallback to host-assisted cloning (message)
123
        NoPopulatorMessage = "In tree storage class does not support snapshot/clone"
124

125
        // AnnCSICloneRequest annotation associates object with CSI Clone Request
126
        AnnCSICloneRequest = "cdi.kubevirt.io/CSICloneRequest"
127

128
        // AnnVirtualImageSize annotation contains the Virtual Image size of a PVC used for host-assisted cloning
129
        AnnVirtualImageSize = "cdi.Kubervirt.io/virtualSize"
130

131
        // AnnSourceCapacity annotation contains the storage capacity of a PVC used for host-assisted cloning
132
        AnnSourceCapacity = "cdi.Kubervirt.io/sourceCapacity"
133

134
        crossNamespaceFinalizer = "cdi.kubevirt.io/dataVolumeFinalizer"
135
)
136

137
// CloneReconcilerBase members
138
type CloneReconcilerBase struct {
139
        ReconcilerBase
140
        clonerImage         string
141
        importerImage       string
142
        pullPolicy          string
143
        cloneSourceAPIGroup *string
144
        cloneSourceKind     string
145
        shortTokenValidator token.Validator
146
        longTokenValidator  token.Validator
147
        tokenGenerator      token.Generator
148
}
149

150
func (r *CloneReconcilerBase) addVolumeCloneSourceWatch(mgr manager.Manager, datavolumeController controller.Controller) error {
×
151
        return datavolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.VolumeCloneSource{}, handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.VolumeCloneSource](
×
152
                func(ctx context.Context, obj *cdiv1.VolumeCloneSource) []reconcile.Request {
×
153
                        var err error
×
154
                        var hasDataVolumeOwner bool
×
155
                        var ownerNamespace, ownerName string
×
156
                        ownerRef := metav1.GetControllerOf(obj)
×
157
                        if ownerRef != nil && ownerRef.Kind == "DataVolume" {
×
158
                                hasDataVolumeOwner = true
×
159
                                ownerNamespace = obj.GetNamespace()
×
160
                                ownerName = ownerRef.Name
×
161
                        } else if hasAnnOwnedByDataVolume(obj) {
×
162
                                hasDataVolumeOwner = true
×
163
                                ownerNamespace, ownerName, err = getAnnOwnedByDataVolume(obj)
×
164
                                if err != nil {
×
165
                                        return nil
×
166
                                }
×
167
                        }
168
                        if !hasDataVolumeOwner {
×
169
                                return nil
×
170
                        }
×
171
                        dv := &cdiv1.DataVolume{
×
172
                                ObjectMeta: metav1.ObjectMeta{
×
173
                                        Namespace: ownerNamespace,
×
174
                                        Name:      ownerName,
×
175
                                },
×
176
                        }
×
177
                        if err = r.client.Get(ctx, client.ObjectKeyFromObject(dv), dv); err != nil {
×
178
                                r.log.Info("Failed to get DataVolume", "error", err)
×
179
                                return nil
×
180
                        }
×
181
                        if err := r.populateSourceIfSourceRef(dv); err != nil {
×
182
                                r.log.Info("Failed to check DataSource", "error", err)
×
183
                                return nil
×
184
                        }
×
185
                        if (r.cloneSourceKind == "PersistentVolumeClaim" && dv.Spec.Source.PVC != nil) ||
×
186
                                (r.cloneSourceKind == "VolumeSnapshot" && dv.Spec.Source.Snapshot != nil) {
×
187
                                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: ownerNamespace, Name: ownerName}}}
×
188
                        }
×
189
                        return nil
×
190
                }),
191
        ))
192
}
193

194
func (r *CloneReconcilerBase) updatePVCForPopulation(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
195
        if dataVolume.Spec.Source.PVC == nil && dataVolume.Spec.Source.Snapshot == nil {
1✔
196
                return errors.Errorf("no source set for clone datavolume")
×
197
        }
×
198
        if err := addCloneToken(dataVolume, pvc); err != nil {
1✔
199
                return err
×
200
        }
×
201
        if err := cc.AddImmediateBindingAnnotationIfWFFCDisabled(pvc, r.featureGates); err != nil {
1✔
202
                return err
×
203
        }
×
204
        if isCrossNamespaceClone(dataVolume) {
2✔
205
                _, _, sourcNamespace := cc.GetCloneSourceInfo(dataVolume)
1✔
206
                cc.AddAnnotation(pvc, populators.AnnDataSourceNamespace, sourcNamespace)
1✔
207
        }
1✔
208
        apiGroup := cc.AnnAPIGroup
1✔
209
        pvc.Spec.DataSourceRef = &corev1.TypedObjectReference{
1✔
210
                APIGroup: &apiGroup,
1✔
211
                Kind:     cdiv1.VolumeCloneSourceRef,
1✔
212
                Name:     volumeCloneSourceName(dataVolume),
1✔
213
        }
1✔
214
        return nil
1✔
215
}
216

217
func (r *CloneReconcilerBase) ensureExtendedTokenDV(dv *cdiv1.DataVolume) (bool, error) {
1✔
218
        if !isCrossNamespaceClone(dv) {
2✔
219
                return false, nil
1✔
220
        }
1✔
221

222
        _, ok := dv.Annotations[cc.AnnExtendedCloneToken]
1✔
223
        if ok {
2✔
224
                return false, nil
1✔
225
        }
1✔
226

227
        token, ok := dv.Annotations[cc.AnnCloneToken]
1✔
228
        if !ok {
1✔
229
                return false, fmt.Errorf("token missing")
×
230
        }
×
231

232
        payload, err := r.shortTokenValidator.Validate(token)
1✔
233
        if err != nil {
1✔
234
                return false, err
×
235
        }
×
236

237
        if payload.Params == nil {
2✔
238
                payload.Params = make(map[string]string)
1✔
239
        }
1✔
240
        payload.Params["uid"] = string(dv.UID)
1✔
241

1✔
242
        newToken, err := r.tokenGenerator.Generate(payload)
1✔
243
        if err != nil {
1✔
244
                return false, err
×
245
        }
×
246

247
        dv.Annotations[cc.AnnExtendedCloneToken] = newToken
1✔
248

1✔
249
        return true, nil
1✔
250
}
251

252
func (r *CloneReconcilerBase) fallbackToHostAssisted(pvc *corev1.PersistentVolumeClaim) error {
1✔
253
        pvcCpy := pvc.DeepCopy()
1✔
254

1✔
255
        cc.AddAnnotation(pvcCpy, cc.AnnCloneType, string(cdiv1.CloneStrategyHostAssisted))
1✔
256
        cc.AddAnnotation(pvcCpy, populators.AnnCloneFallbackReason, NoPopulatorMessage)
1✔
257

1✔
258
        if !reflect.DeepEqual(pvc, pvcCpy) {
2✔
259
                r.recorder.Event(pvcCpy, corev1.EventTypeWarning, NoPopulator, NoPopulatorMessage)
1✔
260
                if err := r.updatePVC(pvcCpy); err != nil {
1✔
261
                        return err
×
262
                }
×
263
        }
264

265
        return nil
1✔
266
}
267

268
func (r *CloneReconcilerBase) ensureExtendedTokenPVC(dv *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
269
        if !isCrossNamespaceClone(dv) {
2✔
270
                return nil
1✔
271
        }
1✔
272

273
        _, ok := pvc.Annotations[cc.AnnExtendedCloneToken]
1✔
274
        if ok {
1✔
275
                return nil
×
276
        }
×
277

278
        token, ok := dv.Annotations[cc.AnnExtendedCloneToken]
1✔
279
        if !ok {
1✔
280
                return fmt.Errorf("token missing")
×
281
        }
×
282

283
        payload, err := r.longTokenValidator.Validate(token)
1✔
284
        if err != nil {
1✔
285
                return err
×
286
        }
×
287

288
        if payload.Params["uid"] != string(dv.UID) {
1✔
289
                return fmt.Errorf("token uid mismatch")
×
290
        }
×
291

292
        // now use pvc uid
293
        payload.Params["uid"] = string(pvc.UID)
1✔
294

1✔
295
        newToken, err := r.tokenGenerator.Generate(payload)
1✔
296
        if err != nil {
1✔
297
                return err
×
298
        }
×
299

300
        pvc.Annotations[cc.AnnExtendedCloneToken] = newToken
1✔
301

1✔
302
        if err := r.updatePVC(pvc); err != nil {
1✔
303
                return err
×
304
        }
×
305

306
        return nil
1✔
307
}
308

309
func (r *CloneReconcilerBase) reconcileVolumeCloneSourceCR(syncState *dvSyncState) error {
1✔
310
        dv := syncState.dvMutated
1✔
311
        volumeCloneSource := &cdiv1.VolumeCloneSource{}
1✔
312
        volumeCloneSourceName := volumeCloneSourceName(dv)
1✔
313
        _, sourceName, sourceNamespace := cc.GetCloneSourceInfo(dv)
1✔
314
        deletedOrSucceeded := dv.DeletionTimestamp != nil || dv.Status.Phase == cdiv1.Succeeded
1✔
315
        exists, err := cc.GetResource(context.TODO(), r.client, sourceNamespace, volumeCloneSourceName, volumeCloneSource)
1✔
316
        if err != nil {
1✔
317
                return err
×
318
        }
×
319

320
        if deletedOrSucceeded || exists {
2✔
321
                if deletedOrSucceeded && exists {
2✔
322
                        if err := r.client.Delete(context.TODO(), volumeCloneSource); err != nil {
1✔
323
                                if !k8serrors.IsNotFound(err) {
×
324
                                        return err
×
325
                                }
×
326
                        }
327
                }
328

329
                if deletedOrSucceeded {
2✔
330
                        cc.RemoveFinalizer(dv, crossNamespaceFinalizer)
1✔
331
                }
1✔
332

333
                return nil
1✔
334
        }
335

336
        volumeCloneSource = &cdiv1.VolumeCloneSource{
1✔
337
                ObjectMeta: metav1.ObjectMeta{
1✔
338
                        Name:      volumeCloneSourceName,
1✔
339
                        Namespace: sourceNamespace,
1✔
340
                },
1✔
341
                Spec: cdiv1.VolumeCloneSourceSpec{
1✔
342
                        Source: corev1.TypedLocalObjectReference{
1✔
343
                                APIGroup: r.cloneSourceAPIGroup,
1✔
344
                                Kind:     r.cloneSourceKind,
1✔
345
                                Name:     sourceName,
1✔
346
                        },
1✔
347
                        Preallocation: dv.Spec.Preallocation,
1✔
348
                },
1✔
349
        }
1✔
350

1✔
351
        if dv.Spec.PriorityClassName != "" {
2✔
352
                volumeCloneSource.Spec.PriorityClassName = &dv.Spec.PriorityClassName
1✔
353
        }
1✔
354

355
        if sourceNamespace == dv.Namespace {
2✔
356
                if err := controllerutil.SetControllerReference(dv, volumeCloneSource, r.scheme); err != nil {
1✔
357
                        return err
×
358
                }
×
359
        } else {
1✔
360
                if err := setAnnOwnedByDataVolume(volumeCloneSource, dv); err != nil {
1✔
361
                        return err
×
362
                }
×
363
        }
364

365
        if err := r.client.Create(context.TODO(), volumeCloneSource); err != nil {
1✔
366
                if !k8serrors.IsAlreadyExists(err) {
×
367
                        return err
×
368
                }
×
369
        }
370

371
        return nil
1✔
372
}
373

374
func (r *CloneReconcilerBase) syncCloneStatusPhase(syncState *dvSyncState, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim) error {
1✔
375
        var event Event
1✔
376
        dataVolume := syncState.dvMutated
1✔
377
        r.setEventForPhase(dataVolume, phase, &event)
1✔
378
        return r.syncDataVolumeStatusPhaseWithEvent(syncState, phase, pvc, event)
1✔
379
}
1✔
380

381
func (r *CloneReconcilerBase) setEventForPhase(dataVolume *cdiv1.DataVolume, phase cdiv1.DataVolumePhase, event *Event) {
1✔
382
        sourceType, sourceName, sourceNamespace := cc.GetCloneSourceInfo(dataVolume)
1✔
383
        switch phase {
1✔
384
        case cdiv1.CloneScheduled:
1✔
385
                event.eventType = corev1.EventTypeNormal
1✔
386
                event.reason = CloneScheduled
1✔
387
                event.message = fmt.Sprintf(MessageCloneScheduled, sourceNamespace, sourceName, dataVolume.Namespace, dataVolume.Name)
1✔
388
        case cdiv1.SnapshotForSmartCloneInProgress:
1✔
389
                event.eventType = corev1.EventTypeNormal
1✔
390
                event.reason = SnapshotForSmartCloneInProgress
1✔
391
                event.message = fmt.Sprintf(MessageSmartCloneInProgress, sourceNamespace, sourceName)
1✔
392
        case cdiv1.CloneFromSnapshotSourceInProgress:
1✔
393
                event.eventType = corev1.EventTypeNormal
1✔
394
                event.reason = CloneFromSnapshotSourceInProgress
1✔
395
                event.message = fmt.Sprintf(MessageCloneFromSnapshotSourceInProgress, sourceType, sourceNamespace, sourceName)
1✔
396
        case cdiv1.CSICloneInProgress:
1✔
397
                event.eventType = corev1.EventTypeNormal
1✔
398
                event.reason = CSICloneInProgress
1✔
399
                event.message = fmt.Sprintf(MessageCsiCloneInProgress, sourceNamespace, sourceName)
1✔
400
        case cdiv1.Succeeded:
1✔
401
                event.eventType = corev1.EventTypeNormal
1✔
402
                event.reason = CloneSucceeded
1✔
403
                event.message = fmt.Sprintf(MessageCloneSucceeded, sourceNamespace, sourceName, dataVolume.Namespace, dataVolume.Name)
1✔
404
        case cdiv1.CloneInProgress:
1✔
405
                event.eventType = corev1.EventTypeNormal
1✔
406
                event.reason = CloneInProgress
1✔
407
                event.message = fmt.Sprintf(MessageCloneInProgress, sourceNamespace, sourceName, dataVolume.Namespace, dataVolume.Name)
1✔
408
        case cdiv1.PrepClaimInProgress:
1✔
409
                event.eventType = corev1.EventTypeNormal
1✔
410
                event.reason = PrepClaimInProgress
1✔
411
                event.message = fmt.Sprintf(MessagePrepClaimInProgress, dataVolume.Namespace, dataVolume.Name)
1✔
412
        case cdiv1.RebindInProgress:
1✔
413
                event.eventType = corev1.EventTypeNormal
1✔
414
                event.reason = RebindInProgress
1✔
415
                event.message = fmt.Sprintf(MessageRebindInProgress, dataVolume.Namespace, dataVolume.Name)
1✔
416
        default:
×
417
                r.log.V(3).Info("No event set for phase", "phase", phase)
×
418
        }
419
}
420

421
var populatorPhaseMap = map[string]cdiv1.DataVolumePhase{
422
        "":                           cdiv1.CloneScheduled,
423
        clone.PendingPhaseName:       cdiv1.CloneScheduled,
424
        clone.SucceededPhaseName:     cdiv1.Succeeded,
425
        clone.CSIClonePhaseName:      cdiv1.CSICloneInProgress,
426
        clone.HostClonePhaseName:     cdiv1.CloneInProgress,
427
        clone.PrepClaimPhaseName:     cdiv1.PrepClaimInProgress,
428
        clone.RebindPhaseName:        cdiv1.RebindInProgress,
429
        clone.SnapshotClonePhaseName: cdiv1.CloneFromSnapshotSourceInProgress,
430
        clone.SnapshotPhaseName:      cdiv1.SnapshotForSmartCloneInProgress,
431
        clone.ErrorPhaseName:         cdiv1.Failed,
432
}
433

434
func (r *CloneReconcilerBase) updateStatusPhaseForPopulator(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
435
        popPhase := pvc.Annotations[populators.AnnClonePhase]
1✔
436
        dvPhase, ok := populatorPhaseMap[popPhase]
1✔
437
        if !ok {
1✔
438
                r.log.V(1).Info("Unknown populator phase", "phase", popPhase)
×
439
                //dataVolumeCopy.Status.Phase = cdiv1.Unknown // hold off on this for now
×
440
                return nil
×
441
        }
×
442
        // Avoid setting DV to failed for consistency with non-populator flow
443
        if dvPhase != cdiv1.Failed {
2✔
444
                dataVolumeCopy.Status.Phase = dvPhase
1✔
445
        }
1✔
446
        r.setEventForPhase(dataVolumeCopy, dvPhase, event)
1✔
447
        return nil
1✔
448
}
449

450
func (r *CloneReconcilerBase) updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
451
        if err := r.populateSourceIfSourceRef(dataVolumeCopy); err != nil {
1✔
452
                return err
×
453
        }
×
454
        _, sourceName, sourceNamespace := cc.GetCloneSourceInfo(dataVolumeCopy)
1✔
455

1✔
456
        usePopulator, err := CheckPVCUsingPopulators(pvc)
1✔
457
        if err != nil {
1✔
458
                return err
×
459
        }
×
460
        if usePopulator {
2✔
461
                return r.updateStatusPhaseForPopulator(pvc, dataVolumeCopy, event)
1✔
462
        }
1✔
463

464
        phase, ok := pvc.Annotations[cc.AnnPodPhase]
1✔
465
        if phase != string(corev1.PodSucceeded) {
2✔
466
                _, ok = pvc.Annotations[cc.AnnCloneRequest]
1✔
467
                requiresWork, err := r.pvcRequiresWork(pvc, dataVolumeCopy)
1✔
468
                if err != nil {
1✔
469
                        return err
×
470
                }
×
471
                if !ok || pvc.Status.Phase != corev1.ClaimBound || !requiresWork {
2✔
472
                        return nil
1✔
473
                }
1✔
474
                dataVolumeCopy.Status.Phase = cdiv1.CloneScheduled
1✔
475
        }
476
        if !ok {
1✔
477
                return nil
×
478
        }
×
479

480
        switch phase {
1✔
481
        case string(corev1.PodPending):
1✔
482
                dataVolumeCopy.Status.Phase = cdiv1.CloneScheduled
1✔
483
                event.eventType = corev1.EventTypeNormal
1✔
484
                event.reason = CloneScheduled
1✔
485
                event.message = fmt.Sprintf(MessageCloneScheduled, sourceNamespace, sourceName, pvc.Namespace, pvc.Name)
1✔
486
        case string(corev1.PodRunning):
1✔
487
                dataVolumeCopy.Status.Phase = cdiv1.CloneInProgress
1✔
488
                event.eventType = corev1.EventTypeNormal
1✔
489
                event.reason = CloneInProgress
1✔
490
                event.message = fmt.Sprintf(MessageCloneInProgress, sourceNamespace, sourceName, pvc.Namespace, pvc.Name)
1✔
491
        case string(corev1.PodFailed):
1✔
492
                event.eventType = corev1.EventTypeWarning
1✔
493
                event.reason = CloneFailed
1✔
494
                event.message = fmt.Sprintf(MessageCloneFailed, sourceNamespace, sourceName, pvc.Namespace, pvc.Name)
1✔
495
        case string(corev1.PodSucceeded):
1✔
496
                dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
497
                dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress("100.0%")
1✔
498
                event.eventType = corev1.EventTypeNormal
1✔
499
                event.reason = CloneSucceeded
1✔
500
                event.message = fmt.Sprintf(MessageCloneSucceeded, sourceNamespace, sourceName, pvc.Namespace, pvc.Name)
1✔
501
        }
502
        return nil
1✔
503
}
504

505
// If SourceRef is set, populate spec.Source with data from the DataSource
506
// Note that when the controller actually updates the DV (updateDataVolume), we nil out spec.Source when SourceRef is set
507
func (r *CloneReconcilerBase) populateSourceIfSourceRef(dv *cdiv1.DataVolume) error {
1✔
508
        if dv.Spec.SourceRef == nil {
2✔
509
                return nil
1✔
510
        }
1✔
511
        if dv.Spec.SourceRef.Kind != cdiv1.DataVolumeDataSource {
×
512
                return errors.Errorf("Unsupported sourceRef kind %s, currently only %s is supported", dv.Spec.SourceRef.Kind, cdiv1.DataVolumeDataSource)
×
513
        }
×
514
        ns := dv.Namespace
×
515
        if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
×
516
                ns = *dv.Spec.SourceRef.Namespace
×
517
        }
×
518
        dataSource := &cdiv1.DataSource{}
×
519
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: dv.Spec.SourceRef.Name, Namespace: ns}, dataSource); err != nil {
×
520
                return err
×
521
        }
×
NEW
522
        if dataSource.Spec.Source.DataSource != nil {
×
NEW
523
                dataSource.Status.Source.DeepCopyInto(&dataSource.Spec.Source)
×
NEW
524
        }
×
525
        if dataSource.Spec.Source.PVC == nil && dataSource.Spec.Source.Snapshot == nil {
×
526
                return errors.Errorf("Empty source field in '%s'. DataSource may not be ready yet", dataSource.Name)
×
527
        }
×
528

529
        dv.Spec.Source = &cdiv1.DataVolumeSource{
×
530
                PVC:      dataSource.Spec.Source.PVC,
×
531
                Snapshot: dataSource.Spec.Source.Snapshot,
×
532
        }
×
533
        return nil
×
534
}
535

536
func isCrossNamespaceClone(dv *cdiv1.DataVolume) bool {
1✔
537
        _, _, sourceNamespace := cc.GetCloneSourceInfo(dv)
1✔
538

1✔
539
        return sourceNamespace != "" && sourceNamespace != dv.Namespace
1✔
540
}
1✔
541

542
// addCloneWithoutSourceWatch reconciles clones created without source once the matching PVC is created
543
func addCloneWithoutSourceWatch(mgr manager.Manager, datavolumeController controller.Controller, typeToWatch client.Object, indexingKey string, op dataVolumeOp) error {
×
544
        getKey := func(namespace, name string) string {
×
545
                return namespace + "/" + name
×
546
        }
×
547

548
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataVolume{}, indexingKey, func(obj client.Object) []string {
×
549
                dv := obj.(*cdiv1.DataVolume)
×
550
                if source := dv.Spec.Source; source != nil {
×
551
                        _, sourceName, sourceNamespace := cc.GetCloneSourceInfo(dv)
×
552
                        if getDataVolumeOp(context.TODO(), mgr.GetLogger(), dv, mgr.GetClient()) == op && sourceName != "" {
×
553
                                ns := cc.GetNamespace(sourceNamespace, obj.GetNamespace())
×
554
                                return []string{getKey(ns, sourceName)}
×
555
                        }
×
556
                }
557
                return nil
×
558
        }); err != nil {
×
559
                return err
×
560
        }
×
561

562
        // Function to reconcile DVs that match the selected fields
563
        dataVolumeMapper := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
564
                dvList := &cdiv1.DataVolumeList{}
×
565
                namespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
×
566
                matchingFields := client.MatchingFields{indexingKey: namespacedName.String()}
×
567
                if err := mgr.GetClient().List(ctx, dvList, matchingFields); err != nil {
×
568
                        return nil
×
569
                }
×
570
                var reqs []reconcile.Request
×
571
                for _, dv := range dvList.Items {
×
572
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}})
×
573
                }
×
574
                return reqs
×
575
        }
576

577
        if err := datavolumeController.Watch(source.Kind(mgr.GetCache(), typeToWatch,
×
578
                handler.EnqueueRequestsFromMapFunc(dataVolumeMapper),
×
579
                predicate.Funcs{
×
580
                        CreateFunc: func(e event.CreateEvent) bool { return true },
×
581
                        DeleteFunc: func(e event.DeleteEvent) bool { return false },
×
582
                        UpdateFunc: func(e event.UpdateEvent) bool { return true },
×
583
                })); err != nil {
×
584
                return err
×
585
        }
×
586

587
        return nil
×
588
}
589

590
func addDataSourceWatch(mgr manager.Manager, c controller.Controller, indexingKey string, op dataVolumeOp) error {
×
591
        getKey := func(namespace, name string) string {
×
592
                return namespace + "/" + name
×
593
        }
×
594

595
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataVolume{}, indexingKey, func(obj client.Object) []string {
×
596
                dv := obj.(*cdiv1.DataVolume)
×
597
                if sourceRef := dv.Spec.SourceRef; sourceRef != nil && sourceRef.Kind == cdiv1.DataVolumeDataSource {
×
598
                        ns := obj.GetNamespace()
×
599
                        if sourceRef.Namespace != nil && *sourceRef.Namespace != "" {
×
600
                                ns = *sourceRef.Namespace
×
601
                        }
×
602
                        if getDataVolumeOp(context.TODO(), mgr.GetLogger(), dv, mgr.GetClient()) == op && sourceRef.Name != "" {
×
603
                                return []string{getKey(ns, sourceRef.Name)}
×
604
                        }
×
605
                }
606
                return nil
×
607
        }); err != nil {
×
608
                return err
×
609
        }
×
610

611
        mapToDataVolume := func(ctx context.Context, obj *cdiv1.DataSource) []reconcile.Request {
×
612
                var dvs cdiv1.DataVolumeList
×
613
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
614
                if err := mgr.GetClient().List(ctx, &dvs, matchingFields); err != nil {
×
615
                        c.GetLogger().Error(err, "Unable to list DataVolumes", "matchingFields", matchingFields)
×
616
                        return nil
×
617
                }
×
618
                var reqs []reconcile.Request
×
619
                for _, dv := range dvs.Items {
×
620
                        if getDataVolumeOp(ctx, c.GetLogger(), &dv, mgr.GetClient()) != op {
×
621
                                continue
×
622
                        }
623
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}})
×
624
                }
625
                return reqs
×
626
        }
627

628
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
629
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapToDataVolume),
×
630
        )); err != nil {
×
631
                return err
×
632
        }
×
633

634
        return nil
×
635
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc