• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5248

23 Apr 2025 10:13AM UTC coverage: 59.204% (+0.03%) from 59.175%
#5248

Pull #3710

travis-ci

SkalaNetworks
chore(doc): wrong source used for pvc in docs

Signed-off-by: SkalaNetworks <contact@skala.network>
Pull Request #3710: chore(doc): wrong source used for pvc in docs

16824 of 28417 relevant lines covered (59.2%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.42
/pkg/uploadserver/uploadserver.go
1
/*
2
 * This file is part of the CDI project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2018 Red Hat, Inc.
17
 *
18
 */
19

20
package uploadserver
21

22
import (
23
        "archive/tar"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "fmt"
28
        "io"
29
        "mime/multipart"
30
        "net"
31
        "net/http"
32
        "os"
33
        "strings"
34
        "sync"
35
        "time"
36

37
        "github.com/golang/snappy"
38
        "github.com/pkg/errors"
39

40
        "k8s.io/klog/v2"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        "kubevirt.io/containerized-data-importer/pkg/common"
44
        "kubevirt.io/containerized-data-importer/pkg/importer"
45
        "kubevirt.io/containerized-data-importer/pkg/util"
46
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
47
)
48

49
const (
50
        healthzPath = "/healthz"
51
)
52

53
type Config struct {
54
        Insecure    bool
55
        BindAddress string
56
        BindPort    int
57

58
        Destination string
59

60
        ServerKeyFile, ServerCertFile string
61
        ClientCertFile, ClientName    string
62

63
        ImageSize          string
64
        FilesystemOverhead float64
65
        Preallocation      bool
66

67
        Deadline *time.Time
68

69
        CryptoConfig cryptowatch.CryptoConfig
70
}
71

72
// RunResult is the result of the upload server run
73
type RunResult struct {
74
        CloneTarget          bool
75
        PreallocationApplied bool
76
        DeadlinePassed       bool
77
}
78

79
// UploadServer is the interface to uploadServerApp
80
type UploadServer interface {
81
        Run() (*RunResult, error)
82
}
83

84
type uploadServerApp struct {
85
        config               *Config
86
        mux                  *http.ServeMux
87
        uploading            bool
88
        processing           bool
89
        done                 bool
90
        preallocationApplied bool
91
        cloneTarget          bool
92
        doneChan             chan struct{}
93
        errChan              chan error
94
        mutex                sync.Mutex
95
}
96

97
type imageReadCloser func(*http.Request) (io.ReadCloser, error)
98

99
// may be overridden in tests
100
var uploadProcessorFunc = newUploadStreamProcessor
101
var uploadProcessorFuncAsync = newAsyncUploadStreamProcessor
102

103
func bodyReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
104
        return r.Body, nil
1✔
105
}
1✔
106

107
func formReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
108
        multiReader, err := r.MultipartReader()
1✔
109
        if err != nil {
1✔
110
                return nil, err
×
111
        }
×
112

113
        var filePart *multipart.Part
1✔
114

1✔
115
        for {
2✔
116
                filePart, err = multiReader.NextPart()
1✔
117
                if err != nil || filePart.FormName() == "file" {
2✔
118
                        break
1✔
119
                }
120
                klog.Infof("Ignoring part %s", filePart.FormName())
×
121
        }
122

123
        // multiReader.NextPart() returns io.EOF when read everything
124
        if err != nil {
1✔
125
                return nil, err
×
126
        }
×
127

128
        return filePart, nil
1✔
129
}
130

131
func isCloneTarget(contentType string) bool {
1✔
132
        return contentType == common.BlockdeviceClone || contentType == common.FilesystemCloneContentType
1✔
133
}
1✔
134

135
// NewUploadServer returns a new instance of uploadServerApp
136
func NewUploadServer(config *Config) UploadServer {
1✔
137
        server := &uploadServerApp{
1✔
138
                config:    config,
1✔
139
                mux:       http.NewServeMux(),
1✔
140
                uploading: false,
1✔
141
                done:      false,
1✔
142
                doneChan:  make(chan struct{}),
1✔
143
                errChan:   make(chan error),
1✔
144
        }
1✔
145

1✔
146
        server.mux.HandleFunc(healthzPath, server.healthzHandler)
1✔
147
        for _, path := range common.SyncUploadPaths {
2✔
148
                server.mux.HandleFunc(path, server.uploadHandler(bodyReadCloser))
1✔
149
        }
1✔
150
        for _, path := range common.AsyncUploadPaths {
2✔
151
                server.mux.HandleFunc(path, server.uploadHandlerAsync(bodyReadCloser))
1✔
152
        }
1✔
153
        for _, path := range common.ArchiveUploadPaths {
2✔
154
                server.mux.HandleFunc(path, server.uploadArchiveHandler(bodyReadCloser))
1✔
155
        }
1✔
156
        for _, path := range common.SyncUploadFormPaths {
2✔
157
                server.mux.HandleFunc(path, server.uploadHandler(formReadCloser))
1✔
158
        }
1✔
159
        for _, path := range common.AsyncUploadFormPaths {
2✔
160
                server.mux.HandleFunc(path, server.uploadHandlerAsync(formReadCloser))
1✔
161
        }
1✔
162

163
        return server
1✔
164
}
165

166
func (app *uploadServerApp) Run() (*RunResult, error) {
1✔
167
        uploadServer := http.Server{
1✔
168
                Handler:           app,
1✔
169
                ReadHeaderTimeout: 10 * time.Second,
1✔
170
        }
1✔
171

1✔
172
        uploadListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", app.config.BindAddress, app.config.BindPort))
1✔
173
        if err != nil {
1✔
174
                return nil, errors.Wrap(err, "Error creating upload listerner")
×
175
        }
×
176

177
        tlsConfig, err := app.getTLSConfig()
1✔
178
        if err != nil {
1✔
179
                return nil, errors.Wrap(err, "Error getting TLS config")
×
180
        }
×
181

182
        go func() {
2✔
183
                defer uploadListener.Close()
1✔
184

1✔
185
                // maybe bind port was 0 (unit tests) assign port here
1✔
186
                app.config.BindPort = uploadListener.Addr().(*net.TCPAddr).Port
1✔
187

1✔
188
                if tlsConfig != nil {
2✔
189
                        uploadServer.TLSConfig = tlsConfig
1✔
190
                        app.errChan <- uploadServer.ServeTLS(uploadListener, "", "")
1✔
191
                        return
1✔
192
                }
1✔
193

194
                // not sure we want to support this code path
195
                app.errChan <- uploadServer.Serve(uploadListener)
×
196
        }()
197

198
        var timeChan <-chan time.Time
1✔
199

1✔
200
        if app.config.Deadline != nil {
2✔
201
                timeChan = time.After(time.Until(*app.config.Deadline))
1✔
202
        } else {
2✔
203
                tc := make(chan time.Time)
1✔
204
                defer close(tc)
1✔
205
                timeChan = tc
1✔
206
        }
1✔
207

208
        select {
1✔
209
        case err = <-app.errChan:
×
210
                if err != nil {
×
211
                        klog.Errorf("HTTP server returned error %s", err.Error())
×
212
                        return nil, err
×
213
                }
×
214
        case <-app.doneChan:
1✔
215
                klog.Info("Shutting down http server after successful upload")
1✔
216
                if err := uploadServer.Shutdown(context.Background()); err != nil {
1✔
217
                        klog.Errorf("failed to shutdown uploadServer; %v", err)
×
218
                }
×
219
        case <-timeChan:
1✔
220
                klog.Info("deadline exceeded, shutting down")
1✔
221
                app.mutex.Lock()
1✔
222
                defer app.mutex.Unlock()
1✔
223
                for {
2✔
224
                        if app.uploading || app.processing {
1✔
225
                                klog.Info("waiting for upload to finish")
×
226
                                app.mutex.Unlock()
×
227
                                time.Sleep(2 * time.Second)
×
228
                                app.mutex.Lock()
×
229
                        } else {
1✔
230
                                break
1✔
231
                        }
232
                }
233
                if !app.done {
2✔
234
                        klog.Info("upload not done, process exiting")
1✔
235
                        return &RunResult{DeadlinePassed: true}, nil
1✔
236
                }
1✔
237
        }
238

239
        result := &RunResult{
1✔
240
                CloneTarget:          app.cloneTarget,
1✔
241
                PreallocationApplied: app.preallocationApplied,
1✔
242
        }
1✔
243

1✔
244
        return result, nil
1✔
245
}
246

247
func (app *uploadServerApp) getTLSConfig() (*tls.Config, error) {
1✔
248
        if app.config.ServerCertFile == "" || app.config.ServerKeyFile == "" {
1✔
249
                if !app.config.Insecure {
×
250
                        return nil, errors.New("invalid TLS config")
×
251
                }
×
252
                return nil, nil
×
253
        }
254

255
        //nolint:gosec // False positive: Min version is not known statically
256
        config := &tls.Config{
1✔
257
                CipherSuites: app.config.CryptoConfig.CipherSuites,
1✔
258
                ClientAuth:   tls.VerifyClientCertIfGiven,
1✔
259
                MinVersion:   app.config.CryptoConfig.MinVersion,
1✔
260
        }
1✔
261

1✔
262
        if app.config.ClientCertFile != "" {
2✔
263
                bs, err := os.ReadFile(app.config.ClientCertFile)
1✔
264
                if err != nil {
1✔
265
                        return nil, err
×
266
                }
×
267

268
                caCertPool := x509.NewCertPool()
1✔
269
                if ok := caCertPool.AppendCertsFromPEM(bs); !ok {
1✔
270
                        return nil, err
×
271
                }
×
272

273
                config.ClientCAs = caCertPool
1✔
274
        }
275

276
        cert, err := tls.LoadX509KeyPair(app.config.ServerCertFile, app.config.ServerKeyFile)
1✔
277
        if err != nil {
1✔
278
                return nil, err
×
279
        }
×
280

281
        config.Certificates = []tls.Certificate{cert}
1✔
282

1✔
283
        return config, nil
1✔
284
}
285

286
func (app *uploadServerApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
287
        app.mux.ServeHTTP(w, r)
1✔
288
}
1✔
289

290
func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, r *http.Request) {
1✔
291
        if _, err := io.WriteString(w, "OK"); err != nil {
1✔
292
                klog.Errorf("healthzHandler: failed to send response; %v", err)
×
293
        }
×
294
}
295

296
func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool {
1✔
297
        if r.Method != http.MethodPost {
2✔
298
                w.WriteHeader(http.StatusNotFound)
1✔
299
                return false
1✔
300
        }
1✔
301

302
        if r.TLS != nil {
2✔
303
                if len(r.TLS.VerifiedChains) == 0 {
1✔
304
                        w.WriteHeader(http.StatusUnauthorized)
×
305
                        return false
×
306
                }
×
307

308
                found := false
1✔
309
                for _, cert := range r.TLS.PeerCertificates {
2✔
310
                        if cert.Subject.CommonName == app.config.ClientName {
2✔
311
                                found = true
1✔
312
                                break
1✔
313
                        }
314
                }
315

316
                if !found {
2✔
317
                        w.WriteHeader(http.StatusUnauthorized)
1✔
318
                        return false
1✔
319
                }
1✔
320
        } else {
1✔
321
                if !app.config.Insecure {
1✔
322
                        w.WriteHeader(http.StatusUnauthorized)
×
323
                        return false
×
324
                }
×
325
                klog.V(3).Infof("Handling HTTP connection")
1✔
326
        }
327

328
        app.mutex.Lock()
1✔
329
        defer app.mutex.Unlock()
1✔
330

1✔
331
        if app.uploading || app.processing {
2✔
332
                klog.Warning("Got concurrent upload request")
1✔
333
                w.WriteHeader(http.StatusServiceUnavailable)
1✔
334
                return false
1✔
335
        }
1✔
336

337
        if app.done {
2✔
338
                klog.Warning("Got upload request after already done")
1✔
339
                w.WriteHeader(http.StatusConflict)
1✔
340
                return false
1✔
341
        }
1✔
342

343
        app.uploading = true
1✔
344

1✔
345
        return true
1✔
346
}
347

348
func (app *uploadServerApp) uploadHandlerAsync(irc imageReadCloser) http.HandlerFunc {
1✔
349
        return func(w http.ResponseWriter, r *http.Request) {
2✔
350
                if r.Method == http.MethodHead {
2✔
351
                        w.WriteHeader(http.StatusOK)
1✔
352
                        return
1✔
353
                }
1✔
354

355
                if !app.validateShouldHandleRequest(w, r) {
2✔
356
                        return
1✔
357
                }
1✔
358

359
                cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
360

1✔
361
                klog.Infof("Content type header is %q\n", cdiContentType)
1✔
362

1✔
363
                readCloser, err := irc(r)
1✔
364
                if err != nil {
1✔
365
                        w.WriteHeader(http.StatusBadRequest)
×
366
                }
×
367

368
                processor, err := uploadProcessorFuncAsync(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType)
1✔
369

1✔
370
                app.mutex.Lock()
1✔
371
                defer app.mutex.Unlock()
1✔
372

1✔
373
                if err != nil {
1✔
374
                        klog.Errorf("Saving stream failed: %s", err)
2✔
375
                        if errors.As(err, &importer.ValidationSizeError{}) {
1✔
376
                                w.WriteHeader(http.StatusBadRequest)
1✔
377
                        } else {
1✔
378
                                w.WriteHeader(http.StatusInternalServerError)
379
                        }
1✔
380

1✔
381
                        _, writeErr := fmt.Fprintf(w, "Saving stream failed: %s", err.Error())
1✔
382
                        if writeErr != nil {
2✔
383
                                klog.Errorf("failed to send response; %v", err)
1✔
384
                        }
1✔
385

1✔
386
                        app.uploading = false
1✔
387
                        return
1✔
388
                }
×
389

×
390
                app.uploading = false
×
391
                app.processing = true
×
392

1✔
393
                // Start processing.
1✔
394
                go func() {
1✔
395
                        err := processor.ProcessDataResume()
1✔
396
                        app.mutex.Lock()
1✔
397
                        defer app.mutex.Unlock()
398
                        app.processing = false
399
                        if err != nil {
1✔
400
                                klog.Errorf("Error during resumed processing: %v", err)
401
                                app.errChan <- err
402
                                return
403
                        }
1✔
404
                        defer close(app.doneChan)
2✔
405
                        app.done = true
1✔
406
                        app.preallocationApplied = processor.PreallocationApplied()
1✔
407
                        app.cloneTarget = isCloneTarget(cdiContentType)
408
                        klog.Infof("Wrote data to %s", app.config.Destination)
1✔
409
                }()
1✔
410

1✔
411
                klog.Info("Returning success to caller, continue processing in background")
1✔
412
        }
1✔
413
}
1✔
414

×
415
func (app *uploadServerApp) processUpload(irc imageReadCloser, w http.ResponseWriter, r *http.Request, dvContentType cdiv1.DataVolumeContentType) {
×
416
        if !app.validateShouldHandleRequest(w, r) {
417
                return
1✔
418
        }
1✔
419

1✔
420
        cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
421

1✔
422
        klog.Infof("Content type header is %q\n", cdiContentType)
1✔
423

2✔
424
        readCloser, err := irc(r)
1✔
425
        if err != nil {
1✔
426
                w.WriteHeader(http.StatusBadRequest)
1✔
427
        }
428

1✔
429
        preallocationApplied, err := uploadProcessorFunc(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType, dvContentType)
1✔
430

1✔
431
        app.mutex.Lock()
1✔
432
        defer app.mutex.Unlock()
1✔
433
        app.uploading = false
1✔
434

×
435
        if err != nil {
1✔
436
                klog.Errorf("Saving stream failed: %s", err)
1✔
437
                w.WriteHeader(http.StatusInternalServerError)
1✔
438
                return
439
        }
440

1✔
441
        app.done = true
2✔
442
        app.preallocationApplied = preallocationApplied
1✔
443
        app.cloneTarget = isCloneTarget(cdiContentType)
1✔
444
        close(app.doneChan)
445

446
        if dvContentType == cdiv1.DataVolumeArchive {
1✔
447
                klog.Infof("Wrote archive data")
2✔
448
        } else {
1✔
449
                klog.Infof("Wrote data to %s", app.config.Destination)
1✔
450
        }
451
}
452

×
453
func (app *uploadServerApp) uploadHandler(irc imageReadCloser) http.HandlerFunc {
×
454
        return func(w http.ResponseWriter, r *http.Request) {
×
455
                app.processUpload(irc, w, r, cdiv1.DataVolumeKubeVirt)
×
456
        }
457
}
×
458

×
459
func (app *uploadServerApp) uploadArchiveHandler(irc imageReadCloser) http.HandlerFunc {
×
460
        return func(w http.ResponseWriter, r *http.Request) {
461
                app.processUpload(irc, w, r, cdiv1.DataVolumeArchive)
462
        }
×
463
}
×
464

×
465
func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string) (*importer.DataProcessor, error) {
×
466
        if isCloneTarget(sourceContentType) {
×
467
                return nil, fmt.Errorf("async clone not supported")
468
        }
469

×
470
        uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType))
×
471
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
472
        return processor, processor.ProcessDataWithPause()
×
473
}
474

475
func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string, dvContentType cdiv1.DataVolumeContentType) (bool, error) {
×
476
        stream = newContentReader(stream, sourceContentType)
×
477
        if isCloneTarget(sourceContentType) {
×
478
                return cloneProcessor(stream, sourceContentType, dest, preallocation)
×
479
        }
×
480

481
        // Clone block device to block device or file system
×
482
        uds := importer.NewUploadDataSource(stream, dvContentType)
×
483
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
484
        err := processor.ProcessData()
×
485
        return processor.PreallocationApplied(), err
×
486
}
×
487

488
func cloneProcessor(stream io.ReadCloser, contentType, dest string, preallocate bool) (bool, error) {
489
        if contentType == common.FilesystemCloneContentType {
×
490
                if dest != common.WriteBlockPath {
×
491
                        return fileToFileCloneProcessor(stream)
×
492
                }
×
493

×
494
                tarImageReader, err := newTarDiskImageReader(stream)
495
                if err != nil {
×
496
                        stream.Close()
×
497
                        return false, err
×
498
                }
499
                stream = tarImageReader
500
        }
×
501

×
502
        defer stream.Close()
×
503
        bytesRead, bytesWrittenn, err := util.StreamDataToFile(stream, dest, preallocate)
×
504
        if err != nil {
×
505
                return false, err
×
506
        }
507

508
        klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWrittenn, dest)
509

510
        return false, nil
511
}
512

513
func fileToFileCloneProcessor(stream io.ReadCloser) (bool, error) {
×
514
        defer stream.Close()
×
515
        if err := util.UnArchiveTar(stream, common.ImporterVolumePath); err != nil {
×
516
                return false, errors.Wrapf(err, "error unarchiving to %s", common.ImporterVolumePath)
×
517
        }
×
518
        return true, nil
×
519
}
520

×
521
type closeWrapper struct {
522
        io.Reader
523
        closers []io.Closer
524
}
525

526
func (c *closeWrapper) Close() error {
527
        var err error
528
        for _, closer := range c.closers {
×
529
                if e := closer.Close(); e != nil {
×
530
                        err = e
×
531
                }
×
532
        }
×
533
        return err
×
534
}
×
535

×
536
type tarDiskImageReader struct {
×
537
        tr           *tar.Reader
×
538
        size, offset int64
×
539
}
×
540

541
func (r *tarDiskImageReader) Read(p []byte) (int, error) {
542
        if r.offset >= r.size {
×
543
                return 0, io.EOF
×
544
        }
×
545
        remaining := r.size - r.offset
×
546
        if int(remaining) < len(p) {
×
547
                p = p[:remaining]
×
548
        }
549
        n, err := r.tr.Read(p)
×
550
        r.offset += int64(n)
×
551
        klog.V(3).Infof("Read %d bytes, offset %d, size %d", n, r.offset, r.size)
×
552
        return n, err
×
553
}
×
554

555
func newTarDiskImageReader(stream io.ReadCloser) (io.ReadCloser, error) {
×
556
        tr := tar.NewReader(stream)
×
557
        for {
×
558
                header, err := tr.Next()
×
559
                if err == io.EOF {
560
                        break
×
561
                }
562
                if err != nil {
563
                        return nil, err
×
564
                }
×
565
                if !strings.Contains(header.Name, common.DiskImageName) {
×
566
                        continue
×
567
                }
×
568
                return &closeWrapper{
569
                        Reader:  &tarDiskImageReader{tr: tr, size: header.Size},
570
                        closers: []io.Closer{stream},
×
571
                }, nil
×
572
        }
×
573
        return nil, fmt.Errorf("no disk image found in tar")
×
574
}
×
575

×
576
func newContentReader(stream io.ReadCloser, contentType string) io.ReadCloser {
577
        if isCloneTarget(contentType) {
1✔
578
                return newSnappyReadCloser(stream)
1✔
579
        }
×
580
        return stream
×
581
}
1✔
582

1✔
583
func newSnappyReadCloser(stream io.ReadCloser) io.ReadCloser {
1✔
584
        return &closeWrapper{
1✔
585
                Reader:  snappy.NewReader(stream),
1✔
586
                closers: []io.Closer{stream},
1✔
587
        }
1✔
588
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc