• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5422

30 Jun 2025 04:38PM UTC coverage: 59.374% (-0.05%) from 59.422%
#5422

push

travis-ci

web-flow
datasources: introduce datasource pointers (#3760)

DataSource Pointers are a new DataSource Source in addition to the
existing Snapshot and PVC sources.

Pointers (Source.DataSource) allow one DataSource to point to the source
of another DataSource with some limitations:
1. The DataSource may not self-reference, this is validated at the API
   level.
2. The DataSource pointer reference chain may not exceed a maxium depth
   of 1.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

30 of 61 new or added lines in 4 files covered. (49.18%)

5 existing lines in 2 files now uncovered.

16968 of 28578 relevant lines covered (59.37%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.44
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "reflect"
24

25
        "github.com/go-logr/logr"
26
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        "k8s.io/apimachinery/pkg/api/meta"
31
        "k8s.io/apimachinery/pkg/runtime"
32
        "k8s.io/apimachinery/pkg/types"
33
        "k8s.io/client-go/tools/record"
34

35
        "sigs.k8s.io/controller-runtime/pkg/client"
36
        "sigs.k8s.io/controller-runtime/pkg/controller"
37
        "sigs.k8s.io/controller-runtime/pkg/event"
38
        "sigs.k8s.io/controller-runtime/pkg/handler"
39
        "sigs.k8s.io/controller-runtime/pkg/manager"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42
        "sigs.k8s.io/controller-runtime/pkg/source"
43

44
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
45
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
46
)
47

48
// DataSourceReconciler members
49
type DataSourceReconciler struct {
50
        client          client.Client
51
        recorder        record.EventRecorder
52
        scheme          *runtime.Scheme
53
        log             logr.Logger
54
        installerLabels map[string]string
55
}
56

57
const (
58
        ready                    = "Ready"
59
        noSource                 = "NoSource"
60
        dataSourceControllerName = "datasource-controller"
61
        maxReferenceDepthReached = "MaxReferenceDepthReached"
62
        selfReference            = "SelfReference"
63
)
64

65
// Reconcile loop for DataSourceReconciler
66
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
67
        dataSource := &cdiv1.DataSource{}
1✔
68
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
69
                if k8serrors.IsNotFound(err) {
2✔
70
                        return reconcile.Result{}, nil
1✔
71
                }
1✔
72
                return reconcile.Result{}, err
×
73
        }
74
        if err := r.update(ctx, dataSource); err != nil {
1✔
75
                return reconcile.Result{}, err
×
76
        }
×
77
        return reconcile.Result{}, nil
1✔
78
}
79

80
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
81
        dataSourceCopy := dataSource.DeepCopy()
1✔
82
        resolved, err := cc.ResolveDataSourceChain(ctx, r.client, dataSource)
1✔
83
        if err != nil {
2✔
84
                log := r.log.WithValues("datasource", dataSource.Name, "namespace", dataSource.Namespace)
1✔
85
                log.Info(err.Error())
1✔
86
                if err := handleDataSourceRefError(dataSource, err); err != nil {
1✔
NEW
87
                        return err
×
NEW
88
                }
×
89
                resolved = dataSource
1✔
90
        } else {
1✔
91
                resolved.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
92
                dataSource.Status.Conditions = nil
1✔
93
        }
1✔
94

95
        switch {
1✔
96
        case resolved.Spec.Source.DataSource != nil:
1✔
97
                // Status condition handling already took place, continue to update
98
        case resolved.Spec.Source.PVC != nil:
1✔
99
                if err := r.handlePvcSource(ctx, resolved.Spec.Source.PVC, dataSource); err != nil {
1✔
100
                        return err
×
101
                }
×
102
        case resolved.Spec.Source.Snapshot != nil:
1✔
103
                if err := r.handleSnapshotSource(ctx, resolved.Spec.Source.Snapshot, dataSource); err != nil {
1✔
104
                        return err
×
105
                }
×
106
        default:
1✔
107
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
108
        }
109

110
        if dsCond := FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady); dsCond != nil && dsCond.Status == corev1.ConditionFalse {
2✔
111
                dataSource.Status.Source = cdiv1.DataSourceSource{}
1✔
112
        }
1✔
113

114
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
115
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
116
                        return err
×
117
                }
×
118
        }
119
        return nil
1✔
120
}
121

122
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
123
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
124
        isReady := false
1✔
125

1✔
126
        pvc := &corev1.PersistentVolumeClaim{}
1✔
127
        pvcErr := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc)
1✔
128
        if pvcErr != nil && !k8serrors.IsNotFound(pvcErr) {
1✔
129
                return pvcErr
×
130
        }
×
131

132
        dv := &cdiv1.DataVolume{}
1✔
133
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
134
                if !k8serrors.IsNotFound(err) {
1✔
135
                        return err
×
136
                }
×
137
                if pvcErr != nil {
2✔
138
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
139
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
140
                } else {
2✔
141
                        isReady = true
1✔
142
                }
1✔
143
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
144
                isReady = true
1✔
145
        } else {
2✔
146
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
147
        }
1✔
148

149
        if isReady {
2✔
150
                cc.CopyAllowedLabels(dv.GetLabels(), dataSource, true)
1✔
151
                cc.CopyAllowedLabels(pvc.GetLabels(), dataSource, true)
1✔
152
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
153
        }
1✔
154

155
        return nil
1✔
156
}
157

158
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
159
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
160
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
161
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
162
                if !k8serrors.IsNotFound(err) {
1✔
163
                        return err
×
164
                }
×
165
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
166
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
167
        } else if cc.IsSnapshotReady(snapshot) {
2✔
168
                cc.CopyAllowedLabels(snapshot.GetLabels(), dataSource, true)
1✔
169
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
170
        } else {
2✔
171
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
172
        }
1✔
173

174
        return nil
1✔
175
}
176

177
func handleDataSourceRefError(dataSource *cdiv1.DataSource, err error) error {
1✔
178
        reason := ""
1✔
179
        switch {
1✔
180
        case errors.Is(err, cc.ErrDataSourceMaxDepthReached):
1✔
181
                reason = maxReferenceDepthReached
1✔
182
        case errors.Is(err, cc.ErrDataSourceSelfReference):
1✔
183
                reason = selfReference
1✔
184
        case k8serrors.IsNotFound(err):
1✔
185
                reason = cc.NotFound
1✔
NEW
186
        default:
×
NEW
187
                return err
×
188
        }
189
        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, err.Error(), reason)
1✔
190
        return nil
1✔
191
}
192

193
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
194
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
1✔
UNCOV
195
                updateConditionState(&condition.ConditionState, status, message, reason)
×
196
        } else {
1✔
197
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
198
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
199
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
200
        }
1✔
201
}
202

203
// FindDataSourceConditionByType finds DataSourceCondition by condition type
204
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
205
        for i, condition := range ds.Status.Conditions {
2✔
206
                if condition.Type == conditionType {
2✔
207
                        return &ds.Status.Conditions[i]
1✔
208
                }
1✔
209
        }
210
        return nil
1✔
211
}
212

213
// NewDataSourceController creates a new instance of the DataSource controller
214
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
215
        reconciler := &DataSourceReconciler{
×
216
                client:          mgr.GetClient(),
×
217
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
218
                scheme:          mgr.GetScheme(),
×
219
                log:             log.WithName(dataSourceControllerName),
×
220
                installerLabels: installerLabels,
×
221
        }
×
222
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
223
                MaxConcurrentReconciles: 3,
×
224
                Reconciler:              reconciler,
×
225
        })
×
226
        if err != nil {
×
227
                return nil, err
×
228
        }
×
229
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
230
                return nil, err
×
231
        }
×
232
        log.Info("Initialized DataSource controller")
×
233
        return DataSourceController, nil
×
234
}
235

236
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
237
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataSource]{},
×
238
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
239
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
240
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
241
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
242
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew)
×
243
                        },
×
244
                },
245
        )); err != nil {
×
246
                return err
×
247
        }
×
248

249
        const dataSourcePvcField = "spec.source.pvc"
×
250
        const dataSourceSnapshotField = "spec.source.snapshot"
×
251

×
252
        getKey := func(namespace, name string) string {
×
253
                return namespace + "/" + name
×
254
        }
×
255

256
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
×
257
                var dataSources cdiv1.DataSourceList
×
258
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
259
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
260
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
261
                        return reqs
×
262
                }
×
263
                for _, ds := range dataSources.Items {
×
264
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
265
                }
×
266
                return reqs
×
267
        }
268

269
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
270
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
×
271
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
272
                        return []string{getKey(ns, pvc.Name)}
×
273
                }
×
274
                return nil
×
275
        }); err != nil {
×
276
                return err
×
277
        }
×
278
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
279
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
280
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
281
                        return []string{getKey(ns, snapshot.Name)}
×
282
                }
×
283
                return nil
×
284
        }); err != nil {
×
285
                return err
×
286
        }
×
287

288
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
289
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
290
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
291
        }
×
292

293
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
294
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
295
                        return mapToDataSource(ctx, obj)
×
296
                }),
×
297
                predicate.TypedFuncs[*cdiv1.DataVolume]{
298
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
299
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
300
                        // Only DV status phase update is interesting to reconcile
301
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
×
302
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
303
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
304
                        },
×
305
                },
306
        )); err != nil {
×
307
                return err
×
308
        }
×
309

310
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
311
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
312
                        return mapToDataSource(ctx, obj)
×
313
                }),
×
314
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
315
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
316
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
317
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
×
318
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
319
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
320
                        },
×
321
                },
322
        )); err != nil {
×
323
                return err
×
324
        }
×
325

326
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
327
                if meta.IsNoMatchError(err) {
×
328
                        // Back out if there's no point to attempt watch
×
329
                        return nil
×
330
                }
×
331
                if !cc.IsErrCacheNotStarted(err) {
×
332
                        return err
×
333
                }
×
334
        }
335
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
336
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
337
                        return mapToDataSource(ctx, obj)
×
338
                }),
×
339
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
340
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
341
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
342
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
343
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
×
344
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
345
                        },
×
346
                },
347
        )); err != nil {
×
348
                return err
×
349
        }
×
350

351
        return nil
×
352
}
353

354
func sameSourceSpec(objOld, objNew client.Object) bool {
×
355
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
356
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
357

×
358
        if !okOld || !okNew {
×
359
                return false
×
360
        }
×
361
        if dsOld.Spec.Source.PVC != nil {
×
362
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
363
        }
×
364
        if dsOld.Spec.Source.Snapshot != nil {
×
365
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
366
        }
×
367

368
        return false
×
369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc