• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4881

16 Aug 2024 03:54AM UTC coverage: 59.19% (+0.01%) from 59.177%
#4881

push

travis-ci

web-flow
feat: Copy labels from source to DataSource (#3377)

* cleanup: Extract label copying logic into common pkg

Extract the label copying logic from populator-base.go into the common
pkg as CopyAllowedLabels func.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

* fix(import-populator): Make copying of labels more robust

Make copying of labels from a prime PVC to the target PVC more robust,
by moving it before rebinding the PV from prime to target. This way we
can ensure the labels are already present once the PVC becomes ready.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

* cleanup: Do not pass labels from DIC to DS anymore

Do not pass labels from a DataImportCron to a DataSource in the
dataimportcron-controller anymore. In the future this will be
handled by the datasource-controller.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

* feat: Copy labels from source to DataSource

Copy labels from the source of a DataSource to the labels of the
DataSource in the datasource-controller.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

* tests: Add e2e tests for copying labels to DataSources

Add e2e tests that cover all scenarios where labels should be copied
from the source of a DataSource to the DataSource itself.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

* feat(dataimportcron-controller): Copy labels to VolumeSnapshots

When using VolumeSnapshots copy the labels found on the source PVC to
the created or an existing VolumeSnapshot.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

---------

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>

42 of 62 new or added lines in 5 files covered. (67.74%)

10 existing lines in 2 files now uncovered.

16610 of 28062 relevant lines covered (59.19%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.96
/pkg/controller/datasource-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23

24
        "github.com/go-logr/logr"
25
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
26

27
        corev1 "k8s.io/api/core/v1"
28
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
29
        "k8s.io/apimachinery/pkg/api/meta"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/record"
33

34
        "sigs.k8s.io/controller-runtime/pkg/client"
35
        "sigs.k8s.io/controller-runtime/pkg/controller"
36
        "sigs.k8s.io/controller-runtime/pkg/event"
37
        "sigs.k8s.io/controller-runtime/pkg/handler"
38
        "sigs.k8s.io/controller-runtime/pkg/manager"
39
        "sigs.k8s.io/controller-runtime/pkg/predicate"
40
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
41
        "sigs.k8s.io/controller-runtime/pkg/source"
42

43
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
44
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
45
)
46

47
// DataSourceReconciler members
48
type DataSourceReconciler struct {
49
        client          client.Client
50
        recorder        record.EventRecorder
51
        scheme          *runtime.Scheme
52
        log             logr.Logger
53
        installerLabels map[string]string
54
}
55

56
const (
57
        ready                    = "Ready"
58
        noSource                 = "NoSource"
59
        dataSourceControllerName = "datasource-controller"
60
)
61

62
// Reconcile loop for DataSourceReconciler
63
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
64
        dataSource := &cdiv1.DataSource{}
1✔
65
        if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
2✔
66
                if k8serrors.IsNotFound(err) {
2✔
67
                        return reconcile.Result{}, nil
1✔
68
                }
1✔
69
                return reconcile.Result{}, err
×
70
        }
71
        if err := r.update(ctx, dataSource); err != nil {
1✔
72
                return reconcile.Result{}, err
×
73
        }
×
74
        return reconcile.Result{}, nil
1✔
75
}
76

77
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
1✔
78
        if !reflect.DeepEqual(dataSource.Status.Source, dataSource.Spec.Source) {
2✔
79
                dataSource.Spec.Source.DeepCopyInto(&dataSource.Status.Source)
1✔
80
                dataSource.Status.Conditions = nil
1✔
81
        }
1✔
82
        dataSourceCopy := dataSource.DeepCopy()
1✔
83
        if sourcePVC := dataSource.Spec.Source.PVC; sourcePVC != nil {
2✔
84
                if err := r.handlePvcSource(ctx, sourcePVC, dataSource); err != nil {
1✔
85
                        return err
×
86
                }
×
87
        } else if sourceSnapshot := dataSource.Spec.Source.Snapshot; sourceSnapshot != nil {
2✔
88
                if err := r.handleSnapshotSource(ctx, sourceSnapshot, dataSource); err != nil {
1✔
89
                        return err
×
90
                }
×
91
        } else {
1✔
92
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
1✔
93
        }
1✔
94

95
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
96
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
97
                        return err
×
98
                }
×
99
        }
100
        return nil
1✔
101
}
102

103
func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
1✔
104
        ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
1✔
105
        isReady := false
1✔
106

1✔
107
        pvc := &corev1.PersistentVolumeClaim{}
1✔
108
        pvcErr := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc)
1✔
109
        if pvcErr != nil && !k8serrors.IsNotFound(pvcErr) {
1✔
NEW
110
                return pvcErr
×
NEW
111
        }
×
112

113
        dv := &cdiv1.DataVolume{}
1✔
114
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
2✔
115
                if !k8serrors.IsNotFound(err) {
1✔
116
                        return err
×
117
                }
×
118
                if pvcErr != nil {
2✔
119
                        r.log.Info("PVC not found", "name", sourcePVC.Name)
1✔
120
                        updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
1✔
121
                } else {
2✔
122
                        isReady = true
1✔
123
                }
1✔
124
        } else if dv.Status.Phase == cdiv1.Succeeded {
2✔
125
                isReady = true
1✔
126
        } else {
2✔
127
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
1✔
128
        }
1✔
129

130
        if isReady {
2✔
131
                cc.CopyAllowedLabels(dv.GetLabels(), dataSource, true)
1✔
132
                cc.CopyAllowedLabels(pvc.GetLabels(), dataSource, true)
1✔
133
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
134
        }
1✔
135

136
        return nil
1✔
137
}
138

139
func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
1✔
140
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
141
        ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
1✔
142
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
2✔
143
                if !k8serrors.IsNotFound(err) {
1✔
144
                        return err
×
145
                }
×
146
                r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
1✔
147
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
1✔
148
        } else if cc.IsSnapshotReady(snapshot) {
2✔
149
                cc.CopyAllowedLabels(snapshot.GetLabels(), dataSource, true)
1✔
150
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
1✔
151
        } else {
2✔
152
                updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
1✔
153
        }
1✔
154

155
        return nil
1✔
156
}
157

158
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
1✔
159
        if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
2✔
160
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
161
        } else {
2✔
162
                condition = &cdiv1.DataSourceCondition{Type: conditionType}
1✔
163
                updateConditionState(&condition.ConditionState, status, message, reason)
1✔
164
                ds.Status.Conditions = append(ds.Status.Conditions, *condition)
1✔
165
        }
1✔
166
}
167

168
// FindDataSourceConditionByType finds DataSourceCondition by condition type
169
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
1✔
170
        for i, condition := range ds.Status.Conditions {
2✔
171
                if condition.Type == conditionType {
2✔
172
                        return &ds.Status.Conditions[i]
1✔
173
                }
1✔
174
        }
175
        return nil
1✔
176
}
177

178
// NewDataSourceController creates a new instance of the DataSource controller
179
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
×
180
        reconciler := &DataSourceReconciler{
×
181
                client:          mgr.GetClient(),
×
182
                recorder:        mgr.GetEventRecorderFor(dataSourceControllerName),
×
183
                scheme:          mgr.GetScheme(),
×
184
                log:             log.WithName(dataSourceControllerName),
×
185
                installerLabels: installerLabels,
×
186
        }
×
187
        DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{
×
188
                MaxConcurrentReconciles: 3,
×
189
                Reconciler:              reconciler,
×
190
        })
×
191
        if err != nil {
×
192
                return nil, err
×
193
        }
×
194
        if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
×
195
                return nil, err
×
196
        }
×
197
        log.Info("Initialized DataSource controller")
×
198
        return DataSourceController, nil
×
199
}
200

201
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
×
202
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataSource]{},
×
203
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
204
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataSource]) bool { return true },
×
205
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return true },
×
206
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool {
×
207
                                return !sameSourceSpec(e.ObjectOld, e.ObjectNew)
×
208
                        },
×
209
                },
210
        )); err != nil {
×
211
                return err
×
212
        }
×
213

214
        const dataSourcePvcField = "spec.source.pvc"
×
215
        const dataSourceSnapshotField = "spec.source.snapshot"
×
216

×
217
        getKey := func(namespace, name string) string {
×
218
                return namespace + "/" + name
×
219
        }
×
220

221
        appendMatchingDataSourceRequests := func(ctx context.Context, indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
×
222
                var dataSources cdiv1.DataSourceList
×
223
                matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
×
224
                if err := mgr.GetClient().List(ctx, &dataSources, matchingFields); err != nil {
×
225
                        log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
×
226
                        return reqs
×
227
                }
×
228
                for _, ds := range dataSources.Items {
×
229
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
×
230
                }
×
231
                return reqs
×
232
        }
233

234
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
×
235
                if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
×
236
                        ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
×
237
                        return []string{getKey(ns, pvc.Name)}
×
238
                }
×
239
                return nil
×
240
        }); err != nil {
×
241
                return err
×
242
        }
×
243
        if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
×
244
                if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
×
245
                        ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
×
246
                        return []string{getKey(ns, snapshot.Name)}
×
247
                }
×
248
                return nil
×
249
        }); err != nil {
×
250
                return err
×
251
        }
×
252

253
        mapToDataSource := func(ctx context.Context, obj client.Object) []reconcile.Request {
×
254
                reqs := appendMatchingDataSourceRequests(ctx, dataSourcePvcField, obj, nil)
×
255
                return appendMatchingDataSourceRequests(ctx, dataSourceSnapshotField, obj, reqs)
×
256
        }
×
257

258
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
259
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](func(ctx context.Context, obj *cdiv1.DataVolume) []reconcile.Request {
×
260
                        return mapToDataSource(ctx, obj)
×
261
                }),
×
262
                predicate.TypedFuncs[*cdiv1.DataVolume]{
263
                        CreateFunc: func(e event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return true },
×
264
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return true },
×
265
                        // Only DV status phase update is interesting to reconcile
266
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool {
×
NEW
267
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
NEW
268
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
UNCOV
269
                        },
×
270
                },
271
        )); err != nil {
×
272
                return err
×
273
        }
×
274

275
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
276
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](func(ctx context.Context, obj *corev1.PersistentVolumeClaim) []reconcile.Request {
×
277
                        return mapToDataSource(ctx, obj)
×
278
                }),
×
279
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
280
                        CreateFunc: func(e event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
281
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return true },
×
282
                        UpdateFunc: func(e event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool {
×
NEW
283
                                return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase ||
×
NEW
284
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
UNCOV
285
                        },
×
286
                },
287
        )); err != nil {
×
288
                return err
×
289
        }
×
290

291
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
292
                if meta.IsNoMatchError(err) {
×
293
                        // Back out if there's no point to attempt watch
×
294
                        return nil
×
295
                }
×
296
                if !cc.IsErrCacheNotStarted(err) {
×
297
                        return err
×
298
                }
×
299
        }
300
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
301
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](func(ctx context.Context, obj *snapshotv1.VolumeSnapshot) []reconcile.Request {
×
302
                        return mapToDataSource(ctx, obj)
×
303
                }),
×
304
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
305
                        CreateFunc: func(e event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
306
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return true },
×
307
                        UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
×
NEW
308
                                return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
×
NEW
309
                                        !reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)
×
UNCOV
310
                        },
×
311
                },
312
        )); err != nil {
×
313
                return err
×
314
        }
×
315

316
        return nil
×
317
}
318

319
func sameSourceSpec(objOld, objNew client.Object) bool {
×
320
        dsOld, okOld := objOld.(*cdiv1.DataSource)
×
321
        dsNew, okNew := objNew.(*cdiv1.DataSource)
×
322

×
323
        if !okOld || !okNew {
×
324
                return false
×
325
        }
×
326
        if dsOld.Spec.Source.PVC != nil {
×
327
                return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
×
328
        }
×
329
        if dsOld.Spec.Source.Snapshot != nil {
×
330
                return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
×
331
        }
×
332

333
        return false
×
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc