• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5247

23 Apr 2025 08:57PM UTC coverage: 59.172% (+0.008%) from 59.164%
#5247

push

travis-ci

web-flow
include error message for upload failure due to lack of space (#3704)

The uploadserver features two handler functions for sync and async
uploads.

The async upload handler correctly reports an error message if QEMU
image size validation errors due to lack of space in the target by
appending the error message to the HTTP response. The sync upload
handler, however, does not.

This commit makes error handling for failed upload identical between
both handlers. Furthermore, it also checks for ENOSPC errors during
write as can happen when the image is streamed directly into target
instead of going through scratch space first (e.g., raw images). It also
consolidates the sync and async upload failure e2e tests into a single
DescribeTable and improves upon its clarity.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

12 of 16 new or added lines in 1 file covered. (75.0%)

2 existing lines in 1 file now uncovered.

16815 of 28417 relevant lines covered (59.17%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.42
/pkg/uploadserver/uploadserver.go
1
/*
2
 * This file is part of the CDI project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2018 Red Hat, Inc.
17
 *
18
 */
19

20
package uploadserver
21

22
import (
23
        "archive/tar"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "fmt"
28
        "io"
29
        "mime/multipart"
30
        "net"
31
        "net/http"
32
        "os"
33
        "strings"
34
        "sync"
35
        "time"
36

37
        "github.com/golang/snappy"
38
        "github.com/pkg/errors"
39

40
        "k8s.io/klog/v2"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        "kubevirt.io/containerized-data-importer/pkg/common"
44
        "kubevirt.io/containerized-data-importer/pkg/importer"
45
        "kubevirt.io/containerized-data-importer/pkg/util"
46
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
47
)
48

49
const (
50
        healthzPath = "/healthz"
51
)
52

53
type Config struct {
54
        Insecure    bool
55
        BindAddress string
56
        BindPort    int
57

58
        Destination string
59

60
        ServerKeyFile, ServerCertFile string
61
        ClientCertFile, ClientName    string
62

63
        ImageSize          string
64
        FilesystemOverhead float64
65
        Preallocation      bool
66

67
        Deadline *time.Time
68

69
        CryptoConfig cryptowatch.CryptoConfig
70
}
71

72
// RunResult is the result of the upload server run
73
type RunResult struct {
74
        CloneTarget          bool
75
        PreallocationApplied bool
76
        DeadlinePassed       bool
77
}
78

79
// UploadServer is the interface to uploadServerApp
80
type UploadServer interface {
81
        Run() (*RunResult, error)
82
}
83

84
type uploadServerApp struct {
85
        config               *Config
86
        mux                  *http.ServeMux
87
        uploading            bool
88
        processing           bool
89
        done                 bool
90
        preallocationApplied bool
91
        cloneTarget          bool
92
        doneChan             chan struct{}
93
        errChan              chan error
94
        mutex                sync.Mutex
95
}
96

97
type imageReadCloser func(*http.Request) (io.ReadCloser, error)
98

99
// may be overridden in tests
100
var uploadProcessorFunc = newUploadStreamProcessor
101
var uploadProcessorFuncAsync = newAsyncUploadStreamProcessor
102

103
func bodyReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
104
        return r.Body, nil
1✔
105
}
1✔
106

107
func formReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
108
        multiReader, err := r.MultipartReader()
1✔
109
        if err != nil {
1✔
110
                return nil, err
×
111
        }
×
112

113
        var filePart *multipart.Part
1✔
114

1✔
115
        for {
2✔
116
                filePart, err = multiReader.NextPart()
1✔
117
                if err != nil || filePart.FormName() == "file" {
2✔
118
                        break
1✔
119
                }
120
                klog.Infof("Ignoring part %s", filePart.FormName())
×
121
        }
122

123
        // multiReader.NextPart() returns io.EOF when read everything
124
        if err != nil {
1✔
125
                return nil, err
×
126
        }
×
127

128
        return filePart, nil
1✔
129
}
130

131
func isCloneTarget(contentType string) bool {
1✔
132
        return contentType == common.BlockdeviceClone || contentType == common.FilesystemCloneContentType
1✔
133
}
1✔
134

135
// NewUploadServer returns a new instance of uploadServerApp
136
func NewUploadServer(config *Config) UploadServer {
1✔
137
        server := &uploadServerApp{
1✔
138
                config:    config,
1✔
139
                mux:       http.NewServeMux(),
1✔
140
                uploading: false,
1✔
141
                done:      false,
1✔
142
                doneChan:  make(chan struct{}),
1✔
143
                errChan:   make(chan error),
1✔
144
        }
1✔
145

1✔
146
        server.mux.HandleFunc(healthzPath, server.healthzHandler)
1✔
147
        for _, path := range common.SyncUploadPaths {
2✔
148
                server.mux.HandleFunc(path, server.uploadHandler(bodyReadCloser))
1✔
149
        }
1✔
150
        for _, path := range common.AsyncUploadPaths {
2✔
151
                server.mux.HandleFunc(path, server.uploadHandlerAsync(bodyReadCloser))
1✔
152
        }
1✔
153
        for _, path := range common.ArchiveUploadPaths {
2✔
154
                server.mux.HandleFunc(path, server.uploadArchiveHandler(bodyReadCloser))
1✔
155
        }
1✔
156
        for _, path := range common.SyncUploadFormPaths {
2✔
157
                server.mux.HandleFunc(path, server.uploadHandler(formReadCloser))
1✔
158
        }
1✔
159
        for _, path := range common.AsyncUploadFormPaths {
2✔
160
                server.mux.HandleFunc(path, server.uploadHandlerAsync(formReadCloser))
1✔
161
        }
1✔
162

163
        return server
1✔
164
}
165

166
func (app *uploadServerApp) Run() (*RunResult, error) {
1✔
167
        uploadServer := http.Server{
1✔
168
                Handler:           app,
1✔
169
                ReadHeaderTimeout: 10 * time.Second,
1✔
170
        }
1✔
171

1✔
172
        uploadListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", app.config.BindAddress, app.config.BindPort))
1✔
173
        if err != nil {
1✔
174
                return nil, errors.Wrap(err, "Error creating upload listerner")
×
175
        }
×
176

177
        tlsConfig, err := app.getTLSConfig()
1✔
178
        if err != nil {
1✔
179
                return nil, errors.Wrap(err, "Error getting TLS config")
×
180
        }
×
181

182
        go func() {
2✔
183
                defer uploadListener.Close()
1✔
184

1✔
185
                // maybe bind port was 0 (unit tests) assign port here
1✔
186
                app.config.BindPort = uploadListener.Addr().(*net.TCPAddr).Port
1✔
187

1✔
188
                if tlsConfig != nil {
2✔
189
                        uploadServer.TLSConfig = tlsConfig
1✔
190
                        app.errChan <- uploadServer.ServeTLS(uploadListener, "", "")
1✔
191
                        return
1✔
192
                }
1✔
193

194
                // not sure we want to support this code path
195
                app.errChan <- uploadServer.Serve(uploadListener)
×
196
        }()
197

198
        var timeChan <-chan time.Time
1✔
199

1✔
200
        if app.config.Deadline != nil {
2✔
201
                timeChan = time.After(time.Until(*app.config.Deadline))
1✔
202
        } else {
2✔
203
                tc := make(chan time.Time)
1✔
204
                defer close(tc)
1✔
205
                timeChan = tc
1✔
206
        }
1✔
207

208
        select {
1✔
209
        case err = <-app.errChan:
×
210
                if err != nil {
×
211
                        klog.Errorf("HTTP server returned error %s", err.Error())
×
212
                        return nil, err
×
213
                }
×
214
        case <-app.doneChan:
1✔
215
                klog.Info("Shutting down http server after successful upload")
1✔
216
                if err := uploadServer.Shutdown(context.Background()); err != nil {
1✔
217
                        klog.Errorf("failed to shutdown uploadServer; %v", err)
×
218
                }
×
219
        case <-timeChan:
1✔
220
                klog.Info("deadline exceeded, shutting down")
1✔
221
                app.mutex.Lock()
1✔
222
                defer app.mutex.Unlock()
1✔
223
                for {
2✔
224
                        if app.uploading || app.processing {
1✔
225
                                klog.Info("waiting for upload to finish")
×
226
                                app.mutex.Unlock()
×
227
                                time.Sleep(2 * time.Second)
×
228
                                app.mutex.Lock()
×
229
                        } else {
1✔
230
                                break
1✔
231
                        }
232
                }
233
                if !app.done {
2✔
234
                        klog.Info("upload not done, process exiting")
1✔
235
                        return &RunResult{DeadlinePassed: true}, nil
1✔
236
                }
1✔
237
        }
238

239
        result := &RunResult{
1✔
240
                CloneTarget:          app.cloneTarget,
1✔
241
                PreallocationApplied: app.preallocationApplied,
1✔
242
        }
1✔
243

1✔
244
        return result, nil
1✔
245
}
246

247
func (app *uploadServerApp) getTLSConfig() (*tls.Config, error) {
1✔
248
        if app.config.ServerCertFile == "" || app.config.ServerKeyFile == "" {
1✔
249
                if !app.config.Insecure {
×
250
                        return nil, errors.New("invalid TLS config")
×
251
                }
×
252
                return nil, nil
×
253
        }
254

255
        //nolint:gosec // False positive: Min version is not known statically
256
        config := &tls.Config{
1✔
257
                CipherSuites: app.config.CryptoConfig.CipherSuites,
1✔
258
                ClientAuth:   tls.VerifyClientCertIfGiven,
1✔
259
                MinVersion:   app.config.CryptoConfig.MinVersion,
1✔
260
        }
1✔
261

1✔
262
        if app.config.ClientCertFile != "" {
2✔
263
                bs, err := os.ReadFile(app.config.ClientCertFile)
1✔
264
                if err != nil {
1✔
265
                        return nil, err
×
266
                }
×
267

268
                caCertPool := x509.NewCertPool()
1✔
269
                if ok := caCertPool.AppendCertsFromPEM(bs); !ok {
1✔
270
                        return nil, err
×
271
                }
×
272

273
                config.ClientCAs = caCertPool
1✔
274
        }
275

276
        cert, err := tls.LoadX509KeyPair(app.config.ServerCertFile, app.config.ServerKeyFile)
1✔
277
        if err != nil {
1✔
278
                return nil, err
×
279
        }
×
280

281
        config.Certificates = []tls.Certificate{cert}
1✔
282

1✔
283
        return config, nil
1✔
284
}
285

286
func (app *uploadServerApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
287
        app.mux.ServeHTTP(w, r)
1✔
288
}
1✔
289

290
func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, r *http.Request) {
1✔
291
        if _, err := io.WriteString(w, "OK"); err != nil {
1✔
292
                klog.Errorf("healthzHandler: failed to send response; %v", err)
×
293
        }
×
294
}
295

296
func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool {
1✔
297
        if r.Method != http.MethodPost {
2✔
298
                w.WriteHeader(http.StatusNotFound)
1✔
299
                return false
1✔
300
        }
1✔
301

302
        if r.TLS != nil {
2✔
303
                if len(r.TLS.VerifiedChains) == 0 {
1✔
304
                        w.WriteHeader(http.StatusUnauthorized)
×
305
                        return false
×
306
                }
×
307

308
                found := false
1✔
309
                for _, cert := range r.TLS.PeerCertificates {
2✔
310
                        if cert.Subject.CommonName == app.config.ClientName {
2✔
311
                                found = true
1✔
312
                                break
1✔
313
                        }
314
                }
315

316
                if !found {
2✔
317
                        w.WriteHeader(http.StatusUnauthorized)
1✔
318
                        return false
1✔
319
                }
1✔
320
        } else {
1✔
321
                if !app.config.Insecure {
1✔
322
                        w.WriteHeader(http.StatusUnauthorized)
×
323
                        return false
×
324
                }
×
325
                klog.V(3).Infof("Handling HTTP connection")
1✔
326
        }
327

328
        app.mutex.Lock()
1✔
329
        defer app.mutex.Unlock()
1✔
330

1✔
331
        if app.uploading || app.processing {
2✔
332
                klog.Warning("Got concurrent upload request")
1✔
333
                w.WriteHeader(http.StatusServiceUnavailable)
1✔
334
                return false
1✔
335
        }
1✔
336

337
        if app.done {
2✔
338
                klog.Warning("Got upload request after already done")
1✔
339
                w.WriteHeader(http.StatusConflict)
1✔
340
                return false
1✔
341
        }
1✔
342

343
        app.uploading = true
1✔
344

1✔
345
        return true
1✔
346
}
347

348
func (app *uploadServerApp) uploadHandlerAsync(irc imageReadCloser) http.HandlerFunc {
1✔
349
        return func(w http.ResponseWriter, r *http.Request) {
2✔
350
                if r.Method == http.MethodHead {
2✔
351
                        w.WriteHeader(http.StatusOK)
1✔
352
                        return
1✔
353
                }
1✔
354

355
                if !app.validateShouldHandleRequest(w, r) {
2✔
356
                        return
1✔
357
                }
1✔
358

359
                cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
360

1✔
361
                klog.Infof("Content type header is %q\n", cdiContentType)
1✔
362

1✔
363
                readCloser, err := irc(r)
1✔
364
                if err != nil {
1✔
365
                        w.WriteHeader(http.StatusBadRequest)
×
366
                }
×
367

368
                processor, err := uploadProcessorFuncAsync(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType)
1✔
369

1✔
370
                app.mutex.Lock()
1✔
371
                defer app.mutex.Unlock()
1✔
372
                app.uploading = false
1✔
373

1✔
374
                if err != nil {
2✔
375
                        handleStreamError(w, err)
1✔
376
                        return
1✔
377
                }
1✔
378

379
                app.processing = true
1✔
380

1✔
381
                // Start processing.
1✔
382
                go func() {
2✔
383
                        err := processor.ProcessDataResume()
1✔
384
                        app.mutex.Lock()
1✔
385
                        defer app.mutex.Unlock()
1✔
386
                        app.processing = false
1✔
387
                        if err != nil {
1✔
388
                                klog.Errorf("Error during resumed processing: %v", err)
×
389
                                app.errChan <- err
×
390
                                return
×
391
                        }
×
392
                        defer close(app.doneChan)
1✔
393
                        app.done = true
1✔
394
                        app.preallocationApplied = processor.PreallocationApplied()
1✔
395
                        app.cloneTarget = isCloneTarget(cdiContentType)
1✔
396
                        klog.Infof("Wrote data to %s", app.config.Destination)
1✔
397
                }()
398

399
                klog.Info("Returning success to caller, continue processing in background")
1✔
400
        }
401
}
402

403
func (app *uploadServerApp) processUpload(irc imageReadCloser, w http.ResponseWriter, r *http.Request, dvContentType cdiv1.DataVolumeContentType) {
1✔
404
        if !app.validateShouldHandleRequest(w, r) {
2✔
405
                return
1✔
406
        }
1✔
407

408
        cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
409

1✔
410
        klog.Infof("Content type header is %q\n", cdiContentType)
1✔
411

1✔
412
        readCloser, err := irc(r)
1✔
413
        if err != nil {
1✔
414
                w.WriteHeader(http.StatusBadRequest)
×
415
        }
×
416

417
        preallocationApplied, err := uploadProcessorFunc(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType, dvContentType)
1✔
418

1✔
419
        app.mutex.Lock()
1✔
420
        defer app.mutex.Unlock()
1✔
421
        app.uploading = false
1✔
422

1✔
423
        if err != nil {
2✔
424
                handleStreamError(w, err)
1✔
425
                return
1✔
426
        }
1✔
427

428
        app.done = true
1✔
429
        app.preallocationApplied = preallocationApplied
1✔
430
        app.cloneTarget = isCloneTarget(cdiContentType)
1✔
431
        close(app.doneChan)
1✔
432

1✔
433
        if dvContentType == cdiv1.DataVolumeArchive {
1✔
434
                klog.Infof("Wrote archive data")
×
435
        } else {
1✔
436
                klog.Infof("Wrote data to %s", app.config.Destination)
1✔
437
        }
1✔
438
}
439

440
func (app *uploadServerApp) uploadHandler(irc imageReadCloser) http.HandlerFunc {
1✔
441
        return func(w http.ResponseWriter, r *http.Request) {
2✔
442
                app.processUpload(irc, w, r, cdiv1.DataVolumeKubeVirt)
1✔
443
        }
1✔
444
}
445

446
func (app *uploadServerApp) uploadArchiveHandler(irc imageReadCloser) http.HandlerFunc {
1✔
447
        return func(w http.ResponseWriter, r *http.Request) {
2✔
448
                app.processUpload(irc, w, r, cdiv1.DataVolumeArchive)
1✔
449
        }
1✔
450
}
451

452
func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string) (*importer.DataProcessor, error) {
×
453
        if isCloneTarget(sourceContentType) {
×
454
                return nil, fmt.Errorf("async clone not supported")
×
455
        }
×
456

457
        uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType))
×
458
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
459
        return processor, processor.ProcessDataWithPause()
×
460
}
461

462
func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string, dvContentType cdiv1.DataVolumeContentType) (bool, error) {
×
463
        stream = newContentReader(stream, sourceContentType)
×
464
        if isCloneTarget(sourceContentType) {
×
465
                return cloneProcessor(stream, sourceContentType, dest, preallocation)
×
466
        }
×
467

468
        // Clone block device to block device or file system
469
        uds := importer.NewUploadDataSource(stream, dvContentType)
×
470
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
471
        err := processor.ProcessData()
×
472
        return processor.PreallocationApplied(), err
×
473
}
474

475
func cloneProcessor(stream io.ReadCloser, contentType, dest string, preallocate bool) (bool, error) {
×
476
        if contentType == common.FilesystemCloneContentType {
×
477
                if dest != common.WriteBlockPath {
×
478
                        return fileToFileCloneProcessor(stream)
×
479
                }
×
480

481
                tarImageReader, err := newTarDiskImageReader(stream)
×
482
                if err != nil {
×
483
                        stream.Close()
×
484
                        return false, err
×
485
                }
×
486
                stream = tarImageReader
×
487
        }
488

489
        defer stream.Close()
×
490
        bytesRead, bytesWrittenn, err := util.StreamDataToFile(stream, dest, preallocate)
×
491
        if err != nil {
×
492
                return false, err
×
493
        }
×
494

495
        klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWrittenn, dest)
×
496

×
497
        return false, nil
×
498
}
499

500
func fileToFileCloneProcessor(stream io.ReadCloser) (bool, error) {
×
501
        defer stream.Close()
×
502
        if err := util.UnArchiveTar(stream, common.ImporterVolumePath); err != nil {
×
503
                return false, errors.Wrapf(err, "error unarchiving to %s", common.ImporterVolumePath)
×
504
        }
×
505
        return true, nil
×
506
}
507

508
type closeWrapper struct {
509
        io.Reader
510
        closers []io.Closer
511
}
512

513
func (c *closeWrapper) Close() error {
×
514
        var err error
×
515
        for _, closer := range c.closers {
×
516
                if e := closer.Close(); e != nil {
×
517
                        err = e
×
518
                }
×
519
        }
520
        return err
×
521
}
522

523
type tarDiskImageReader struct {
524
        tr           *tar.Reader
525
        size, offset int64
526
}
527

528
func (r *tarDiskImageReader) Read(p []byte) (int, error) {
×
529
        if r.offset >= r.size {
×
530
                return 0, io.EOF
×
531
        }
×
532
        remaining := r.size - r.offset
×
533
        if int(remaining) < len(p) {
×
534
                p = p[:remaining]
×
535
        }
×
536
        n, err := r.tr.Read(p)
×
537
        r.offset += int64(n)
×
538
        klog.V(3).Infof("Read %d bytes, offset %d, size %d", n, r.offset, r.size)
×
539
        return n, err
×
540
}
541

542
func newTarDiskImageReader(stream io.ReadCloser) (io.ReadCloser, error) {
×
543
        tr := tar.NewReader(stream)
×
544
        for {
×
545
                header, err := tr.Next()
×
546
                if err == io.EOF {
×
547
                        break
×
548
                }
549
                if err != nil {
×
550
                        return nil, err
×
551
                }
×
552
                if !strings.Contains(header.Name, common.DiskImageName) {
×
553
                        continue
×
554
                }
555
                return &closeWrapper{
×
556
                        Reader:  &tarDiskImageReader{tr: tr, size: header.Size},
×
557
                        closers: []io.Closer{stream},
×
558
                }, nil
×
559
        }
560
        return nil, fmt.Errorf("no disk image found in tar")
×
561
}
562

563
func newContentReader(stream io.ReadCloser, contentType string) io.ReadCloser {
×
564
        if isCloneTarget(contentType) {
×
565
                return newSnappyReadCloser(stream)
×
566
        }
×
567
        return stream
×
568
}
569

570
func newSnappyReadCloser(stream io.ReadCloser) io.ReadCloser {
×
571
        return &closeWrapper{
×
572
                Reader:  snappy.NewReader(stream),
×
573
                closers: []io.Closer{stream},
×
574
        }
×
575
}
×
576

577
func handleStreamError(w http.ResponseWriter, err error) {
1✔
578
        if errors.As(err, &importer.ValidationSizeError{}) || strings.Contains(err.Error(), "no space left on device") {
1✔
NEW
579
                w.WriteHeader(http.StatusBadRequest)
×
NEW
580
                err = errors.New("effective image size is larger than the reported available storage. A larger PVC is required")
×
581
        } else {
1✔
582
                w.WriteHeader(http.StatusInternalServerError)
1✔
583
        }
1✔
584
        klog.Errorf("Saving stream failed: %s", err)
1✔
585

1✔
586
        _, writeErr := fmt.Fprintf(w, "Saving stream failed: %s", err.Error())
1✔
587
        if writeErr != nil {
1✔
NEW
588
                klog.Errorf("failed to send response; %v", err)
×
NEW
589
        }
×
590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc