• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4794

14 Jul 2024 06:12PM UTC coverage: 58.983% (+0.01%) from 58.972%
#4794

push

travis-ci

web-flow
update to k8s 1.30 libs and controller-runtime 0.18.4 (#3336)

* make deps-update

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* ReourceRequirements -> VolumeResourceRequirements

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix calls to controller.Watch()

controller-runtime changed the API!

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* Fix errors with actual openshift/library-go lib

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* make all works now and everything compiles

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix "make update-codegen" because generate_groups.sh deprecated

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* run "make generate"

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix transfer unittest because of change to controller-runtime

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

---------

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

6 of 238 new or added lines in 24 files covered. (2.52%)

10 existing lines in 4 files now uncovered.

16454 of 27896 relevant lines covered (58.98%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.94
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23

24
        "github.com/go-logr/logr"
25
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
26

27
        corev1 "k8s.io/api/core/v1"
28
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
29
        "k8s.io/apimachinery/pkg/api/meta"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/record"
33

34
        "sigs.k8s.io/controller-runtime/pkg/client"
35
        "sigs.k8s.io/controller-runtime/pkg/controller"
36
        "sigs.k8s.io/controller-runtime/pkg/event"
37
        "sigs.k8s.io/controller-runtime/pkg/handler"
38
        "sigs.k8s.io/controller-runtime/pkg/manager"
39
        "sigs.k8s.io/controller-runtime/pkg/predicate"
40
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
41
        "sigs.k8s.io/controller-runtime/pkg/source"
42

43
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
44
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
45
)
46

47
// DataSourceReconciler members
48
type DataSourceReconciler struct {
49
        client          client.Client
50
        recorder        record.EventRecorder
51
        scheme          *runtime.Scheme
52
        log             logr.Logger
53
        installerLabels map[string]string
54
}
55

56
const (
57
        ready                    = "Ready"
58
        noSource                 = "NoSource"
59
        dataSourceControllerName = "datasource-controller"
60
)
61

62
// Reconcile loop for DataSourceReconciler
63
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
64
        dataSource := &cdiv1.DataSource{}
1✔
65
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
66
                if k8serrors.IsNotFound(err) {
2✔
67
                        return reconcile.Result{}, nil
1✔
68
                }
1✔
69
                return reconcile.Result{}, err
×
70
        }
71
        if err := r.update(ctx, dataSource); err != nil {
1✔
72
                return reconcile.Result{}, err
×
73
        }
×
74
        return reconcile.Result{}, nil
1✔
75
}
76

77
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
78
        if !reflect.DeepEqual(dataSource.Status.Source, dataSource.Spec.Source) {
2✔
79
                dataSource.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
80
                dataSource.Status.Conditions = nil
1✔
81
        }
1✔
82
        dataSourceCopy := dataSource.DeepCopy()
1✔
83
        if sourcePVC := dataSource.Spec.Source.PVC; sourcePVC != nil {
2✔
84
                if err := r.handlePvcSource(ctx, sourcePVC, dataSource); err != nil {
1✔
85
                        return err
×
86
                }
×
87
        } else if sourceSnapshot := dataSource.Spec.Source.Snapshot; sourceSnapshot != nil {
2✔
88
                if err := r.handleSnapshotSource(ctx, sourceSnapshot, dataSource); err != nil {
1✔
89
                        return err
×
90
                }
×
91
        } else {
1✔
92
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
93
        }
1✔
94

95
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
96
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
97
                        return err
×
98
                }
×
99
        }
100
        return nil
1✔
101
}
102

103
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
104
        dv := &cdiv1.DataVolume{}
1✔
105
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
106
        isReady := false
1✔
107
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
108
                if !k8serrors.IsNotFound(err) {
1✔
109
                        return err
×
110
                }
×
111
                pvc := &corev1.PersistentVolumeClaim{}
1✔
112
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc); err != nil {
2✔
113
                        if !k8serrors.IsNotFound(err) {
1✔
114
                                return err
×
115
                        }
×
116
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
117
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
118
                } else {
1✔
119
                        isReady = true
1✔
120
                }
1✔
121
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
122
                isReady = true
1✔
123
        } else {
2✔
124
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
125
        }
1✔
126
        if isReady {
2✔
127
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
128
        }
1✔
129

130
        return nil
1✔
131
}
132

133
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
134
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
135
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
136
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
137
                if !k8serrors.IsNotFound(err) {
1✔
138
                        return err
×
139
                }
×
140
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
141
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
142
        } else if cc.IsSnapshotReady(snapshot) {
2✔
143
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
144
        } else {
2✔
145
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
146
        }
1✔
147

148
        return nil
1✔
149
}
150

151
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
152
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
2✔
153
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
154
        } else {
2✔
155
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
156
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
157
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
158
        }
1✔
159
}
160

161
// FindDataSourceConditionByType finds DataSourceCondition by condition type
162
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
163
        for i, condition := range ds.Status.Conditions {
2✔
164
                if condition.Type == conditionType {
2✔
165
                        return &ds.Status.Conditions[i]
1✔
166
                }
1✔
167
        }
168
        return nil
1✔
169
}
170

171
// NewDataSourceController creates a new instance of the DataSource controller
172
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
173
        reconciler := &DataSourceReconciler{
×
174
                client:          mgr.GetClient(),
×
175
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
176
                scheme:          mgr.GetScheme(),
×
177
                log:             log.WithName(dataSourceControllerName),
×
178
                installerLabels: installerLabels,
×
179
        }
×
180
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
181
                MaxConcurrentReconciles: 3,
×
182
                Reconciler:              reconciler,
×
183
        })
×
184
        if err != nil {
×
185
                return nil, err
×
186
        }
×
187
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
188
                return nil, err
×
189
        }
×
190
        log.Info("Initialized DataSource controller")
×
191
        return DataSourceController, nil
×
192
}
193

194
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
NEW
195
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataSource]{},
×
NEW
196
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
NEW
197
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
NEW
198
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
NEW
199
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
NEW
200
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew)
×
NEW
201
                        },
×
202
                },
NEW
203
        )); err != nil {
×
204
                return err
×
205
        }
×
206

207
        const dataSourcePvcField = "spec.source.pvc"
×
208
        const dataSourceSnapshotField = "spec.source.snapshot"
×
209

×
210
        getKey := func(namespace, name string) string {
×
211
                return namespace + "/" + name
×
212
        }
×
213

214
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
×
215
                var dataSources cdiv1.DataSourceList
×
216
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
217
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
218
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
219
                        return reqs
×
220
                }
×
221
                for _, ds := range dataSources.Items {
×
222
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
223
                }
×
224
                return reqs
×
225
        }
226

227
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
228
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
×
229
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
230
                        return []string{getKey(ns, pvc.Name)}
×
231
                }
×
232
                return nil
×
233
        }); err != nil {
×
234
                return err
×
235
        }
×
236
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
237
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
238
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
239
                        return []string{getKey(ns, snapshot.Name)}
×
240
                }
×
241
                return nil
×
242
        }); err != nil {
×
243
                return err
×
244
        }
×
245

246
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
247
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
248
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
249
        }
×
250

NEW
251
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
NEW
252
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
NEW
253
                        return mapToDataSource(ctx, obj)
×
NEW
254
                }),
×
255
                predicate.TypedFuncs[*cdiv1.DataVolume]{
NEW
256
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
NEW
257
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
258
                        // Only DV status phase update is interesting to reconcile
NEW
259
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
×
NEW
260
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase
×
UNCOV
261
                        },
×
262
                },
NEW
263
        )); err != nil {
×
264
                return err
×
265
        }
×
266

NEW
267
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
NEW
268
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
NEW
269
                        return mapToDataSource(ctx, obj)
×
NEW
270
                }),
×
271
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
NEW
272
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
NEW
273
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
NEW
274
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
×
NEW
275
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase
×
UNCOV
276
                        },
×
277
                },
NEW
278
        )); err != nil {
×
279
                return err
×
280
        }
×
281

282
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
283
                if meta.IsNoMatchError(err) {
×
284
                        // Back out if there's no point to attempt watch
×
285
                        return nil
×
286
                }
×
287
                if !cc.IsErrCacheNotStarted(err) {
×
288
                        return err
×
289
                }
×
290
        }
NEW
291
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
NEW
292
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
NEW
293
                        return mapToDataSource(ctx, obj)
×
NEW
294
                }),
×
295
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
NEW
296
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
NEW
297
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
NEW
298
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
NEW
299
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status)
×
UNCOV
300
                        },
×
301
                },
NEW
302
        )); err != nil {
×
303
                return err
×
304
        }
×
305

306
        return nil
×
307
}
308

309
func sameSourceSpec(objOld, objNew client.Object) bool {
×
310
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
311
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
312

×
313
        if !okOld || !okNew {
×
314
                return false
×
315
        }
×
316
        if dsOld.Spec.Source.PVC != nil {
×
317
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
318
        }
×
319
        if dsOld.Spec.Source.Snapshot != nil {
×
320
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
321
        }
×
322

323
        return false
×
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc