• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5556

28 Aug 2025 08:03PM UTC coverage: 59.14% (-0.2%) from 59.319%
#5556

Pull #3876

travis-ci

Dsanatar
add new health and ready endpoints for probe instead of reusing /metrics

Signed-off-by: dsanatar <dsanatar@redhat.com>
Pull Request #3876: Add Readiness/Liveness Probes to Operator Pod

0 of 42 new or added lines in 2 files covered. (0.0%)

100 existing lines in 3 files now uncovered.

17173 of 29038 relevant lines covered (59.14%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.33
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "reflect"
24

25
        "github.com/go-logr/logr"
26
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        "k8s.io/apimachinery/pkg/api/meta"
31
        "k8s.io/apimachinery/pkg/runtime"
32
        "k8s.io/apimachinery/pkg/types"
33
        "k8s.io/client-go/tools/record"
34

35
        "sigs.k8s.io/controller-runtime/pkg/client"
36
        "sigs.k8s.io/controller-runtime/pkg/controller"
37
        "sigs.k8s.io/controller-runtime/pkg/event"
38
        "sigs.k8s.io/controller-runtime/pkg/handler"
39
        "sigs.k8s.io/controller-runtime/pkg/manager"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42
        "sigs.k8s.io/controller-runtime/pkg/source"
43

44
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
45
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
46
)
47

48
// DataSourceReconciler members
49
type DataSourceReconciler struct {
50
        client          client.Client
51
        recorder        record.EventRecorder
52
        scheme          *runtime.Scheme
53
        log             logr.Logger
54
        installerLabels map[string]string
55
}
56

57
const (
58
        ready                    = "Ready"
59
        noSource                 = "NoSource"
60
        dataSourceControllerName = "datasource-controller"
61
        maxReferenceDepthReached = "MaxReferenceDepthReached"
62
        selfReference            = "SelfReference"
63
        crossNamespaceReference  = "CrossNamespaceReference"
64
)
65

66
// Reconcile loop for DataSourceReconciler
67
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
68
        dataSource := &cdiv1.DataSource{}
1✔
69
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
70
                if k8serrors.IsNotFound(err) {
2✔
71
                        return reconcile.Result{}, nil
1✔
72
                }
1✔
73
                return reconcile.Result{}, err
×
74
        }
75
        if err := r.update(ctx, dataSource); err != nil {
1✔
76
                return reconcile.Result{}, err
×
77
        }
×
78
        return reconcile.Result{}, nil
1✔
79
}
80

81
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
82
        dataSourceCopy := dataSource.DeepCopy()
1✔
83
        resolved, err := cc.ResolveDataSourceChain(ctx, r.client, dataSource)
1✔
84
        if err != nil {
2✔
85
                log := r.log.WithValues("datasource", dataSource.Name, "namespace", dataSource.Namespace)
1✔
86
                log.Info(err.Error())
1✔
87
                if err := handleDataSourceRefError(dataSource, err); err != nil {
1✔
88
                        return err
×
89
                }
×
90
                resolved = dataSource
1✔
91
        } else {
1✔
92
                resolved.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
93
                dataSource.Status.Conditions = nil
1✔
94
        }
1✔
95

96
        switch {
1✔
97
        case resolved.Spec.Source.DataSource != nil:
1✔
98
                // Status condition handling already took place, continue to update
99
        case resolved.Spec.Source.PVC != nil:
1✔
100
                if err := r.handlePvcSource(ctx, resolved.Spec.Source.PVC, dataSource); err != nil {
1✔
101
                        return err
×
102
                }
×
103
        case resolved.Spec.Source.Snapshot != nil:
1✔
104
                if err := r.handleSnapshotSource(ctx, resolved.Spec.Source.Snapshot, dataSource); err != nil {
1✔
105
                        return err
×
106
                }
×
107
        default:
1✔
108
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
109
        }
110

111
        if dsCond := FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady); dsCond != nil && dsCond.Status == corev1.ConditionFalse {
2✔
112
                dataSource.Status.Source = cdiv1.DataSourceSource{}
1✔
113
        }
1✔
114

115
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
116
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
117
                        return err
×
118
                }
×
119
        }
120
        return nil
1✔
121
}
122

123
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
124
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
125
        isReady := false
1✔
126

1✔
127
        pvc := &corev1.PersistentVolumeClaim{}
1✔
128
        pvcErr := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc)
1✔
129
        if pvcErr != nil && !k8serrors.IsNotFound(pvcErr) {
1✔
130
                return pvcErr
×
131
        }
×
132

133
        dv := &cdiv1.DataVolume{}
1✔
134
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
135
                if !k8serrors.IsNotFound(err) {
1✔
136
                        return err
×
137
                }
×
138
                if pvcErr != nil {
2✔
139
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
140
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
141
                } else {
2✔
142
                        isReady = true
1✔
143
                }
1✔
144
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
145
                isReady = true
1✔
146
        } else {
2✔
147
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
148
        }
1✔
149

150
        if isReady {
2✔
151
                cc.CopyAllowedLabels(dv.GetLabels(), dataSource, true)
1✔
152
                cc.CopyAllowedLabels(pvc.GetLabels(), dataSource, true)
1✔
153
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
154
        }
1✔
155

156
        return nil
1✔
157
}
158

159
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
160
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
161
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
162
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
163
                if !k8serrors.IsNotFound(err) {
1✔
164
                        return err
×
165
                }
×
166
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
167
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
168
        } else if cc.IsSnapshotReady(snapshot) {
2✔
169
                cc.CopyAllowedLabels(snapshot.GetLabels(), dataSource, true)
1✔
170
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
171
        } else {
2✔
172
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
173
        }
1✔
174

175
        return nil
1✔
176
}
177

178
func handleDataSourceRefError(dataSource *cdiv1.DataSource, err error) error {
1✔
179
        reason := ""
1✔
180
        switch {
1✔
181
        case errors.Is(err, cc.ErrDataSourceMaxDepthReached):
1✔
182
                reason = maxReferenceDepthReached
1✔
183
        case errors.Is(err, cc.ErrDataSourceSelfReference):
1✔
184
                reason = selfReference
1✔
185
        case errors.Is(err, cc.ErrDataSourceCrossNamespace):
1✔
186
                reason = crossNamespaceReference
1✔
187
        case k8serrors.IsNotFound(err):
1✔
188
                reason = cc.NotFound
1✔
189
        default:
×
190
                return err
×
191
        }
192
        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, err.Error(), reason)
1✔
193
        return nil
1✔
194
}
195

196
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
197
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
1✔
198
                updateConditionState(&condition.ConditionState, status, message, reason)
×
199
        } else {
1✔
200
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
201
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
202
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
203
        }
1✔
204
}
205

206
// FindDataSourceConditionByType finds DataSourceCondition by condition type
207
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
208
        for i, condition := range ds.Status.Conditions {
2✔
209
                if condition.Type == conditionType {
2✔
210
                        return &ds.Status.Conditions[i]
1✔
211
                }
1✔
212
        }
213
        return nil
1✔
214
}
215

216
// NewDataSourceController creates a new instance of the DataSource controller
217
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
218
        reconciler := &DataSourceReconciler{
×
219
                client:          mgr.GetClient(),
×
220
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
221
                scheme:          mgr.GetScheme(),
×
222
                log:             log.WithName(dataSourceControllerName),
×
223
                installerLabels: installerLabels,
×
224
        }
×
225
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
226
                MaxConcurrentReconciles: 3,
×
227
                Reconciler:              reconciler,
×
228
        })
×
229
        if err != nil {
×
230
                return nil, err
×
231
        }
×
232
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
233
                return nil, err
×
234
        }
×
235
        log.Info("Initialized DataSource controller")
×
236
        return DataSourceController, nil
×
237
}
238

239
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
240
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataSource]{},
×
241
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
242
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
243
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
244
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
245
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew)
×
246
                        },
×
247
                },
248
        )); err != nil {
×
249
                return err
×
250
        }
×
UNCOV
251

×
252
        const dataSourcePvcField = "spec.source.pvc"
×
253
        const dataSourceSnapshotField = "spec.source.snapshot"
×
254

×
255
        getKey := func(namespace, name string) string {
×
256
                return namespace + "/" + name
×
257
        }
×
UNCOV
258

×
259
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
260
                var dataSources cdiv1.DataSourceList
261
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
262
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
263
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
264
                        return reqs
×
265
                }
×
266
                for _, ds := range dataSources.Items {
×
267
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
268
                }
×
269
                return reqs
×
UNCOV
270
        }
×
UNCOV
271

×
272
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
273
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
274
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
275
                        return []string{getKey(ns, pvc.Name)}
×
276
                }
×
277
                return nil
×
278
        }); err != nil {
×
279
                return err
×
280
        }
281
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
282
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
283
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
284
                        return []string{getKey(ns, snapshot.Name)}
285
                }
×
286
                return nil
×
287
        }); err != nil {
×
288
                return err
×
289
        }
×
UNCOV
290

×
291
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
292
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
293
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
294
        }
UNCOV
295

×
296
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
297
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
298
                        return mapToDataSource(ctx, obj)
×
299
                }),
×
UNCOV
300
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
301
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
302
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
UNCOV
303
                        // Only DV status phase update is interesting to reconcile
×
304
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
305
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
306
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
307
                        },
×
UNCOV
308
                },
×
309
        )); err != nil {
×
310
                return err
×
311
        }
×
UNCOV
312

×
313
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
314
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
315
                        return mapToDataSource(ctx, obj)
316
                }),
×
UNCOV
317
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
318
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
319
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
320
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
321
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
322
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
323
                        },
×
UNCOV
324
                },
×
325
        )); err != nil {
326
                return err
×
327
        }
×
328

329
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
330
                if meta.IsNoMatchError(err) {
×
331
                        // Back out if there's no point to attempt watch
×
332
                        return nil
×
333
                }
334
                if !cc.IsErrCacheNotStarted(err) {
×
335
                        return err
×
336
                }
×
337
        }
338
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
339
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
340
                        return mapToDataSource(ctx, obj)
×
341
                }),
×
342
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
343
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
344
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
345
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
346
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
×
347
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
348
                        },
×
349
                },
350
        )); err != nil {
×
351
                return err
×
352
        }
×
353

354
        return nil
×
UNCOV
355
}
×
UNCOV
356

×
357
func sameSourceSpec(objOld, objNew client.Object) bool {
×
358
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
359
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
360

×
361
        if !okOld || !okNew {
×
362
                return false
363
        }
×
364
        if dsOld.Spec.Source.PVC != nil {
×
365
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
366
        }
×
367
        if dsOld.Spec.Source.Snapshot != nil {
368
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
369
        }
×
UNCOV
370

×
371
        return false
×
UNCOV
372
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc