• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / kubevirt / 3e2e8a29-09c8-4f6e-ae24-caee5cf9a628

28 Jun 2026 02:11AM UTC coverage: 72.164% (+0.004%) from 72.16%
3e2e8a29-09c8-4f6e-ae24-caee5cf9a628

push

prow

web-flow
Merge pull request #18116 from brianmcarey/go-1.26

Build KubeVirt with go 1.26.4

8 of 11 new or added lines in 7 files covered. (72.73%)

4 existing lines in 1 file now uncovered.

82476 of 114289 relevant lines covered (72.16%)

616.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.34
/pkg/storage/export/virt-exportserver/exportserver.go
1
/*
2
 * This file is part of the KubeVirt project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 * Copyright The KubeVirt Authors.
17
 *
18
 */
19

20
package virtexportserver
21

22
import (
23
        "bytes"
24
        "context"
25
        "crypto/tls"
26
        "crypto/x509"
27
        "encoding/json"
28
        "errors"
29
        goflag "flag"
30
        "fmt"
31
        "io"
32
        golog "log"
33
        "net"
34
        "net/http"
35
        "os"
36
        "os/exec"
37
        "path"
38
        "path/filepath"
39
        "strconv"
40
        "strings"
41
        "sync"
42
        "time"
43

44
        gzip "github.com/klauspost/pgzip"
45
        flag "github.com/spf13/pflag"
46
        "golang.org/x/net/http2"
47
        "google.golang.org/grpc"
48
        "google.golang.org/grpc/credentials/insecure"
49
        "google.golang.org/grpc/keepalive"
50
        corev1 "k8s.io/api/core/v1"
51
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
52
        "k8s.io/apimachinery/pkg/runtime"
53
        "k8s.io/apimachinery/pkg/runtime/schema"
54
        "sigs.k8s.io/yaml"
55

56
        virtv1 "kubevirt.io/api/core/v1"
57
        "kubevirt.io/client-go/log"
58
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
59

60
        nbdv1 "kubevirt.io/kubevirt/pkg/storage/cbt/nbd/v1"
61

62
        backupv1 "kubevirt.io/api/backup/v1alpha1"
63

64
        "kubevirt.io/kubevirt/pkg/safepath"
65
        "kubevirt.io/kubevirt/pkg/service"
66
        "kubevirt.io/kubevirt/pkg/storage/export/export"
67
        "kubevirt.io/kubevirt/pkg/storage/oci"
68
        storageutils "kubevirt.io/kubevirt/pkg/storage/utils"
69
)
70

71
const (
72
        authHeader              = "x-kubevirt-export-token"
73
        manifestCmBasePath      = "/manifest_data/"
74
        vmManifestPath          = manifestCmBasePath + "virtualmachine-manifest"
75
        vmTemplateManifestPath  = manifestCmBasePath + "virtualmachinetemplate-manifest"
76
        internalLinkPath        = manifestCmBasePath + "internal_host"
77
        internalCaConfigMapPath = manifestCmBasePath + "internal_ca_cm"
78
        externalLinkPath        = manifestCmBasePath + "external_host"
79
        externalCaConfigMapPath = manifestCmBasePath + "external_ca_cm"
80
        exportNamePath          = manifestCmBasePath + "export-name"
81

82
        external = "/external"
83
        internal = "/internal"
84

85
        defaultMapPageSize = 512
86
        tunnelIdleTimeout  = 60 * time.Second
87
)
88

89
var (
90
        excludeMap = map[string]struct{}{
91
                "lost+found": {},
92
        }
93
        h2DummyAddr = &net.TCPAddr{}
94
)
95

96
type TokenGetterFunc func() (string, error)
97

98
type ExportServerConfig struct {
99
        Deadline time.Time
100

101
        ListenAddr string
102

103
        CertFile, KeyFile string
104
        BackupCACert      []byte
105
        TLSMinVersion     uint16
106
        TLSCipherSuites   []uint16
107

108
        TokenFile string
109

110
        BackupUID        string
111
        BackupType       string
112
        BackupCheckpoint string
113

114
        Paths *export.ServerPaths
115

116
        // unit testing helpers
117
        ArchiveHandler     func(string) http.Handler
118
        DirHandler         func(string, string) http.Handler
119
        FileHandler        func(string) http.Handler
120
        GzipHandler        func(string) http.Handler
121
        VmHandler          func([]export.VolumeInfo, func() (string, error), func() (*corev1.ConfigMap, error)) http.Handler
122
        TokenSecretHandler func(TokenGetterFunc) http.Handler
123
        OCIHandler         func(*oci.Builder) http.Handler
124

125
        PermissionChecker func(string) bool
126

127
        TokenGetter TokenGetterFunc
128
}
129

130
type execReader struct {
131
        cmd    *exec.Cmd
132
        stdout io.ReadCloser
133
        stderr io.ReadCloser
134
}
135

136
type exportServer struct {
137
        ExportServerConfig
138
        handler http.Handler
139

140
        nbdClient  nbdv1.NBDClient
141
        nbdMu      sync.RWMutex
142
        ociBuilder *oci.Builder
143
}
144

145
func (er *execReader) Read(p []byte) (int, error) {
×
146
        n, err := er.stdout.Read(p)
×
147
        if err == io.EOF {
×
148
                if err2 := er.cmd.Wait(); err2 != nil {
×
149
                        errBytes, _ := io.ReadAll(er.stderr)
×
NEW
150
                        log.Log.Reason(err2).Errorf("Subprocess did not execute successfully, result is: %d\n%s", er.cmd.ProcessState.ExitCode(), string(errBytes))
×
151
                        return n, err2
×
152
                }
×
153
        }
154
        return n, err
×
155
}
156

157
func (er *execReader) Close() error {
×
158
        return er.stdout.Close()
×
159
}
×
160

161
func (s *exportServer) initHandler() {
29✔
162
        mux := http.NewServeMux()
29✔
163
        for _, vi := range s.Paths.Volumes {
45✔
164
                if hasPermissions := s.PermissionChecker(vi.Path); !hasPermissions {
16✔
165
                        golog.Fatalf("unable to manipulate %s's contents, exiting", vi.Path)
×
166
                }
×
167
                for path, handler := range s.getHandlerMap(vi) {
32✔
168
                        log.Log.Infof("Handling path %s\n", path)
16✔
169
                        mux.Handle(path, tokenChecker(s.TokenGetter, handler))
16✔
170
                }
16✔
171
        }
172
        for _, bi := range s.Paths.Backups {
29✔
173
                log.Log.Infof("Handling backup path %s (Map) and %s (Data)\n", bi.MapURI, bi.DataURI)
×
174
                mux.Handle(bi.MapURI, tokenChecker(s.TokenGetter, s.backupMapHandler(bi.Path)))
×
175
                mux.Handle(bi.DataURI, tokenChecker(s.TokenGetter, s.backupDataHandler(bi.Path)))
×
176
        }
×
177
        if s.Paths.VMURI != "" {
37✔
178
                mux.Handle(filepath.Join(internal, s.Paths.VMURI), tokenChecker(s.TokenGetter, s.VmHandler(s.Paths.Volumes, getInternalBasePath, getInternalCAConfigMap)))
8✔
179
                mux.Handle(filepath.Join(external, s.Paths.VMURI), tokenChecker(s.TokenGetter, s.VmHandler(s.Paths.Volumes, getExternalBasePath, getExternalCAConfigMap)))
8✔
180
        }
8✔
181
        if s.Paths.SecretURI != "" {
29✔
182
                mux.Handle(filepath.Join(internal, s.Paths.SecretURI), tokenChecker(s.TokenGetter, s.TokenSecretHandler(s.TokenGetter)))
×
183
                mux.Handle(filepath.Join(external, s.Paths.SecretURI), tokenChecker(s.TokenGetter, s.TokenSecretHandler(s.TokenGetter)))
×
184
        }
×
185
        if s.ociBuilder != nil && s.Paths.OCIURI != "" {
30✔
186
                mux.Handle(s.Paths.OCIURI, tokenChecker(s.TokenGetter, s.OCIHandler(s.ociBuilder)))
1✔
187
        }
1✔
188

189
        // Readiness probe
190
        mux.HandleFunc(export.ReadinessPath, s.readyHandler)
29✔
191

29✔
192
        s.handler = mux
29✔
193
}
194

195
func getInternalCAConfigMap() (*corev1.ConfigMap, error) {
×
196
        return getCAConfigMap(internalCaConfigMapPath)
×
197
}
×
198

199
func getExternalCAConfigMap() (*corev1.ConfigMap, error) {
×
200
        return getCAConfigMap(externalCaConfigMapPath)
×
201
}
×
202

203
func (s *exportServer) getHandlerMap(vi export.VolumeInfo) map[string]http.Handler {
16✔
204
        fi, err := os.Stat(vi.Path)
16✔
205
        if err != nil {
16✔
206
                log.Log.Reason(err).Errorf("error statting %s", vi.Path)
×
207
                return nil
×
208
        }
×
209

210
        var result = make(map[string]http.Handler)
16✔
211

16✔
212
        if vi.ArchiveURI != "" {
20✔
213
                result[vi.ArchiveURI] = s.ArchiveHandler(vi.Path)
4✔
214
        }
4✔
215

216
        if vi.DirURI != "" {
20✔
217
                result[vi.DirURI] = s.DirHandler(vi.DirURI, vi.Path)
4✔
218
        }
4✔
219

220
        p := vi.Path
16✔
221
        if fi.IsDir() {
32✔
222
                p = path.Join(p, "disk.img")
16✔
223
        }
16✔
224

225
        if vi.RawURI != "" {
20✔
226
                result[vi.RawURI] = s.FileHandler(p)
4✔
227
        }
4✔
228

229
        if vi.RawGzURI != "" {
20✔
230
                result[vi.RawGzURI] = s.GzipHandler(p)
4✔
231
        }
4✔
232

233
        return result
16✔
234
}
235

236
func (s *exportServer) Run() {
×
237
        s.initHandler()
×
238

×
239
        ctx, cancel := context.WithCancel(context.Background())
×
240
        if !s.Deadline.IsZero() {
×
241
                log.Log.Infof("Deadline set to %s", s.Deadline)
×
242
                ctx, cancel = context.WithDeadline(ctx, s.Deadline)
×
243
        }
×
244
        defer cancel()
×
245

×
246
        srv := s.buildServer(ctx)
×
247

×
248
        h2Server := &http2.Server{
×
249
                IdleTimeout: tunnelIdleTimeout,
×
250
        }
×
251
        if err := http2.ConfigureServer(srv, h2Server); err != nil {
×
252
                panic(err)
×
253
        }
254

255
        ch := make(chan error)
×
256

×
257
        go func() {
×
258
                err := srv.ListenAndServeTLS(s.CertFile, s.KeyFile)
×
259
                ch <- err
×
260
        }()
×
261

262
        if s.ociBuilder != nil {
×
263
                go func() {
×
264
                        if err := s.ociBuilder.Prepare(ctx); err != nil {
×
265
                                log.Log.Reason(err).Error("OCI Pass 1 failed")
×
266
                        }
×
267
                }()
268
        }
269

270
        select {
×
271
        case err := <-ch:
×
272
                panic(err)
×
273
        case <-ctx.Done():
×
274
                log.Log.Info("Deadline exceeded, shutting down")
×
275
                srv.Shutdown(context.Background())
×
276
        }
277
}
278

279
func (s *exportServer) buildServer(ctx context.Context) *http.Server {
3✔
280
        tlsConfig := &tls.Config{
3✔
281
                MinVersion:   s.TLSMinVersion,
3✔
282
                CipherSuites: s.TLSCipherSuites,
3✔
283
                NextProtos:   []string{"h2", "http/1.1"},
3✔
284
        }
3✔
285

3✔
286
        rootHandler := s.handler
3✔
287
        if s.BackupUID != "" {
5✔
288
                clientCAPool := x509.NewCertPool()
2✔
289
                if ok := clientCAPool.AppendCertsFromPEM(s.BackupCACert); !ok {
3✔
290
                        panic("failed to parse Backup CA")
1✔
291
                }
292
                tlsConfig.ClientCAs = clientCAPool
1✔
293
                tlsConfig.ClientAuth = tls.VerifyClientCertIfGiven
1✔
294

1✔
295
                rootHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3✔
296
                        if r.Method == http.MethodConnect {
3✔
297
                                s.handleTunnel(w, r)
1✔
298
                                return
1✔
299
                        }
1✔
300
                        s.handler.ServeHTTP(w, r)
1✔
301
                })
302
        }
303

304
        return &http.Server{
2✔
305
                Addr:      s.ListenAddr,
2✔
306
                Handler:   rootHandler,
2✔
307
                TLSConfig: tlsConfig,
2✔
308
                BaseContext: func(_ net.Listener) context.Context {
2✔
309
                        return ctx
×
310
                },
×
311
        }
312
}
313

314
func (s *exportServer) AddFlags() {
×
315
        flag.CommandLine.AddGoFlag(goflag.CommandLine.Lookup("v"))
×
316
}
×
317

318
func NewExportServer(config ExportServerConfig) (service.Service, error) {
29✔
319
        es := &exportServer{
29✔
320
                ExportServerConfig: config,
29✔
321
        }
29✔
322

29✔
323
        if es.ArchiveHandler == nil {
29✔
324
                es.ArchiveHandler = archiveHandler
×
325
        }
×
326

327
        if es.DirHandler == nil {
29✔
328
                es.DirHandler = dirHandler
×
329
        }
×
330

331
        if es.FileHandler == nil {
29✔
332
                es.FileHandler = fileHandler
×
333
        }
×
334

335
        if es.GzipHandler == nil {
29✔
336
                es.GzipHandler = gzipHandler
×
337
        }
×
338

339
        if es.VmHandler == nil {
29✔
340
                es.VmHandler = vmHandler
×
341
        }
×
342

343
        if es.TokenSecretHandler == nil {
29✔
344
                es.TokenSecretHandler = secretHandler
×
345
        }
×
346

347
        if es.TokenGetter == nil {
29✔
348
                es.TokenGetter = func() (string, error) {
×
349
                        return getToken(es.TokenFile)
×
350
                }
×
351
        }
352

353
        if es.PermissionChecker == nil {
29✔
354
                es.PermissionChecker = checkVolumePermissions
×
355
        }
×
356

357
        if es.OCIHandler == nil {
58✔
358
                es.OCIHandler = ociHTTPHandler
29✔
359
        }
29✔
360

361
        if es.Paths != nil && es.Paths.OCIURI != "" {
29✔
362
                builder, err := newOCIBuilder(es.Paths)
×
363
                if err != nil {
×
364
                        return nil, fmt.Errorf("failed to construct OCI builder: %w", err)
×
365
                }
×
366
                if builder != nil {
×
367
                        es.ociBuilder = builder
×
368
                }
×
369
        }
370

371
        return es, nil
29✔
372
}
373

374
var getExpandedVM = func() *virtv1.VirtualMachine {
×
375
        f, err := os.Open(vmManifestPath)
×
376
        if err != nil {
×
377
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
378
                return nil
×
379
        }
×
380
        defer f.Close()
×
381
        fileinfo, err := f.Stat()
×
382
        if err != nil {
×
383
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
384
                return nil
×
385
        }
×
386
        buf := make([]byte, fileinfo.Size())
×
387
        _, err = f.Read(buf)
×
388
        if err != nil {
×
389
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
390
                return nil
×
391
        }
×
392

393
        vm := &virtv1.VirtualMachine{}
×
394
        if err := json.Unmarshal(buf, vm); err != nil {
×
395
                log.Log.Reason(err).Info("Unable to load VM manifest data")
×
396
                return nil
×
397
        }
×
398
        return vm
×
399
}
400

401
var getInternalBasePath = func() (string, error) {
×
402
        data, err := os.ReadFile(internalLinkPath)
×
403
        if err != nil {
×
404
                return "", err
×
405
        }
×
406
        return string(data), nil
×
407
}
408

409
var getExportName = func() (string, error) {
×
410
        data, err := os.ReadFile(exportNamePath)
×
411
        if err != nil {
×
412
                return "", err
×
413
        }
×
414
        return string(data), nil
×
415
}
416

417
var getExternalBasePath = func() (string, error) {
×
418
        data, err := os.ReadFile(externalLinkPath)
×
419
        if err != nil {
×
420
                return "", err
×
421
        }
×
422
        return string(data), nil
×
423
}
424

425
func GetTypeMetaString(gvk schema.GroupVersionKind) string {
×
426
        return fmt.Sprintf("apiVersion: %s\nkind: %s\n", gvk.GroupVersion().String(), gvk.Kind)
×
427
}
×
428

429
var getCAConfigMap = func(name string) (*corev1.ConfigMap, error) {
×
430
        f, err := os.Open(name)
×
431
        if err != nil {
×
432
                return nil, err
×
433
        }
×
434
        defer f.Close()
×
435
        fileinfo, err := f.Stat()
×
436
        if err != nil {
×
437
                return nil, err
×
438
        }
×
439
        buf := make([]byte, fileinfo.Size())
×
440
        _, err = f.Read(buf)
×
441
        if err != nil {
×
442
                return nil, err
×
443
        }
×
444

445
        cm := &corev1.ConfigMap{}
×
446
        if err := json.Unmarshal(buf, cm); err != nil {
×
447
                return nil, err
×
448
        }
×
449
        return cm, nil
×
450
}
451

452
var getCdiHeaderSecret = func(token, name string) *corev1.Secret {
2✔
453
        data := make(map[string]string)
2✔
454

2✔
455
        data["token"] = fmt.Sprintf("x-kubevirt-export-token:%s", token)
2✔
456
        return &corev1.Secret{
2✔
457
                ObjectMeta: metav1.ObjectMeta{
2✔
458
                        Name: name,
2✔
459
                },
2✔
460
                StringData: data,
2✔
461
        }
2✔
462
}
2✔
463

464
var getDataVolumes = func(vm *virtv1.VirtualMachine) ([]*cdiv1.DataVolume, error) {
×
465
        res := make([]*cdiv1.DataVolume, 0)
×
466
        volumes, err := storageutils.GetVolumes(vm, nil)
×
467
        if err != nil {
×
468
                return nil, err
×
469
        }
×
470
        for _, volume := range volumes {
×
471
                name := ""
×
472
                if volume.DataVolume != nil {
×
473
                        name = volume.DataVolume.Name
×
474
                } else if volume.PersistentVolumeClaim != nil {
×
475
                        name = volume.PersistentVolumeClaim.ClaimName
×
476
                }
×
477
                if name == "" {
×
478
                        continue
×
479
                }
480
                log.Log.V(1).Infof("Opening DV %s", filepath.Join(manifestCmBasePath, fmt.Sprintf("dv-%s", name)))
×
481
                f, err := os.Open(filepath.Join(manifestCmBasePath, fmt.Sprintf("dv-%s", name)))
×
482
                if err != nil {
×
483
                        if errors.Is(err, os.ErrNotExist) {
×
484
                                log.Log.V(1).Info("DV not found skipping")
×
485
                                continue
×
486
                        }
487
                        return nil, err
×
488
                }
489
                defer f.Close()
×
490
                fileinfo, err := f.Stat()
×
491
                if err != nil {
×
492
                        return nil, err
×
493
                }
×
494
                buf := make([]byte, fileinfo.Size())
×
495
                _, err = f.Read(buf)
×
496
                if err != nil {
×
497
                        return nil, err
×
498
                }
×
499
                dv := &cdiv1.DataVolume{}
×
500
                if err := json.Unmarshal(buf, dv); err != nil {
×
501
                        return nil, err
×
502
                }
×
503
                res = append(res, dv)
×
504
        }
505
        return res, nil
×
506
}
507

508
func newTarReader(mountPoint string) (io.ReadCloser, error) {
×
509
        var excludeArgs []string
×
510
        for name := range excludeMap {
×
511
                excludeArgs = append(excludeArgs, "--exclude="+name)
×
512
        }
×
513

514
        args := []string{"Scv"}
×
515
        args = append(args, excludeArgs...)
×
516
        args = append(args, ".")
×
517

×
518
        cmd := exec.Command("/usr/bin/tar", args...)
×
519
        cmd.Dir = mountPoint
×
520
        stdout, err := cmd.StdoutPipe()
×
521
        if err != nil {
×
522
                return nil, err
×
523
        }
×
524
        var stderr bytes.Buffer
×
525
        cmd.Stderr = &stderr
×
526
        if err = cmd.Start(); err != nil {
×
527
                return nil, err
×
528
        }
×
529
        return &execReader{cmd: cmd, stdout: stdout, stderr: io.NopCloser(&stderr)}, nil
×
530
}
531

532
func pipeToGzip(reader io.ReadCloser) io.ReadCloser {
×
533
        pr, pw := io.Pipe()
×
534
        zw := gzip.NewWriter(pw)
×
535

×
536
        go func() {
×
537
                n, err := io.Copy(zw, reader)
×
538
                if err != nil {
×
539
                        log.Log.Reason(err).Error("error piping to gzip")
×
540
                }
×
541
                if err = zw.Close(); err != nil {
×
542
                        log.Log.Reason(err).Error("error closing gzip writer")
×
543
                }
×
544
                if err = pw.Close(); err != nil {
×
545
                        log.Log.Reason(err).Error("error closing pipe writer")
×
546
                }
×
547
                log.Log.Infof("Wrote %d bytes\n", n)
×
548
        }()
549

550
        return pr
×
551
}
552

553
func getTokenQueryParam(r *http.Request) (token string) {
25✔
554
        q := r.URL.Query()
25✔
555
        if keys, ok := q[authHeader]; ok {
37✔
556
                token = keys[0]
12✔
557
                q.Del(authHeader)
12✔
558
                r.URL.RawQuery = q.Encode()
12✔
559
        }
12✔
560
        return
25✔
561
}
562

563
func getTokenHeader(r *http.Request) (token string) {
25✔
564
        if tok := r.Header.Get(authHeader); tok != "" {
38✔
565
                r.Header.Del(authHeader)
13✔
566
                token = tok
13✔
567
        }
13✔
568
        return
25✔
569
}
570

571
func tokenChecker(tokenGetter TokenGetterFunc, nextHandler http.Handler) http.Handler {
33✔
572
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58✔
573
                token, err := tokenGetter()
25✔
574
                if err != nil {
25✔
575
                        w.WriteHeader(http.StatusInternalServerError)
×
576
                        return
×
577
                }
×
578
                for _, tok := range []string{getTokenQueryParam(r), getTokenHeader(r)} {
69✔
579
                        if tok == token {
57✔
580
                                nextHandler.ServeHTTP(w, r)
13✔
581
                                return
13✔
582
                        }
13✔
583
                }
584
                w.WriteHeader(http.StatusUnauthorized)
12✔
585
        })
586
}
587

588
func archiveHandler(mountPoint string) http.Handler {
×
589
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
×
590
                if req.Method != http.MethodGet {
×
591
                        w.WriteHeader(http.StatusBadRequest)
×
592
                        return
×
593
                }
×
594
                if hasPermissions := checkDirectoryPermissions(mountPoint); !hasPermissions {
×
595
                        w.WriteHeader(http.StatusInternalServerError)
×
596
                        return
×
597
                }
×
598

599
                tarReader, err := newTarReader(mountPoint)
×
600
                if err != nil {
×
601
                        log.Log.Reason(err).Error("error creating tar reader")
×
602
                        w.WriteHeader(http.StatusInternalServerError)
×
603
                        return
×
604
                }
×
605
                defer tarReader.Close()
×
606
                gzipReader := pipeToGzip(tarReader)
×
607
                defer gzipReader.Close()
×
608
                n, err := io.Copy(w, gzipReader)
×
609
                if err != nil {
×
610
                        log.Log.Reason(err).Error("error writing response body")
×
611
                }
×
612
                log.Log.Infof("Wrote %d bytes\n", n)
×
613
        })
614
}
615

616
func checkDirectoryPermissions(filePath string) bool {
×
617
        dir, err := os.Open(filePath)
×
618
        if err != nil {
×
619
                log.Log.Reason(err).Errorf("error opening %s", filePath)
×
620
                return false
×
621
        }
×
622
        defer dir.Close()
×
623

×
624
        // Read all filenames
×
625
        contents, err := dir.Readdirnames(-1)
×
626
        if err != nil {
×
627
                log.Log.Reason(err).Errorf("failed to read directory contents: %v", err)
×
628
                return false
×
629
        }
×
630

631
        for _, item := range contents {
×
632
                if _, ok := excludeMap[item]; ok {
×
633
                        continue
×
634
                }
635
                itemPath := filepath.Join(filePath, item)
×
636
                // Check if export server has permissions to manipulate the file
×
637
                file, err := os.Open(itemPath)
×
638
                if err != nil {
×
639
                        log.Log.Reason(err).Errorf("%s may lack read permissions", itemPath)
×
640
                        return false
×
641
                }
×
642
                file.Close()
×
643
        }
644
        return true
×
645
}
646

647
func checkVolumePermissions(path string) bool {
×
648
        fi, err := os.Stat(path)
×
649
        if err != nil {
×
650
                log.Log.Reason(err).Errorf("error statting %s", path)
×
651
                return false
×
652
        }
×
653
        if fi.IsDir() {
×
654
                return checkDirectoryPermissions(path)
×
655
        }
×
656
        f, err := os.Open(path)
×
657
        if err != nil {
×
658
                log.Log.Reason(err).Errorf("error opening %s", path)
×
659
                return false
×
660
        }
×
661
        f.Close()
×
662
        return true
×
663
}
664

665
func gzipHandler(filePath string) http.Handler {
×
666
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
×
667
                if req.Method != http.MethodGet {
×
668
                        w.WriteHeader(http.StatusBadRequest)
×
669
                        return
×
670
                }
×
671
                f, err := os.Open(filePath)
×
672
                if err != nil {
×
673
                        log.Log.Reason(err).Errorf("error opening %s", filePath)
×
674
                        w.WriteHeader(http.StatusInternalServerError)
×
675
                        return
×
676
                }
×
677
                defer f.Close()
×
678
                gzipReader := pipeToGzip(f)
×
679
                defer gzipReader.Close()
×
680
                n, err := io.Copy(w, gzipReader)
×
681
                if err != nil {
×
682
                        log.Log.Reason(err).Error("error writing response body")
×
683
                }
×
684
                log.Log.Infof("Wrote %d bytes\n", n)
×
685
        })
686
}
687

688
func vmHandler(vi []export.VolumeInfo, getBasePath func() (string, error), getCmFunc func() (*corev1.ConfigMap, error)) http.Handler {
14✔
689
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
28✔
690
                if req.Method != http.MethodGet {
18✔
691
                        w.WriteHeader(http.StatusBadRequest)
4✔
692
                        return
4✔
693
                }
4✔
694
                resources := make([]runtime.Object, 0)
10✔
695
                outputFunc := resourceToBytesJson
10✔
696
                contentType := req.Header.Get("Accept")
10✔
697
                if contentType == runtime.ContentTypeYAML {
13✔
698
                        outputFunc = resourceToBytesYaml
3✔
699
                }
3✔
700
                exportName, err := getExportName()
10✔
701
                if err != nil {
11✔
702
                        log.Log.Reason(err).Error("error reading export name")
1✔
703
                        w.WriteHeader(http.StatusInternalServerError)
1✔
704
                        return
1✔
705
                }
1✔
706
                headerSecretName := getSecretTokenName(exportName)
9✔
707
                path, err := getBasePath()
9✔
708
                if err != nil {
11✔
709
                        if errors.Is(err, os.ErrNotExist) {
2✔
710
                                log.Log.Reason(err).Info("path not found")
×
711
                                w.WriteHeader(http.StatusNotFound)
×
712
                        } else {
2✔
713
                                log.Log.Reason(err).Error("error reading path")
2✔
714
                                w.WriteHeader(http.StatusInternalServerError)
2✔
715
                        }
2✔
716
                        return
2✔
717
                }
718
                certCm, error := getCmFunc()
7✔
719
                if error != nil {
8✔
720
                        log.Log.Reason(err).Error("error reading ca configmap information")
1✔
721
                        w.WriteHeader(http.StatusInternalServerError)
1✔
722
                        return
1✔
723
                }
1✔
724
                certCm.TypeMeta = metav1.TypeMeta{
6✔
725
                        Kind:       "ConfigMap",
6✔
726
                        APIVersion: "v1",
6✔
727
                }
6✔
728
                resources = append(resources, certCm)
6✔
729
                expandedVm := getExpandedVM()
6✔
730
                if expandedVm == nil {
7✔
731
                        log.Log.Reason(err).Error("error getting VM definition")
1✔
732
                        w.WriteHeader(http.StatusInternalServerError)
1✔
733
                        return
1✔
734
                }
1✔
735
                expandedVm.TypeMeta = metav1.TypeMeta{
5✔
736
                        Kind:       virtv1.VirtualMachineGroupVersionKind.Kind,
5✔
737
                        APIVersion: virtv1.VirtualMachineGroupVersionKind.GroupVersion().String(),
5✔
738
                }
5✔
739
                for i, dvTemplate := range expandedVm.Spec.DataVolumeTemplates {
7✔
740
                        dvTemplate.Spec.Source.HTTP.URL = fmt.Sprintf("https://%s", filepath.Join(path, vi[i].RawGzURI))
2✔
741
                        dvTemplate.Spec.Source.HTTP.CertConfigMap = certCm.Name
2✔
742
                        dvTemplate.Spec.Source.HTTP.SecretExtraHeaders = []string{headerSecretName}
2✔
743
                }
2✔
744
                resources = append(resources, expandedVm)
5✔
745
                datavolumes, err := getDataVolumes(expandedVm)
5✔
746
                if err != nil {
5✔
747
                        log.Log.Reason(err).Error("error reading datavolumes information")
×
748
                        w.WriteHeader(http.StatusInternalServerError)
×
749
                        return
×
750
                }
×
751
                for _, dv := range datavolumes {
6✔
752
                        dv.TypeMeta = metav1.TypeMeta{
1✔
753
                                Kind:       "DataVolume",
1✔
754
                                APIVersion: "cdi.kubevirt.io/v1beta1",
1✔
755
                        }
1✔
756
                        for _, info := range vi {
2✔
757
                                if strings.Contains(info.RawGzURI, dv.Name) {
2✔
758
                                        dv.Spec.Source.HTTP.URL = fmt.Sprintf("https://%s", filepath.Join(path, info.RawGzURI))
1✔
759
                                }
1✔
760
                        }
761
                        dv.Spec.Source.HTTP.CertConfigMap = certCm.Name
1✔
762
                        dv.Spec.Source.HTTP.SecretExtraHeaders = []string{headerSecretName}
1✔
763
                        resources = append(resources, dv)
1✔
764
                }
765
                data, err := outputFunc(resources)
5✔
766
                if err != nil {
5✔
767
                        w.WriteHeader(http.StatusInternalServerError)
×
768
                        return
×
769
                }
×
770
                n, err := w.Write(data)
5✔
771
                if err != nil {
5✔
772
                        log.Log.Reason(err).Error("error writing manifests")
×
773
                        w.WriteHeader(http.StatusInternalServerError)
×
774
                        return
×
775
                }
×
776
                log.Log.Infof("Wrote %d bytes\n", n)
5✔
777
        })
778
}
779

780
func resourceToBytesJson(resources []runtime.Object) ([]byte, error) {
3✔
781
        list := corev1.List{
3✔
782
                TypeMeta: metav1.TypeMeta{
3✔
783
                        Kind:       "List",
3✔
784
                        APIVersion: "v1",
3✔
785
                },
3✔
786
                ListMeta: metav1.ListMeta{},
3✔
787
        }
3✔
788
        for _, resource := range resources {
8✔
789
                list.Items = append(list.Items, runtime.RawExtension{Object: resource})
5✔
790
        }
5✔
791
        resourceBytes, err := json.MarshalIndent(list, "", "    ")
3✔
792
        if err != nil {
3✔
793
                return nil, err
×
794
        }
×
795
        return resourceBytes, nil
3✔
796
}
797

798
func resourceToBytesYaml(resources []runtime.Object) ([]byte, error) {
4✔
799
        data := []byte{}
4✔
800
        for _, resource := range resources {
12✔
801
                resourceBytes, err := yaml.Marshal(resource)
8✔
802
                if err != nil {
8✔
803
                        return nil, err
×
804
                }
×
805
                data = append(data, resourceBytes...)
8✔
806
                data = append(data, []byte("---\n")...)
8✔
807
        }
808
        return data, nil
4✔
809
}
810

811
// symlinkSafeDir is an http.FileSystem that prevents symlink traversal outside root.
812
// Unlike http.Dir, it resolves symlinks via safepath and rejects any that escape the
813
// root boundary, preventing attackers from reading files outside the exported PVC.
814
type symlinkSafeDir struct {
815
        root string
816
}
817

818
func (d symlinkSafeDir) Open(name string) (http.File, error) {
7✔
819
        cleanName := path.Clean("/" + name)
7✔
820
        if cleanName == "/" {
8✔
821
                cleanName = "."
1✔
822
        } else {
7✔
823
                cleanName = cleanName[1:]
6✔
824
        }
6✔
825

826
        resolved, err := safepath.JoinAndResolveWithRelativeRoot(d.root, cleanName)
7✔
827
        if err != nil {
11✔
828
                return nil, err
4✔
829
        }
4✔
830

831
        fd, err := safepath.OpenAtNoFollow(resolved)
3✔
832
        if err != nil {
3✔
833
                return nil, err
×
834
        }
×
835
        defer fd.Close()
3✔
836

3✔
837
        return os.Open(fd.SafePath())
3✔
838
}
839

840
func dirHandler(uri, mountPoint string) http.Handler {
6✔
841
        return http.StripPrefix(uri, http.FileServer(symlinkSafeDir{root: mountPoint}))
6✔
842
}
6✔
843

844
func fileHandler(file string) http.Handler {
×
845
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
×
846
                f, err := os.Open(file)
×
847
                if err != nil {
×
848
                        log.Log.Reason(err).Errorf("error opening %s", file)
×
849
                        w.WriteHeader(http.StatusInternalServerError)
×
850
                        return
×
851
                }
×
852
                defer f.Close()
×
853
                http.ServeContent(w, r, "disk.img", time.Time{}, f)
×
854
        })
855
}
856

857
func getToken(tokenFile string) (string, error) {
×
858
        content, err := os.ReadFile(tokenFile)
×
859
        if err != nil {
×
860
                return "", err
×
861
        }
×
862

863
        return string(content), nil
×
864
}
865

866
var getSecretTokenName = func(exportName string) string {
11✔
867
        return fmt.Sprintf("header-secret-%s", exportName)
11✔
868
}
11✔
869

870
func secretHandler(tokenGetter TokenGetterFunc) http.Handler {
8✔
871
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
16✔
872
                if req.Method != http.MethodGet {
12✔
873
                        w.WriteHeader(http.StatusBadRequest)
4✔
874
                        return
4✔
875
                }
4✔
876
                resources := make([]runtime.Object, 0)
4✔
877
                outputFunc := resourceToBytesJson
4✔
878
                contentType := req.Header.Get("Accept")
4✔
879
                if contentType == runtime.ContentTypeYAML {
5✔
880
                        outputFunc = resourceToBytesYaml
1✔
881
                }
1✔
882
                token, err := tokenGetter()
4✔
883
                if err != nil {
5✔
884
                        log.Log.Reason(err).Error("error getting token")
1✔
885
                        w.WriteHeader(http.StatusInternalServerError)
1✔
886
                        return
1✔
887
                }
1✔
888
                exportName, err := getExportName()
3✔
889
                if err != nil {
4✔
890
                        log.Log.Reason(err).Error("error reading export name")
1✔
891
                        w.WriteHeader(http.StatusInternalServerError)
1✔
892
                        return
1✔
893
                }
1✔
894
                headerSecretName := getSecretTokenName(exportName)
2✔
895
                secret := getCdiHeaderSecret(token, headerSecretName)
2✔
896
                secret.TypeMeta = metav1.TypeMeta{
2✔
897
                        Kind:       "Secret",
2✔
898
                        APIVersion: "v1",
2✔
899
                }
2✔
900
                resources = append(resources, secret)
2✔
901
                data, err := outputFunc(resources)
2✔
902
                if err != nil {
2✔
903
                        log.Log.Reason(err).Errorf("error generating secret manifest")
×
904
                        w.WriteHeader(http.StatusInternalServerError)
×
905
                        return
×
906
                }
×
907
                n, err := w.Write(data)
2✔
908
                if err != nil {
2✔
909
                        log.Log.Reason(err).Error("error writing secret manifest")
×
910
                        w.WriteHeader(http.StatusInternalServerError)
×
911
                        return
×
912
                }
×
913
                log.Log.Infof("Wrote %d bytes\n", n)
2✔
914
        })
915
}
916

917
func (s *exportServer) readyHandler(w http.ResponseWriter, r *http.Request) {
3✔
918
        if s.ociBuilder != nil && !s.ociBuilder.Ready() {
4✔
919
                http.Error(w, "OCI digest computation in progress", http.StatusServiceUnavailable)
1✔
920
                return
1✔
921
        }
1✔
922
        io.WriteString(w, "OK")
2✔
923
}
924

925
func (s *exportServer) handleTunnel(w http.ResponseWriter, r *http.Request) {
5✔
926
        if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
7✔
927
                log.Log.Error("tunnel rejected: no client certificate presented")
2✔
928
                http.Error(w, "mTLS required", http.StatusUnauthorized)
2✔
929
                return
2✔
930
        }
2✔
931

932
        expectedCN := fmt.Sprintf("kubevirt.io:system:client:%s", s.BackupUID)
3✔
933
        clientCN := r.TLS.PeerCertificates[0].Subject.CommonName
3✔
934
        if clientCN != expectedCN {
4✔
935
                log.Log.Errorf("identity mismatch, cert: %s, expected: %s", clientCN, expectedCN)
1✔
936
                http.Error(w, "Forbidden", http.StatusForbidden)
1✔
937
                return
1✔
938
        }
1✔
939

940
        s.nbdMu.Lock()
2✔
941
        if s.nbdClient != nil {
3✔
942
                s.nbdMu.Unlock()
1✔
943
                _ = r.Body.Close()
1✔
944
                log.Log.Warning("rejecting tunnel: active session already exists")
1✔
945
                http.Error(w, "Conflict", http.StatusConflict)
1✔
946
                return
1✔
947
        }
1✔
948

949
        ctx, cancel := context.WithCancel(r.Context())
1✔
950
        defer cancel()
1✔
951
        conn := newH2ServerConn(r.Body, w, cancel)
1✔
952

1✔
953
        var dialOnce sync.Once
1✔
954
        clientConn, err := grpc.NewClient(
1✔
955
                "passthrough:///backup",
1✔
956
                grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
1✔
957
                        var c net.Conn
×
958
                        dialOnce.Do(func() { c = conn })
×
959
                        if c != nil {
×
960
                                return c, nil
×
961
                        }
×
962
                        return nil, fmt.Errorf("tunnel connection is single-use; reconnect not supported")
×
963
                }),
964
                grpc.WithTransportCredentials(insecure.NewCredentials()),
965
                grpc.WithKeepaliveParams(keepalive.ClientParameters{
966
                        Time:                10 * time.Second,
967
                        Timeout:             5 * time.Second,
968
                        PermitWithoutStream: true,
969
                }),
970
        )
971
        if err != nil {
1✔
972
                s.nbdMu.Unlock()
×
973
                log.Log.Reason(err).Error("failed to initialize gRPC client for tunnel")
×
974
                http.Error(w, "Internal Server Error", http.StatusInternalServerError)
×
975
                return
×
976
        }
×
977

978
        w.WriteHeader(http.StatusOK)
1✔
979
        w.(http.Flusher).Flush()
1✔
980

1✔
981
        s.nbdClient = nbdv1.NewNBDClient(clientConn)
1✔
982
        s.nbdMu.Unlock()
1✔
983

1✔
984
        log.Log.Infof("Exclusive backup tunnel established for %s", s.BackupUID)
1✔
985

1✔
986
        <-ctx.Done()
1✔
987

1✔
988
        s.nbdMu.Lock()
1✔
989
        clientConn.Close()
1✔
990
        s.nbdClient = nil
1✔
991
        s.nbdMu.Unlock()
1✔
992
        log.Log.Info("Backup tunnel disconnected, listener reset")
1✔
993
}
994

995
type h2ServerConn struct {
996
        r      io.ReadCloser
997
        w      http.ResponseWriter
998
        cancel context.CancelFunc
999
        once   sync.Once
1000
}
1001

1002
func newH2ServerConn(r io.ReadCloser, w http.ResponseWriter, cancel context.CancelFunc) *h2ServerConn {
2✔
1003
        return &h2ServerConn{r: r, w: w, cancel: cancel}
2✔
1004
}
2✔
1005

1006
func (c *h2ServerConn) Read(b []byte) (int, error) { return c.r.Read(b) }
1✔
1007

1008
func (c *h2ServerConn) Write(b []byte) (int, error) {
1✔
1009
        n, err := c.w.Write(b)
1✔
1010
        if f, ok := c.w.(http.Flusher); ok {
2✔
1011
                f.Flush()
1✔
1012
        }
1✔
1013
        return n, err
1✔
1014
}
1015

1016
func (c *h2ServerConn) Close() error {
1✔
1017
        c.once.Do(c.cancel)
1✔
1018
        return c.r.Close()
1✔
1019
}
1✔
1020

1021
func (c *h2ServerConn) LocalAddr() net.Addr                { return h2DummyAddr }
×
1022
func (c *h2ServerConn) RemoteAddr() net.Addr               { return h2DummyAddr }
×
1023
func (c *h2ServerConn) SetDeadline(_ time.Time) error      { return nil }
×
1024
func (c *h2ServerConn) SetReadDeadline(_ time.Time) error  { return nil }
×
1025
func (c *h2ServerConn) SetWriteDeadline(_ time.Time) error { return nil }
×
1026

1027
type ExportMapExtent struct {
1028
        Offset      uint64 `json:"offset"`
1029
        Length      uint64 `json:"length"`
1030
        Type        uint64 `json:"type"`
1031
        Description string `json:"description"`
1032
}
1033

1034
type ExportMapResponse struct {
1035
        Extents    []ExportMapExtent `json:"extents"`
1036
        NextOffset *uint64           `json:"next_offset"`
1037
}
1038

1039
func (s *exportServer) backupMapHandler(exportName string) http.Handler {
15✔
1040
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
30✔
1041
                if req.Method != http.MethodGet {
19✔
1042
                        w.WriteHeader(http.StatusMethodNotAllowed)
4✔
1043
                        return
4✔
1044
                }
4✔
1045

1046
                s.nbdMu.RLock()
11✔
1047
                client := s.nbdClient
11✔
1048
                s.nbdMu.RUnlock()
11✔
1049
                if client == nil {
12✔
1050
                        http.Error(w, "Backup source (virt-launcher) not connected via tunnel", http.StatusServiceUnavailable)
1✔
1051
                        return
1✔
1052
                }
1✔
1053

1054
                offset := uint64(0)
10✔
1055
                length := uint64(0)
10✔
1056
                pageSize := defaultMapPageSize
10✔
1057
                query := req.URL.Query()
10✔
1058

10✔
1059
                if offsetStr := query.Get("offset"); offsetStr != "" {
12✔
1060
                        o, err := strconv.ParseUint(offsetStr, 10, 64)
2✔
1061
                        if err != nil {
3✔
1062
                                http.Error(w, fmt.Sprintf("invalid offset %q: %v", offsetStr, err), http.StatusBadRequest)
1✔
1063
                                return
1✔
1064
                        }
1✔
1065
                        offset = o
1✔
1066
                }
1067
                if lengthStr := query.Get("length"); lengthStr != "" {
11✔
1068
                        l, err := strconv.ParseUint(lengthStr, 10, 64)
2✔
1069
                        if err != nil {
3✔
1070
                                http.Error(w, fmt.Sprintf("invalid length %q: %v", lengthStr, err), http.StatusBadRequest)
1✔
1071
                                return
1✔
1072
                        }
1✔
1073
                        length = l
1✔
1074
                }
1075
                if pageSizeStr := query.Get("page_size"); pageSizeStr != "" {
11✔
1076
                        p, err := strconv.Atoi(pageSizeStr)
3✔
1077
                        if err != nil || p <= 0 {
5✔
1078
                                http.Error(w, fmt.Sprintf("invalid page_size %q", pageSizeStr), http.StatusBadRequest)
2✔
1079
                                return
2✔
1080
                        }
2✔
1081
                        pageSize = p
1✔
1082
                }
1083

1084
                var bitmapName string
6✔
1085
                if s.BackupType == string(backupv1.Incremental) && s.BackupCheckpoint != "" {
7✔
1086
                        bitmapName = s.BackupCheckpoint
1✔
1087
                }
1✔
1088

1089
                streamCtx, streamCancel := context.WithCancel(req.Context())
6✔
1090
                defer streamCancel()
6✔
1091

6✔
1092
                stream, err := client.Map(streamCtx, &nbdv1.MapRequest{
6✔
1093
                        ExportName: exportName,
6✔
1094
                        BitmapName: bitmapName,
6✔
1095
                        Offset:     offset,
6✔
1096
                        Length:     length,
6✔
1097
                })
6✔
1098
                if err != nil {
7✔
1099
                        errMsg := fmt.Sprintf("Failed to call map for export: %s", exportName)
1✔
1100
                        log.Log.Reason(err).Error(errMsg)
1✔
1101
                        http.Error(w, errMsg, http.StatusInternalServerError)
1✔
1102
                        return
1✔
1103
                }
1✔
1104

1105
                extents, nextOffsetPtr, err := collectMapPage(stream, pageSize)
5✔
1106
                if err != nil {
6✔
1107
                        errMsg := fmt.Sprintf("Failed to collect map extents for export: %s", exportName)
1✔
1108
                        log.Log.Reason(err).Error(errMsg)
1✔
1109
                        http.Error(w, errMsg, http.StatusInternalServerError)
1✔
1110
                        return
1✔
1111
                }
1✔
1112

1113
                page := ExportMapResponse{
4✔
1114
                        Extents:    extents,
4✔
1115
                        NextOffset: nextOffsetPtr,
4✔
1116
                }
4✔
1117

4✔
1118
                w.Header().Set("Content-Type", "application/json")
4✔
1119
                if err := json.NewEncoder(w).Encode(page); err != nil {
4✔
1120
                        log.Log.Reason(err).Errorf("failed to encode map page for export %s", exportName)
×
1121
                }
×
1122
        })
1123
}
1124

1125
func (s *exportServer) backupDataHandler(exportName string) http.Handler {
11✔
1126
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
22✔
1127
                if req.Method != http.MethodGet {
15✔
1128
                        w.WriteHeader(http.StatusMethodNotAllowed)
4✔
1129
                        return
4✔
1130
                }
4✔
1131

1132
                s.nbdMu.RLock()
7✔
1133
                client := s.nbdClient
7✔
1134
                s.nbdMu.RUnlock()
7✔
1135

7✔
1136
                if client == nil {
8✔
1137
                        http.Error(w, "Backup source not connected", http.StatusServiceUnavailable)
1✔
1138
                        return
1✔
1139
                }
1✔
1140

1141
                offset := uint64(0)
6✔
1142
                length := uint64(0)
6✔
1143

6✔
1144
                query := req.URL.Query()
6✔
1145
                if offsetStr := query.Get("offset"); offsetStr != "" {
9✔
1146
                        o, err := strconv.ParseUint(offsetStr, 10, 64)
3✔
1147
                        if err != nil {
4✔
1148
                                http.Error(w, fmt.Sprintf("invalid offset %q: %v", offsetStr, err), http.StatusBadRequest)
1✔
1149
                                return
1✔
1150
                        }
1✔
1151
                        offset = o
2✔
1152
                }
1153
                if lengthStr := query.Get("length"); lengthStr != "" {
8✔
1154
                        l, err := strconv.ParseUint(lengthStr, 10, 64)
3✔
1155
                        if err != nil {
4✔
1156
                                http.Error(w, fmt.Sprintf("invalid length %q: %v", lengthStr, err), http.StatusBadRequest)
1✔
1157
                                return
1✔
1158
                        }
1✔
1159
                        length = l
2✔
1160
                }
1161

1162
                stream, err := client.Read(req.Context(), &nbdv1.ReadRequest{
4✔
1163
                        ExportName: exportName,
4✔
1164
                        Offset:     offset,
4✔
1165
                        Length:     length,
4✔
1166
                })
4✔
1167
                if err != nil {
5✔
1168
                        http.Error(w, fmt.Sprintf("Failed to call read for export: %s", exportName), http.StatusInternalServerError)
1✔
1169
                        return
1✔
1170
                }
1✔
1171

1172
                w.Header().Set("Content-Type", "application/octet-stream")
3✔
1173
                for {
8✔
1174
                        chunk, err := stream.Recv()
5✔
1175
                        if errors.Is(err, io.EOF) {
8✔
1176
                                break
3✔
1177
                        }
1178
                        if err != nil {
2✔
1179
                                log.Log.Reason(err).Error("Tunnel stream interrupted")
×
1180
                                panic(http.ErrAbortHandler)
×
1181
                        }
1182
                        if _, err := w.Write(chunk.Data); err != nil {
2✔
1183
                                log.Log.Reason(err).Error("HTTP client disconnected during stream")
×
1184
                                return
×
1185
                        }
×
1186
                        if f, ok := w.(http.Flusher); ok {
4✔
1187
                                f.Flush()
2✔
1188
                        }
2✔
1189
                }
1190
        })
1191
}
1192

1193
func collectMapPage(stream nbdv1.NBD_MapClient, pageSize int) ([]ExportMapExtent, *uint64, error) {
9✔
1194
        var extents []ExportMapExtent
9✔
1195
        for {
26✔
1196
                msg, err := stream.Recv()
17✔
1197
                if errors.Is(err, io.EOF) {
22✔
1198
                        return extents, nil, nil
5✔
1199
                }
5✔
1200
                if err != nil {
14✔
1201
                        return nil, nil, err
2✔
1202
                }
2✔
1203
                for _, e := range msg.Extents {
20✔
1204
                        if len(extents) >= pageSize {
12✔
1205
                                return extents, &e.Offset, nil
2✔
1206
                        }
2✔
1207
                        extents = append(extents, ExportMapExtent{
8✔
1208
                                Offset:      e.Offset,
8✔
1209
                                Length:      e.Length,
8✔
1210
                                Type:        e.Flags,
8✔
1211
                                Description: e.Description,
8✔
1212
                        })
8✔
1213
                }
1214
        }
1215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc