• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5403

24 Jun 2025 09:49AM UTC coverage: 59.378% (-0.04%) from 59.415%
#5403

Pull #3760

travis-ci

Acedus
testing: test pullMethod: node multi-arch import flake

Signed-off-by: Adi Aloni <aaloni@redhat.com>
Pull Request #3760: VEP48: Introduce DataSource Source DataSource (DataSource Pointers)

28 of 61 new or added lines in 5 files covered. (45.9%)

121 existing lines in 2 files now uncovered.

16969 of 28578 relevant lines covered (59.38%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.36
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "reflect"
24

25
        "github.com/go-logr/logr"
26
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        "k8s.io/apimachinery/pkg/api/meta"
31
        "k8s.io/apimachinery/pkg/runtime"
32
        "k8s.io/apimachinery/pkg/types"
33
        "k8s.io/client-go/tools/record"
34

35
        "sigs.k8s.io/controller-runtime/pkg/client"
36
        "sigs.k8s.io/controller-runtime/pkg/controller"
37
        "sigs.k8s.io/controller-runtime/pkg/event"
38
        "sigs.k8s.io/controller-runtime/pkg/handler"
39
        "sigs.k8s.io/controller-runtime/pkg/manager"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42
        "sigs.k8s.io/controller-runtime/pkg/source"
43

44
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
45
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
46
)
47

48
// DataSourceReconciler members
49
type DataSourceReconciler struct {
50
        client          client.Client
51
        recorder        record.EventRecorder
52
        scheme          *runtime.Scheme
53
        log             logr.Logger
54
        installerLabels map[string]string
55
}
56

57
const (
58
        ready                    = "Ready"
59
        noSource                 = "NoSource"
60
        dataSourceControllerName = "datasource-controller"
61
        maxReferenceDepthReached = "MaxReferenceDepthReached"
62
        selfReference            = "SelfReference"
63
)
64

65
// Reconcile loop for DataSourceReconciler
66
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
67
        dataSource := &cdiv1.DataSource{}
1✔
68
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
69
                if k8serrors.IsNotFound(err) {
2✔
70
                        return reconcile.Result{}, nil
1✔
71
                }
1✔
72
                return reconcile.Result{}, err
×
73
        }
74
        if err := r.update(ctx, dataSource); err != nil {
1✔
75
                return reconcile.Result{}, err
×
76
        }
×
77
        return reconcile.Result{}, nil
1✔
78
}
79

80
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
81
        dataSourceCopy := dataSource.DeepCopy()
1✔
82
        resolved, err := cc.ResolveDataSourceChain(ctx, r.client, dataSource)
1✔
83
        if err != nil {
2✔
84
                log := r.log.WithValues("datasource", dataSource.Name, "namespace", dataSource.Namespace)
1✔
85
                log.Info(err.Error())
1✔
86
                if err := handleDataSourceRefError(dataSource, err); err != nil {
1✔
NEW
87
                        return err
×
NEW
88
                }
×
89
                dataSource.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
90
                resolved = dataSource
1✔
91
        } else if !reflect.DeepEqual(dataSource.Status.Source, resolved.Spec.Source) {
2✔
92
                resolved.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
93
                dataSource.Status.Conditions = nil
1✔
94
        }
1✔
95

96
        switch {
1✔
97
        case resolved.Spec.Source.DataSource != nil:
1✔
98
                // Status condition handling already took place, continue to update
99
        case resolved.Spec.Source.PVC != nil:
1✔
100
                if err := r.handlePvcSource(ctx, resolved.Spec.Source.PVC, dataSource); err != nil {
1✔
101
                        return err
×
102
                }
×
103
        case resolved.Spec.Source.Snapshot != nil:
1✔
104
                if err := r.handleSnapshotSource(ctx, resolved.Spec.Source.Snapshot, dataSource); err != nil {
1✔
105
                        return err
×
106
                }
×
107
        default:
1✔
108
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
109
        }
110

111
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
112
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
113
                        return err
×
UNCOV
114
                }
×
115
        }
116
        return nil
1✔
117
}
118

119
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
120
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
121
        isReady := false
1✔
122

1✔
123
        pvc := &corev1.PersistentVolumeClaim{}
1✔
124
        pvcErr := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc)
1✔
125
        if pvcErr != nil && !k8serrors.IsNotFound(pvcErr) {
1✔
UNCOV
126
                return pvcErr
×
UNCOV
127
        }
×
128

129
        dv := &cdiv1.DataVolume{}
1✔
130
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
131
                if !k8serrors.IsNotFound(err) {
1✔
UNCOV
132
                        return err
×
UNCOV
133
                }
×
134
                if pvcErr != nil {
2✔
135
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
136
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
137
                } else {
2✔
138
                        isReady = true
1✔
139
                }
1✔
140
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
141
                isReady = true
1✔
142
        } else {
2✔
143
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
144
        }
1✔
145

146
        if isReady {
2✔
147
                cc.CopyAllowedLabels(dv.GetLabels(), dataSource, true)
1✔
148
                cc.CopyAllowedLabels(pvc.GetLabels(), dataSource, true)
1✔
149
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
150
        }
1✔
151

152
        return nil
1✔
153
}
154

155
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
156
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
157
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
158
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
159
                if !k8serrors.IsNotFound(err) {
1✔
UNCOV
160
                        return err
×
UNCOV
161
                }
×
162
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
163
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
164
        } else if cc.IsSnapshotReady(snapshot) {
2✔
165
                cc.CopyAllowedLabels(snapshot.GetLabels(), dataSource, true)
1✔
166
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
167
        } else {
2✔
168
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
169
        }
1✔
170

171
        return nil
1✔
172
}
173

174
func handleDataSourceRefError(dataSource *cdiv1.DataSource, err error) error {
1✔
175
        reason := ""
1✔
176
        switch {
1✔
177
        case errors.Is(err, cc.ErrDataSourceMaxDepthReached):
1✔
178
                reason = maxReferenceDepthReached
1✔
179
        case errors.Is(err, cc.ErrDataSourceSelfReference):
1✔
180
                reason = selfReference
1✔
181
        case k8serrors.IsNotFound(err):
1✔
182
                reason = cc.NotFound
1✔
NEW
183
        default:
×
NEW
184
                return err
×
185
        }
186
        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, err.Error(), reason)
1✔
187
        return nil
1✔
188
}
189

190
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
191
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
2✔
192
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
193
        } else {
2✔
194
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
195
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
196
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
197
        }
1✔
198
}
199

200
// FindDataSourceConditionByType finds DataSourceCondition by condition type
201
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
202
        for i, condition := range ds.Status.Conditions {
2✔
203
                if condition.Type == conditionType {
2✔
204
                        return &ds.Status.Conditions[i]
1✔
205
                }
1✔
206
        }
207
        return nil
1✔
208
}
209

210
// NewDataSourceController creates a new instance of the DataSource controller
UNCOV
211
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
UNCOV
212
        reconciler := &DataSourceReconciler{
×
UNCOV
213
                client:          mgr.GetClient(),
×
214
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
215
                scheme:          mgr.GetScheme(),
×
216
                log:             log.WithName(dataSourceControllerName),
×
217
                installerLabels: installerLabels,
×
218
        }
×
219
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
220
                MaxConcurrentReconciles: 3,
×
221
                Reconciler:              reconciler,
×
222
        })
×
223
        if err != nil {
×
224
                return nil, err
×
225
        }
×
226
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
227
                return nil, err
×
228
        }
×
229
        log.Info("Initialized DataSource controller")
×
230
        return DataSourceController, nil
×
231
}
232

233
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
UNCOV
234
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataSource]{},
×
UNCOV
235
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
236
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
237
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
238
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
239
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew)
×
240
                        },
×
241
                },
242
        )); err != nil {
×
243
                return err
×
UNCOV
244
        }
×
245

246
        const dataSourcePvcField = "spec.source.pvc"
×
247
        const dataSourceSnapshotField = "spec.source.snapshot"
×
UNCOV
248

×
249
        getKey := func(namespace, name string) string {
×
250
                return namespace + "/" + name
×
251
        }
×
252

253
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
×
254
                var dataSources cdiv1.DataSourceList
×
UNCOV
255
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
256
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
257
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
258
                        return reqs
×
259
                }
×
260
                for _, ds := range dataSources.Items {
×
261
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
262
                }
×
263
                return reqs
×
264
        }
265

266
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
UNCOV
267
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
×
UNCOV
268
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
269
                        return []string{getKey(ns, pvc.Name)}
×
270
                }
×
271
                return nil
×
272
        }); err != nil {
×
273
                return err
×
274
        }
×
275
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
276
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
277
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
278
                        return []string{getKey(ns, snapshot.Name)}
×
279
                }
×
280
                return nil
×
281
        }); err != nil {
×
282
                return err
×
283
        }
×
284

285
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
286
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
UNCOV
287
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
288
        }
×
289

290
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
291
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
UNCOV
292
                        return mapToDataSource(ctx, obj)
×
293
                }),
×
294
                predicate.TypedFuncs[*cdiv1.DataVolume]{
295
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
296
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
297
                        // Only DV status phase update is interesting to reconcile
298
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
×
299
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
UNCOV
300
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
301
                        },
×
302
                },
303
        )); err != nil {
×
304
                return err
×
UNCOV
305
        }
×
306

307
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
308
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
UNCOV
309
                        return mapToDataSource(ctx, obj)
×
310
                }),
×
311
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
312
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
313
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
UNCOV
314
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
×
315
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
316
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
317
                        },
×
318
                },
319
        )); err != nil {
×
320
                return err
×
UNCOV
321
        }
×
322

323
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
324
                if meta.IsNoMatchError(err) {
×
UNCOV
325
                        // Back out if there's no point to attempt watch
×
326
                        return nil
×
327
                }
×
328
                if !cc.IsErrCacheNotStarted(err) {
×
329
                        return err
×
330
                }
×
331
        }
332
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
333
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
UNCOV
334
                        return mapToDataSource(ctx, obj)
×
335
                }),
×
336
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
337
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
338
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
UNCOV
339
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
340
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
×
341
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
342
                        },
×
343
                },
344
        )); err != nil {
×
345
                return err
×
UNCOV
346
        }
×
347

348
        return nil
×
349
}
350

351
func sameSourceSpec(objOld, objNew client.Object) bool {
×
UNCOV
352
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
UNCOV
353
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
354

×
355
        if !okOld || !okNew {
×
356
                return false
×
357
        }
×
358
        if dsOld.Spec.Source.PVC != nil {
×
359
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
360
        }
×
361
        if dsOld.Spec.Source.Snapshot != nil {
×
362
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
363
        }
×
364

365
        return false
×
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc