• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5266

29 Apr 2025 10:51AM UTC coverage: 59.237% (+0.06%) from 59.18%
#5266

Pull #3712

travis-ci

Acedus
http-datasource: validate image size before transfer

Leverage nbdkit to make a best effort check of whether the effective
size of the target VM image is bigger than available scratch space and
return early if it is.

Signed-off-by: Adi Aloni <aaloni@redhat.com>
Pull Request #3712: http-datasource: validate image size before transfer

18 of 24 new or added lines in 3 files covered. (75.0%)

87 existing lines in 2 files now uncovered.

16857 of 28457 relevant lines covered (59.24%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.55
/pkg/importer/data-processor.go
1
/*
2
Copyright 2018 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package importer
18

19
import (
20
        "net/url"
21
        "os"
22

23
        "github.com/pkg/errors"
24

25
        "k8s.io/apimachinery/pkg/api/resource"
26
        "k8s.io/klog/v2"
27

28
        "kubevirt.io/containerized-data-importer/pkg/common"
29
        "kubevirt.io/containerized-data-importer/pkg/image"
30
        "kubevirt.io/containerized-data-importer/pkg/util"
31
)
32

33
var qemuOperations = image.NewQEMUOperations()
34

35
// ProcessingPhase is the current phase being processed.
36
type ProcessingPhase string
37

38
const (
39
        // ProcessingPhaseInfo is the first phase, during this phase the source obtains information needed to determine which phase to go to next.
40
        ProcessingPhaseInfo ProcessingPhase = "Info"
41
        // ProcessingPhaseTransferScratch is the phase in which the data source writes data to the scratch space.
42
        ProcessingPhaseTransferScratch ProcessingPhase = "TransferScratch"
43
        // ProcessingPhaseTransferDataDir is the phase in which the data source writes data directly to the target path without conversion.
44
        ProcessingPhaseTransferDataDir ProcessingPhase = "TransferDataDir"
45
        // ProcessingPhaseTransferDataFile is the phase in which the data source writes data directly to the target file without conversion.
46
        ProcessingPhaseTransferDataFile ProcessingPhase = "TransferDataFile"
47
        // ProcessingPhaseValidatePause is the phase in which the data processor should validate and then pause.
48
        ProcessingPhaseValidatePause ProcessingPhase = "ValidatePause"
49
        // ProcessingPhaseConvert is the phase in which the data is taken from the url provided by the source, and it is converted to the target RAW disk image format.
50
        // The url can be an http end point or file system end point.
51
        ProcessingPhaseConvert ProcessingPhase = "Convert"
52
        // ProcessingPhaseResize the disk image, this is only needed when the target contains a file system (block device do not need a resize)
53
        ProcessingPhaseResize ProcessingPhase = "Resize"
54
        // ProcessingPhaseComplete is the phase where the entire process completed successfully and we can exit gracefully.
55
        ProcessingPhaseComplete ProcessingPhase = "Complete"
56
        // ProcessingPhasePause is the phase where we pause processing and end the loop, and expect something to call the process loop again.
57
        ProcessingPhasePause ProcessingPhase = "Pause"
58
        // ProcessingPhaseError is the phase in which we encountered an error and need to exit ungracefully.
59
        ProcessingPhaseError ProcessingPhase = common.GenericError
60
        // ProcessingPhaseMergeDelta is the phase in a multi-stage import where a delta image downloaded to scratch is applied to the base image
61
        ProcessingPhaseMergeDelta ProcessingPhase = "MergeDelta"
62
)
63

64
// may be overridden in tests
65
var getAvailableSpaceBlockFunc = util.GetAvailableSpaceBlock
66
var getAvailableSpaceFunc = util.GetAvailableSpace
67

68
// DataSourceInterface is the interface all data sources should implement.
69
type DataSourceInterface interface {
70
        // Info is called to get initial information about the data.
71
        Info() (ProcessingPhase, error)
72
        // Transfer is called to transfer the data from the source to the path passed in.
73
        Transfer(path string) (ProcessingPhase, error)
74
        // TransferFile is called to transfer the data from the source to the file passed in.
75
        TransferFile(fileName string) (ProcessingPhase, error)
76
        // Geturl returns the url that the data processor can use when converting the data.
77
        GetURL() *url.URL
78
        // GetTerminationMessage returns data to be serialized and used as the termination message of the importer.
79
        GetTerminationMessage() *common.TerminationMessage
80
        // Close closes any readers or other open resources.
81
        Close() error
82
}
83

84
// ResumableDataSource is the interface all resumeable data sources should implement
85
type ResumableDataSource interface {
86
        DataSourceInterface
87
        GetResumePhase() ProcessingPhase
88
}
89

90
// DataProcessor holds the fields needed to process data from a data provider.
91
type DataProcessor struct {
92
        // currentPhase is the phase the processing is in currently.
93
        currentPhase ProcessingPhase
94
        // provider provides the data for processing.
95
        source DataSourceInterface
96
        // destination file. will be DataDir/disk.img if file system, or a block device (if a block device, then DataDir will not exist).
97
        dataFile string
98
        // dataDir path to target directory if it contains a file system.
99
        dataDir string
100
        // scratchDataDir path to the scratch space.
101
        scratchDataDir string
102
        // requestImageSize is the size we want the resulting image to be.
103
        requestImageSize string
104
        // available space is the available space before downloading the image
105
        availableSpace int64
106
        // storage overhead is the amount of overhead of the storage used
107
        filesystemOverhead float64
108
        // preallocation is the flag controlling preallocation setting of qemu-img
109
        preallocation bool
110
        // preallocationApplied is used to pass information whether preallocation has been performed, or not
111
        preallocationApplied bool
112
        // phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error.
113
        phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error)
114
        // cacheMode is the mode in which we choose the qemu-img cache mode:
115
        // TRY_NONE = bypass page cache if the target supports it, otherwise, fall back to using page cache
116
        cacheMode string
117
}
118

119
// NewDataProcessor create a new instance of a data processor using the passed in data provider.
120
func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool, cacheMode string) *DataProcessor {
1✔
121
        dp := &DataProcessor{
1✔
122
                currentPhase:       ProcessingPhaseInfo,
1✔
123
                source:             dataSource,
1✔
124
                dataFile:           dataFile,
1✔
125
                dataDir:            dataDir,
1✔
126
                scratchDataDir:     scratchDataDir,
1✔
127
                requestImageSize:   requestImageSize,
1✔
128
                filesystemOverhead: filesystemOverhead,
1✔
129
                preallocation:      preallocation,
1✔
130
                cacheMode:          cacheMode,
1✔
131
        }
1✔
132
        // Calculate available space before doing anything.
1✔
133
        dp.availableSpace = dp.calculateTargetSize()
1✔
134
        dp.initDefaultPhases()
1✔
135
        return dp
1✔
136
}
1✔
137

138
// RegisterPhaseExecutor registers an execution function for the given phase.
139
// If there is already an function registered, override it with the new function.
140
func (dp *DataProcessor) RegisterPhaseExecutor(pp ProcessingPhase, executor func() (ProcessingPhase, error)) {
1✔
141
        if _, ok := dp.phaseExecutors[pp]; ok {
1✔
UNCOV
142
                klog.Warningf("Executor already exists at phase %s. Override it.", pp)
×
UNCOV
143
        }
×
144
        dp.phaseExecutors[pp] = executor
1✔
145
}
146

147
// ProcessData is the main synchronous processing loop
148
func (dp *DataProcessor) ProcessData() error {
1✔
149
        return dp.ProcessDataWithPause()
1✔
150
}
1✔
151

152
// ProcessDataResume Resume a paused processor, assumes the provided data source is ResumableDataSource
153
func (dp *DataProcessor) ProcessDataResume() error {
1✔
154
        rds, ok := dp.source.(ResumableDataSource)
1✔
155
        if !ok {
2✔
156
                return errors.New("Datasource not resumable")
1✔
157
        }
1✔
158
        klog.Infof("Resuming processing at phase %s", rds.GetResumePhase())
1✔
159
        dp.currentPhase = rds.GetResumePhase()
1✔
160
        return dp.ProcessDataWithPause()
1✔
161
}
162

163
func (dp *DataProcessor) initDefaultPhases() {
1✔
164
        dp.phaseExecutors = make(map[ProcessingPhase]func() (ProcessingPhase, error))
1✔
165
        dp.RegisterPhaseExecutor(ProcessingPhaseInfo, func() (ProcessingPhase, error) {
2✔
166
                pp, err := dp.source.Info()
1✔
167
                if err != nil {
1✔
UNCOV
168
                        err = errors.Wrap(err, "Unable to obtain information about data source")
×
UNCOV
169
                }
×
170
                return pp, err
1✔
171
        })
172
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferScratch, func() (ProcessingPhase, error) {
2✔
173
                pp, err := dp.source.Transfer(dp.scratchDataDir)
1✔
174
                if errors.Is(err, ErrInvalidPath) {
2✔
175
                        // Passed in invalid scratch space path, return scratch space needed error.
1✔
176
                        err = ErrRequiresScratchSpace
1✔
177
                } else if err != nil {
3✔
178
                        err = errors.Wrap(err, "Unable to transfer source data to scratch space")
1✔
179
                }
1✔
180
                return pp, err
1✔
181
        })
182
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataDir, func() (ProcessingPhase, error) {
2✔
183
                pp, err := dp.source.Transfer(dp.dataDir)
1✔
184
                if err != nil {
1✔
UNCOV
185
                        err = errors.Wrap(err, "Unable to transfer source data to target directory")
×
UNCOV
186
                }
×
187
                return pp, err
1✔
188
        })
189
        dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataFile, func() (ProcessingPhase, error) {
2✔
190
                pp, err := dp.source.TransferFile(dp.dataFile)
1✔
191
                if err != nil {
2✔
192
                        err = errors.Wrap(err, "Unable to transfer source data to target file")
1✔
193
                }
1✔
194
                return pp, err
1✔
195
        })
196
        dp.RegisterPhaseExecutor(ProcessingPhaseValidatePause, func() (ProcessingPhase, error) {
1✔
UNCOV
197
                pp := ProcessingPhasePause
×
UNCOV
198
                err := dp.validate(dp.source.GetURL())
×
199
                if err != nil {
×
200
                        pp = ProcessingPhaseError
×
201
                }
×
202
                return pp, err
×
203
        })
204
        dp.RegisterPhaseExecutor(ProcessingPhaseConvert, func() (ProcessingPhase, error) {
2✔
205
                pp, err := dp.convert(dp.source.GetURL())
1✔
206
                if err != nil {
1✔
NEW
207
                        err = errors.Wrap(err, "Unable to convert source data to target format")
×
NEW
208
                }
×
209
                return pp, err
1✔
210
        })
211
        dp.RegisterPhaseExecutor(ProcessingPhaseResize, func() (ProcessingPhase, error) {
2✔
212
                pp, err := dp.resize()
1✔
213
                if err != nil {
1✔
UNCOV
214
                        err = errors.Wrap(err, "Unable to resize disk image to requested size")
×
UNCOV
215
                }
×
216
                return pp, err
1✔
217
        })
218
        dp.RegisterPhaseExecutor(ProcessingPhaseMergeDelta, func() (ProcessingPhase, error) {
2✔
219
                pp, err := dp.merge()
1✔
220
                if err != nil {
1✔
UNCOV
221
                        err = errors.Wrap(err, "Unable to apply delta to base image")
×
UNCOV
222
                }
×
223
                return pp, err
1✔
224
        })
225
}
226

227
// ProcessDataWithPause is the main processing loop.
228
func (dp *DataProcessor) ProcessDataWithPause() error {
1✔
229
        visited := make(map[ProcessingPhase]bool, len(dp.phaseExecutors))
1✔
230
        for dp.currentPhase != ProcessingPhaseComplete && dp.currentPhase != ProcessingPhasePause {
2✔
231
                if visited[dp.currentPhase] {
2✔
232
                        err := errors.Errorf("loop detected on phase %s", dp.currentPhase)
1✔
233
                        klog.Errorf("%+v", err)
1✔
234
                        return err
1✔
235
                }
1✔
236
                executor, ok := dp.phaseExecutors[dp.currentPhase]
1✔
237
                if !ok {
2✔
238
                        return errors.Errorf("Unknown processing phase %s", dp.currentPhase)
1✔
239
                }
1✔
240
                nextPhase, err := executor()
1✔
241
                visited[dp.currentPhase] = true
1✔
242
                if err != nil {
2✔
243
                        klog.Errorf("%+v", err)
1✔
244
                        return err
1✔
245
                }
1✔
246
                dp.currentPhase = nextPhase
1✔
247
                klog.V(1).Infof("New phase: %s\n", dp.currentPhase)
1✔
248
        }
249
        return nil
1✔
250
}
251

252
func (dp *DataProcessor) validate(url *url.URL) error {
1✔
253
        klog.V(1).Infoln("Validating image")
1✔
254
        err := qemuOperations.Validate(url, dp.availableSpace)
1✔
255
        if err != nil {
2✔
256
                return ValidationSizeError{err: err}
1✔
257
        }
1✔
258
        return nil
1✔
259
}
260

261
// convert is called when convert the image from the url to a RAW disk image. Source formats include RAW/QCOW2 (Raw to raw conversion is a copy)
262
func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) {
1✔
263
        err := dp.validate(url)
1✔
264
        if err != nil {
2✔
265
                return ProcessingPhaseError, err
1✔
266
        }
1✔
267
        err = CleanAll(dp.dataFile)
1✔
268
        if err != nil {
1✔
UNCOV
269
                return ProcessingPhaseError, err
×
UNCOV
270
        }
×
271
        klog.V(3).Infoln("Converting to Raw")
1✔
272
        err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation, dp.cacheMode)
1✔
273
        if err != nil {
2✔
274
                return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed")
1✔
275
        }
1✔
276
        dp.preallocationApplied = dp.preallocation
1✔
277

1✔
278
        return ProcessingPhaseResize, nil
1✔
279
}
280

281
func (dp *DataProcessor) resize() (ProcessingPhase, error) {
1✔
282
        size, _ := getAvailableSpaceBlockFunc(dp.dataFile)
1✔
283
        klog.V(3).Infof("Available space in dataFile: %d", size)
1✔
284
        isBlockDev := size >= int64(0)
1✔
285
        if !isBlockDev {
2✔
286
                if dp.requestImageSize != "" {
2✔
287
                        klog.V(3).Infoln("Resizing image")
1✔
288
                        err := ResizeImage(dp.dataFile, dp.requestImageSize, dp.getUsableSpace(), dp.preallocation)
1✔
289
                        if err != nil {
2✔
290
                                return ProcessingPhaseError, errors.Wrap(err, "Resize of image failed")
1✔
291
                        }
1✔
292
                }
293
                // Validate that a sparse file will fit even as it fills out.
294
                dataFileURL, err := url.Parse(dp.dataFile)
1✔
295
                if err != nil {
1✔
UNCOV
296
                        return ProcessingPhaseError, err
×
UNCOV
297
                }
×
298
                err = dp.validate(dataFileURL)
1✔
299
                if err != nil {
1✔
UNCOV
300
                        return ProcessingPhaseError, err
×
UNCOV
301
                }
×
302
        }
303
        dp.preallocationApplied = dp.preallocation
1✔
304
        if dp.dataFile != "" && !isBlockDev {
2✔
305
                // Change permissions to 0660
1✔
306
                err := os.Chmod(dp.dataFile, 0660)
1✔
307
                if err != nil {
1✔
UNCOV
308
                        return ProcessingPhaseError, errors.Wrap(err, "Unable to change permissions of target file")
×
UNCOV
309
                }
×
310
        }
311

312
        return ProcessingPhaseComplete, nil
1✔
313
}
314

315
// ResizeImage resizes the images to match the requested size. Sometimes provisioners misbehave and the available space
316
// is not the same as the requested space. For those situations we compare the available space to the requested space and
317
// use the smallest of the two values.
318
func ResizeImage(dataFile, imageSize string, totalTargetSpace int64, preallocation bool) error {
1✔
319
        dataFileURL, _ := url.Parse(dataFile)
1✔
320
        info, err := qemuOperations.Info(dataFileURL)
1✔
321
        if err != nil {
2✔
322
                return err
1✔
323
        }
1✔
324
        if imageSize != "" {
2✔
325
                currentImageSizeQuantity := resource.NewScaledQuantity(info.VirtualSize, 0)
1✔
326
                newImageSizeQuantity := resource.MustParse(imageSize)
1✔
327
                minSizeQuantity := util.MinQuantity(resource.NewScaledQuantity(totalTargetSpace, 0), &newImageSizeQuantity)
1✔
328
                if minSizeQuantity.Cmp(newImageSizeQuantity) != 0 {
2✔
329
                        // Available destination space is smaller than the size we want to resize to
1✔
330
                        klog.Warningf("Available space less than requested size, resizing image to available space %s.\n", minSizeQuantity.String())
1✔
331
                }
1✔
332
                if currentImageSizeQuantity.Cmp(minSizeQuantity) == 0 {
2✔
333
                        klog.V(1).Infof("No need to resize image. Requested size: %s, Image size: %d.\n", imageSize, info.VirtualSize)
1✔
334
                        return nil
1✔
335
                }
1✔
336
                // Check if calculated size is < imageSize, and return error if so.
337
                if currentImageSizeQuantity.Cmp(minSizeQuantity) == 1 {
1✔
UNCOV
338
                        klog.V(1).Infof("Calculated new size is < than current size, not resizing: requested size %s, virtual size: %d.\n", minSizeQuantity.String(), info.VirtualSize)
×
UNCOV
339
                        return nil
×
UNCOV
340
                }
×
341
                klog.V(1).Infof("Expanding image size to: %s\n", minSizeQuantity.String())
1✔
342
                return qemuOperations.Resize(dataFile, minSizeQuantity, preallocation)
1✔
343
        }
344
        return errors.New("Image resize called with blank resize")
1✔
345
}
346

347
func (dp *DataProcessor) calculateTargetSize() int64 {
1✔
348
        klog.V(1).Infof("Calculating available size\n")
1✔
349
        var targetQuantity *resource.Quantity
1✔
350
        size, err := getAvailableSpaceBlockFunc(dp.dataFile)
1✔
351
        if err != nil {
2✔
352
                klog.Error(err)
1✔
353
        }
1✔
354
        if size >= int64(0) {
2✔
355
                // Block volume.
1✔
356
                klog.V(1).Infof("Checking out block volume size.\n")
1✔
357
                targetQuantity = resource.NewScaledQuantity(size, 0)
1✔
358
        } else {
2✔
359
                // File system volume.
1✔
360
                klog.V(1).Infof("Checking out file system volume size.\n")
1✔
361
                size, err := getAvailableSpaceFunc(dp.dataDir)
1✔
362
                if err != nil {
2✔
363
                        klog.Error(err)
1✔
364
                }
1✔
365
                targetQuantity = resource.NewScaledQuantity(size, 0)
1✔
366
                if dp.requestImageSize != "" {
2✔
367
                        klog.V(1).Infof("Request image size not empty.\n")
1✔
368
                        newImageSizeQuantity := resource.MustParse(dp.requestImageSize)
1✔
369
                        minQuantity := util.MinQuantity(targetQuantity, &newImageSizeQuantity)
1✔
370
                        targetQuantity = &minQuantity
1✔
371
                }
1✔
372
        }
373
        klog.V(1).Infof("Target size %s.\n", targetQuantity.String())
1✔
374
        targetSize := targetQuantity.Value()
1✔
375
        return targetSize
1✔
376
}
377

378
// PreallocationApplied returns true if data processing path included preallocation step
UNCOV
379
func (dp *DataProcessor) PreallocationApplied() bool {
×
UNCOV
380
        return dp.preallocationApplied
×
UNCOV
381
}
×
382

383
func (dp *DataProcessor) getUsableSpace() int64 {
1✔
384
        return util.GetUsableSpace(dp.filesystemOverhead, dp.availableSpace)
1✔
385
}
1✔
386

387
// Rebase and commit a delta image to its backing file
388
func (dp *DataProcessor) merge() (ProcessingPhase, error) {
1✔
389
        klog.V(1).Info("Merging QCOW to base image.")
1✔
390
        imageURL := dp.source.GetURL()
1✔
391
        if imageURL == nil {
1✔
UNCOV
392
                return ProcessingPhaseError, errors.New("bad URL in data source")
×
UNCOV
393
        }
×
394
        if err := qemuOperations.Rebase(dp.dataFile, imageURL.String()); err != nil {
1✔
UNCOV
395
                return ProcessingPhaseError, errors.Wrap(err, "error rebasing image")
×
UNCOV
396
        }
×
397
        if err := qemuOperations.Commit(imageURL.String()); err != nil {
1✔
UNCOV
398
                return ProcessingPhaseError, errors.Wrap(err, "error committing image")
×
UNCOV
399
        }
×
400
        return ProcessingPhaseComplete, nil
1✔
401
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc