• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4816

29 Jul 2024 03:06PM UTC coverage: 59.15% (+0.02%) from 59.127%
#4816

Pull #3213

travis-ci

akalenyu
Use scratch space for non-archive uploads (with host assisted from block exception)

Basically a follow up to 3219 for upload sources,
which suffer from the same issue of losing sparseness.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
Pull Request #3213: Handle lost sparseness in non http data sources

6 of 11 new or added lines in 3 files covered. (54.55%)

2 existing lines in 1 file now uncovered.

16458 of 27824 relevant lines covered (59.15%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.57
/pkg/uploadserver/uploadserver.go
1
/*
2
 * This file is part of the CDI project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2018 Red Hat, Inc.
17
 *
18
 */
19

20
package uploadserver
21

22
import (
23
        "archive/tar"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "fmt"
28
        "io"
29
        "mime/multipart"
30
        "net"
31
        "net/http"
32
        "os"
33
        "strings"
34
        "sync"
35
        "time"
36

37
        "github.com/golang/snappy"
38
        "github.com/pkg/errors"
39

40
        "k8s.io/klog/v2"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        "kubevirt.io/containerized-data-importer/pkg/common"
44
        "kubevirt.io/containerized-data-importer/pkg/importer"
45
        "kubevirt.io/containerized-data-importer/pkg/util"
46
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
47
)
48

49
const (
50
        healthzPort = 8080
51
        healthzPath = "/healthz"
52
)
53

54
type Config struct {
55
        BindAddress string
56
        BindPort    int
57

58
        Destination string
59

60
        ServerKeyFile, ServerCertFile string
61
        ClientCertFile, ClientName    string
62

63
        ImageSize          string
64
        FilesystemOverhead float64
65
        Preallocation      bool
66

67
        Deadline *time.Time
68

69
        CryptoConfig cryptowatch.CryptoConfig
70
}
71

72
// RunResult is the result of the upload server run
73
type RunResult struct {
74
        CloneTarget          bool
75
        PreallocationApplied bool
76
        DeadlinePassed       bool
77
}
78

79
// UploadServer is the interface to uploadServerApp
80
type UploadServer interface {
81
        Run() (*RunResult, error)
82
}
83

84
type uploadServerApp struct {
85
        config               *Config
86
        mux                  *http.ServeMux
87
        uploading            bool
88
        processing           bool
89
        done                 bool
90
        preallocationApplied bool
91
        cloneTarget          bool
92
        doneChan             chan struct{}
93
        errChan              chan error
94
        mutex                sync.Mutex
95
}
96

97
type imageReadCloser func(*http.Request) (io.ReadCloser, error)
98

99
// may be overridden in tests
100
var uploadProcessorFunc = newUploadStreamProcessor
101
var uploadProcessorFuncAsync = newAsyncUploadStreamProcessor
102

103
func bodyReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
104
        return r.Body, nil
1✔
105
}
1✔
106

107
func formReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
108
        multiReader, err := r.MultipartReader()
1✔
109
        if err != nil {
1✔
110
                return nil, err
×
111
        }
×
112

113
        var filePart *multipart.Part
1✔
114

1✔
115
        for {
2✔
116
                filePart, err = multiReader.NextPart()
1✔
117
                if err != nil || filePart.FormName() == "file" {
2✔
118
                        break
1✔
119
                }
120
                klog.Infof("Ignoring part %s", filePart.FormName())
×
121
        }
122

123
        // multiReader.NextPart() returns io.EOF when read everything
124
        if err != nil {
1✔
125
                return nil, err
×
126
        }
×
127

128
        return filePart, nil
1✔
129
}
130

131
func isCloneTraget(contentType string) bool {
1✔
132
        return contentType == common.BlockdeviceClone || contentType == common.FilesystemCloneContentType
1✔
133
}
1✔
134

135
// NewUploadServer returns a new instance of uploadServerApp
136
func NewUploadServer(config *Config) UploadServer {
1✔
137
        server := &uploadServerApp{
1✔
138
                config:    config,
1✔
139
                mux:       http.NewServeMux(),
1✔
140
                uploading: false,
1✔
141
                done:      false,
1✔
142
                doneChan:  make(chan struct{}),
1✔
143
                errChan:   make(chan error),
1✔
144
        }
1✔
145

1✔
146
        for _, path := range common.SyncUploadPaths {
2✔
147
                server.mux.HandleFunc(path, server.uploadHandler(bodyReadCloser))
1✔
148
        }
1✔
149
        for _, path := range common.AsyncUploadPaths {
2✔
150
                server.mux.HandleFunc(path, server.uploadHandlerAsync(bodyReadCloser))
1✔
151
        }
1✔
152
        for _, path := range common.ArchiveUploadPaths {
2✔
153
                server.mux.HandleFunc(path, server.uploadArchiveHandler(bodyReadCloser))
1✔
154
        }
1✔
155
        for _, path := range common.SyncUploadFormPaths {
2✔
156
                server.mux.HandleFunc(path, server.uploadHandler(formReadCloser))
1✔
157
        }
1✔
158
        for _, path := range common.AsyncUploadFormPaths {
2✔
159
                server.mux.HandleFunc(path, server.uploadHandlerAsync(formReadCloser))
1✔
160
        }
1✔
161

162
        return server
1✔
163
}
164

165
func (app *uploadServerApp) Run() (*RunResult, error) {
1✔
166
        uploadServer := http.Server{
1✔
167
                Handler:           app,
1✔
168
                ReadHeaderTimeout: 10 * time.Second,
1✔
169
        }
1✔
170

1✔
171
        healthzServer, err := app.createHealthzServer()
1✔
172
        if err != nil {
1✔
173
                return nil, errors.Wrap(err, "Error creating healthz http server")
×
174
        }
×
175

176
        uploadListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", app.config.BindAddress, app.config.BindPort))
1✔
177
        if err != nil {
1✔
178
                return nil, errors.Wrap(err, "Error creating upload listerner")
×
179
        }
×
180

181
        healthzListener, err := net.Listen("tcp", fmt.Sprintf(":%d", healthzPort))
1✔
182
        if err != nil {
1✔
183
                return nil, errors.Wrap(err, "Error creating healthz listerner")
×
184
        }
×
185

186
        tlsConfig, err := app.getTLSConfig()
1✔
187
        if err != nil {
1✔
188
                return nil, errors.Wrap(err, "Error getting TLS config")
×
189
        }
×
190

191
        go func() {
2✔
192
                defer uploadListener.Close()
1✔
193

1✔
194
                // maybe bind port was 0 (unit tests) assign port here
1✔
195
                app.config.BindPort = uploadListener.Addr().(*net.TCPAddr).Port
1✔
196

1✔
197
                if tlsConfig != nil {
2✔
198
                        uploadServer.TLSConfig = tlsConfig
1✔
199
                        app.errChan <- uploadServer.ServeTLS(uploadListener, "", "")
1✔
200
                        return
1✔
201
                }
1✔
202

203
                // not sure we want to support this code path
204
                app.errChan <- uploadServer.Serve(uploadListener)
×
205
        }()
206

207
        go func() {
2✔
208
                defer healthzServer.Close()
1✔
209

1✔
210
                app.errChan <- healthzServer.Serve(healthzListener)
1✔
211
        }()
1✔
212

213
        var timeChan <-chan time.Time
1✔
214

1✔
215
        if app.config.Deadline != nil {
2✔
216
                timeChan = time.After(time.Until(*app.config.Deadline))
1✔
217
        } else {
2✔
218
                tc := make(chan time.Time)
1✔
219
                defer close(tc)
1✔
220
                timeChan = tc
1✔
221
        }
1✔
222

223
        select {
1✔
224
        case err = <-app.errChan:
×
225
                if err != nil {
×
226
                        klog.Errorf("HTTP server returned error %s", err.Error())
×
227
                        return nil, err
×
228
                }
×
229
        case <-app.doneChan:
1✔
230
                klog.Info("Shutting down http server after successful upload")
1✔
231
                if err := healthzServer.Shutdown(context.Background()); err != nil {
1✔
232
                        klog.Errorf("failed to shutdown healthzServer; %v", err)
×
233
                }
×
234
                if err := uploadServer.Shutdown(context.Background()); err != nil {
1✔
235
                        klog.Errorf("failed to shutdown uploadServer; %v", err)
×
236
                }
×
237
        case <-timeChan:
1✔
238
                klog.Info("deadline exceeded, shutting down")
1✔
239
                app.mutex.Lock()
1✔
240
                defer app.mutex.Unlock()
1✔
241
                for {
2✔
242
                        if app.uploading || app.processing {
1✔
243
                                klog.Info("waiting for upload to finish")
×
244
                                app.mutex.Unlock()
×
245
                                time.Sleep(2 * time.Second)
×
246
                                app.mutex.Lock()
×
247
                        } else {
1✔
248
                                break
1✔
249
                        }
250
                }
251
                if !app.done {
2✔
252
                        klog.Info("upload not done, process exiting")
1✔
253
                        return &RunResult{DeadlinePassed: true}, nil
1✔
254
                }
1✔
255
        }
256

257
        result := &RunResult{
1✔
258
                CloneTarget:          app.cloneTarget,
1✔
259
                PreallocationApplied: app.preallocationApplied,
1✔
260
        }
1✔
261

1✔
262
        return result, nil
1✔
263
}
264

265
func (app *uploadServerApp) getTLSConfig() (*tls.Config, error) {
1✔
266
        if app.config.ServerCertFile == "" || app.config.ServerKeyFile == "" {
1✔
267
                return nil, nil
×
268
        }
×
269

270
        //nolint:gosec // False positive: Min version is not known statically
271
        config := &tls.Config{
1✔
272
                CipherSuites: app.config.CryptoConfig.CipherSuites,
1✔
273
                ClientAuth:   tls.RequireAndVerifyClientCert,
1✔
274
                MinVersion:   app.config.CryptoConfig.MinVersion,
1✔
275
        }
1✔
276

1✔
277
        if app.config.ClientCertFile != "" {
2✔
278
                bs, err := os.ReadFile(app.config.ClientCertFile)
1✔
279
                if err != nil {
1✔
280
                        return nil, err
×
281
                }
×
282

283
                caCertPool := x509.NewCertPool()
1✔
284
                if ok := caCertPool.AppendCertsFromPEM(bs); !ok {
1✔
285
                        return nil, err
×
286
                }
×
287

288
                config.ClientCAs = caCertPool
1✔
289
        }
290

291
        cert, err := tls.LoadX509KeyPair(app.config.ServerCertFile, app.config.ServerKeyFile)
1✔
292
        if err != nil {
1✔
293
                return nil, err
×
294
        }
×
295

296
        config.Certificates = []tls.Certificate{cert}
1✔
297

1✔
298
        return config, nil
1✔
299
}
300

301
func (app *uploadServerApp) createHealthzServer() (*http.Server, error) {
1✔
302
        mux := http.NewServeMux()
1✔
303
        mux.HandleFunc(healthzPath, app.healthzHandler)
1✔
304
        return &http.Server{
1✔
305
                Handler:           mux,
1✔
306
                ReadHeaderTimeout: 10 * time.Second,
1✔
307
        }, nil
1✔
308
}
1✔
309

310
func (app *uploadServerApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
311
        app.mux.ServeHTTP(w, r)
1✔
312
}
1✔
313

314
func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, r *http.Request) {
1✔
315
        if _, err := io.WriteString(w, "OK"); err != nil {
1✔
316
                klog.Errorf("healthzHandler: failed to send response; %v", err)
×
317
        }
×
318
}
319

320
func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool {
1✔
321
        if r.Method != http.MethodPost {
2✔
322
                w.WriteHeader(http.StatusNotFound)
1✔
323
                return false
1✔
324
        }
1✔
325

326
        if r.TLS != nil {
2✔
327
                found := false
1✔
328

1✔
329
                for _, cert := range r.TLS.PeerCertificates {
2✔
330
                        if cert.Subject.CommonName == app.config.ClientName {
2✔
331
                                found = true
1✔
332
                                break
1✔
333
                        }
334
                }
335

336
                if !found {
2✔
337
                        w.WriteHeader(http.StatusUnauthorized)
1✔
338
                        return false
1✔
339
                }
1✔
340
        } else {
1✔
341
                klog.V(3).Infof("Handling HTTP connection")
1✔
342
        }
1✔
343

344
        app.mutex.Lock()
1✔
345
        defer app.mutex.Unlock()
1✔
346

1✔
347
        if app.uploading || app.processing {
2✔
348
                klog.Warning("Got concurrent upload request")
1✔
349
                w.WriteHeader(http.StatusServiceUnavailable)
1✔
350
                return false
1✔
351
        }
1✔
352

353
        if app.done {
2✔
354
                klog.Warning("Got upload request after already done")
1✔
355
                w.WriteHeader(http.StatusConflict)
1✔
356
                return false
1✔
357
        }
1✔
358

359
        app.uploading = true
1✔
360

1✔
361
        return true
1✔
362
}
363

364
func (app *uploadServerApp) uploadHandlerAsync(irc imageReadCloser) http.HandlerFunc {
1✔
365
        return func(w http.ResponseWriter, r *http.Request) {
2✔
366
                if r.Method == http.MethodHead {
2✔
367
                        w.WriteHeader(http.StatusOK)
1✔
368
                        return
1✔
369
                }
1✔
370

371
                if !app.validateShouldHandleRequest(w, r) {
2✔
372
                        return
1✔
373
                }
1✔
374

375
                cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
376

1✔
377
                klog.Infof("Content type header is %q\n", cdiContentType)
1✔
378

1✔
379
                readCloser, err := irc(r)
1✔
380
                if err != nil {
1✔
381
                        w.WriteHeader(http.StatusBadRequest)
×
382
                }
×
383

384
                processor, err := uploadProcessorFuncAsync(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType)
1✔
385

1✔
386
                app.mutex.Lock()
1✔
387
                defer app.mutex.Unlock()
1✔
388

1✔
389
                if err != nil {
2✔
390
                        klog.Errorf("Saving stream failed: %s", err)
1✔
391
                        if errors.As(err, &importer.ValidationSizeError{}) {
1✔
392
                                w.WriteHeader(http.StatusBadRequest)
×
393
                        } else {
1✔
394
                                w.WriteHeader(http.StatusInternalServerError)
1✔
395
                        }
1✔
396

397
                        _, writeErr := fmt.Fprintf(w, "Saving stream failed: %s", err.Error())
1✔
398
                        if writeErr != nil {
1✔
399
                                klog.Errorf("failed to send response; %v", err)
×
400
                        }
×
401

402
                        app.uploading = false
1✔
403
                        return
1✔
404
                }
405

406
                app.uploading = false
1✔
407
                app.processing = true
1✔
408

1✔
409
                // Start processing.
1✔
410
                go func() {
2✔
411
                        err := processor.ProcessDataResume()
1✔
412
                        app.mutex.Lock()
1✔
413
                        defer app.mutex.Unlock()
1✔
414
                        app.processing = false
1✔
415
                        if err != nil {
1✔
416
                                klog.Errorf("Error during resumed processing: %v", err)
×
417
                                app.errChan <- err
×
418
                                return
×
419
                        }
×
420
                        defer close(app.doneChan)
1✔
421
                        app.done = true
1✔
422
                        app.preallocationApplied = processor.PreallocationApplied()
1✔
423
                        app.cloneTarget = isCloneTraget(cdiContentType)
1✔
424
                        klog.Infof("Wrote data to %s", app.config.Destination)
1✔
425
                }()
426

427
                klog.Info("Returning success to caller, continue processing in background")
1✔
428
        }
429
}
430

431
func (app *uploadServerApp) processUpload(irc imageReadCloser, w http.ResponseWriter, r *http.Request, dvContentType cdiv1.DataVolumeContentType) {
1✔
432
        if !app.validateShouldHandleRequest(w, r) {
2✔
433
                return
1✔
434
        }
1✔
435

436
        cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
437

1✔
438
        klog.Infof("Content type header is %q\n", cdiContentType)
1✔
439

1✔
440
        readCloser, err := irc(r)
1✔
441
        if err != nil {
1✔
442
                w.WriteHeader(http.StatusBadRequest)
×
443
        }
×
444

445
        preallocationApplied, err := uploadProcessorFunc(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType, dvContentType)
1✔
446

1✔
447
        app.mutex.Lock()
1✔
448
        defer app.mutex.Unlock()
1✔
449
        app.uploading = false
1✔
450

1✔
451
        if err != nil {
2✔
452
                klog.Errorf("Saving stream failed: %s", err)
1✔
453
                w.WriteHeader(http.StatusInternalServerError)
1✔
454
                return
1✔
455
        }
1✔
456

457
        app.done = true
1✔
458
        app.preallocationApplied = preallocationApplied
1✔
459
        app.cloneTarget = isCloneTraget(cdiContentType)
1✔
460
        close(app.doneChan)
1✔
461

1✔
462
        if dvContentType == cdiv1.DataVolumeArchive {
1✔
463
                klog.Infof("Wrote archive data")
×
464
        } else {
1✔
465
                klog.Infof("Wrote data to %s", app.config.Destination)
1✔
466
        }
1✔
467
}
468

469
func (app *uploadServerApp) uploadHandler(irc imageReadCloser) http.HandlerFunc {
1✔
470
        return func(w http.ResponseWriter, r *http.Request) {
2✔
471
                app.processUpload(irc, w, r, cdiv1.DataVolumeKubeVirt)
1✔
472
        }
1✔
473
}
474

475
func (app *uploadServerApp) uploadArchiveHandler(irc imageReadCloser) http.HandlerFunc {
1✔
476
        return func(w http.ResponseWriter, r *http.Request) {
2✔
477
                app.processUpload(irc, w, r, cdiv1.DataVolumeArchive)
1✔
478
        }
1✔
479
}
480

481
func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string) (*importer.DataProcessor, error) {
×
482
        if sourceContentType == common.FilesystemCloneContentType {
×
483
                return nil, fmt.Errorf("async filesystem clone not supported")
×
484
        }
×
485

486
        uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType))
×
487
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
488
        return processor, processor.ProcessDataWithPause()
×
489
}
490

491
func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string, dvContentType cdiv1.DataVolumeContentType) (bool, error) {
×
492
        if sourceContentType == common.FilesystemCloneContentType {
×
493
                return false, filesystemCloneProcessor(stream, dest)
×
494
        }
×
495

496
        // Clone block device to block device or file system
NEW
497
        directWriteException := sourceContentType == common.BlockdeviceClone
×
NEW
498
        uds := importer.NewUploadDataSource(newContentReader(stream, sourceContentType), dvContentType, directWriteException)
×
499
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
500
        err := processor.ProcessData()
×
501
        return processor.PreallocationApplied(), err
×
502
}
503

504
// Clone file system to block device or file system
505
func filesystemCloneProcessor(stream io.ReadCloser, dest string) error {
×
506
        // Clone to block device
×
507
        if dest == common.WriteBlockPath {
×
508
                if err := untarToBlockdev(newSnappyReadCloser(stream), dest); err != nil {
×
509
                        return errors.Wrapf(err, "error unarchiving to %s", dest)
×
510
                }
×
511
                return nil
×
512
        }
513

514
        // Clone to file system
515
        destDir := common.ImporterVolumePath
×
516
        if err := util.UnArchiveTar(newSnappyReadCloser(stream), destDir); err != nil {
×
517
                return errors.Wrapf(err, "error unarchiving to %s", destDir)
×
518
        }
×
519
        return nil
×
520
}
521

522
func untarToBlockdev(stream io.Reader, dest string) error {
×
523
        tr := tar.NewReader(stream)
×
524
        for {
×
525
                header, err := tr.Next()
×
526
                switch {
×
527
                case err == io.EOF:
×
528
                        return nil
×
529
                case err != nil:
×
530
                        return err
×
531
                case header == nil:
×
532
                        continue
×
533
                }
534
                if !strings.Contains(header.Name, common.DiskImageName) {
×
535
                        continue
×
536
                }
537
                switch header.Typeflag {
×
538
                case tar.TypeReg, tar.TypeGNUSparse:
×
539
                        klog.Infof("Untaring %d bytes to %s", header.Size, dest)
×
540
                        f, err := os.OpenFile(dest, os.O_APPEND|os.O_WRONLY, os.ModeDevice|os.ModePerm)
×
541
                        if err != nil {
×
542
                                return err
×
543
                        }
×
544
                        written, err := io.CopyN(f, tr, header.Size)
×
545
                        if err != nil {
×
546
                                return err
×
547
                        }
×
548
                        klog.Infof("Written %d", written)
×
549
                        f.Close()
×
550
                        return nil
×
551
                }
552
        }
553
}
554

555
func newContentReader(stream io.ReadCloser, contentType string) io.ReadCloser {
×
556
        if contentType == common.BlockdeviceClone {
×
557
                return newSnappyReadCloser(stream)
×
558
        }
×
559

560
        return stream
×
561
}
562

563
func newSnappyReadCloser(stream io.ReadCloser) io.ReadCloser {
×
564
        return io.NopCloser(snappy.NewReader(stream))
×
565
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc