• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 37199505-f7fc-48df-8efe-d8ccfaf333fc

05 Jun 2026 03:58AM UTC coverage: 71.676% (+0.03%) from 71.651%
37199505-f7fc-48df-8efe-d8ccfaf333fc

push

prow

web-flow
Merge pull request #17944 from 0xFelix/oci-export-phase1

Add OCI artifact export for VirtualMachines

336 of 444 new or added lines in 9 files covered. (75.68%)

2 existing lines in 2 files now uncovered.

78948 of 110145 relevant lines covered (71.68%)

589.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.34
/pkg/storage/export/virt-exportserver/exportserver.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virtexportserver
21

22
import (
23
        "bytes"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "encoding/json"
28
        "errors"
29
        goflag "flag"
30
        "fmt"
31
        "io"
32
        golog "log"
33
        "net"
34
        "net/http"
35
        "os"
36
        "os/exec"
37
        "path"
38
        "path/filepath"
39
        "strconv"
40
        "strings"
41
        "sync"
42
        "time"
43

44
        gzip "github.com/klauspost/pgzip"
45
        flag "github.com/spf13/pflag"
46
        "golang.org/x/net/http2"
47
        "google.golang.org/grpc"
48
        "google.golang.org/grpc/credentials/insecure"
49
        "google.golang.org/grpc/keepalive"
50
        corev1 "k8s.io/api/core/v1"
51
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
52
        "k8s.io/apimachinery/pkg/runtime"
53
        "k8s.io/apimachinery/pkg/runtime/schema"
54
        "sigs.k8s.io/yaml"
55

56
        virtv1 "kubevirt.io/api/core/v1"
57
        "kubevirt.io/client-go/log"
58
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
59

60
        nbdv1 "kubevirt.io/kubevirt/pkg/storage/cbt/nbd/v1"
61

62
        backupv1 "kubevirt.io/api/backup/v1alpha1"
63

64
        "kubevirt.io/kubevirt/pkg/safepath"
65
        "kubevirt.io/kubevirt/pkg/service"
66
        "kubevirt.io/kubevirt/pkg/storage/export/export"
67
        "kubevirt.io/kubevirt/pkg/storage/oci"
68
        storageutils "kubevirt.io/kubevirt/pkg/storage/utils"
69
)
70

71
const (
72
        authHeader              = "x-kubevirt-export-token"
73
        manifestCmBasePath      = "/manifest_data/"
74
        vmManifestPath          = manifestCmBasePath + "virtualmachine-manifest"
75
        internalLinkPath        = manifestCmBasePath + "internal_host"
76
        internalCaConfigMapPath = manifestCmBasePath + "internal_ca_cm"
77
        externalLinkPath        = manifestCmBasePath + "external_host"
78
        externalCaConfigMapPath = manifestCmBasePath + "external_ca_cm"
79
        exportNamePath          = manifestCmBasePath + "export-name"
80

81
        external = "/external"
82
        internal = "/internal"
83

84
        defaultMapPageSize = 512
85
        tunnelIdleTimeout  = 60 * time.Second
86
)
87

88
var (
89
        excludeMap = map[string]struct{}{
90
                "lost+found": {},
91
        }
92
        h2DummyAddr = &net.TCPAddr{}
93
)
94

95
type TokenGetterFunc func() (string, error)
96

97
type ExportServerConfig struct {
98
        Deadline time.Time
99

100
        ListenAddr string
101

102
        CertFile, KeyFile string
103
        BackupCACert      []byte
104
        TLSMinVersion     uint16
105
        TLSCipherSuites   []uint16
106

107
        TokenFile string
108

109
        BackupUID        string
110
        BackupType       string
111
        BackupCheckpoint string
112

113
        Paths *export.ServerPaths
114

115
        // unit testing helpers
116
        ArchiveHandler     func(string) http.Handler
117
        DirHandler         func(string, string) http.Handler
118
        FileHandler        func(string) http.Handler
119
        GzipHandler        func(string) http.Handler
120
        VmHandler          func([]export.VolumeInfo, func() (string, error), func() (*corev1.ConfigMap, error)) http.Handler
121
        TokenSecretHandler func(TokenGetterFunc) http.Handler
122
        OCIHandler         func(*oci.Builder) http.Handler
123

124
        PermissionChecker func(string) bool
125

126
        TokenGetter TokenGetterFunc
127
}
128

129
type execReader struct {
130
        cmd    *exec.Cmd
131
        stdout io.ReadCloser
132
        stderr io.ReadCloser
133
}
134

135
type exportServer struct {
136
        ExportServerConfig
137
        handler http.Handler
138

139
        nbdClient  nbdv1.NBDClient
140
        nbdMu      sync.RWMutex
141
        ociBuilder *oci.Builder
142
}
143

144
func (er *execReader) Read(p []byte) (int, error) {
×
145
        n, err := er.stdout.Read(p)
×
146
        if err == io.EOF {
×
147
                if err2 := er.cmd.Wait(); err2 != nil {
×
148
                        errBytes, _ := io.ReadAll(er.stderr)
×
149
                        log.Log.Reason(err2).Errorf("Subprocess did not execute successfully, result is: %q\n%s", er.cmd.ProcessState.ExitCode(), string(errBytes))
×
150
                        return n, err2
×
151
                }
×
152
        }
153
        return n, err
×
154
}
155

156
func (er *execReader) Close() error {
×
157
        return er.stdout.Close()
×
158
}
×
159

160
func (s *exportServer) initHandler() {
29✔
161
        mux := http.NewServeMux()
29✔
162
        for _, vi := range s.Paths.Volumes {
45✔
163
                if hasPermissions := s.PermissionChecker(vi.Path); !hasPermissions {
16✔
164
                        golog.Fatalf("unable to manipulate %s's contents, exiting", vi.Path)
×
165
                }
×
166
                for path, handler := range s.getHandlerMap(vi) {
32✔
167
                        log.Log.Infof("Handling path %s\n", path)
16✔
168
                        mux.Handle(path, tokenChecker(s.TokenGetter, handler))
16✔
169
                }
16✔
170
        }
171
        for _, bi := range s.Paths.Backups {
29✔
172
                log.Log.Infof("Handling backup path %s (Map) and %s (Data)\n", bi.MapURI, bi.DataURI)
×
173
                mux.Handle(bi.MapURI, tokenChecker(s.TokenGetter, s.backupMapHandler(bi.Path)))
×
174
                mux.Handle(bi.DataURI, tokenChecker(s.TokenGetter, s.backupDataHandler(bi.Path)))
×
175
        }
×
176
        if s.Paths.VMURI != "" {
37✔
177
                mux.Handle(filepath.Join(internal, s.Paths.VMURI), tokenChecker(s.TokenGetter, s.VmHandler(s.Paths.Volumes, getInternalBasePath, getInternalCAConfigMap)))
8✔
178
                mux.Handle(filepath.Join(external, s.Paths.VMURI), tokenChecker(s.TokenGetter, s.VmHandler(s.Paths.Volumes, getExternalBasePath, getExternalCAConfigMap)))
8✔
179
        }
8✔
180
        if s.Paths.SecretURI != "" {
29✔
181
                mux.Handle(filepath.Join(internal, s.Paths.SecretURI), tokenChecker(s.TokenGetter, s.TokenSecretHandler(s.TokenGetter)))
×
182
                mux.Handle(filepath.Join(external, s.Paths.SecretURI), tokenChecker(s.TokenGetter, s.TokenSecretHandler(s.TokenGetter)))
×
183
        }
×
184
        if s.ociBuilder != nil && s.Paths.OCIURI != "" {
30✔
185
                mux.Handle(s.Paths.OCIURI, tokenChecker(s.TokenGetter, s.OCIHandler(s.ociBuilder)))
1✔
186
        }
1✔
187

188
        // Readiness probe
189
        mux.HandleFunc(export.ReadinessPath, s.readyHandler)
29✔
190

29✔
191
        s.handler = mux
29✔
192
}
193

194
func getInternalCAConfigMap() (*corev1.ConfigMap, error) {
×
195
        return getCAConfigMap(internalCaConfigMapPath)
×
196
}
×
197

198
func getExternalCAConfigMap() (*corev1.ConfigMap, error) {
×
199
        return getCAConfigMap(externalCaConfigMapPath)
×
200
}
×
201

202
func (s *exportServer) getHandlerMap(vi export.VolumeInfo) map[string]http.Handler {
16✔
203
        fi, err := os.Stat(vi.Path)
16✔
204
        if err != nil {
16✔
205
                log.Log.Reason(err).Errorf("error statting %s", vi.Path)
×
206
                return nil
×
207
        }
×
208

209
        var result = make(map[string]http.Handler)
16✔
210

16✔
211
        if vi.ArchiveURI != "" {
20✔
212
                result[vi.ArchiveURI] = s.ArchiveHandler(vi.Path)
4✔
213
        }
4✔
214

215
        if vi.DirURI != "" {
20✔
216
                result[vi.DirURI] = s.DirHandler(vi.DirURI, vi.Path)
4✔
217
        }
4✔
218

219
        p := vi.Path
16✔
220
        if fi.IsDir() {
32✔
221
                p = path.Join(p, "disk.img")
16✔
222
        }
16✔
223

224
        if vi.RawURI != "" {
20✔
225
                result[vi.RawURI] = s.FileHandler(p)
4✔
226
        }
4✔
227

228
        if vi.RawGzURI != "" {
20✔
229
                result[vi.RawGzURI] = s.GzipHandler(p)
4✔
230
        }
4✔
231

232
        return result
16✔
233
}
234

235
func (s *exportServer) Run() {
×
236
        s.initHandler()
×
237

×
NEW
238
        ctx, cancel := context.WithCancel(context.Background())
×
NEW
239
        if !s.Deadline.IsZero() {
×
NEW
240
                log.Log.Infof("Deadline set to %s", s.Deadline)
×
NEW
241
                ctx, cancel = context.WithDeadline(ctx, s.Deadline)
×
NEW
242
        }
×
NEW
243
        defer cancel()
×
NEW
244

×
NEW
245
        srv := s.buildServer(ctx)
×
246

×
247
        h2Server := &http2.Server{
×
248
                IdleTimeout: tunnelIdleTimeout,
×
249
        }
×
250
        if err := http2.ConfigureServer(srv, h2Server); err != nil {
×
251
                panic(err)
×
252
        }
253

254
        ch := make(chan error)
×
255

×
256
        go func() {
×
257
                err := srv.ListenAndServeTLS(s.CertFile, s.KeyFile)
×
258
                ch <- err
×
259
        }()
×
260

NEW
261
        if s.ociBuilder != nil {
×
NEW
262
                go func() {
×
NEW
263
                        if err := s.ociBuilder.Prepare(ctx); err != nil {
×
NEW
264
                                log.Log.Reason(err).Error("OCI Pass 1 failed")
×
NEW
265
                        }
×
266
                }()
267
        }
268

NEW
269
        select {
×
NEW
270
        case err := <-ch:
×
UNCOV
271
                panic(err)
×
NEW
272
        case <-ctx.Done():
×
NEW
273
                log.Log.Info("Deadline exceeded, shutting down")
×
NEW
274
                srv.Shutdown(context.Background())
×
275
        }
276
}
277

278
func (s *exportServer) buildServer(ctx context.Context) *http.Server {
3✔
279
        tlsConfig := &tls.Config{
3✔
280
                MinVersion:   s.TLSMinVersion,
3✔
281
                CipherSuites: s.TLSCipherSuites,
3✔
282
                NextProtos:   []string{"h2", "http/1.1"},
3✔
283
        }
3✔
284

3✔
285
        rootHandler := s.handler
3✔
286
        if s.BackupUID != "" {
5✔
287
                clientCAPool := x509.NewCertPool()
2✔
288
                if ok := clientCAPool.AppendCertsFromPEM(s.BackupCACert); !ok {
3✔
289
                        panic("failed to parse Backup CA")
1✔
290
                }
291
                tlsConfig.ClientCAs = clientCAPool
1✔
292
                tlsConfig.ClientAuth = tls.VerifyClientCertIfGiven
1✔
293

1✔
294
                rootHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3✔
295
                        if r.Method == http.MethodConnect {
3✔
296
                                s.handleTunnel(w, r)
1✔
297
                                return
1✔
298
                        }
1✔
299
                        s.handler.ServeHTTP(w, r)
1✔
300
                })
301
        }
302

303
        return &http.Server{
2✔
304
                Addr:      s.ListenAddr,
2✔
305
                Handler:   rootHandler,
2✔
306
                TLSConfig: tlsConfig,
2✔
307
                BaseContext: func(_ net.Listener) context.Context {
2✔
NEW
308
                        return ctx
×
NEW
309
                },
×
310
        }
311
}
312

313
func (s *exportServer) AddFlags() {
×
314
        flag.CommandLine.AddGoFlag(goflag.CommandLine.Lookup("v"))
×
315
}
×
316

317
func NewExportServer(config ExportServerConfig) (service.Service, error) {
29✔
318
        es := &exportServer{
29✔
319
                ExportServerConfig: config,
29✔
320
        }
29✔
321

29✔
322
        if es.ArchiveHandler == nil {
29✔
323
                es.ArchiveHandler = archiveHandler
×
324
        }
×
325

326
        if es.DirHandler == nil {
29✔
327
                es.DirHandler = dirHandler
×
328
        }
×
329

330
        if es.FileHandler == nil {
29✔
331
                es.FileHandler = fileHandler
×
332
        }
×
333

334
        if es.GzipHandler == nil {
29✔
335
                es.GzipHandler = gzipHandler
×
336
        }
×
337

338
        if es.VmHandler == nil {
29✔
339
                es.VmHandler = vmHandler
×
340
        }
×
341

342
        if es.TokenSecretHandler == nil {
29✔
343
                es.TokenSecretHandler = secretHandler
×
344
        }
×
345

346
        if es.TokenGetter == nil {
29✔
347
                es.TokenGetter = func() (string, error) {
×
348
                        return getToken(es.TokenFile)
×
349
                }
×
350
        }
351

352
        if es.PermissionChecker == nil {
29✔
353
                es.PermissionChecker = checkVolumePermissions
×
354
        }
×
355

356
        if es.OCIHandler == nil {
58✔
357
                es.OCIHandler = ociHTTPHandler
29✔
358
        }
29✔
359

360
        if es.Paths != nil && es.Paths.OCIURI != "" {
29✔
NEW
361
                builder, err := newOCIBuilder(es.Paths)
×
NEW
362
                if err != nil {
×
NEW
363
                        return nil, fmt.Errorf("failed to construct OCI builder: %w", err)
×
NEW
364
                }
×
NEW
365
                if builder != nil {
×
NEW
366
                        es.ociBuilder = builder
×
NEW
367
                }
×
368
        }
369

370
        return es, nil
29✔
371
}
372

373
var getExpandedVM = func() *virtv1.VirtualMachine {
×
374
        f, err := os.Open(vmManifestPath)
×
375
        if err != nil {
×
376
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
377
                return nil
×
378
        }
×
379
        defer f.Close()
×
380
        fileinfo, err := f.Stat()
×
381
        if err != nil {
×
382
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
383
                return nil
×
384
        }
×
385
        buf := make([]byte, fileinfo.Size())
×
386
        _, err = f.Read(buf)
×
387
        if err != nil {
×
388
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
389
                return nil
×
390
        }
×
391

392
        vm := &virtv1.VirtualMachine{}
×
393
        if err := json.Unmarshal(buf, vm); err != nil {
×
394
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
395
                return nil
×
396
        }
×
397
        return vm
×
398
}
399

400
var getInternalBasePath = func() (string, error) {
×
401
        data, err := os.ReadFile(internalLinkPath)
×
402
        if err != nil {
×
403
                return "", err
×
404
        }
×
405
        return string(data), nil
×
406
}
407

408
var getExportName = func() (string, error) {
×
409
        data, err := os.ReadFile(exportNamePath)
×
410
        if err != nil {
×
411
                return "", err
×
412
        }
×
413
        return string(data), nil
×
414
}
415

416
var getExternalBasePath = func() (string, error) {
×
417
        data, err := os.ReadFile(externalLinkPath)
×
418
        if err != nil {
×
419
                return "", err
×
420
        }
×
421
        return string(data), nil
×
422
}
423

424
func GetTypeMetaString(gvk schema.GroupVersionKind) string {
×
425
        return fmt.Sprintf("apiVersion: %s\nkind: %s\n", gvk.GroupVersion().String(), gvk.Kind)
×
426
}
×
427

428
var getCAConfigMap = func(name string) (*corev1.ConfigMap, error) {
×
429
        f, err := os.Open(name)
×
430
        if err != nil {
×
431
                return nil, err
×
432
        }
×
433
        defer f.Close()
×
434
        fileinfo, err := f.Stat()
×
435
        if err != nil {
×
436
                return nil, err
×
437
        }
×
438
        buf := make([]byte, fileinfo.Size())
×
439
        _, err = f.Read(buf)
×
440
        if err != nil {
×
441
                return nil, err
×
442
        }
×
443

444
        cm := &corev1.ConfigMap{}
×
445
        if err := json.Unmarshal(buf, cm); err != nil {
×
446
                return nil, err
×
447
        }
×
448
        return cm, nil
×
449
}
450

451
var getCdiHeaderSecret = func(token, name string) *corev1.Secret {
2✔
452
        data := make(map[string]string)
2✔
453

2✔
454
        data["token"] = fmt.Sprintf("x-kubevirt-export-token:%s", token)
2✔
455
        return &corev1.Secret{
2✔
456
                ObjectMeta: metav1.ObjectMeta{
2✔
457
                        Name: name,
2✔
458
                },
2✔
459
                StringData: data,
2✔
460
        }
2✔
461
}
2✔
462

463
var getDataVolumes = func(vm *virtv1.VirtualMachine) ([]*cdiv1.DataVolume, error) {
×
464
        res := make([]*cdiv1.DataVolume, 0)
×
465
        volumes, err := storageutils.GetVolumes(vm, nil)
×
466
        if err != nil {
×
467
                return nil, err
×
468
        }
×
469
        for _, volume := range volumes {
×
470
                name := ""
×
471
                if volume.DataVolume != nil {
×
472
                        name = volume.DataVolume.Name
×
473
                } else if volume.PersistentVolumeClaim != nil {
×
474
                        name = volume.PersistentVolumeClaim.ClaimName
×
475
                }
×
476
                if name == "" {
×
477
                        continue
×
478
                }
479
                log.Log.V(1).Infof("Opening DV %s", filepath.Join(manifestCmBasePath, fmt.Sprintf("dv-%s", name)))
×
480
                f, err := os.Open(filepath.Join(manifestCmBasePath, fmt.Sprintf("dv-%s", name)))
×
481
                if err != nil {
×
482
                        if errors.Is(err, os.ErrNotExist) {
×
483
                                log.Log.V(1).Info("DV not found skipping")
×
484
                                continue
×
485
                        }
486
                        return nil, err
×
487
                }
488
                defer f.Close()
×
489
                fileinfo, err := f.Stat()
×
490
                if err != nil {
×
491
                        return nil, err
×
492
                }
×
493
                buf := make([]byte, fileinfo.Size())
×
494
                _, err = f.Read(buf)
×
495
                if err != nil {
×
496
                        return nil, err
×
497
                }
×
498
                dv := &cdiv1.DataVolume{}
×
499
                if err := json.Unmarshal(buf, dv); err != nil {
×
500
                        return nil, err
×
501
                }
×
502
                res = append(res, dv)
×
503
        }
504
        return res, nil
×
505
}
506

507
func newTarReader(mountPoint string) (io.ReadCloser, error) {
×
508
        var excludeArgs []string
×
509
        for name := range excludeMap {
×
510
                excludeArgs = append(excludeArgs, "--exclude="+name)
×
511
        }
×
512

513
        args := []string{"Scv"}
×
514
        args = append(args, excludeArgs...)
×
515
        args = append(args, ".")
×
516

×
517
        cmd := exec.Command("/usr/bin/tar", args...)
×
518
        cmd.Dir = mountPoint
×
519
        stdout, err := cmd.StdoutPipe()
×
520
        if err != nil {
×
521
                return nil, err
×
522
        }
×
523
        var stderr bytes.Buffer
×
524
        cmd.Stderr = &stderr
×
525
        if err = cmd.Start(); err != nil {
×
526
                return nil, err
×
527
        }
×
528
        return &execReader{cmd: cmd, stdout: stdout, stderr: io.NopCloser(&stderr)}, nil
×
529
}
530

531
func pipeToGzip(reader io.ReadCloser) io.ReadCloser {
×
532
        pr, pw := io.Pipe()
×
533
        zw := gzip.NewWriter(pw)
×
534

×
535
        go func() {
×
536
                n, err := io.Copy(zw, reader)
×
537
                if err != nil {
×
538
                        log.Log.Reason(err).Error("error piping to gzip")
×
539
                }
×
540
                if err = zw.Close(); err != nil {
×
541
                        log.Log.Reason(err).Error("error closing gzip writer")
×
542
                }
×
543
                if err = pw.Close(); err != nil {
×
544
                        log.Log.Reason(err).Error("error closing pipe writer")
×
545
                }
×
546
                log.Log.Infof("Wrote %d bytes\n", n)
×
547
        }()
548

549
        return pr
×
550
}
551

552
func getTokenQueryParam(r *http.Request) (token string) {
25✔
553
        q := r.URL.Query()
25✔
554
        if keys, ok := q[authHeader]; ok {
37✔
555
                token = keys[0]
12✔
556
                q.Del(authHeader)
12✔
557
                r.URL.RawQuery = q.Encode()
12✔
558
        }
12✔
559
        return
25✔
560
}
561

562
func getTokenHeader(r *http.Request) (token string) {
25✔
563
        if tok := r.Header.Get(authHeader); tok != "" {
38✔
564
                r.Header.Del(authHeader)
13✔
565
                token = tok
13✔
566
        }
13✔
567
        return
25✔
568
}
569

570
func tokenChecker(tokenGetter TokenGetterFunc, nextHandler http.Handler) http.Handler {
33✔
571
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58✔
572
                token, err := tokenGetter()
25✔
573
                if err != nil {
25✔
574
                        w.WriteHeader(http.StatusInternalServerError)
×
575
                        return
×
576
                }
×
577
                for _, tok := range []string{getTokenQueryParam(r), getTokenHeader(r)} {
69✔
578
                        if tok == token {
57✔
579
                                nextHandler.ServeHTTP(w, r)
13✔
580
                                return
13✔
581
                        }
13✔
582
                }
583
                w.WriteHeader(http.StatusUnauthorized)
12✔
584
        })
585
}
586

587
func archiveHandler(mountPoint string) http.Handler {
×
588
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
×
589
                if req.Method != http.MethodGet {
×
590
                        w.WriteHeader(http.StatusBadRequest)
×
591
                        return
×
592
                }
×
593
                if hasPermissions := checkDirectoryPermissions(mountPoint); !hasPermissions {
×
594
                        w.WriteHeader(http.StatusInternalServerError)
×
595
                        return
×
596
                }
×
597

598
                tarReader, err := newTarReader(mountPoint)
×
599
                if err != nil {
×
600
                        log.Log.Reason(err).Error("error creating tar reader")
×
601
                        w.WriteHeader(http.StatusInternalServerError)
×
602
                        return
×
603
                }
×
604
                defer tarReader.Close()
×
605
                gzipReader := pipeToGzip(tarReader)
×
606
                defer gzipReader.Close()
×
607
                n, err := io.Copy(w, gzipReader)
×
608
                if err != nil {
×
609
                        log.Log.Reason(err).Error("error writing response body")
×
610
                }
×
611
                log.Log.Infof("Wrote %d bytes\n", n)
×
612
        })
613
}
614

615
func checkDirectoryPermissions(filePath string) bool {
×
616
        dir, err := os.Open(filePath)
×
617
        if err != nil {
×
618
                log.Log.Reason(err).Errorf("error opening %s", filePath)
×
619
                return false
×
620
        }
×
621
        defer dir.Close()
×
622

×
623
        // Read all filenames
×
624
        contents, err := dir.Readdirnames(-1)
×
625
        if err != nil {
×
626
                log.Log.Reason(err).Errorf("failed to read directory contents: %v", err)
×
627
                return false
×
628
        }
×
629

630
        for _, item := range contents {
×
631
                if _, ok := excludeMap[item]; ok {
×
632
                        continue
×
633
                }
634
                itemPath := filepath.Join(filePath, item)
×
635
                // Check if export server has permissions to manipulate the file
×
636
                file, err := os.Open(itemPath)
×
637
                if err != nil {
×
638
                        log.Log.Reason(err).Errorf("%s may lack read permissions", itemPath)
×
639
                        return false
×
640
                }
×
641
                file.Close()
×
642
        }
643
        return true
×
644
}
645

646
func checkVolumePermissions(path string) bool {
×
647
        fi, err := os.Stat(path)
×
648
        if err != nil {
×
649
                log.Log.Reason(err).Errorf("error statting %s", path)
×
650
                return false
×
651
        }
×
652
        if fi.IsDir() {
×
653
                return checkDirectoryPermissions(path)
×
654
        }
×
655
        f, err := os.Open(path)
×
656
        if err != nil {
×
657
                log.Log.Reason(err).Errorf("error opening %s", path)
×
658
                return false
×
659
        }
×
660
        f.Close()
×
661
        return true
×
662
}
663

664
func gzipHandler(filePath string) http.Handler {
×
665
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
×
666
                if req.Method != http.MethodGet {
×
667
                        w.WriteHeader(http.StatusBadRequest)
×
668
                        return
×
669
                }
×
670
                f, err := os.Open(filePath)
×
671
                if err != nil {
×
672
                        log.Log.Reason(err).Errorf("error opening %s", filePath)
×
673
                        w.WriteHeader(http.StatusInternalServerError)
×
674
                        return
×
675
                }
×
676
                defer f.Close()
×
677
                gzipReader := pipeToGzip(f)
×
678
                defer gzipReader.Close()
×
679
                n, err := io.Copy(w, gzipReader)
×
680
                if err != nil {
×
681
                        log.Log.Reason(err).Error("error writing response body")
×
682
                }
×
683
                log.Log.Infof("Wrote %d bytes\n", n)
×
684
        })
685
}
686

687
func vmHandler(vi []export.VolumeInfo, getBasePath func() (string, error), getCmFunc func() (*corev1.ConfigMap, error)) http.Handler {
14✔
688
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
28✔
689
                if req.Method != http.MethodGet {
18✔
690
                        w.WriteHeader(http.StatusBadRequest)
4✔
691
                        return
4✔
692
                }
4✔
693
                resources := make([]runtime.Object, 0)
10✔
694
                outputFunc := resourceToBytesJson
10✔
695
                contentType := req.Header.Get("Accept")
10✔
696
                if contentType == runtime.ContentTypeYAML {
13✔
697
                        outputFunc = resourceToBytesYaml
3✔
698
                }
3✔
699
                exportName, err := getExportName()
10✔
700
                if err != nil {
11✔
701
                        log.Log.Reason(err).Error("error reading export name")
1✔
702
                        w.WriteHeader(http.StatusInternalServerError)
1✔
703
                        return
1✔
704
                }
1✔
705
                headerSecretName := getSecretTokenName(exportName)
9✔
706
                path, err := getBasePath()
9✔
707
                if err != nil {
11✔
708
                        if errors.Is(err, os.ErrNotExist) {
2✔
709
                                log.Log.Reason(err).Info("path not found")
×
710
                                w.WriteHeader(http.StatusNotFound)
×
711
                        } else {
2✔
712
                                log.Log.Reason(err).Error("error reading path")
2✔
713
                                w.WriteHeader(http.StatusInternalServerError)
2✔
714
                        }
2✔
715
                        return
2✔
716
                }
717
                certCm, error := getCmFunc()
7✔
718
                if error != nil {
8✔
719
                        log.Log.Reason(err).Error("error reading ca configmap information")
1✔
720
                        w.WriteHeader(http.StatusInternalServerError)
1✔
721
                        return
1✔
722
                }
1✔
723
                certCm.TypeMeta = metav1.TypeMeta{
6✔
724
                        Kind:       "ConfigMap",
6✔
725
                        APIVersion: "v1",
6✔
726
                }
6✔
727
                resources = append(resources, certCm)
6✔
728
                expandedVm := getExpandedVM()
6✔
729
                if expandedVm == nil {
7✔
730
                        log.Log.Reason(err).Error("error getting VM definition")
1✔
731
                        w.WriteHeader(http.StatusInternalServerError)
1✔
732
                        return
1✔
733
                }
1✔
734
                expandedVm.TypeMeta = metav1.TypeMeta{
5✔
735
                        Kind:       virtv1.VirtualMachineGroupVersionKind.Kind,
5✔
736
                        APIVersion: virtv1.VirtualMachineGroupVersionKind.GroupVersion().String(),
5✔
737
                }
5✔
738
                for i, dvTemplate := range expandedVm.Spec.DataVolumeTemplates {
7✔
739
                        dvTemplate.Spec.Source.HTTP.URL = fmt.Sprintf("https://%s", filepath.Join(path, vi[i].RawGzURI))
2✔
740
                        dvTemplate.Spec.Source.HTTP.CertConfigMap = certCm.Name
2✔
741
                        dvTemplate.Spec.Source.HTTP.SecretExtraHeaders = []string{headerSecretName}
2✔
742
                }
2✔
743
                resources = append(resources, expandedVm)
5✔
744
                datavolumes, err := getDataVolumes(expandedVm)
5✔
745
                if err != nil {
5✔
746
                        log.Log.Reason(err).Error("error reading datavolumes information")
×
747
                        w.WriteHeader(http.StatusInternalServerError)
×
748
                        return
×
749
                }
×
750
                for _, dv := range datavolumes {
6✔
751
                        dv.TypeMeta = metav1.TypeMeta{
1✔
752
                                Kind:       "DataVolume",
1✔
753
                                APIVersion: "cdi.kubevirt.io/v1beta1",
1✔
754
                        }
1✔
755
                        for _, info := range vi {
2✔
756
                                if strings.Contains(info.RawGzURI, dv.Name) {
2✔
757
                                        dv.Spec.Source.HTTP.URL = fmt.Sprintf("https://%s", filepath.Join(path, info.RawGzURI))
1✔
758
                                }
1✔
759
                        }
760
                        dv.Spec.Source.HTTP.CertConfigMap = certCm.Name
1✔
761
                        dv.Spec.Source.HTTP.SecretExtraHeaders = []string{headerSecretName}
1✔
762
                        resources = append(resources, dv)
1✔
763
                }
764
                data, err := outputFunc(resources)
5✔
765
                if err != nil {
5✔
766
                        w.WriteHeader(http.StatusInternalServerError)
×
767
                        return
×
768
                }
×
769
                n, err := w.Write(data)
5✔
770
                if err != nil {
5✔
771
                        log.Log.Reason(err).Error("error writing manifests")
×
772
                        w.WriteHeader(http.StatusInternalServerError)
×
773
                        return
×
774
                }
×
775
                log.Log.Infof("Wrote %d bytes\n", n)
5✔
776
        })
777
}
778

779
func resourceToBytesJson(resources []runtime.Object) ([]byte, error) {
3✔
780
        list := corev1.List{
3✔
781
                TypeMeta: metav1.TypeMeta{
3✔
782
                        Kind:       "List",
3✔
783
                        APIVersion: "v1",
3✔
784
                },
3✔
785
                ListMeta: metav1.ListMeta{},
3✔
786
        }
3✔
787
        for _, resource := range resources {
8✔
788
                list.Items = append(list.Items, runtime.RawExtension{Object: resource})
5✔
789
        }
5✔
790
        resourceBytes, err := json.MarshalIndent(list, "", "    ")
3✔
791
        if err != nil {
3✔
792
                return nil, err
×
793
        }
×
794
        return resourceBytes, nil
3✔
795
}
796

797
func resourceToBytesYaml(resources []runtime.Object) ([]byte, error) {
4✔
798
        data := []byte{}
4✔
799
        for _, resource := range resources {
12✔
800
                resourceBytes, err := yaml.Marshal(resource)
8✔
801
                if err != nil {
8✔
802
                        return nil, err
×
803
                }
×
804
                data = append(data, resourceBytes...)
8✔
805
                data = append(data, []byte("---\n")...)
8✔
806
        }
807
        return data, nil
4✔
808
}
809

810
// symlinkSafeDir is an http.FileSystem that prevents symlink traversal outside root.
811
// Unlike http.Dir, it resolves symlinks via safepath and rejects any that escape the
812
// root boundary, preventing attackers from reading files outside the exported PVC.
813
type symlinkSafeDir struct {
814
        root string
815
}
816

817
func (d symlinkSafeDir) Open(name string) (http.File, error) {
7✔
818
        cleanName := path.Clean("/" + name)
7✔
819
        if cleanName == "/" {
8✔
820
                cleanName = "."
1✔
821
        } else {
7✔
822
                cleanName = cleanName[1:]
6✔
823
        }
6✔
824

825
        resolved, err := safepath.JoinAndResolveWithRelativeRoot(d.root, cleanName)
7✔
826
        if err != nil {
11✔
827
                return nil, err
4✔
828
        }
4✔
829

830
        fd, err := safepath.OpenAtNoFollow(resolved)
3✔
831
        if err != nil {
3✔
832
                return nil, err
×
833
        }
×
834
        defer fd.Close()
3✔
835

3✔
836
        return os.Open(fd.SafePath())
3✔
837
}
838

839
func dirHandler(uri, mountPoint string) http.Handler {
6✔
840
        return http.StripPrefix(uri, http.FileServer(symlinkSafeDir{root: mountPoint}))
6✔
841
}
6✔
842

843
func fileHandler(file string) http.Handler {
×
844
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
×
845
                f, err := os.Open(file)
×
846
                if err != nil {
×
847
                        log.Log.Reason(err).Errorf("error opening %s", file)
×
848
                        w.WriteHeader(http.StatusInternalServerError)
×
849
                        return
×
850
                }
×
851
                defer f.Close()
×
852
                http.ServeContent(w, r, "disk.img", time.Time{}, f)
×
853
        })
854
}
855

856
func getToken(tokenFile string) (string, error) {
×
857
        content, err := os.ReadFile(tokenFile)
×
858
        if err != nil {
×
859
                return "", err
×
860
        }
×
861

862
        return string(content), nil
×
863
}
864

865
var getSecretTokenName = func(exportName string) string {
11✔
866
        return fmt.Sprintf("header-secret-%s", exportName)
11✔
867
}
11✔
868

869
func secretHandler(tokenGetter TokenGetterFunc) http.Handler {
8✔
870
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
16✔
871
                if req.Method != http.MethodGet {
12✔
872
                        w.WriteHeader(http.StatusBadRequest)
4✔
873
                        return
4✔
874
                }
4✔
875
                resources := make([]runtime.Object, 0)
4✔
876
                outputFunc := resourceToBytesJson
4✔
877
                contentType := req.Header.Get("Accept")
4✔
878
                if contentType == runtime.ContentTypeYAML {
5✔
879
                        outputFunc = resourceToBytesYaml
1✔
880
                }
1✔
881
                token, err := tokenGetter()
4✔
882
                if err != nil {
5✔
883
                        log.Log.Reason(err).Error("error getting token")
1✔
884
                        w.WriteHeader(http.StatusInternalServerError)
1✔
885
                        return
1✔
886
                }
1✔
887
                exportName, err := getExportName()
3✔
888
                if err != nil {
4✔
889
                        log.Log.Reason(err).Error("error reading export name")
1✔
890
                        w.WriteHeader(http.StatusInternalServerError)
1✔
891
                        return
1✔
892
                }
1✔
893
                headerSecretName := getSecretTokenName(exportName)
2✔
894
                secret := getCdiHeaderSecret(token, headerSecretName)
2✔
895
                secret.TypeMeta = metav1.TypeMeta{
2✔
896
                        Kind:       "Secret",
2✔
897
                        APIVersion: "v1",
2✔
898
                }
2✔
899
                resources = append(resources, secret)
2✔
900
                data, err := outputFunc(resources)
2✔
901
                if err != nil {
2✔
902
                        log.Log.Reason(err).Errorf("error generating secret manifest")
×
903
                        w.WriteHeader(http.StatusInternalServerError)
×
904
                        return
×
905
                }
×
906
                n, err := w.Write(data)
2✔
907
                if err != nil {
2✔
908
                        log.Log.Reason(err).Error("error writing secret manifest")
×
909
                        w.WriteHeader(http.StatusInternalServerError)
×
910
                        return
×
911
                }
×
912
                log.Log.Infof("Wrote %d bytes\n", n)
2✔
913
        })
914
}
915

916
func (s *exportServer) readyHandler(w http.ResponseWriter, r *http.Request) {
3✔
917
        if s.ociBuilder != nil && !s.ociBuilder.Ready() {
4✔
918
                http.Error(w, "OCI digest computation in progress", http.StatusServiceUnavailable)
1✔
919
                return
1✔
920
        }
1✔
921
        io.WriteString(w, "OK")
2✔
922
}
923

924
func (s *exportServer) handleTunnel(w http.ResponseWriter, r *http.Request) {
5✔
925
        if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
7✔
926
                log.Log.Error("tunnel rejected: no client certificate presented")
2✔
927
                http.Error(w, "mTLS required", http.StatusUnauthorized)
2✔
928
                return
2✔
929
        }
2✔
930

931
        expectedCN := fmt.Sprintf("kubevirt.io:system:client:%s", s.BackupUID)
3✔
932
        clientCN := r.TLS.PeerCertificates[0].Subject.CommonName
3✔
933
        if clientCN != expectedCN {
4✔
934
                log.Log.Errorf("identity mismatch, cert: %s, expected: %s", clientCN, expectedCN)
1✔
935
                http.Error(w, "Forbidden", http.StatusForbidden)
1✔
936
                return
1✔
937
        }
1✔
938

939
        s.nbdMu.Lock()
2✔
940
        if s.nbdClient != nil {
3✔
941
                s.nbdMu.Unlock()
1✔
942
                _ = r.Body.Close()
1✔
943
                log.Log.Warning("rejecting tunnel: active session already exists")
1✔
944
                http.Error(w, "Conflict", http.StatusConflict)
1✔
945
                return
1✔
946
        }
1✔
947

948
        ctx, cancel := context.WithCancel(r.Context())
1✔
949
        defer cancel()
1✔
950
        conn := newH2ServerConn(r.Body, w, cancel)
1✔
951

1✔
952
        var dialOnce sync.Once
1✔
953
        clientConn, err := grpc.NewClient(
1✔
954
                "passthrough:///backup",
1✔
955
                grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
1✔
956
                        var c net.Conn
×
957
                        dialOnce.Do(func() { c = conn })
×
958
                        if c != nil {
×
959
                                return c, nil
×
960
                        }
×
961
                        return nil, fmt.Errorf("tunnel connection is single-use; reconnect not supported")
×
962
                }),
963
                grpc.WithTransportCredentials(insecure.NewCredentials()),
964
                grpc.WithKeepaliveParams(keepalive.ClientParameters{
965
                        Time:                10 * time.Second,
966
                        Timeout:             5 * time.Second,
967
                        PermitWithoutStream: true,
968
                }),
969
        )
970
        if err != nil {
1✔
971
                s.nbdMu.Unlock()
×
972
                log.Log.Reason(err).Error("failed to initialize gRPC client for tunnel")
×
973
                http.Error(w, "Internal Server Error", http.StatusInternalServerError)
×
974
                return
×
975
        }
×
976

977
        w.WriteHeader(http.StatusOK)
1✔
978
        w.(http.Flusher).Flush()
1✔
979

1✔
980
        s.nbdClient = nbdv1.NewNBDClient(clientConn)
1✔
981
        s.nbdMu.Unlock()
1✔
982

1✔
983
        log.Log.Infof("Exclusive backup tunnel established for %s", s.BackupUID)
1✔
984

1✔
985
        <-ctx.Done()
1✔
986

1✔
987
        s.nbdMu.Lock()
1✔
988
        clientConn.Close()
1✔
989
        s.nbdClient = nil
1✔
990
        s.nbdMu.Unlock()
1✔
991
        log.Log.Info("Backup tunnel disconnected, listener reset")
1✔
992
}
993

994
type h2ServerConn struct {
995
        r      io.ReadCloser
996
        w      http.ResponseWriter
997
        cancel context.CancelFunc
998
        once   sync.Once
999
}
1000

1001
func newH2ServerConn(r io.ReadCloser, w http.ResponseWriter, cancel context.CancelFunc) *h2ServerConn {
2✔
1002
        return &h2ServerConn{r: r, w: w, cancel: cancel}
2✔
1003
}
2✔
1004

1005
func (c *h2ServerConn) Read(b []byte) (int, error) { return c.r.Read(b) }
1✔
1006

1007
func (c *h2ServerConn) Write(b []byte) (int, error) {
1✔
1008
        n, err := c.w.Write(b)
1✔
1009
        if f, ok := c.w.(http.Flusher); ok {
2✔
1010
                f.Flush()
1✔
1011
        }
1✔
1012
        return n, err
1✔
1013
}
1014

1015
func (c *h2ServerConn) Close() error {
1✔
1016
        c.once.Do(c.cancel)
1✔
1017
        return c.r.Close()
1✔
1018
}
1✔
1019

1020
func (c *h2ServerConn) LocalAddr() net.Addr                { return h2DummyAddr }
×
1021
func (c *h2ServerConn) RemoteAddr() net.Addr               { return h2DummyAddr }
×
1022
func (c *h2ServerConn) SetDeadline(_ time.Time) error      { return nil }
×
1023
func (c *h2ServerConn) SetReadDeadline(_ time.Time) error  { return nil }
×
1024
func (c *h2ServerConn) SetWriteDeadline(_ time.Time) error { return nil }
×
1025

1026
type ExportMapExtent struct {
1027
        Offset      uint64 `json:"offset"`
1028
        Length      uint64 `json:"length"`
1029
        Type        uint64 `json:"type"`
1030
        Description string `json:"description"`
1031
}
1032

1033
type ExportMapResponse struct {
1034
        Extents    []ExportMapExtent `json:"extents"`
1035
        NextOffset *uint64           `json:"next_offset"`
1036
}
1037

1038
func (s *exportServer) backupMapHandler(exportName string) http.Handler {
15✔
1039
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
30✔
1040
                if req.Method != http.MethodGet {
19✔
1041
                        w.WriteHeader(http.StatusMethodNotAllowed)
4✔
1042
                        return
4✔
1043
                }
4✔
1044

1045
                s.nbdMu.RLock()
11✔
1046
                client := s.nbdClient
11✔
1047
                s.nbdMu.RUnlock()
11✔
1048
                if client == nil {
12✔
1049
                        http.Error(w, "Backup source (virt-launcher) not connected via tunnel", http.StatusServiceUnavailable)
1✔
1050
                        return
1✔
1051
                }
1✔
1052

1053
                offset := uint64(0)
10✔
1054
                length := uint64(0)
10✔
1055
                pageSize := defaultMapPageSize
10✔
1056
                query := req.URL.Query()
10✔
1057

10✔
1058
                if offsetStr := query.Get("offset"); offsetStr != "" {
12✔
1059
                        o, err := strconv.ParseUint(offsetStr, 10, 64)
2✔
1060
                        if err != nil {
3✔
1061
                                http.Error(w, fmt.Sprintf("invalid offset %q: %v", offsetStr, err), http.StatusBadRequest)
1✔
1062
                                return
1✔
1063
                        }
1✔
1064
                        offset = o
1✔
1065
                }
1066
                if lengthStr := query.Get("length"); lengthStr != "" {
11✔
1067
                        l, err := strconv.ParseUint(lengthStr, 10, 64)
2✔
1068
                        if err != nil {
3✔
1069
                                http.Error(w, fmt.Sprintf("invalid length %q: %v", lengthStr, err), http.StatusBadRequest)
1✔
1070
                                return
1✔
1071
                        }
1✔
1072
                        length = l
1✔
1073
                }
1074
                if pageSizeStr := query.Get("page_size"); pageSizeStr != "" {
11✔
1075
                        p, err := strconv.Atoi(pageSizeStr)
3✔
1076
                        if err != nil || p <= 0 {
5✔
1077
                                http.Error(w, fmt.Sprintf("invalid page_size %q", pageSizeStr), http.StatusBadRequest)
2✔
1078
                                return
2✔
1079
                        }
2✔
1080
                        pageSize = p
1✔
1081
                }
1082

1083
                var bitmapName string
6✔
1084
                if s.BackupType == string(backupv1.Incremental) && s.BackupCheckpoint != "" {
7✔
1085
                        bitmapName = s.BackupCheckpoint
1✔
1086
                }
1✔
1087

1088
                streamCtx, streamCancel := context.WithCancel(req.Context())
6✔
1089
                defer streamCancel()
6✔
1090

6✔
1091
                stream, err := client.Map(streamCtx, &nbdv1.MapRequest{
6✔
1092
                        ExportName: exportName,
6✔
1093
                        BitmapName: bitmapName,
6✔
1094
                        Offset:     offset,
6✔
1095
                        Length:     length,
6✔
1096
                })
6✔
1097
                if err != nil {
7✔
1098
                        errMsg := fmt.Sprintf("Failed to call map for export: %s", exportName)
1✔
1099
                        log.Log.Reason(err).Error(errMsg)
1✔
1100
                        http.Error(w, errMsg, http.StatusInternalServerError)
1✔
1101
                        return
1✔
1102
                }
1✔
1103

1104
                extents, nextOffsetPtr, err := collectMapPage(stream, pageSize)
5✔
1105
                if err != nil {
6✔
1106
                        errMsg := fmt.Sprintf("Failed to collect map extents for export: %s", exportName)
1✔
1107
                        log.Log.Reason(err).Error(errMsg)
1✔
1108
                        http.Error(w, errMsg, http.StatusInternalServerError)
1✔
1109
                        return
1✔
1110
                }
1✔
1111

1112
                page := ExportMapResponse{
4✔
1113
                        Extents:    extents,
4✔
1114
                        NextOffset: nextOffsetPtr,
4✔
1115
                }
4✔
1116

4✔
1117
                w.Header().Set("Content-Type", "application/json")
4✔
1118
                if err := json.NewEncoder(w).Encode(page); err != nil {
4✔
1119
                        log.Log.Reason(err).Errorf("failed to encode map page for export %s", exportName)
×
1120
                }
×
1121
        })
1122
}
1123

1124
func (s *exportServer) backupDataHandler(exportName string) http.Handler {
11✔
1125
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
22✔
1126
                if req.Method != http.MethodGet {
15✔
1127
                        w.WriteHeader(http.StatusMethodNotAllowed)
4✔
1128
                        return
4✔
1129
                }
4✔
1130

1131
                s.nbdMu.RLock()
7✔
1132
                client := s.nbdClient
7✔
1133
                s.nbdMu.RUnlock()
7✔
1134

7✔
1135
                if client == nil {
8✔
1136
                        http.Error(w, "Backup source not connected", http.StatusServiceUnavailable)
1✔
1137
                        return
1✔
1138
                }
1✔
1139

1140
                offset := uint64(0)
6✔
1141
                length := uint64(0)
6✔
1142

6✔
1143
                query := req.URL.Query()
6✔
1144
                if offsetStr := query.Get("offset"); offsetStr != "" {
9✔
1145
                        o, err := strconv.ParseUint(offsetStr, 10, 64)
3✔
1146
                        if err != nil {
4✔
1147
                                http.Error(w, fmt.Sprintf("invalid offset %q: %v", offsetStr, err), http.StatusBadRequest)
1✔
1148
                                return
1✔
1149
                        }
1✔
1150
                        offset = o
2✔
1151
                }
1152
                if lengthStr := query.Get("length"); lengthStr != "" {
8✔
1153
                        l, err := strconv.ParseUint(lengthStr, 10, 64)
3✔
1154
                        if err != nil {
4✔
1155
                                http.Error(w, fmt.Sprintf("invalid length %q: %v", lengthStr, err), http.StatusBadRequest)
1✔
1156
                                return
1✔
1157
                        }
1✔
1158
                        length = l
2✔
1159
                }
1160

1161
                stream, err := client.Read(req.Context(), &nbdv1.ReadRequest{
4✔
1162
                        ExportName: exportName,
4✔
1163
                        Offset:     offset,
4✔
1164
                        Length:     length,
4✔
1165
                })
4✔
1166
                if err != nil {
5✔
1167
                        http.Error(w, fmt.Sprintf("Failed to call read for export: %s", exportName), http.StatusInternalServerError)
1✔
1168
                        return
1✔
1169
                }
1✔
1170

1171
                w.Header().Set("Content-Type", "application/octet-stream")
3✔
1172
                for {
8✔
1173
                        chunk, err := stream.Recv()
5✔
1174
                        if errors.Is(err, io.EOF) {
8✔
1175
                                break
3✔
1176
                        }
1177
                        if err != nil {
2✔
1178
                                log.Log.Reason(err).Error("Tunnel stream interrupted")
×
1179
                                panic(http.ErrAbortHandler)
×
1180
                        }
1181
                        if _, err := w.Write(chunk.Data); err != nil {
2✔
1182
                                log.Log.Reason(err).Error("HTTP client disconnected during stream")
×
1183
                                return
×
1184
                        }
×
1185
                        if f, ok := w.(http.Flusher); ok {
4✔
1186
                                f.Flush()
2✔
1187
                        }
2✔
1188
                }
1189
        })
1190
}
1191

1192
func collectMapPage(stream nbdv1.NBD_MapClient, pageSize int) ([]ExportMapExtent, *uint64, error) {
9✔
1193
        var extents []ExportMapExtent
9✔
1194
        for {
26✔
1195
                msg, err := stream.Recv()
17✔
1196
                if errors.Is(err, io.EOF) {
22✔
1197
                        return extents, nil, nil
5✔
1198
                }
5✔
1199
                if err != nil {
14✔
1200
                        return nil, nil, err
2✔
1201
                }
2✔
1202
                for _, e := range msg.Extents {
20✔
1203
                        if len(extents) >= pageSize {
12✔
1204
                                return extents, &e.Offset, nil
2✔
1205
                        }
2✔
1206
                        extents = append(extents, ExportMapExtent{
8✔
1207
                                Offset:      e.Offset,
8✔
1208
                                Length:      e.Length,
8✔
1209
                                Type:        e.Flags,
8✔
1210
                                Description: e.Description,
8✔
1211
                        })
8✔
1212
                }
1213
        }
1214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc