• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5598

18 Sep 2025 02:27PM UTC coverage: 59.217% (+0.01%) from 59.206%
#5598

push

travis-ci

web-flow
Oracle Cloud: Set default clone strategy to csi-clone (#3882)

Signed-off-by: Alvaro Romero <alromero@redhat.com>

17177 of 29007 relevant lines covered (59.22%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.33
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "reflect"
24

25
        "github.com/go-logr/logr"
26
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        "k8s.io/apimachinery/pkg/api/meta"
31
        "k8s.io/apimachinery/pkg/runtime"
32
        "k8s.io/apimachinery/pkg/types"
33
        "k8s.io/client-go/tools/record"
34

35
        "sigs.k8s.io/controller-runtime/pkg/client"
36
        "sigs.k8s.io/controller-runtime/pkg/controller"
37
        "sigs.k8s.io/controller-runtime/pkg/event"
38
        "sigs.k8s.io/controller-runtime/pkg/handler"
39
        "sigs.k8s.io/controller-runtime/pkg/manager"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42
        "sigs.k8s.io/controller-runtime/pkg/source"
43

44
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
45
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
46
)
47

48
// DataSourceReconciler members
49
type DataSourceReconciler struct {
50
        client          client.Client
51
        recorder        record.EventRecorder
52
        scheme          *runtime.Scheme
53
        log             logr.Logger
54
        installerLabels map[string]string
55
}
56

57
const (
58
        ready                    = "Ready"
59
        noSource                 = "NoSource"
60
        dataSourceControllerName = "datasource-controller"
61
        maxReferenceDepthReached = "MaxReferenceDepthReached"
62
        selfReference            = "SelfReference"
63
        crossNamespaceReference  = "CrossNamespaceReference"
64
)
65

66
// Reconcile loop for DataSourceReconciler
67
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
68
        dataSource := &cdiv1.DataSource{}
1✔
69
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
70
                if k8serrors.IsNotFound(err) {
2✔
71
                        return reconcile.Result{}, nil
1✔
72
                }
1✔
73
                return reconcile.Result{}, err
×
74
        }
75
        if err := r.update(ctx, dataSource); err != nil {
1✔
76
                return reconcile.Result{}, err
×
77
        }
×
78
        return reconcile.Result{}, nil
1✔
79
}
80

81
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
82
        dataSourceCopy := dataSource.DeepCopy()
1✔
83
        resolved, err := cc.ResolveDataSourceChain(ctx, r.client, dataSource)
1✔
84
        if err != nil {
2✔
85
                log := r.log.WithValues("datasource", dataSource.Name, "namespace", dataSource.Namespace)
1✔
86
                log.Info(err.Error())
1✔
87
                if err := handleDataSourceRefError(dataSource, err); err != nil {
1✔
88
                        return err
×
89
                }
×
90
                resolved = dataSource
1✔
91
        } else {
1✔
92
                resolved.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
93
                dataSource.Status.Conditions = nil
1✔
94
        }
1✔
95

96
        switch {
1✔
97
        case resolved.Spec.Source.DataSource != nil:
1✔
98
                // Status condition handling already took place, continue to update
99
        case resolved.Spec.Source.PVC != nil:
1✔
100
                if err := r.handlePvcSource(ctx, resolved.Spec.Source.PVC, dataSource); err != nil {
1✔
101
                        return err
×
102
                }
×
103
        case resolved.Spec.Source.Snapshot != nil:
1✔
104
                if err := r.handleSnapshotSource(ctx, resolved.Spec.Source.Snapshot, dataSource); err != nil {
1✔
105
                        return err
×
106
                }
×
107
        default:
1✔
108
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
109
        }
110

111
        if dsCond := FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady); dsCond != nil && dsCond.Status == corev1.ConditionFalse {
2✔
112
                dataSource.Status.Source = cdiv1.DataSourceSource{}
1✔
113
        }
1✔
114

115
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
116
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
117
                        return err
×
118
                }
×
119
        }
120
        return nil
1✔
121
}
122

123
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
124
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
125
        isReady := false
1✔
126

1✔
127
        pvc := &corev1.PersistentVolumeClaim{}
1✔
128
        pvcErr := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc)
1✔
129
        if pvcErr != nil && !k8serrors.IsNotFound(pvcErr) {
1✔
130
                return pvcErr
×
131
        }
×
132

133
        dv := &cdiv1.DataVolume{}
1✔
134
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
135
                if !k8serrors.IsNotFound(err) {
1✔
136
                        return err
×
137
                }
×
138
                if pvcErr != nil {
2✔
139
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
140
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
141
                } else {
2✔
142
                        isReady = true
1✔
143
                }
1✔
144
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
145
                isReady = true
1✔
146
        } else {
2✔
147
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
148
        }
1✔
149

150
        if isReady {
2✔
151
                cc.CopyAllowedLabels(dv.GetLabels(), dataSource, true)
1✔
152
                cc.CopyAllowedLabels(pvc.GetLabels(), dataSource, true)
1✔
153
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
154
        }
1✔
155

156
        return nil
1✔
157
}
158

159
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
160
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
161
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
162
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
163
                if !k8serrors.IsNotFound(err) {
1✔
164
                        return err
×
165
                }
×
166
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
167
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
168
        } else if cc.IsSnapshotReady(snapshot) {
2✔
169
                cc.CopyAllowedLabels(snapshot.GetLabels(), dataSource, true)
1✔
170
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
171
        } else {
2✔
172
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
173
        }
1✔
174

175
        return nil
1✔
176
}
177

178
func handleDataSourceRefError(dataSource *cdiv1.DataSource, err error) error {
1✔
179
        reason := ""
1✔
180
        switch {
1✔
181
        case errors.Is(err, cc.ErrDataSourceMaxDepthReached):
1✔
182
                reason = maxReferenceDepthReached
1✔
183
        case errors.Is(err, cc.ErrDataSourceSelfReference):
1✔
184
                reason = selfReference
1✔
185
        case errors.Is(err, cc.ErrDataSourceCrossNamespace):
1✔
186
                reason = crossNamespaceReference
1✔
187
        case k8serrors.IsNotFound(err):
1✔
188
                reason = cc.NotFound
1✔
189
        default:
×
190
                return err
×
191
        }
192
        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, err.Error(), reason)
1✔
193
        return nil
1✔
194
}
195

196
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
197
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
1✔
198
                updateConditionState(&condition.ConditionState, status, message, reason)
×
199
        } else {
1✔
200
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
201
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
202
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
203
        }
1✔
204
}
205

206
// FindDataSourceConditionByType finds DataSourceCondition by condition type
207
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
208
        for i, condition := range ds.Status.Conditions {
2✔
209
                if condition.Type == conditionType {
2✔
210
                        return &ds.Status.Conditions[i]
1✔
211
                }
1✔
212
        }
213
        return nil
1✔
214
}
215

216
// NewDataSourceController creates a new instance of the DataSource controller
217
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
218
        reconciler := &DataSourceReconciler{
×
219
                client:          mgr.GetClient(),
×
220
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
221
                scheme:          mgr.GetScheme(),
×
222
                log:             log.WithName(dataSourceControllerName),
×
223
                installerLabels: installerLabels,
×
224
        }
×
225
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
226
                MaxConcurrentReconciles: 3,
×
227
                Reconciler:              reconciler,
×
228
        })
×
229
        if err != nil {
×
230
                return nil, err
×
231
        }
×
232
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
233
                return nil, err
×
234
        }
×
235
        log.Info("Initialized DataSource controller")
×
236
        return DataSourceController, nil
×
237
}
238

239
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
240
        const dataSourcePvcField = "spec.source.pvc"
×
241
        const dataSourceSnapshotField = "spec.source.snapshot"
×
242
        const dataSourceDataSourceField = "spec.source.dataSource"
×
243

×
244
        getKey := func(namespace, name string) string {
×
245
                return namespace + "/" + name
×
246
        }
×
247

248
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
×
249
                var dataSources cdiv1.DataSourceList
×
250
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
251
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
252
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
253
                        return reqs
×
254
                }
×
255
                for _, ds := range dataSources.Items {
×
256
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
257
                }
×
258
                return reqs
×
259
        }
260

261
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
262
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](func(ctx context.Context, obj *cdiv1.DataSource) []reconcile.Request {
×
263
                        reqs := []reconcile.Request{
×
264
                                {
×
265
                                        NamespacedName: types.NamespacedName{
×
266
                                                Name:      obj.Name,
×
267
                                                Namespace: obj.Namespace,
×
268
                                        },
×
269
                                },
×
270
                        }
×
271
                        return appendMatchingDataSourceRequests(ctx, dataSourceDataSourceField, obj, reqs)
×
272
                }),
×
273
                predicate.TypedFuncs[*cdiv1.DataSource]{
274
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
275
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
276
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
277
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew) ||
×
278
                                        !sameConditions(e.ObjectOld, e.ObjectNew)
×
279
                        },
×
280
                },
281
        )); err != nil {
×
282
                return err
×
283
        }
×
284

285
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
286
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
×
287
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
288
                        return []string{getKey(ns, pvc.Name)}
×
289
                }
×
290
                return nil
×
291
        }); err != nil {
×
292
                return err
×
293
        }
×
294

295
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
296
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
297
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
298
                        return []string{getKey(ns, snapshot.Name)}
×
299
                }
×
300
                return nil
×
301
        }); err != nil {
×
302
                return err
×
303
        }
×
304

305
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceDataSourceField, func(obj client.Object) []string {
×
306
                ds := obj.(*cdiv1.DataSource)
×
307
                if sourceDS := ds.Spec.Source.DataSource; sourceDS != nil {
×
308
                        ns := cc.GetNamespace(sourceDS.Namespace, ds.GetNamespace())
×
309
                        return []string{getKey(ns, sourceDS.Name)}
×
310
                }
×
311
                return nil
×
312
        }); err != nil {
×
313
                return err
×
314
        }
×
315

316
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
317
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
318
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
319
        }
×
320

321
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
322
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
323
                        return mapToDataSource(ctx, obj)
×
324
                }),
×
325
                predicate.TypedFuncs[*cdiv1.DataVolume]{
326
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
327
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
328
                        // Only DV status phase update is interesting to reconcile
329
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
×
330
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
331
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
332
                        },
×
333
                },
334
        )); err != nil {
×
335
                return err
×
336
        }
×
337

338
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
339
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
340
                        return mapToDataSource(ctx, obj)
×
341
                }),
×
342
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
343
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
344
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
345
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
×
346
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
347
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
348
                        },
×
349
                },
350
        )); err != nil {
×
351
                return err
×
352
        }
×
353

354
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
355
                if meta.IsNoMatchError(err) {
×
356
                        // Back out if there's no point to attempt watch
×
357
                        return nil
×
358
                }
×
359
                if !cc.IsErrCacheNotStarted(err) {
×
360
                        return err
×
361
                }
×
362
        }
363
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
364
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
365
                        return mapToDataSource(ctx, obj)
×
366
                }),
×
367
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
368
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
369
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
370
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
371
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
×
372
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
373
                        },
×
374
                },
375
        )); err != nil {
×
376
                return err
×
377
        }
×
378

379
        return nil
×
380
}
381

382
func sameSourceSpec(objOld, objNew client.Object) bool {
×
383
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
384
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
385

×
386
        if !okOld || !okNew {
×
387
                return false
×
388
        }
×
389
        if dsOld.Spec.Source.PVC != nil {
×
390
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
391
        }
×
392
        if dsOld.Spec.Source.Snapshot != nil {
×
393
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
394
        }
×
395
        if dsOld.Spec.Source.DataSource != nil {
×
396
                return reflect.DeepEqual(dsOld.Spec.Source.DataSource, dsNew.Spec.Source.DataSource)
×
397
        }
×
398

399
        return false
×
400
}
401

402
func sameConditions(objOld, objNew client.Object) bool {
×
403
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
404
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
405

×
406
        if !okOld || !okNew {
×
407
                return false
×
408
        }
×
409

410
        oldConditions := dsOld.Status.Conditions
×
411
        newConditions := dsNew.Status.Conditions
×
412

×
413
        if len(oldConditions) != len(newConditions) {
×
414
                return false
×
415
        }
×
416

417
        condMap := make(map[cdiv1.DataSourceConditionType]cdiv1.DataSourceCondition, len(oldConditions))
×
418
        for _, c := range oldConditions {
×
419
                condMap[c.Type] = c
×
420
        }
×
421

422
        for _, c := range newConditions {
×
423
                if oldC, ok := condMap[c.Type]; !ok ||
×
424
                        oldC.Reason != c.Reason ||
×
425
                        oldC.Message != c.Message ||
×
426
                        oldC.Status != c.Status {
×
427
                        return false
×
428
                }
×
429
        }
430

431
        return true
×
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc