• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5793

26 Jan 2026 02:08AM UTC coverage: 49.624% (+0.1%) from 49.491%
#5793

Pull #4010

travis-ci

halfcrazy
feat: Add checksum validation for HTTP/HTTPS DataVolume sources

Introduces cryptographic hash validation for HTTP/HTTPS
import sources to prevent data tampering during download.

**Usage Example:**

```yaml
apiVersion: cdi.kubevirt.io/v1beta1
kind: DataVolume
metadata:
  name: fedora-dv
spec:
  source:
    http:
      url: "https://download.fedoraproject.org/pub/fedora/linux/releases/39/Cloud/x86_64/images/Fedora-Cloud-Base-39-1.5.x86_64.qcow2"
      checksum: "sha256:c5b50f903e39b3c5d3b7c7bb9a4c5e4f3"
  pvc:
    accessModes:
      - ReadWriteOnce
    resources:
      requests:
        storage: 10Gi
```

Signed-off-by: Yan Zhu <hackzhuyan@gmail.com>
Pull Request #4010: feat: Add checksum validation for HTTP/HTTPS DataVolume sources

154 of 212 new or added lines in 14 files covered. (72.64%)

13 existing lines in 2 files now uncovered.

14779 of 29782 relevant lines covered (49.62%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.42
/pkg/controller/datavolume/import-controller.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datavolume
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strconv"
24

25
        "github.com/go-logr/logr"
26
        "github.com/pkg/errors"
27

28
        corev1 "k8s.io/api/core/v1"
29
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
        "k8s.io/apimachinery/pkg/types"
32

33
        "sigs.k8s.io/controller-runtime/pkg/controller"
34
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
35
        "sigs.k8s.io/controller-runtime/pkg/handler"
36
        "sigs.k8s.io/controller-runtime/pkg/manager"
37
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
38
        "sigs.k8s.io/controller-runtime/pkg/source"
39

40
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
41
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
42
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
43
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
44
)
45

46
const (
47
        // ImportScheduled provides a const to indicate import is scheduled
48
        ImportScheduled = "ImportScheduled"
49
        // ImportInProgress provides a const to indicate an import is in progress
50
        ImportInProgress = "ImportInProgress"
51
        // ImportFailed provides a const to indicate import has failed
52
        ImportFailed = "ImportFailed"
53
        // ImportSucceeded provides a const to indicate import has succeeded
54
        ImportSucceeded = "ImportSucceeded"
55

56
        // MessageImportScheduled provides a const to form import is scheduled message
57
        MessageImportScheduled = "Import into %s scheduled"
58
        // MessageImportInProgress provides a const to form import is in progress message
59
        MessageImportInProgress = "Import into %s in progress"
60
        // MessageImportFailed provides a const to form import has failed message
61
        MessageImportFailed = "Failed to import into PVC %s"
62
        // MessageImportSucceeded provides a const to form import has succeeded message
63
        MessageImportSucceeded = "Successfully imported into PVC %s"
64

65
        importControllerName = "datavolume-import-controller"
66

67
        volumeImportSourcePrefix = "volume-import-source"
68
)
69

70
// ImportReconciler members
71
type ImportReconciler struct {
72
        ReconcilerBase
73
}
74

75
// NewImportController creates a new instance of the datavolume import controller
76
func NewImportController(
77
        ctx context.Context,
78
        mgr manager.Manager,
79
        log logr.Logger,
80
        installerLabels map[string]string,
81
) (controller.Controller, error) {
×
82
        client := mgr.GetClient()
×
83
        reconciler := &ImportReconciler{
×
84
                ReconcilerBase: ReconcilerBase{
×
85
                        client:               client,
×
86
                        scheme:               mgr.GetScheme(),
×
87
                        log:                  log.WithName(importControllerName),
×
88
                        recorder:             mgr.GetEventRecorderFor(importControllerName),
×
89
                        featureGates:         featuregates.NewFeatureGates(client),
×
90
                        installerLabels:      installerLabels,
×
91
                        shouldUpdateProgress: true,
×
92
                },
×
93
        }
×
94

×
95
        datavolumeController, err := controller.New(importControllerName, mgr, controller.Options{
×
96
                MaxConcurrentReconciles: 3,
×
97
                Reconciler:              reconciler,
×
98
        })
×
99
        if err != nil {
×
100
                return nil, err
×
101
        }
×
102
        if err := addDataVolumeImportControllerWatches(mgr, datavolumeController); err != nil {
×
103
                return nil, err
×
104
        }
×
105

106
        return datavolumeController, nil
×
107
}
108

109
func addDataVolumeImportControllerWatches(mgr manager.Manager, datavolumeController controller.Controller) error {
×
110
        if err := addDataVolumeControllerCommonWatches(mgr, datavolumeController, dataVolumeImport); err != nil {
×
111
                return err
×
112
        }
×
113
        if err := datavolumeController.Watch(source.Kind(mgr.GetCache(), &cdiv1.VolumeImportSource{}, handler.TypedEnqueueRequestForOwner[*cdiv1.VolumeImportSource](
×
114
                mgr.GetScheme(), mgr.GetClient().RESTMapper(), &cdiv1.DataVolume{}, handler.OnlyControllerOwner()))); err != nil {
×
115
                return err
×
116
        }
×
117
        return nil
×
118
}
119

120
func (r *ImportReconciler) updatePVCForPopulation(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
121
        if dataVolume.Spec.Source.HTTP == nil &&
1✔
122
                dataVolume.Spec.Source.S3 == nil &&
1✔
123
                dataVolume.Spec.Source.GCS == nil &&
1✔
124
                dataVolume.Spec.Source.Registry == nil &&
1✔
125
                dataVolume.Spec.Source.Imageio == nil &&
1✔
126
                dataVolume.Spec.Source.VDDK == nil &&
1✔
127
                dataVolume.Spec.Source.Blank == nil {
2✔
128
                return errors.Errorf("no source set for import datavolume")
1✔
129
        }
1✔
130
        if err := cc.AddImmediateBindingAnnotationIfWFFCDisabled(pvc, r.featureGates); err != nil {
1✔
131
                return err
×
132
        }
×
133
        apiGroup := cc.AnnAPIGroup
1✔
134
        pvc.Spec.DataSourceRef = &corev1.TypedObjectReference{
1✔
135
                APIGroup: &apiGroup,
1✔
136
                Kind:     cdiv1.VolumeImportSourceRef,
1✔
137
                Name:     volumeImportSourceName(dataVolume),
1✔
138
        }
1✔
139
        return nil
1✔
140
}
141

142
func (r *ImportReconciler) updateAnnotations(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
1✔
143
        annotations := pvc.Annotations
1✔
144

1✔
145
        if checkpoint := cc.GetNextCheckpoint(pvc, r.getCheckpointArgs(dataVolume)); checkpoint != nil {
2✔
146
                annotations[cc.AnnCurrentCheckpoint] = checkpoint.Current
1✔
147
                annotations[cc.AnnPreviousCheckpoint] = checkpoint.Previous
1✔
148
                annotations[cc.AnnFinalCheckpoint] = strconv.FormatBool(checkpoint.IsFinal)
1✔
149
        }
1✔
150

151
        if http := dataVolume.Spec.Source.HTTP; http != nil {
2✔
152
                cc.UpdateHTTPAnnotations(annotations, http)
1✔
153
                return nil
1✔
154
        }
1✔
155
        if s3 := dataVolume.Spec.Source.S3; s3 != nil {
2✔
156
                cc.UpdateS3Annotations(annotations, s3)
1✔
157
                return nil
1✔
158
        }
1✔
159
        if gcs := dataVolume.Spec.Source.GCS; gcs != nil {
1✔
160
                cc.UpdateGCSAnnotations(annotations, gcs)
×
161
                return nil
×
162
        }
×
163
        if registry := dataVolume.Spec.Source.Registry; registry != nil {
1✔
164
                cc.UpdateRegistryAnnotations(annotations, registry)
×
165
                return nil
×
166
        }
×
167
        if imageio := dataVolume.Spec.Source.Imageio; imageio != nil {
1✔
168
                cc.UpdateImageIOAnnotations(annotations, imageio)
×
169
                return nil
×
170
        }
×
171
        if vddk := dataVolume.Spec.Source.VDDK; vddk != nil {
2✔
172
                cc.UpdateVDDKAnnotations(annotations, vddk)
1✔
173
                return nil
1✔
174
        }
1✔
175
        if dataVolume.Spec.Source.Blank != nil {
2✔
176
                annotations[cc.AnnSource] = cc.SourceNone
1✔
177
                return nil
1✔
178
        }
1✔
179
        return errors.Errorf("no source set for import datavolume")
×
180
}
181

182
// Reconcile loop for the import data volumes
183
func (r *ImportReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
184
        return r.reconcile(ctx, req, r)
1✔
185
}
1✔
186

187
func (r *ImportReconciler) sync(log logr.Logger, req reconcile.Request) (dvSyncResult, error) {
1✔
188
        syncState, err := r.syncImport(log, req)
1✔
189
        if err == nil {
2✔
190
                err = r.syncUpdate(log, &syncState)
1✔
191
        }
1✔
192
        return syncState.dvSyncResult, err
1✔
193
}
194

195
func (r *ImportReconciler) syncImport(log logr.Logger, req reconcile.Request) (dvSyncState, error) {
1✔
196
        syncState, syncErr := r.syncCommon(log, req, r.cleanup, nil)
1✔
197
        if syncErr != nil || syncState.result != nil {
2✔
198
                return syncState, syncErr
1✔
199
        }
1✔
200

201
        pvcModifier := r.updateAnnotations
1✔
202
        if syncState.usePopulator {
2✔
203
                if r.shouldReconcileVolumeSourceCR(&syncState) {
2✔
204
                        err := r.reconcileVolumeImportSourceCR(&syncState)
1✔
205
                        if err != nil {
1✔
206
                                return syncState, err
×
207
                        }
×
208
                }
209
                pvcModifier = r.updatePVCForPopulation
1✔
210
        }
211

212
        if err := r.handlePvcCreation(log, &syncState, pvcModifier); err != nil {
2✔
213
                syncErr = err
1✔
214
        }
1✔
215

216
        if syncState.pvc != nil && syncErr == nil && !syncState.usePopulator {
2✔
217
                r.setVddkAnnotations(&syncState)
1✔
218
                syncErr = cc.MaybeSetPvcMultiStageAnnotation(syncState.pvc, r.getCheckpointArgs(syncState.dvMutated))
1✔
219
        }
1✔
220
        return syncState, syncErr
1✔
221
}
222

223
func (r *ImportReconciler) cleanup(syncState *dvSyncState) error {
1✔
224
        // The cleanup is to delete the volumeImportSourceCR which is used only with populators,
1✔
225
        // it is owner by the DV so will be deleted when dv is deleted
1✔
226
        // also we can already delete once dv is succeeded
1✔
227
        usePopulator, err := checkDVUsingPopulators(syncState.dvMutated)
1✔
228
        if err != nil {
1✔
229
                return err
×
230
        }
×
231
        if usePopulator && !r.shouldReconcileVolumeSourceCR(syncState) {
2✔
232
                return r.deleteVolumeImportSourceCR(syncState)
1✔
233
        }
1✔
234

235
        return nil
1✔
236
}
237

238
func isPVCImportPopulation(pvc *corev1.PersistentVolumeClaim) bool {
1✔
239
        return populators.IsPVCDataSourceRefKind(pvc, cdiv1.VolumeImportSourceRef)
1✔
240
}
1✔
241

242
func (r *ImportReconciler) shouldUpdateStatusPhase(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
1✔
243
        pvcCopy := pvc.DeepCopy()
1✔
244
        requiresWork, err := r.pvcRequiresWork(pvcCopy, dv)
1✔
245
        if err != nil {
1✔
246
                return false, err
×
247
        }
×
248
        if isPVCImportPopulation(pvcCopy) {
2✔
249
                // Better to play it safe and check the PVC Prime too
1✔
250
                // before updating DV phase.
1✔
251
                nn := types.NamespacedName{Namespace: pvcCopy.Namespace, Name: populators.PVCPrimeName(pvcCopy)}
1✔
252
                err := r.client.Get(context.TODO(), nn, pvcCopy)
1✔
253
                if err != nil {
1✔
254
                        if k8serrors.IsNotFound(err) {
×
255
                                return false, nil
×
256
                        }
×
257
                        return false, err
×
258
                }
259
        }
260
        _, ok := pvcCopy.Annotations[cc.AnnImportPod]
1✔
261
        return ok && pvcCopy.Status.Phase == corev1.ClaimBound && requiresWork, nil
1✔
262
}
263

264
func (r *ImportReconciler) updateStatusPhase(pvc *corev1.PersistentVolumeClaim, dataVolumeCopy *cdiv1.DataVolume, event *Event) error {
1✔
265
        phase, ok := pvc.Annotations[cc.AnnPodPhase]
1✔
266
        if phase != string(corev1.PodSucceeded) {
2✔
267
                update, err := r.shouldUpdateStatusPhase(pvc, dataVolumeCopy)
1✔
268
                if !update || err != nil {
2✔
269
                        return err
1✔
270
                }
1✔
271
        }
272
        dataVolumeCopy.Status.Phase = cdiv1.ImportScheduled
1✔
273
        if !ok {
2✔
274
                return nil
1✔
275
        }
1✔
276

277
        switch phase {
1✔
278
        case string(corev1.PodPending):
1✔
279
                // TODO: Use a more generic Scheduled, like maybe TransferScheduled.
1✔
280
                dataVolumeCopy.Status.Phase = cdiv1.ImportScheduled
1✔
281
                event.eventType = corev1.EventTypeNormal
1✔
282
                event.reason = ImportScheduled
1✔
283
                event.message = fmt.Sprintf(MessageImportScheduled, pvc.Name)
1✔
284
        case string(corev1.PodRunning):
1✔
285
                // TODO: Use a more generic In Progess, like maybe TransferInProgress.
1✔
286
                dataVolumeCopy.Status.Phase = cdiv1.ImportInProgress
1✔
287
                event.eventType = corev1.EventTypeNormal
1✔
288
                event.reason = ImportInProgress
1✔
289
                event.message = fmt.Sprintf(MessageImportInProgress, pvc.Name)
1✔
290
        case string(corev1.PodFailed):
1✔
291
                // Only mark as Failed if it's a fatal error (e.g., checksum mismatch).
1✔
292
                // Otherwise, keep the current phase to allow retries.
1✔
293
                if pvc.Annotations[cc.AnnImportFatalError] == "true" {
1✔
NEW
294
                        dataVolumeCopy.Status.Phase = cdiv1.Failed
×
NEW
295
                }
×
296
                event.eventType = corev1.EventTypeWarning
1✔
297
                event.reason = ImportFailed
1✔
298
                event.message = fmt.Sprintf(MessageImportFailed, pvc.Name)
1✔
299
        case string(corev1.PodSucceeded):
1✔
300
                if cc.IsMultiStageImportInProgress(pvc) {
2✔
301
                        // Multi-stage annotations will be updated by import-populator if populators are in use
1✔
302
                        if !isPVCImportPopulation(pvc) {
2✔
303
                                if err := cc.UpdatesMultistageImportSucceeded(pvc, r.getCheckpointArgs(dataVolumeCopy)); err != nil {
1✔
304
                                        return err
×
305
                                }
×
306
                        }
307
                        // this is a multistage import, set the datavolume status to paused
308
                        dataVolumeCopy.Status.Phase = cdiv1.Paused
1✔
309
                        event.eventType = corev1.EventTypeNormal
1✔
310
                        event.reason = cc.ImportPaused
1✔
311
                        event.message = fmt.Sprintf(cc.MessageImportPaused, pvc.Name)
1✔
312
                        break
1✔
313
                }
314
                dataVolumeCopy.Status.Phase = cdiv1.Succeeded
1✔
315
                dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress("100.0%")
1✔
316
                event.eventType = corev1.EventTypeNormal
1✔
317
                event.reason = ImportSucceeded
1✔
318
                event.message = fmt.Sprintf(MessageImportSucceeded, pvc.Name)
1✔
319
        }
320
        return nil
1✔
321
}
322

323
func (r *ImportReconciler) setVddkAnnotations(syncState *dvSyncState) {
1✔
324
        if cc.GetSource(syncState.pvc) != cc.SourceVDDK {
2✔
325
                return
1✔
326
        }
1✔
327
        if vddkHost := syncState.pvc.Annotations[cc.AnnVddkHostConnection]; vddkHost != "" {
2✔
328
                cc.AddAnnotation(syncState.dvMutated, cc.AnnVddkHostConnection, vddkHost)
1✔
329
        }
1✔
330
        if vddkVersion := syncState.pvc.Annotations[cc.AnnVddkVersion]; vddkVersion != "" {
2✔
331
                cc.AddAnnotation(syncState.dvMutated, cc.AnnVddkVersion, vddkVersion)
1✔
332
        }
1✔
333
}
334

335
func (r *ImportReconciler) getCheckpointArgs(dv *cdiv1.DataVolume) *cc.CheckpointArgs {
1✔
336
        return &cc.CheckpointArgs{
1✔
337
                Checkpoints: dv.Spec.Checkpoints,
1✔
338
                IsFinal:     dv.Spec.FinalCheckpoint,
1✔
339
                Client:      r.client,
1✔
340
                Log:         r.log,
1✔
341
        }
1✔
342
}
1✔
343

344
func volumeImportSourceName(dv *cdiv1.DataVolume) string {
1✔
345
        return fmt.Sprintf("%s-%s", volumeImportSourcePrefix, dv.UID)
1✔
346
}
1✔
347

348
func (r *ImportReconciler) reconcileVolumeImportSourceCR(syncState *dvSyncState) error {
1✔
349
        dv := syncState.dvMutated
1✔
350
        importSource := &cdiv1.VolumeImportSource{}
1✔
351
        importSourceName := volumeImportSourceName(dv)
1✔
352
        isMultiStage := dv.Spec.Source != nil && len(dv.Spec.Checkpoints) > 0 &&
1✔
353
                (dv.Spec.Source.VDDK != nil || dv.Spec.Source.Imageio != nil)
1✔
354

1✔
355
        // check if import source already exists
1✔
356
        if exists, err := cc.GetResource(context.TODO(), r.client, dv.Namespace, importSourceName, importSource); err != nil {
1✔
357
                return err
×
358
        } else if exists {
2✔
359
                return r.updateVolumeImportSourceIfNeeded(importSource, dv, isMultiStage)
1✔
360
        }
1✔
361

362
        source := &cdiv1.ImportSourceType{}
1✔
363
        if http := dv.Spec.Source.HTTP; http != nil {
2✔
364
                source.HTTP = http
1✔
365
        } else if s3 := dv.Spec.Source.S3; s3 != nil {
2✔
366
                source.S3 = s3
×
367
        } else if gcs := dv.Spec.Source.GCS; gcs != nil {
1✔
368
                source.GCS = gcs
×
369
        } else if registry := dv.Spec.Source.Registry; registry != nil {
1✔
370
                source.Registry = registry
×
371
        } else if imageio := dv.Spec.Source.Imageio; imageio != nil {
1✔
372
                source.Imageio = imageio
×
373
        } else if vddk := dv.Spec.Source.VDDK; vddk != nil {
1✔
374
                source.VDDK = vddk
×
375
        } else {
1✔
376
                // Our dv shouldn't be without source
1✔
377
                // Defaulting to Blank source
1✔
378
                source.Blank = &cdiv1.DataVolumeBlankImage{}
1✔
379
        }
1✔
380

381
        importSource = &cdiv1.VolumeImportSource{
1✔
382
                ObjectMeta: metav1.ObjectMeta{
1✔
383
                        Name:      importSourceName,
1✔
384
                        Namespace: dv.Namespace,
1✔
385
                },
1✔
386
                Spec: cdiv1.VolumeImportSourceSpec{
1✔
387
                        Source:        source,
1✔
388
                        ContentType:   dv.Spec.ContentType,
1✔
389
                        Preallocation: dv.Spec.Preallocation,
1✔
390
                },
1✔
391
        }
1✔
392

1✔
393
        if isMultiStage {
1✔
394
                importSource.Spec.TargetClaim = &dv.Name
×
395
                importSource.Spec.Checkpoints = dv.Spec.Checkpoints
×
396
                importSource.Spec.FinalCheckpoint = &dv.Spec.FinalCheckpoint
×
397
        }
×
398

399
        if err := controllerutil.SetControllerReference(dv, importSource, r.scheme); err != nil {
1✔
400
                return err
×
401
        }
×
402

403
        if err := r.client.Create(context.TODO(), importSource); err != nil {
1✔
404
                if !k8serrors.IsAlreadyExists(err) {
×
405
                        return err
×
406
                }
×
407
        }
408
        return nil
1✔
409
}
410

411
func (r *ImportReconciler) deleteVolumeImportSourceCR(syncState *dvSyncState) error {
1✔
412
        importSourceName := volumeImportSourceName(syncState.dvMutated)
1✔
413
        importSource := &cdiv1.VolumeImportSource{}
1✔
414
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: importSourceName, Namespace: syncState.dvMutated.Namespace}, importSource); err != nil {
1✔
415
                if !k8serrors.IsNotFound(err) {
×
416
                        return err
×
417
                }
×
418
        } else {
1✔
419
                if err := r.client.Delete(context.TODO(), importSource); err != nil {
1✔
420
                        if !k8serrors.IsNotFound(err) {
×
421
                                return err
×
422
                        }
×
423
                }
424
        }
425

426
        return nil
1✔
427
}
428

429
func (r *ImportReconciler) updateVolumeImportSourceIfNeeded(source *cdiv1.VolumeImportSource, dv *cdiv1.DataVolume, isMultiStage bool) error {
1✔
430
        // Updates are only needed in multistage imports
1✔
431
        if !isMultiStage {
2✔
432
                return nil
1✔
433
        }
1✔
434

435
        // Unchanged checkpoint API, no update needed
436
        finalCheckpoint := false
×
437
        if source.Spec.FinalCheckpoint != nil {
×
438
                finalCheckpoint = *source.Spec.FinalCheckpoint
×
439
        }
×
440
        if reflect.DeepEqual(source.Spec.Checkpoints, dv.Spec.Checkpoints) &&
×
441
                finalCheckpoint == dv.Spec.FinalCheckpoint {
×
442
                return nil
×
443
        }
×
444

445
        source.Spec.Checkpoints = dv.Spec.Checkpoints
×
446
        source.Spec.FinalCheckpoint = &dv.Spec.FinalCheckpoint
×
447
        return r.client.Update(context.TODO(), source)
×
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc