• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4794

14 Jul 2024 06:12PM UTC coverage: 58.983% (+0.01%) from 58.972%
#4794

push

travis-ci

web-flow
update to k8s 1.30 libs and controller-runtime 0.18.4 (#3336)

* make deps-update

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* ReourceRequirements -> VolumeResourceRequirements

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix calls to controller.Watch()

controller-runtime changed the API!

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* Fix errors with actual openshift/library-go lib

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* make all works now and everything compiles

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix "make update-codegen" because generate_groups.sh deprecated

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* run "make generate"

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix transfer unittest because of change to controller-runtime

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

---------

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

6 of 238 new or added lines in 24 files covered. (2.52%)

10 existing lines in 4 files now uncovered.

16454 of 27896 relevant lines covered (58.98%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.59
/pkg/controller/datavolume/import-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strconv"
24

25
        "github.com/go-logr/logr"
26
        "github.com/pkg/errors"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
        "k8s.io/apimachinery/pkg/types"
32

33
        "sigs.k8s.io/controller-runtime/pkg/controller"
34
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
35
        "sigs.k8s.io/controller-runtime/pkg/handler"
36
        "sigs.k8s.io/controller-runtime/pkg/manager"
37
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
38
        "sigs.k8s.io/controller-runtime/pkg/source"
39

40
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
41
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
42
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
43
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
44
)
45

46
const (
47
        // ImportScheduled provides a const to indicate import is scheduled
48
        ImportScheduled = "ImportScheduled"
49
        // ImportInProgress provides a const to indicate an import is in progress
50
        ImportInProgress = "ImportInProgress"
51
        // ImportFailed provides a const to indicate import has failed
52
        ImportFailed = "ImportFailed"
53
        // ImportSucceeded provides a const to indicate import has succeeded
54
        ImportSucceeded = "ImportSucceeded"
55

56
        // MessageImportScheduled provides a const to form import is scheduled message
57
        MessageImportScheduled = "Import into %s scheduled"
58
        // MessageImportInProgress provides a const to form import is in progress message
59
        MessageImportInProgress = "Import into %s in progress"
60
        // MessageImportFailed provides a const to form import has failed message
61
        MessageImportFailed = "Failed to import into PVC %s"
62
        // MessageImportSucceeded provides a const to form import has succeeded message
63
        MessageImportSucceeded = "Successfully imported into PVC %s"
64

65
        importControllerName = "datavolume-import-controller"
66

67
        volumeImportSourcePrefix = "volume-import-source"
68
)
69

70
// ImportReconciler members
71
type ImportReconciler struct {
72
        ReconcilerBase
73
}
74

75
// NewImportController creates a new instance of the datavolume import controller
76
func NewImportController(
77
        ctx context.Context,
78
        mgr manager.Manager,
79
        log logr.Logger,
80
        installerLabels map[string]string,
81
) (controller.Controller, error) {
×
82
        client := mgr.GetClient()
×
83
        reconciler := &ImportReconciler{
×
84
                ReconcilerBase: ReconcilerBase{
×
85
                        client:               client,
×
86
                        scheme:               mgr.GetScheme(),
×
87
                        log:                  log.WithName(importControllerName),
×
88
                        recorder:             mgr.GetEventRecorderFor(importControllerName),
×
89
                        featureGates:         featuregates.NewFeatureGates(client),
×
90
                        installerLabels:      installerLabels,
×
91
                        shouldUpdateProgress: true,
×
92
                },
×
93
        }
×
94

×
95
        datavolumeController, err := controller.New(importControllerName, mgr, controller.Options{
×
96
                MaxConcurrentReconciles: 3,
×
97
                Reconciler:              reconciler,
×
98
        })
×
99
        if err != nil {
×
100
                return nil, err
×
101
        }
×
102
        if err := addDataVolumeImportControllerWatches(mgr, datavolumeController); err != nil {
×
103
                return nil, err
×
104
        }
×
105

106
        return datavolumeController, nil
×
107
}
108

109
func addDataVolumeImportControllerWatches(mgr manager.Manager, datavolumeController controller.Controller) error {
×
110
        if err := addDataVolumeControllerCommonWatches(mgr, datavolumeController, dataVolumeImport); err != nil {
×
111
                return err
×
112
        }
×
NEW
113
        if err := datavolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.VolumeImportSource{}, handler.TypedEnqueueRequestForOwner[*cdiv1.VolumeImportSource](
×
NEW
114
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &cdiv1.DataVolume{}, handler.OnlyControllerOwner()))); err != nil {
×
115
                return err
×
116
        }
×
117
        return nil
×
118
}
119

120
func (r *ImportReconciler) updatePVCForPopulation(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
121
        if dataVolume.Spec.Source.HTTP == nil &&
1✔
122
                dataVolume.Spec.Source.S3 == nil &&
1✔
123
                dataVolume.Spec.Source.GCS == nil &&
1✔
124
                dataVolume.Spec.Source.Registry == nil &&
1✔
125
                dataVolume.Spec.Source.Imageio == nil &&
1✔
126
                dataVolume.Spec.Source.VDDK == nil &&
1✔
127
                dataVolume.Spec.Source.Blank == nil {
2✔
128
                return errors.Errorf("no source set for import datavolume")
1✔
129
        }
1✔
130
        if err := cc.AddImmediateBindingAnnotationIfWFFCDisabled(pvc, r.featureGates); err != nil {
1✔
131
                return err
×
132
        }
×
133
        apiGroup := cc.AnnAPIGroup
1✔
134
        pvc.Spec.DataSourceRef = &corev1.TypedObjectReference{
1✔
135
                APIGroup: &apiGroup,
1✔
136
                Kind:     cdiv1.VolumeImportSourceRef,
1✔
137
                Name:     volumeImportSourceName(dataVolume),
1✔
138
        }
1✔
139
        return nil
1✔
140
}
141

142
func (r *ImportReconciler) updateAnnotations(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
143
        annotations := pvc.Annotations
1✔
144

1✔
145
        if checkpoint := cc.GetNextCheckpoint(pvc, r.getCheckpointArgs(dataVolume)); checkpoint != nil {
2✔
146
                annotations[cc.AnnCurrentCheckpoint] = checkpoint.Current
1✔
147
                annotations[cc.AnnPreviousCheckpoint] = checkpoint.Previous
1✔
148
                annotations[cc.AnnFinalCheckpoint] = strconv.FormatBool(checkpoint.IsFinal)
1✔
149
        }
1✔
150

151
        if http := dataVolume.Spec.Source.HTTP; http != nil {
2✔
152
                cc.UpdateHTTPAnnotations(annotations, http)
1✔
153
                return nil
1✔
154
        }
1✔
155
        if s3 := dataVolume.Spec.Source.S3; s3 != nil {
2✔
156
                cc.UpdateS3Annotations(annotations, s3)
1✔
157
                return nil
1✔
158
        }
1✔
159
        if gcs := dataVolume.Spec.Source.GCS; gcs != nil {
1✔
160
                cc.UpdateGCSAnnotations(annotations, gcs)
×
161
                return nil
×
162
        }
×
163
        if registry := dataVolume.Spec.Source.Registry; registry != nil {
1✔
164
                cc.UpdateRegistryAnnotations(annotations, registry)
×
165
                return nil
×
166
        }
×
167
        if imageio := dataVolume.Spec.Source.Imageio; imageio != nil {
1✔
168
                cc.UpdateImageIOAnnotations(annotations, imageio)
×
169
                return nil
×
170
        }
×
171
        if vddk := dataVolume.Spec.Source.VDDK; vddk != nil {
2✔
172
                cc.UpdateVDDKAnnotations(annotations, vddk)
1✔
173
                return nil
1✔
174
        }
1✔
175
        if dataVolume.Spec.Source.Blank != nil {
2✔
176
                annotations[cc.AnnSource] = cc.SourceNone
1✔
177
                return nil
1✔
178
        }
1✔
179
        return errors.Errorf("no source set for import datavolume")
×
180
}
181

182
// Reconcile loop for the import data volumes
183
func (r *ImportReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
184
        return r.reconcile(ctx, req, r)
1✔
185
}
1✔
186

187
func (r *ImportReconciler) sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error) {
1✔
188
        syncState, err := r.syncImport(log, req)
1✔
189
        if err == nil {
2✔
190
                err = r.syncUpdate(log, &syncState)
1✔
191
        }
1✔
192
        return syncState.dvSyncResult, err
1✔
193
}
194

195
func (r *ImportReconciler) syncImport(log logr.Logger, req reconcile.Request) (dvSyncState, error) {
1✔
196
        syncState, syncErr := r.syncCommon(log, req, r.cleanup, nil)
1✔
197
        if syncErr != nil || syncState.result != nil {
2✔
198
                return syncState, syncErr
1✔
199
        }
1✔
200

201
        pvcModifier := r.updateAnnotations
1✔
202
        if syncState.usePopulator {
2✔
203
                if r.shouldReconcileVolumeSourceCR(&syncState) {
2✔
204
                        err := r.reconcileVolumeImportSourceCR(&syncState)
1✔
205
                        if err != nil {
1✔
206
                                return syncState, err
×
207
                        }
×
208
                }
209
                pvcModifier = r.updatePVCForPopulation
1✔
210
        }
211

212
        if err := r.handlePvcCreation(log, &syncState, pvcModifier); err != nil {
2✔
213
                syncErr = err
1✔
214
        }
1✔
215

216
        if syncState.pvc != nil && syncErr == nil && !syncState.usePopulator {
2✔
217
                r.setVddkAnnotations(&syncState)
1✔
218
                syncErr = cc.MaybeSetPvcMultiStageAnnotation(syncState.pvc, r.getCheckpointArgs(syncState.dvMutated))
1✔
219
        }
1✔
220
        return syncState, syncErr
1✔
221
}
222

223
func (r *ImportReconciler) cleanup(syncState *dvSyncState) error {
1✔
224
        // The cleanup is to delete the volumeImportSourceCR which is used only with populators,
1✔
225
        // it is owner by the DV so will be deleted when dv is deleted
1✔
226
        // also we can already delete once dv is succeeded
1✔
227
        usePopulator, err := checkDVUsingPopulators(syncState.dvMutated)
1✔
228
        if err != nil {
1✔
229
                return err
×
230
        }
×
231
        if usePopulator && !r.shouldReconcileVolumeSourceCR(syncState) {
2✔
232
                return r.deleteVolumeImportSourceCR(syncState)
1✔
233
        }
1✔
234

235
        return nil
1✔
236
}
237

238
func isPVCImportPopulation(pvc *corev1.PersistentVolumeClaim) bool {
1✔
239
        return populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeImportSourceRef)
1✔
240
}
1✔
241

242
func (r *ImportReconciler) shouldUpdateStatusPhase(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
243
        pvcCopy := pvc.DeepCopy()
1✔
244
        requiresWork, err := r.pvcRequiresWork(pvcCopy, dv)
1✔
245
        if err != nil {
1✔
246
                return false, err
×
247
        }
×
248
        if isPVCImportPopulation(pvcCopy) {
2✔
249
                // Better to play it safe and check the PVC Prime too
1✔
250
                // before updating DV phase.
1✔
251
                nn := types.NamespacedName{Namespace: pvcCopy.Namespace, Name: populators.PVCPrimeName(pvcCopy)}
1✔
252
                err := r.client.Get(context.TODO(), nn, pvcCopy)
1✔
253
                if err != nil {
1✔
254
                        if k8serrors.IsNotFound(err) {
×
255
                                return false, nil
×
256
                        }
×
257
                        return false, err
×
258
                }
259
        }
260
        _, ok := pvcCopy.Annotations[cc.AnnImportPod]
1✔
261
        return ok && pvcCopy.Status.Phase == corev1.ClaimBound && requiresWork, nil
1✔
262
}
263

264
func (r *ImportReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
265
        phase, ok := pvc.Annotations[cc.AnnPodPhase]
1✔
266
        if phase != string(corev1.PodSucceeded) {
2✔
267
                update, err := r.shouldUpdateStatusPhase(pvc, dataVolumeCopy)
1✔
268
                if !update || err != nil {
2✔
269
                        return err
1✔
270
                }
1✔
271
        }
272
        dataVolumeCopy.Status.Phase = cdiv1.ImportScheduled
1✔
273
        if !ok {
2✔
274
                return nil
1✔
275
        }
1✔
276

277
        switch phase {
1✔
278
        case string(corev1.PodPending):
1✔
279
                // TODO: Use a more generic Scheduled, like maybe TransferScheduled.
1✔
280
                dataVolumeCopy.Status.Phase = cdiv1.ImportScheduled
1✔
281
                event.eventType = corev1.EventTypeNormal
1✔
282
                event.reason = ImportScheduled
1✔
283
                event.message = fmt.Sprintf(MessageImportScheduled, pvc.Name)
1✔
284
        case string(corev1.PodRunning):
1✔
285
                // TODO: Use a more generic In Progess, like maybe TransferInProgress.
1✔
286
                dataVolumeCopy.Status.Phase = cdiv1.ImportInProgress
1✔
287
                event.eventType = corev1.EventTypeNormal
1✔
288
                event.reason = ImportInProgress
1✔
289
                event.message = fmt.Sprintf(MessageImportInProgress, pvc.Name)
1✔
290
        case string(corev1.PodFailed):
1✔
291
                event.eventType = corev1.EventTypeWarning
1✔
292
                event.reason = ImportFailed
1✔
293
                event.message = fmt.Sprintf(MessageImportFailed, pvc.Name)
1✔
294
        case string(corev1.PodSucceeded):
1✔
295
                if cc.IsMultiStageImportInProgress(pvc) {
2✔
296
                        // Multi-stage annotations will be updated by import-populator if populators are in use
1✔
297
                        if !isPVCImportPopulation(pvc) {
2✔
298
                                if err := cc.UpdatesMultistageImportSucceeded(pvc, r.getCheckpointArgs(dataVolumeCopy)); err != nil {
1✔
299
                                        return err
×
300
                                }
×
301
                        }
302
                        // this is a multistage import, set the datavolume status to paused
303
                        dataVolumeCopy.Status.Phase = cdiv1.Paused
1✔
304
                        event.eventType = corev1.EventTypeNormal
1✔
305
                        event.reason = cc.ImportPaused
1✔
306
                        event.message = fmt.Sprintf(cc.MessageImportPaused, pvc.Name)
1✔
307
                        break
1✔
308
                }
309
                dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
310
                dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress("100.0%")
1✔
311
                event.eventType = corev1.EventTypeNormal
1✔
312
                event.reason = ImportSucceeded
1✔
313
                event.message = fmt.Sprintf(MessageImportSucceeded, pvc.Name)
1✔
314
        }
315
        return nil
1✔
316
}
317

318
func (r *ImportReconciler) setVddkAnnotations(syncState *dvSyncState) {
1✔
319
        if cc.GetSource(syncState.pvc) != cc.SourceVDDK {
2✔
320
                return
1✔
321
        }
1✔
322
        if vddkHost := syncState.pvc.Annotations[cc.AnnVddkHostConnection]; vddkHost != "" {
2✔
323
                cc.AddAnnotation(syncState.dvMutated, cc.AnnVddkHostConnection, vddkHost)
1✔
324
        }
1✔
325
        if vddkVersion := syncState.pvc.Annotations[cc.AnnVddkVersion]; vddkVersion != "" {
2✔
326
                cc.AddAnnotation(syncState.dvMutated, cc.AnnVddkVersion, vddkVersion)
1✔
327
        }
1✔
328
}
329

330
func (r *ImportReconciler) getCheckpointArgs(dv *cdiv1.DataVolume) *cc.CheckpointArgs {
1✔
331
        return &cc.CheckpointArgs{
1✔
332
                Checkpoints: dv.Spec.Checkpoints,
1✔
333
                IsFinal:     dv.Spec.FinalCheckpoint,
1✔
334
                Client:      r.client,
1✔
335
                Log:         r.log,
1✔
336
        }
1✔
337
}
1✔
338

339
func volumeImportSourceName(dv *cdiv1.DataVolume) string {
1✔
340
        return fmt.Sprintf("%s-%s", volumeImportSourcePrefix, dv.UID)
1✔
341
}
1✔
342

343
func (r *ImportReconciler) reconcileVolumeImportSourceCR(syncState *dvSyncState) error {
1✔
344
        dv := syncState.dvMutated
1✔
345
        importSource := &cdiv1.VolumeImportSource{}
1✔
346
        importSourceName := volumeImportSourceName(dv)
1✔
347
        isMultiStage := dv.Spec.Source != nil && len(dv.Spec.Checkpoints) > 0 &&
1✔
348
                (dv.Spec.Source.VDDK != nil || dv.Spec.Source.Imageio != nil)
1✔
349

1✔
350
        // check if import source already exists
1✔
351
        if exists, err := cc.GetResource(context.TODO(), r.client, dv.Namespace, importSourceName, importSource); err != nil {
1✔
352
                return err
×
353
        } else if exists {
2✔
354
                return r.updateVolumeImportSourceIfNeeded(importSource, dv, isMultiStage)
1✔
355
        }
1✔
356

357
        source := &cdiv1.ImportSourceType{}
1✔
358
        if http := dv.Spec.Source.HTTP; http != nil {
2✔
359
                source.HTTP = http
1✔
360
        } else if s3 := dv.Spec.Source.S3; s3 != nil {
2✔
361
                source.S3 = s3
×
362
        } else if gcs := dv.Spec.Source.GCS; gcs != nil {
1✔
363
                source.GCS = gcs
×
364
        } else if registry := dv.Spec.Source.Registry; registry != nil {
1✔
365
                source.Registry = registry
×
366
        } else if imageio := dv.Spec.Source.Imageio; imageio != nil {
1✔
367
                source.Imageio = imageio
×
368
        } else if vddk := dv.Spec.Source.VDDK; vddk != nil {
1✔
369
                source.VDDK = vddk
×
370
        } else {
1✔
371
                // Our dv shouldn't be without source
1✔
372
                // Defaulting to Blank source
1✔
373
                source.Blank = &cdiv1.DataVolumeBlankImage{}
1✔
374
        }
1✔
375

376
        importSource = &cdiv1.VolumeImportSource{
1✔
377
                ObjectMeta: metav1.ObjectMeta{
1✔
378
                        Name:      importSourceName,
1✔
379
                        Namespace: dv.Namespace,
1✔
380
                },
1✔
381
                Spec: cdiv1.VolumeImportSourceSpec{
1✔
382
                        Source:        source,
1✔
383
                        ContentType:   dv.Spec.ContentType,
1✔
384
                        Preallocation: dv.Spec.Preallocation,
1✔
385
                },
1✔
386
        }
1✔
387

1✔
388
        if isMultiStage {
1✔
389
                importSource.Spec.TargetClaim = &dv.Name
×
390
                importSource.Spec.Checkpoints = dv.Spec.Checkpoints
×
391
                importSource.Spec.FinalCheckpoint = &dv.Spec.FinalCheckpoint
×
392
        }
×
393

394
        if err := controllerutil.SetControllerReference(dv, importSource, r.scheme); err != nil {
1✔
395
                return err
×
396
        }
×
397

398
        if err := r.client.Create(context.TODO(), importSource); err != nil {
1✔
399
                if !k8serrors.IsAlreadyExists(err) {
×
400
                        return err
×
401
                }
×
402
        }
403
        return nil
1✔
404
}
405

406
func (r *ImportReconciler) deleteVolumeImportSourceCR(syncState *dvSyncState) error {
1✔
407
        importSourceName := volumeImportSourceName(syncState.dvMutated)
1✔
408
        importSource := &cdiv1.VolumeImportSource{}
1✔
409
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: importSourceName, Namespace: syncState.dvMutated.Namespace}, importSource); err != nil {
1✔
410
                if !k8serrors.IsNotFound(err) {
×
411
                        return err
×
412
                }
×
413
        } else {
1✔
414
                if err := r.client.Delete(context.TODO(), importSource); err != nil {
1✔
415
                        if !k8serrors.IsNotFound(err) {
×
416
                                return err
×
417
                        }
×
418
                }
419
        }
420

421
        return nil
1✔
422
}
423

424
func (r *ImportReconciler) updateVolumeImportSourceIfNeeded(source *cdiv1.VolumeImportSource, dv *cdiv1.DataVolume, isMultiStage bool) error {
1✔
425
        // Updates are only needed in multistage imports
1✔
426
        if !isMultiStage {
2✔
427
                return nil
1✔
428
        }
1✔
429

430
        // Unchanged checkpoint API, no update needed
431
        finalCheckpoint := false
×
432
        if source.Spec.FinalCheckpoint != nil {
×
433
                finalCheckpoint = *source.Spec.FinalCheckpoint
×
434
        }
×
435
        if reflect.DeepEqual(source.Spec.Checkpoints, dv.Spec.Checkpoints) &&
×
436
                finalCheckpoint == dv.Spec.FinalCheckpoint {
×
437
                return nil
×
438
        }
×
439

440
        source.Spec.Checkpoints = dv.Spec.Checkpoints
×
441
        source.Spec.FinalCheckpoint = &dv.Spec.FinalCheckpoint
×
442
        return r.client.Update(context.TODO(), source)
×
443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc