• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5267

29 Apr 2025 10:55AM UTC coverage: 59.216% (+0.04%) from 59.18%
#5267

Pull #3712

travis-ci

Acedus
http-datasource: validate image size before transfer

Leverage nbdkit to make a best effort check of whether the effective
size of the target VM image is bigger than available scratch space and
return early if it is.

Signed-off-by: Adi Aloni <aaloni@redhat.com>
Pull Request #3712: http-datasource: validate image size before transfer

25 of 28 new or added lines in 3 files covered. (89.29%)

27 existing lines in 2 files now uncovered.

16857 of 28467 relevant lines covered (59.22%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.03
/pkg/importer/data-processor.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package importer
18

19
import (
20
        "net/url"
21
        "os"
22

23
        "github.com/pkg/errors"
24

25
        "k8s.io/apimachinery/pkg/api/resource"
26
        "k8s.io/klog/v2"
27

28
        "kubevirt.io/containerized-data-importer/pkg/common"
29
        "kubevirt.io/containerized-data-importer/pkg/image"
30
        "kubevirt.io/containerized-data-importer/pkg/util"
31
)
32

33
var qemuOperations = image.NewQEMUOperations()
34

35
// ProcessingPhase is the current phase being processed.
36
type ProcessingPhase string
37

38
const (
39
        // ProcessingPhaseInfo is the first phase, during this phase the source obtains information needed to determine which phase to go to next.
40
        ProcessingPhaseInfo ProcessingPhase = "Info"
41
        // ProcessingPhaseTransferScratch is the phase in which the data source writes data to the scratch space.
42
        ProcessingPhaseTransferScratch ProcessingPhase = "TransferScratch"
43
        // ProcessingPhaseTransferDataDir is the phase in which the data source writes data directly to the target path without conversion.
44
        ProcessingPhaseTransferDataDir ProcessingPhase = "TransferDataDir"
45
        // ProcessingPhaseTransferDataFile is the phase in which the data source writes data directly to the target file without conversion.
46
        ProcessingPhaseTransferDataFile ProcessingPhase = "TransferDataFile"
47
        // ProcessingPhaseValidatePause is the phase in which the data processor should validate and then pause.
48
        ProcessingPhaseValidatePause ProcessingPhase = "ValidatePause"
49
        // ProcessingPhaseValidateScratch is the phase in which the data processor should validate and then pause.
50
        ProcessingPhaseValidateScratch ProcessingPhase = "ValidateScratch"
51
        // ProcessingPhaseConvert is the phase in which the data is taken from the url provided by the source, and it is converted to the target RAW disk image format.
52
        // The url can be an http end point or file system end point.
53
        ProcessingPhaseConvert ProcessingPhase = "Convert"
54
        // ProcessingPhaseResize the disk image, this is only needed when the target contains a file system (block device do not need a resize)
55
        ProcessingPhaseResize ProcessingPhase = "Resize"
56
        // ProcessingPhaseComplete is the phase where the entire process completed successfully and we can exit gracefully.
57
        ProcessingPhaseComplete ProcessingPhase = "Complete"
58
        // ProcessingPhasePause is the phase where we pause processing and end the loop, and expect something to call the process loop again.
59
        ProcessingPhasePause ProcessingPhase = "Pause"
60
        // ProcessingPhaseError is the phase in which we encountered an error and need to exit ungracefully.
61
        ProcessingPhaseError ProcessingPhase = common.GenericError
62
        // ProcessingPhaseMergeDelta is the phase in a multi-stage import where a delta image downloaded to scratch is applied to the base image
63
        ProcessingPhaseMergeDelta ProcessingPhase = "MergeDelta"
64
)
65

66
// may be overridden in tests
67
var getAvailableSpaceBlockFunc = util.GetAvailableSpaceBlock
68
var getAvailableSpaceFunc = util.GetAvailableSpace
69

70
// DataSourceInterface is the interface all data sources should implement.
71
type DataSourceInterface interface {
72
        // Info is called to get initial information about the data.
73
        Info() (ProcessingPhase, error)
74
        // Transfer is called to transfer the data from the source to the path passed in.
75
        Transfer(path string) (ProcessingPhase, error)
76
        // TransferFile is called to transfer the data from the source to the file passed in.
77
        TransferFile(fileName string) (ProcessingPhase, error)
78
        // Geturl returns the url that the data processor can use when converting the data.
79
        GetURL() *url.URL
80
        // GetTerminationMessage returns data to be serialized and used as the termination message of the importer.
81
        GetTerminationMessage() *common.TerminationMessage
82
        // Close closes any readers or other open resources.
83
        Close() error
84
}
85

86
// ResumableDataSource is the interface all resumeable data sources should implement
87
type ResumableDataSource interface {
88
        DataSourceInterface
89
        GetResumePhase() ProcessingPhase
90
}
91

92
// DataProcessor holds the fields needed to process data from a data provider.
93
type DataProcessor struct {
94
        // currentPhase is the phase the processing is in currently.
95
        currentPhase ProcessingPhase
96
        // provider provides the data for processing.
97
        source DataSourceInterface
98
        // destination file. will be DataDir/disk.img if file system, or a block device (if a block device, then DataDir will not exist).
99
        dataFile string
100
        // dataDir path to target directory if it contains a file system.
101
        dataDir string
102
        // scratchDataDir path to the scratch space.
103
        scratchDataDir string
104
        // requestImageSize is the size we want the resulting image to be.
105
        requestImageSize string
106
        // available space is the available space before downloading the image
107
        availableSpace int64
108
        // storage overhead is the amount of overhead of the storage used
109
        filesystemOverhead float64
110
        // preallocation is the flag controlling preallocation setting of qemu-img
111
        preallocation bool
112
        // preallocationApplied is used to pass information whether preallocation has been performed, or not
113
        preallocationApplied bool
114
        // phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error.
115
        phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error)
116
        // cacheMode is the mode in which we choose the qemu-img cache mode:
117
        // TRY_NONE = bypass page cache if the target supports it, otherwise, fall back to using page cache
118
        cacheMode string
119
}
120

121
// NewDataProcessor create a new instance of a data processor using the passed in data provider.
122
func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool, cacheMode string) *DataProcessor {
1✔
123
        dp := &DataProcessor{
1✔
124
                currentPhase:       ProcessingPhaseInfo,
1✔
125
                source:             dataSource,
1✔
126
                dataFile:           dataFile,
1✔
127
                dataDir:            dataDir,
1✔
128
                scratchDataDir:     scratchDataDir,
1✔
129
                requestImageSize:   requestImageSize,
1✔
130
                filesystemOverhead: filesystemOverhead,
1✔
131
                preallocation:      preallocation,
1✔
132
                cacheMode:          cacheMode,
1✔
133
        }
1✔
134
        // Calculate available space before doing anything.
1✔
135
        dp.availableSpace = dp.calculateTargetSize()
1✔
136
        dp.initDefaultPhases()
1✔
137
        return dp
1✔
138
}
1✔
139

140
// RegisterPhaseExecutor registers an execution function for the given phase.
141
// If there is already an function registered, override it with the new function.
142
func (dp *DataProcessor) RegisterPhaseExecutor(pp ProcessingPhase, executor func() (ProcessingPhase, error)) {
1✔
143
        if _, ok := dp.phaseExecutors[pp]; ok {
1✔
144
                klog.Warningf("Executor already exists at phase %s. Override it.", pp)
×
145
        }
×
146
        dp.phaseExecutors[pp] = executor
1✔
147
}
148

149
// ProcessData is the main synchronous processing loop
150
func (dp *DataProcessor) ProcessData() error {
1✔
151
        return dp.ProcessDataWithPause()
1✔
152
}
1✔
153

154
// ProcessDataResume Resume a paused processor, assumes the provided data source is ResumableDataSource
155
func (dp *DataProcessor) ProcessDataResume() error {
1✔
156
        rds, ok := dp.source.(ResumableDataSource)
1✔
157
        if !ok {
2✔
158
                return errors.New("Datasource not resumable")
1✔
159
        }
1✔
160
        klog.Infof("Resuming processing at phase %s", rds.GetResumePhase())
1✔
161
        dp.currentPhase = rds.GetResumePhase()
1✔
162
        return dp.ProcessDataWithPause()
1✔
163
}
164

165
func (dp *DataProcessor) initDefaultPhases() {
1✔
166
        dp.phaseExecutors = make(map[ProcessingPhase]func() (ProcessingPhase, error))
1✔
167
        dp.RegisterPhaseExecutor(ProcessingPhaseInfo, func() (ProcessingPhase, error) {
2✔
168
                pp, err := dp.source.Info()
1✔
169
                if err != nil {
1✔
170
                        err = errors.Wrap(err, "Unable to obtain information about data source")
×
171
                }
×
172
                return pp, err
1✔
173
        })
174
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferScratch, func() (ProcessingPhase, error) {
2✔
175
                pp, err := dp.source.Transfer(dp.scratchDataDir)
1✔
176
                if errors.Is(err, ErrInvalidPath) {
2✔
177
                        // Passed in invalid scratch space path, return scratch space needed error.
1✔
178
                        err = ErrRequiresScratchSpace
1✔
179
                } else if err != nil {
3✔
180
                        err = errors.Wrap(err, "Unable to transfer source data to scratch space")
1✔
181
                }
1✔
182
                return pp, err
1✔
183
        })
184
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataDir, func() (ProcessingPhase, error) {
2✔
185
                pp, err := dp.source.Transfer(dp.dataDir)
1✔
186
                if err != nil {
1✔
187
                        err = errors.Wrap(err, "Unable to transfer source data to target directory")
×
188
                }
×
189
                return pp, err
1✔
190
        })
191
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataFile, func() (ProcessingPhase, error) {
2✔
192
                pp, err := dp.source.TransferFile(dp.dataFile)
1✔
193
                if err != nil {
2✔
194
                        err = errors.Wrap(err, "Unable to transfer source data to target file")
1✔
195
                }
1✔
196
                return pp, err
1✔
197
        })
198
        dp.RegisterPhaseExecutor(ProcessingPhaseValidatePause, func() (ProcessingPhase, error) {
1✔
199
                pp := ProcessingPhasePause
×
200
                err := dp.validate(dp.source.GetURL())
×
201
                if err != nil {
×
202
                        pp = ProcessingPhaseError
×
203
                }
×
204
                return pp, err
×
205
        })
206
        dp.RegisterPhaseExecutor(ProcessingPhaseValidateScratch, func() (ProcessingPhase, error) {
2✔
207
                pp := ProcessingPhaseTransferScratch
1✔
208
                err := dp.validate(dp.source.GetURL())
1✔
209
                if err != nil && errors.Is(err, ValidationSizeError{image.ErrLargerPVCRequired}) {
2✔
210
                        pp = ProcessingPhaseError
1✔
211
                }
1✔
212
                return pp, err
1✔
213
        })
214
        dp.RegisterPhaseExecutor(ProcessingPhaseConvert, func() (ProcessingPhase, error) {
2✔
215
                pp, err := dp.convert(dp.source.GetURL())
1✔
216
                if err != nil {
1✔
UNCOV
217
                        err = errors.Wrap(err, "Unable to convert source data to target format")
×
UNCOV
218
                }
×
219
                return pp, err
1✔
220
        })
221
        dp.RegisterPhaseExecutor(ProcessingPhaseResize, func() (ProcessingPhase, error) {
2✔
222
                pp, err := dp.resize()
1✔
223
                if err != nil {
1✔
UNCOV
224
                        err = errors.Wrap(err, "Unable to resize disk image to requested size")
×
UNCOV
225
                }
×
226
                return pp, err
1✔
227
        })
228
        dp.RegisterPhaseExecutor(ProcessingPhaseMergeDelta, func() (ProcessingPhase, error) {
2✔
229
                pp, err := dp.merge()
1✔
230
                if err != nil {
1✔
UNCOV
231
                        err = errors.Wrap(err, "Unable to apply delta to base image")
×
UNCOV
232
                }
×
233
                return pp, err
1✔
234
        })
235
}
236

237
// ProcessDataWithPause is the main processing loop.
238
func (dp *DataProcessor) ProcessDataWithPause() error {
1✔
239
        visited := make(map[ProcessingPhase]bool, len(dp.phaseExecutors))
1✔
240
        for dp.currentPhase != ProcessingPhaseComplete && dp.currentPhase != ProcessingPhasePause {
2✔
241
                if visited[dp.currentPhase] {
2✔
242
                        err := errors.Errorf("loop detected on phase %s", dp.currentPhase)
1✔
243
                        klog.Errorf("%+v", err)
1✔
244
                        return err
1✔
245
                }
1✔
246
                executor, ok := dp.phaseExecutors[dp.currentPhase]
1✔
247
                if !ok {
2✔
248
                        return errors.Errorf("Unknown processing phase %s", dp.currentPhase)
1✔
249
                }
1✔
250
                nextPhase, err := executor()
1✔
251
                visited[dp.currentPhase] = true
1✔
252
                if err != nil {
2✔
253
                        klog.Errorf("%+v", err)
1✔
254
                        return err
1✔
255
                }
1✔
256
                dp.currentPhase = nextPhase
1✔
257
                klog.V(1).Infof("New phase: %s\n", dp.currentPhase)
1✔
258
        }
259
        return nil
1✔
260
}
261

262
func (dp *DataProcessor) validate(url *url.URL) error {
1✔
263
        klog.V(1).Infoln("Validating image")
1✔
264
        err := qemuOperations.Validate(url, dp.availableSpace)
1✔
265
        if err != nil {
2✔
266
                return ValidationSizeError{err: err}
1✔
267
        }
1✔
268
        return nil
1✔
269
}
270

271
// convert is called when convert the image from the url to a RAW disk image. Source formats include RAW/QCOW2 (Raw to raw conversion is a copy)
272
func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) {
1✔
273
        err := dp.validate(url)
1✔
274
        if err != nil {
2✔
275
                return ProcessingPhaseError, err
1✔
276
        }
1✔
277
        err = CleanAll(dp.dataFile)
1✔
278
        if err != nil {
1✔
UNCOV
279
                return ProcessingPhaseError, err
×
UNCOV
280
        }
×
281
        klog.V(3).Infoln("Converting to Raw")
1✔
282
        err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation, dp.cacheMode)
1✔
283
        if err != nil {
2✔
284
                return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed")
1✔
285
        }
1✔
286
        dp.preallocationApplied = dp.preallocation
1✔
287

1✔
288
        return ProcessingPhaseResize, nil
1✔
289
}
290

291
func (dp *DataProcessor) resize() (ProcessingPhase, error) {
1✔
292
        size, _ := getAvailableSpaceBlockFunc(dp.dataFile)
1✔
293
        klog.V(3).Infof("Available space in dataFile: %d", size)
1✔
294
        isBlockDev := size >= int64(0)
1✔
295
        if !isBlockDev {
2✔
296
                if dp.requestImageSize != "" {
2✔
297
                        klog.V(3).Infoln("Resizing image")
1✔
298
                        err := ResizeImage(dp.dataFile, dp.requestImageSize, dp.getUsableSpace(), dp.preallocation)
1✔
299
                        if err != nil {
2✔
300
                                return ProcessingPhaseError, errors.Wrap(err, "Resize of image failed")
1✔
301
                        }
1✔
302
                }
303
                // Validate that a sparse file will fit even as it fills out.
304
                dataFileURL, err := url.Parse(dp.dataFile)
1✔
305
                if err != nil {
1✔
UNCOV
306
                        return ProcessingPhaseError, err
×
UNCOV
307
                }
×
308
                err = dp.validate(dataFileURL)
1✔
309
                if err != nil {
1✔
310
                        return ProcessingPhaseError, err
×
UNCOV
311
                }
×
312
        }
313
        dp.preallocationApplied = dp.preallocation
1✔
314
        if dp.dataFile != "" && !isBlockDev {
2✔
315
                // Change permissions to 0660
1✔
316
                err := os.Chmod(dp.dataFile, 0660)
1✔
317
                if err != nil {
1✔
UNCOV
318
                        return ProcessingPhaseError, errors.Wrap(err, "Unable to change permissions of target file")
×
UNCOV
319
                }
×
320
        }
321

322
        return ProcessingPhaseComplete, nil
1✔
323
}
324

325
// ResizeImage resizes the images to match the requested size. Sometimes provisioners misbehave and the available space
326
// is not the same as the requested space. For those situations we compare the available space to the requested space and
327
// use the smallest of the two values.
328
func ResizeImage(dataFile, imageSize string, totalTargetSpace int64, preallocation bool) error {
1✔
329
        dataFileURL, _ := url.Parse(dataFile)
1✔
330
        info, err := qemuOperations.Info(dataFileURL)
1✔
331
        if err != nil {
2✔
332
                return err
1✔
333
        }
1✔
334
        if imageSize != "" {
2✔
335
                currentImageSizeQuantity := resource.NewScaledQuantity(info.VirtualSize, 0)
1✔
336
                newImageSizeQuantity := resource.MustParse(imageSize)
1✔
337
                minSizeQuantity := util.MinQuantity(resource.NewScaledQuantity(totalTargetSpace, 0), &newImageSizeQuantity)
1✔
338
                if minSizeQuantity.Cmp(newImageSizeQuantity) != 0 {
2✔
339
                        // Available destination space is smaller than the size we want to resize to
1✔
340
                        klog.Warningf("Available space less than requested size, resizing image to available space %s.\n", minSizeQuantity.String())
1✔
341
                }
1✔
342
                if currentImageSizeQuantity.Cmp(minSizeQuantity) == 0 {
2✔
343
                        klog.V(1).Infof("No need to resize image. Requested size: %s, Image size: %d.\n", imageSize, info.VirtualSize)
1✔
344
                        return nil
1✔
345
                }
1✔
346
                // Check if calculated size is < imageSize, and return error if so.
347
                if currentImageSizeQuantity.Cmp(minSizeQuantity) == 1 {
1✔
UNCOV
348
                        klog.V(1).Infof("Calculated new size is < than current size, not resizing: requested size %s, virtual size: %d.\n", minSizeQuantity.String(), info.VirtualSize)
×
UNCOV
349
                        return nil
×
UNCOV
350
                }
×
351
                klog.V(1).Infof("Expanding image size to: %s\n", minSizeQuantity.String())
1✔
352
                return qemuOperations.Resize(dataFile, minSizeQuantity, preallocation)
1✔
353
        }
354
        return errors.New("Image resize called with blank resize")
1✔
355
}
356

357
func (dp *DataProcessor) calculateTargetSize() int64 {
1✔
358
        klog.V(1).Infof("Calculating available size\n")
1✔
359
        var targetQuantity *resource.Quantity
1✔
360
        size, err := getAvailableSpaceBlockFunc(dp.dataFile)
1✔
361
        if err != nil {
2✔
362
                klog.Error(err)
1✔
363
        }
1✔
364
        if size >= int64(0) {
2✔
365
                // Block volume.
1✔
366
                klog.V(1).Infof("Checking out block volume size.\n")
1✔
367
                targetQuantity = resource.NewScaledQuantity(size, 0)
1✔
368
        } else {
2✔
369
                // File system volume.
1✔
370
                klog.V(1).Infof("Checking out file system volume size.\n")
1✔
371
                size, err := getAvailableSpaceFunc(dp.dataDir)
1✔
372
                if err != nil {
2✔
373
                        klog.Error(err)
1✔
374
                }
1✔
375
                targetQuantity = resource.NewScaledQuantity(size, 0)
1✔
376
                if dp.requestImageSize != "" {
2✔
377
                        klog.V(1).Infof("Request image size not empty.\n")
1✔
378
                        newImageSizeQuantity := resource.MustParse(dp.requestImageSize)
1✔
379
                        minQuantity := util.MinQuantity(targetQuantity, &newImageSizeQuantity)
1✔
380
                        targetQuantity = &minQuantity
1✔
381
                }
1✔
382
        }
383
        klog.V(1).Infof("Target size %s.\n", targetQuantity.String())
1✔
384
        targetSize := targetQuantity.Value()
1✔
385
        return targetSize
1✔
386
}
387

388
// PreallocationApplied returns true if data processing path included preallocation step
UNCOV
389
func (dp *DataProcessor) PreallocationApplied() bool {
×
UNCOV
390
        return dp.preallocationApplied
×
UNCOV
391
}
×
392

393
func (dp *DataProcessor) getUsableSpace() int64 {
1✔
394
        return util.GetUsableSpace(dp.filesystemOverhead, dp.availableSpace)
1✔
395
}
1✔
396

397
// Rebase and commit a delta image to its backing file
398
func (dp *DataProcessor) merge() (ProcessingPhase, error) {
1✔
399
        klog.V(1).Info("Merging QCOW to base image.")
1✔
400
        imageURL := dp.source.GetURL()
1✔
401
        if imageURL == nil {
1✔
UNCOV
402
                return ProcessingPhaseError, errors.New("bad URL in data source")
×
UNCOV
403
        }
×
404
        if err := qemuOperations.Rebase(dp.dataFile, imageURL.String()); err != nil {
1✔
405
                return ProcessingPhaseError, errors.Wrap(err, "error rebasing image")
×
406
        }
×
407
        if err := qemuOperations.Commit(imageURL.String()); err != nil {
1✔
408
                return ProcessingPhaseError, errors.Wrap(err, "error committing image")
×
409
        }
×
410
        return ProcessingPhaseComplete, nil
1✔
411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc