• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4878

15 Aug 2024 11:00PM UTC coverage: 59.27% (+0.08%) from 59.188%
#4878

push

travis-ci

web-flow
Properly handle DataVolume preallocation setting for all host assisted (copy) clones (#3352)

* add file.go in pkg/util

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* add tarDiskImageReader to uploadserver

also don't support async endpoint for clone ops (never used anyway)

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* don't use dataprocessor API for block source clones

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* implement StreamDataToFile with sparse/preallocation support

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* unit test for StreamDataToFile

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* add sparse/preallocation check to block->fs to existing functests

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

---------

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

139 of 304 new or added lines in 2 files covered. (45.72%)

9 existing lines in 3 files now uncovered.

16607 of 28019 relevant lines covered (59.27%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.31
/pkg/uploadserver/uploadserver.go
1
/*
2
 * This file is part of the CDI project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright 2018 Red Hat, Inc.
17
 *
18
 */
19

20
package uploadserver
21

22
import (
23
        "archive/tar"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "fmt"
28
        "io"
29
        "mime/multipart"
30
        "net"
31
        "net/http"
32
        "os"
33
        "strings"
34
        "sync"
35
        "time"
36

37
        "github.com/golang/snappy"
38
        "github.com/pkg/errors"
39

40
        "k8s.io/klog/v2"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        "kubevirt.io/containerized-data-importer/pkg/common"
44
        "kubevirt.io/containerized-data-importer/pkg/importer"
45
        "kubevirt.io/containerized-data-importer/pkg/util"
46
        cryptowatch "kubevirt.io/containerized-data-importer/pkg/util/tls-crypto-watch"
47
)
48

49
const (
50
        healthzPort = 8080
51
        healthzPath = "/healthz"
52
)
53

54
type Config struct {
55
        BindAddress string
56
        BindPort    int
57

58
        Destination string
59

60
        ServerKeyFile, ServerCertFile string
61
        ClientCertFile, ClientName    string
62

63
        ImageSize          string
64
        FilesystemOverhead float64
65
        Preallocation      bool
66

67
        Deadline *time.Time
68

69
        CryptoConfig cryptowatch.CryptoConfig
70
}
71

72
// RunResult is the result of the upload server run
73
type RunResult struct {
74
        CloneTarget          bool
75
        PreallocationApplied bool
76
        DeadlinePassed       bool
77
}
78

79
// UploadServer is the interface to uploadServerApp
80
type UploadServer interface {
81
        Run() (*RunResult, error)
82
}
83

84
type uploadServerApp struct {
85
        config               *Config
86
        mux                  *http.ServeMux
87
        uploading            bool
88
        processing           bool
89
        done                 bool
90
        preallocationApplied bool
91
        cloneTarget          bool
92
        doneChan             chan struct{}
93
        errChan              chan error
94
        mutex                sync.Mutex
95
}
96

97
type imageReadCloser func(*http.Request) (io.ReadCloser, error)
98

99
// may be overridden in tests
100
var uploadProcessorFunc = newUploadStreamProcessor
101
var uploadProcessorFuncAsync = newAsyncUploadStreamProcessor
102

103
func bodyReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
104
        return r.Body, nil
1✔
105
}
1✔
106

107
func formReadCloser(r *http.Request) (io.ReadCloser, error) {
1✔
108
        multiReader, err := r.MultipartReader()
1✔
109
        if err != nil {
1✔
110
                return nil, err
×
111
        }
×
112

113
        var filePart *multipart.Part
1✔
114

1✔
115
        for {
2✔
116
                filePart, err = multiReader.NextPart()
1✔
117
                if err != nil || filePart.FormName() == "file" {
2✔
118
                        break
1✔
119
                }
120
                klog.Infof("Ignoring part %s", filePart.FormName())
×
121
        }
122

123
        // multiReader.NextPart() returns io.EOF when read everything
124
        if err != nil {
1✔
125
                return nil, err
×
126
        }
×
127

128
        return filePart, nil
1✔
129
}
130

131
func isCloneTarget(contentType string) bool {
1✔
132
        return contentType == common.BlockdeviceClone || contentType == common.FilesystemCloneContentType
1✔
133
}
1✔
134

135
// NewUploadServer returns a new instance of uploadServerApp
136
func NewUploadServer(config *Config) UploadServer {
1✔
137
        server := &uploadServerApp{
1✔
138
                config:    config,
1✔
139
                mux:       http.NewServeMux(),
1✔
140
                uploading: false,
1✔
141
                done:      false,
1✔
142
                doneChan:  make(chan struct{}),
1✔
143
                errChan:   make(chan error),
1✔
144
        }
1✔
145

1✔
146
        for _, path := range common.SyncUploadPaths {
2✔
147
                server.mux.HandleFunc(path, server.uploadHandler(bodyReadCloser))
1✔
148
        }
1✔
149
        for _, path := range common.AsyncUploadPaths {
2✔
150
                server.mux.HandleFunc(path, server.uploadHandlerAsync(bodyReadCloser))
1✔
151
        }
1✔
152
        for _, path := range common.ArchiveUploadPaths {
2✔
153
                server.mux.HandleFunc(path, server.uploadArchiveHandler(bodyReadCloser))
1✔
154
        }
1✔
155
        for _, path := range common.SyncUploadFormPaths {
2✔
156
                server.mux.HandleFunc(path, server.uploadHandler(formReadCloser))
1✔
157
        }
1✔
158
        for _, path := range common.AsyncUploadFormPaths {
2✔
159
                server.mux.HandleFunc(path, server.uploadHandlerAsync(formReadCloser))
1✔
160
        }
1✔
161

162
        return server
1✔
163
}
164

165
func (app *uploadServerApp) Run() (*RunResult, error) {
1✔
166
        uploadServer := http.Server{
1✔
167
                Handler:           app,
1✔
168
                ReadHeaderTimeout: 10 * time.Second,
1✔
169
        }
1✔
170

1✔
171
        healthzServer, err := app.createHealthzServer()
1✔
172
        if err != nil {
1✔
173
                return nil, errors.Wrap(err, "Error creating healthz http server")
×
174
        }
×
175

176
        uploadListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", app.config.BindAddress, app.config.BindPort))
1✔
177
        if err != nil {
1✔
178
                return nil, errors.Wrap(err, "Error creating upload listerner")
×
179
        }
×
180

181
        healthzListener, err := net.Listen("tcp", fmt.Sprintf(":%d", healthzPort))
1✔
182
        if err != nil {
1✔
183
                return nil, errors.Wrap(err, "Error creating healthz listerner")
×
184
        }
×
185

186
        tlsConfig, err := app.getTLSConfig()
1✔
187
        if err != nil {
1✔
188
                return nil, errors.Wrap(err, "Error getting TLS config")
×
189
        }
×
190

191
        go func() {
2✔
192
                defer uploadListener.Close()
1✔
193

1✔
194
                // maybe bind port was 0 (unit tests) assign port here
1✔
195
                app.config.BindPort = uploadListener.Addr().(*net.TCPAddr).Port
1✔
196

1✔
197
                if tlsConfig != nil {
2✔
198
                        uploadServer.TLSConfig = tlsConfig
1✔
199
                        app.errChan <- uploadServer.ServeTLS(uploadListener, "", "")
1✔
200
                        return
1✔
201
                }
1✔
202

203
                // not sure we want to support this code path
204
                app.errChan <- uploadServer.Serve(uploadListener)
×
205
        }()
206

207
        go func() {
2✔
208
                defer healthzServer.Close()
1✔
209

1✔
210
                app.errChan <- healthzServer.Serve(healthzListener)
1✔
211
        }()
1✔
212

213
        var timeChan <-chan time.Time
1✔
214

1✔
215
        if app.config.Deadline != nil {
2✔
216
                timeChan = time.After(time.Until(*app.config.Deadline))
1✔
217
        } else {
2✔
218
                tc := make(chan time.Time)
1✔
219
                defer close(tc)
1✔
220
                timeChan = tc
1✔
221
        }
1✔
222

223
        select {
1✔
224
        case err = <-app.errChan:
×
225
                if err != nil {
×
226
                        klog.Errorf("HTTP server returned error %s", err.Error())
×
227
                        return nil, err
×
228
                }
×
229
        case <-app.doneChan:
1✔
230
                klog.Info("Shutting down http server after successful upload")
1✔
231
                if err := healthzServer.Shutdown(context.Background()); err != nil {
1✔
232
                        klog.Errorf("failed to shutdown healthzServer; %v", err)
×
233
                }
×
234
                if err := uploadServer.Shutdown(context.Background()); err != nil {
1✔
235
                        klog.Errorf("failed to shutdown uploadServer; %v", err)
×
236
                }
×
237
        case <-timeChan:
1✔
238
                klog.Info("deadline exceeded, shutting down")
1✔
239
                app.mutex.Lock()
1✔
240
                defer app.mutex.Unlock()
1✔
241
                for {
2✔
242
                        if app.uploading || app.processing {
1✔
243
                                klog.Info("waiting for upload to finish")
×
244
                                app.mutex.Unlock()
×
245
                                time.Sleep(2 * time.Second)
×
246
                                app.mutex.Lock()
×
247
                        } else {
1✔
248
                                break
1✔
249
                        }
250
                }
251
                if !app.done {
2✔
252
                        klog.Info("upload not done, process exiting")
1✔
253
                        return &RunResult{DeadlinePassed: true}, nil
1✔
254
                }
1✔
255
        }
256

257
        result := &RunResult{
1✔
258
                CloneTarget:          app.cloneTarget,
1✔
259
                PreallocationApplied: app.preallocationApplied,
1✔
260
        }
1✔
261

1✔
262
        return result, nil
1✔
263
}
264

265
func (app *uploadServerApp) getTLSConfig() (*tls.Config, error) {
1✔
266
        if app.config.ServerCertFile == "" || app.config.ServerKeyFile == "" {
1✔
267
                return nil, nil
×
268
        }
×
269

270
        //nolint:gosec // False positive: Min version is not known statically
271
        config := &tls.Config{
1✔
272
                CipherSuites: app.config.CryptoConfig.CipherSuites,
1✔
273
                ClientAuth:   tls.RequireAndVerifyClientCert,
1✔
274
                MinVersion:   app.config.CryptoConfig.MinVersion,
1✔
275
        }
1✔
276

1✔
277
        if app.config.ClientCertFile != "" {
2✔
278
                bs, err := os.ReadFile(app.config.ClientCertFile)
1✔
279
                if err != nil {
1✔
280
                        return nil, err
×
281
                }
×
282

283
                caCertPool := x509.NewCertPool()
1✔
284
                if ok := caCertPool.AppendCertsFromPEM(bs); !ok {
1✔
285
                        return nil, err
×
286
                }
×
287

288
                config.ClientCAs = caCertPool
1✔
289
        }
290

291
        cert, err := tls.LoadX509KeyPair(app.config.ServerCertFile, app.config.ServerKeyFile)
1✔
292
        if err != nil {
1✔
293
                return nil, err
×
294
        }
×
295

296
        config.Certificates = []tls.Certificate{cert}
1✔
297

1✔
298
        return config, nil
1✔
299
}
300

301
func (app *uploadServerApp) createHealthzServer() (*http.Server, error) {
1✔
302
        mux := http.NewServeMux()
1✔
303
        mux.HandleFunc(healthzPath, app.healthzHandler)
1✔
304
        return &http.Server{
1✔
305
                Handler:           mux,
1✔
306
                ReadHeaderTimeout: 10 * time.Second,
1✔
307
        }, nil
1✔
308
}
1✔
309

310
func (app *uploadServerApp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1✔
311
        app.mux.ServeHTTP(w, r)
1✔
312
}
1✔
313

314
func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, r *http.Request) {
1✔
315
        if _, err := io.WriteString(w, "OK"); err != nil {
1✔
316
                klog.Errorf("healthzHandler: failed to send response; %v", err)
×
317
        }
×
318
}
319

320
func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool {
1✔
321
        if r.Method != http.MethodPost {
2✔
322
                w.WriteHeader(http.StatusNotFound)
1✔
323
                return false
1✔
324
        }
1✔
325

326
        if r.TLS != nil {
2✔
327
                found := false
1✔
328

1✔
329
                for _, cert := range r.TLS.PeerCertificates {
2✔
330
                        if cert.Subject.CommonName == app.config.ClientName {
2✔
331
                                found = true
1✔
332
                                break
1✔
333
                        }
334
                }
335

336
                if !found {
2✔
337
                        w.WriteHeader(http.StatusUnauthorized)
1✔
338
                        return false
1✔
339
                }
1✔
340
        } else {
1✔
341
                klog.V(3).Infof("Handling HTTP connection")
1✔
342
        }
1✔
343

344
        app.mutex.Lock()
1✔
345
        defer app.mutex.Unlock()
1✔
346

1✔
347
        if app.uploading || app.processing {
2✔
348
                klog.Warning("Got concurrent upload request")
1✔
349
                w.WriteHeader(http.StatusServiceUnavailable)
1✔
350
                return false
1✔
351
        }
1✔
352

353
        if app.done {
2✔
354
                klog.Warning("Got upload request after already done")
1✔
355
                w.WriteHeader(http.StatusConflict)
1✔
356
                return false
1✔
357
        }
1✔
358

359
        app.uploading = true
1✔
360

1✔
361
        return true
1✔
362
}
363

364
func (app *uploadServerApp) uploadHandlerAsync(irc imageReadCloser) http.HandlerFunc {
1✔
365
        return func(w http.ResponseWriter, r *http.Request) {
2✔
366
                if r.Method == http.MethodHead {
2✔
367
                        w.WriteHeader(http.StatusOK)
1✔
368
                        return
1✔
369
                }
1✔
370

371
                if !app.validateShouldHandleRequest(w, r) {
2✔
372
                        return
1✔
373
                }
1✔
374

375
                cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
376

1✔
377
                klog.Infof("Content type header is %q\n", cdiContentType)
1✔
378

1✔
379
                readCloser, err := irc(r)
1✔
380
                if err != nil {
1✔
381
                        w.WriteHeader(http.StatusBadRequest)
×
382
                }
×
383

384
                processor, err := uploadProcessorFuncAsync(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType)
1✔
385

1✔
386
                app.mutex.Lock()
1✔
387
                defer app.mutex.Unlock()
1✔
388

1✔
389
                if err != nil {
2✔
390
                        klog.Errorf("Saving stream failed: %s", err)
1✔
391
                        if errors.As(err, &importer.ValidationSizeError{}) {
1✔
392
                                w.WriteHeader(http.StatusBadRequest)
×
393
                        } else {
1✔
394
                                w.WriteHeader(http.StatusInternalServerError)
1✔
395
                        }
1✔
396

397
                        _, writeErr := fmt.Fprintf(w, "Saving stream failed: %s", err.Error())
1✔
398
                        if writeErr != nil {
1✔
399
                                klog.Errorf("failed to send response; %v", err)
×
400
                        }
×
401

402
                        app.uploading = false
1✔
403
                        return
1✔
404
                }
405

406
                app.uploading = false
1✔
407
                app.processing = true
1✔
408

1✔
409
                // Start processing.
1✔
410
                go func() {
2✔
411
                        err := processor.ProcessDataResume()
1✔
412
                        app.mutex.Lock()
1✔
413
                        defer app.mutex.Unlock()
1✔
414
                        app.processing = false
1✔
415
                        if err != nil {
1✔
416
                                klog.Errorf("Error during resumed processing: %v", err)
×
417
                                app.errChan <- err
×
418
                                return
×
419
                        }
×
420
                        defer close(app.doneChan)
1✔
421
                        app.done = true
1✔
422
                        app.preallocationApplied = processor.PreallocationApplied()
1✔
423
                        app.cloneTarget = isCloneTarget(cdiContentType)
1✔
424
                        klog.Infof("Wrote data to %s", app.config.Destination)
1✔
425
                }()
426

427
                klog.Info("Returning success to caller, continue processing in background")
1✔
428
        }
429
}
430

431
func (app *uploadServerApp) processUpload(irc imageReadCloser, w http.ResponseWriter, r *http.Request, dvContentType cdiv1.DataVolumeContentType) {
1✔
432
        if !app.validateShouldHandleRequest(w, r) {
2✔
433
                return
1✔
434
        }
1✔
435

436
        cdiContentType := r.Header.Get(common.UploadContentTypeHeader)
1✔
437

1✔
438
        klog.Infof("Content type header is %q\n", cdiContentType)
1✔
439

1✔
440
        readCloser, err := irc(r)
1✔
441
        if err != nil {
1✔
442
                w.WriteHeader(http.StatusBadRequest)
×
443
        }
×
444

445
        preallocationApplied, err := uploadProcessorFunc(readCloser, app.config.Destination, app.config.ImageSize, app.config.FilesystemOverhead, app.config.Preallocation, cdiContentType, dvContentType)
1✔
446

1✔
447
        app.mutex.Lock()
1✔
448
        defer app.mutex.Unlock()
1✔
449
        app.uploading = false
1✔
450

1✔
451
        if err != nil {
2✔
452
                klog.Errorf("Saving stream failed: %s", err)
1✔
453
                w.WriteHeader(http.StatusInternalServerError)
1✔
454
                return
1✔
455
        }
1✔
456

457
        app.done = true
1✔
458
        app.preallocationApplied = preallocationApplied
1✔
459
        app.cloneTarget = isCloneTarget(cdiContentType)
1✔
460
        close(app.doneChan)
1✔
461

1✔
462
        if dvContentType == cdiv1.DataVolumeArchive {
1✔
463
                klog.Infof("Wrote archive data")
×
464
        } else {
1✔
465
                klog.Infof("Wrote data to %s", app.config.Destination)
1✔
466
        }
1✔
467
}
468

469
func (app *uploadServerApp) uploadHandler(irc imageReadCloser) http.HandlerFunc {
1✔
470
        return func(w http.ResponseWriter, r *http.Request) {
2✔
471
                app.processUpload(irc, w, r, cdiv1.DataVolumeKubeVirt)
1✔
472
        }
1✔
473
}
474

475
func (app *uploadServerApp) uploadArchiveHandler(irc imageReadCloser) http.HandlerFunc {
1✔
476
        return func(w http.ResponseWriter, r *http.Request) {
2✔
477
                app.processUpload(irc, w, r, cdiv1.DataVolumeArchive)
1✔
478
        }
1✔
479
}
480

481
func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string) (*importer.DataProcessor, error) {
×
NEW
482
        if isCloneTarget(sourceContentType) {
×
NEW
483
                return nil, fmt.Errorf("async clone not supported")
×
UNCOV
484
        }
×
485

486
        uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType))
×
487
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
488
        return processor, processor.ProcessDataWithPause()
×
489
}
490

491
func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, sourceContentType string, dvContentType cdiv1.DataVolumeContentType) (bool, error) {
×
NEW
492
        stream = newContentReader(stream, sourceContentType)
×
NEW
493
        if isCloneTarget(sourceContentType) {
×
NEW
494
                return cloneProcessor(stream, sourceContentType, dest, preallocation)
×
UNCOV
495
        }
×
496

497
        // Clone block device to block device or file system
NEW
498
        uds := importer.NewUploadDataSource(stream, dvContentType)
×
499
        processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
×
500
        err := processor.ProcessData()
×
501
        return processor.PreallocationApplied(), err
×
502
}
503

NEW
504
func cloneProcessor(stream io.ReadCloser, contentType, dest string, preallocate bool) (bool, error) {
×
NEW
505
        if contentType == common.FilesystemCloneContentType {
×
NEW
506
                if dest != common.WriteBlockPath {
×
NEW
507
                        return fileToFileCloneProcessor(stream)
×
508
                }
×
509

NEW
510
                tarImageReader, err := newTarDiskImageReader(stream)
×
NEW
511
                if err != nil {
×
NEW
512
                        stream.Close()
×
NEW
513
                        return false, err
×
NEW
514
                }
×
NEW
515
                stream = tarImageReader
×
516
        }
517

NEW
518
        defer stream.Close()
×
NEW
519
        bytesRead, bytesWrittenn, err := util.StreamDataToFile(stream, dest, preallocate)
×
NEW
520
        if err != nil {
×
NEW
521
                return false, err
×
NEW
522
        }
×
523

NEW
524
        klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWrittenn, dest)
×
NEW
525

×
NEW
526
        return false, nil
×
527
}
528

NEW
529
func fileToFileCloneProcessor(stream io.ReadCloser) (bool, error) {
×
NEW
530
        defer stream.Close()
×
NEW
531
        if err := util.UnArchiveTar(stream, common.ImporterVolumePath); err != nil {
×
NEW
532
                return false, errors.Wrapf(err, "error unarchiving to %s", common.ImporterVolumePath)
×
533
        }
×
NEW
534
        return true, nil
×
535
}
536

537
type closeWrapper struct {
538
        io.Reader
539
        closers []io.Closer
540
}
541

NEW
542
func (c *closeWrapper) Close() error {
×
NEW
543
        var err error
×
NEW
544
        for _, closer := range c.closers {
×
NEW
545
                if e := closer.Close(); e != nil {
×
NEW
546
                        err = e
×
NEW
547
                }
×
548
        }
NEW
549
        return err
×
550
}
551

552
type tarDiskImageReader struct {
553
        tr           *tar.Reader
554
        size, offset int64
555
}
556

NEW
557
func (r *tarDiskImageReader) Read(p []byte) (int, error) {
×
NEW
558
        if r.offset >= r.size {
×
NEW
559
                return 0, io.EOF
×
NEW
560
        }
×
NEW
561
        remaining := r.size - r.offset
×
NEW
562
        if int(remaining) < len(p) {
×
NEW
563
                p = p[:remaining]
×
NEW
564
        }
×
NEW
565
        n, err := r.tr.Read(p)
×
NEW
566
        r.offset += int64(n)
×
NEW
567
        klog.V(3).Infof("Read %d bytes, offset %d, size %d", n, r.offset, r.size)
×
NEW
568
        return n, err
×
569
}
570

NEW
571
func newTarDiskImageReader(stream io.ReadCloser) (io.ReadCloser, error) {
×
572
        tr := tar.NewReader(stream)
×
573
        for {
×
574
                header, err := tr.Next()
×
NEW
575
                if err == io.EOF {
×
NEW
576
                        break
×
577
                }
NEW
578
                if err != nil {
×
NEW
579
                        return nil, err
×
580
                }
×
581
                if !strings.Contains(header.Name, common.DiskImageName) {
×
582
                        continue
×
583
                }
NEW
584
                return &closeWrapper{
×
NEW
585
                        Reader:  &tarDiskImageReader{tr: tr, size: header.Size},
×
NEW
586
                        closers: []io.Closer{stream},
×
NEW
587
                }, nil
×
588
        }
NEW
589
        return nil, fmt.Errorf("no disk image found in tar")
×
590
}
591

592
func newContentReader(stream io.ReadCloser, contentType string) io.ReadCloser {
×
NEW
593
        if isCloneTarget(contentType) {
×
594
                return newSnappyReadCloser(stream)
×
595
        }
×
UNCOV
596
        return stream
×
597
}
598

599
func newSnappyReadCloser(stream io.ReadCloser) io.ReadCloser {
×
NEW
600
        return &closeWrapper{
×
NEW
601
                Reader:  snappy.NewReader(stream),
×
NEW
602
                closers: []io.Closer{stream},
×
NEW
603
        }
×
UNCOV
604
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc