• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fornellas / resonance / 14681039432

26 Apr 2025 12:05PM UTC coverage: 57.102% (+0.3%) from 56.807%
14681039432

Pull #262

github

web-flow
Merge d5a56d147 into 194d32297
Pull Request #262: Refactor Logger

662 of 831 new or added lines in 29 files covered. (79.66%)

8 existing lines in 5 files now uncovered.

3791 of 6639 relevant lines covered (57.1%)

22.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.65
/host/agent_client_wrapper.go
1
package host
2

3
import (
4
        "bytes"
5
        "compress/gzip"
6
        "context"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "io/fs"
11
        "log/slog"
12
        "net"
13
        "os"
14
        userPkg "os/user"
15
        "regexp"
16
        "sort"
17
        "strings"
18
        "sync"
19
        "syscall"
20

21
        "al.essio.dev/pkg/shellescape"
22
        "google.golang.org/grpc"
23
        "google.golang.org/grpc/credentials/insecure"
24
        "google.golang.org/grpc/status"
25

26
        "github.com/fornellas/resonance/host/lib"
27
        hostNet "github.com/fornellas/resonance/host/net"
28

29
        "github.com/fornellas/resonance/host/agent_server/proto"
30
        "github.com/fornellas/resonance/host/types"
31
        "github.com/fornellas/resonance/log"
32
)
33

34
func unwrapGrpcStatusErrno(err error) error {
52✔
35
        st := status.Convert(err)
52✔
36
        for _, detail := range st.Details() {
79✔
37
                if errno, ok := detail.(*proto.Errno); ok {
54✔
38
                        return syscall.Errno(errno.Errno)
27✔
39
                }
27✔
40
        }
41
        return err
27✔
42
}
43

44
type AgentClientWrapperReadFileReadCloser struct {
45
        Op         string
46
        Path       string
47
        Stream     grpc.ServerStreamingClient[proto.ReadFileResponse]
48
        CancelFunc context.CancelFunc
49
        Data       []byte
50
}
51

52
func (rc *AgentClientWrapperReadFileReadCloser) Read(p []byte) (int, error) {
4✔
53
        if len(rc.Data) > 0 {
7✔
54
                n := copy(p, rc.Data)
3✔
55
                if n < len(rc.Data) {
3✔
56
                        rc.Data = rc.Data[n:]
×
57
                } else {
3✔
58
                        rc.Data = nil
3✔
59
                }
3✔
60
                return n, nil
3✔
61
        }
62

63
        readFileResponse, err := rc.Stream.Recv()
3✔
64
        if err != nil {
6✔
65
                if err == io.EOF {
6✔
66
                        return 0, err
3✔
67
                }
3✔
68
                return 0, &fs.PathError{
×
69
                        Op:   rc.Op,
×
70
                        Path: rc.Path,
×
71
                        Err:  unwrapGrpcStatusErrno(err),
×
72
                }
×
73
        }
74

75
        n := copy(p, readFileResponse.Chunk)
×
76
        if n < len(readFileResponse.Chunk) {
×
77
                rc.Data = readFileResponse.Chunk[n:]
×
78
        } else {
×
79
                rc.Data = nil
×
80
        }
×
81

82
        return n, nil
×
83
}
84

85
func (rc *AgentClientWrapperReadFileReadCloser) Close() error {
3✔
86
        rc.CancelFunc()
3✔
87
        return nil
3✔
88
}
3✔
89

90
var AgentGrpcBinGz = map[string][]byte{}
91

92
type WriterLogger struct {
93
        Logger *slog.Logger
94
}
95

96
func (wl WriterLogger) Write(b []byte) (int, error) {
×
97
        lines := strings.Split(string(b), "\n")
×
98
        for i, line := range lines {
×
99
                if len(line) == 0 && i+1 == len(lines) {
×
100
                        break
×
101
                }
NEW
102
                wl.Logger.Error(line)
×
103
        }
104
        return len(b), nil
×
105
}
106

107
var AgentBinGz = map[string][]byte{}
108

109
// AgentClientWrapper wraps a BaseHost and provides a full Host implementation with the
110
// use of an ephemeral agent.
111
type AgentClientWrapper struct {
112
        BaseHost          types.BaseHost
113
        path              string
114
        grpcClientConn    *grpc.ClientConn
115
        hostServiceClient proto.HostServiceClient
116
        spawnErrCh        chan error
117
}
118

119
func getTmpFile(ctx context.Context, baseHost types.BaseHost, template string) (string, error) {
3✔
120
        cmd := types.Cmd{
3✔
121
                Path: "mktemp",
3✔
122
                Args: []string{"-t", fmt.Sprintf("%s.XXXXXXXX", template)},
3✔
123
        }
3✔
124
        waitStatus, stdout, stderr, err := lib.SimpleRun(ctx, baseHost, cmd)
3✔
125
        if err != nil {
3✔
126
                return "", err
×
127
        }
×
128
        if !waitStatus.Success() {
3✔
129
                return "", fmt.Errorf(
×
130
                        "failed to run %s: %s\nstdout:\n%s\nstderr:\n%s",
×
131
                        cmd, waitStatus.String(), stdout, stderr,
×
132
                )
×
133
        }
×
134
        return strings.TrimRight(stdout, "\n"), nil
3✔
135
}
136

137
func chmod(ctx context.Context, baseHost types.BaseHost, name string, mode types.FileMode) error {
3✔
138
        cmd := types.Cmd{
3✔
139
                Path: "chmod",
3✔
140
                Args: []string{fmt.Sprintf("%o", mode), name},
3✔
141
        }
3✔
142
        waitStatus, stdout, stderr, err := lib.SimpleRun(ctx, baseHost, cmd)
3✔
143
        if err != nil {
3✔
144
                return err
×
145
        }
×
146
        if waitStatus.Success() {
6✔
147
                return nil
3✔
148
        }
3✔
149

150
        return fmt.Errorf(
×
151
                "failed to run %s: %s\nstdout:\n%s\nstderr:\n%s",
×
152
                cmd, waitStatus.String(), stdout, stderr,
×
153
        )
×
154
}
155

156
func getGoOs(ctx context.Context, baseHost types.BaseHost) (string, error) {
3✔
157
        cmd := types.Cmd{
3✔
158
                Path: "uname",
3✔
159
                Args: []string{"-o"},
3✔
160
        }
3✔
161
        waitStatus, stdout, stderr, err := lib.SimpleRun(ctx, baseHost, cmd)
3✔
162
        if err != nil {
3✔
163
                return "", err
×
164
        }
×
165
        if !waitStatus.Success() {
3✔
166
                return "", fmt.Errorf(
×
167
                        "failed to run %s: %s\nstdout:\n%s\nstderr:\n%s",
×
168
                        cmd, waitStatus.String(), stdout, stderr,
×
169
                )
×
170
        }
×
171
        os := strings.TrimRight(stdout, "\n")
3✔
172

3✔
173
        switch os {
3✔
174
        case "GNU/Linux":
3✔
175
                return "linux", nil
3✔
176
        default:
×
177
                return "", fmt.Errorf("operating system not recognized: %#v", os)
×
178
        }
179
}
180

181
func getGoArch(ctx context.Context, baseHost types.BaseHost) (string, error) {
3✔
182
        cmd := types.Cmd{
3✔
183
                Path: "uname",
3✔
184
                Args: []string{"-m"},
3✔
185
        }
3✔
186
        waitStatus, stdout, stderr, err := lib.SimpleRun(ctx, baseHost, cmd)
3✔
187
        if err != nil {
3✔
188
                return "", err
×
189
        }
×
190
        if !waitStatus.Success() {
3✔
191
                return "", fmt.Errorf(
×
192
                        "failed to run %s: %s\nstdout:\n%s\nstderr:\n%s",
×
193
                        cmd, waitStatus.String(), stdout, stderr,
×
194
                )
×
195
        }
×
196
        machine := strings.TrimRight(stdout, "\n")
3✔
197

3✔
198
        matched, err := regexp.MatchString("^i[23456]86$", machine)
3✔
199
        if err != nil {
3✔
200
                panic(err)
×
201
        }
202
        if matched {
3✔
203
                return "386", nil
×
204
        }
×
205
        matched, err = regexp.MatchString("^x86_64$", machine)
3✔
206
        if err != nil {
3✔
207
                panic(err)
×
208
        }
209
        if matched {
5✔
210
                return "amd64", nil
2✔
211
        }
2✔
212
        matched, err = regexp.MatchString("^armv6l|armv7l$", machine)
1✔
213
        if err != nil {
1✔
214
                panic(err)
×
215
        }
216
        if matched {
1✔
217
                return "arm", nil
×
218
        }
×
219
        matched, err = regexp.MatchString("^aarch64$", machine)
1✔
220
        if err != nil {
1✔
221
                panic(err)
×
222
        }
223
        if matched {
2✔
224
                return "arm64", nil
1✔
225
        }
1✔
226
        return "", fmt.Errorf("machine not recognized: %#v", machine)
×
227
}
228

229
func getAgentBinGz(ctx context.Context, baseHost types.BaseHost) ([]byte, error) {
3✔
230
        goos, err := getGoOs(ctx, baseHost)
3✔
231
        if err != nil {
3✔
232
                return nil, err
×
233
        }
×
234

235
        goarch, err := getGoArch(ctx, baseHost)
3✔
236
        if err != nil {
3✔
237
                return nil, err
×
238
        }
×
239
        osArch := fmt.Sprintf("%s.%s", goos, goarch)
3✔
240

3✔
241
        agentBinGz, ok := AgentBinGz[osArch]
3✔
242
        if !ok {
3✔
243
                vaildOsArch := []string{}
×
244
                for osArch := range AgentBinGz {
×
245
                        vaildOsArch = append(vaildOsArch, osArch)
×
246
                }
×
247
                sort.Strings(vaildOsArch)
×
248
                return nil, fmt.Errorf("%#v not supported by agent, supported options: %v", osArch, vaildOsArch)
×
249
        }
250
        return agentBinGz, nil
3✔
251
}
252

253
func copyReader(ctx context.Context, baseHost types.BaseHost, reader io.Reader, path string) error {
3✔
254
        cmd := types.Cmd{
3✔
255
                Path:  "sh",
3✔
256
                Args:  []string{"-c", fmt.Sprintf("cat > %s", shellescape.Quote(path))},
3✔
257
                Stdin: reader,
3✔
258
        }
3✔
259
        waitStatus, stdout, stderr, err := lib.SimpleRun(ctx, baseHost, cmd)
3✔
260
        if err != nil {
3✔
261
                return err
×
262
        }
×
263
        if !waitStatus.Success() {
3✔
264
                return fmt.Errorf(
×
265
                        "failed to run %s: %s\nstdout:\n%s\nstderr:\n%s",
×
266
                        cmd, waitStatus.String(), stdout, stderr,
×
267
                )
×
268
        }
×
269
        return nil
3✔
270
}
271

272
func NewAgentClientWrapper(ctx context.Context, baseHost types.BaseHost) (*AgentClientWrapper, error) {
3✔
273
        ctx, _ = log.MustWithGroup(ctx, "🐈 Agent")
3✔
274

3✔
275
        agentPath, err := getTmpFile(ctx, baseHost, "resonance_agent")
3✔
276
        if err != nil {
3✔
277
                return nil, err
×
278
        }
×
279

280
        if err := chmod(ctx, baseHost, agentPath, 0755); err != nil {
3✔
281
                return nil, err
×
282
        }
×
283

284
        agentBinGz, err := getAgentBinGz(ctx, baseHost)
3✔
285
        if err != nil {
3✔
286
                return nil, err
×
287
        }
×
288

289
        agentReader, err := gzip.NewReader(bytes.NewReader(agentBinGz))
3✔
290
        if err != nil {
3✔
291
                return nil, err
×
292
        }
×
293

294
        if err := copyReader(ctx, baseHost, agentReader, agentPath); err != nil {
3✔
295
                return nil, err
×
296
        }
×
297

298
        agent := AgentClientWrapper{
3✔
299
                BaseHost:   baseHost,
3✔
300
                path:       agentPath,
3✔
301
                spawnErrCh: make(chan error),
3✔
302
        }
3✔
303

3✔
304
        if err := agent.spawn(ctx); err != nil {
3✔
305
                return nil, err
×
306
        }
×
307

308
        return &agent, nil
3✔
309
}
310

311
func (h *AgentClientWrapper) spawn(ctx context.Context) error {
3✔
312
        stdinReader, stdinWriter, err := os.Pipe()
3✔
313
        if err != nil {
3✔
314
                return err
×
315
        }
×
316

317
        stdoutReader, stdoutWriter, err := os.Pipe()
3✔
318
        if err != nil {
3✔
319
                return err
×
320
        }
×
321

322
        // We just pass "127.0.0.1" to avoid issues with dns resolution, this value is not used
323
        h.grpcClientConn, err = grpc.NewClient(
3✔
324
                "127.0.0.1",
3✔
325
                grpc.WithTransportCredentials(insecure.NewCredentials()),
3✔
326
                grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
6✔
327
                        return hostNet.IOConn{
3✔
328
                                Reader: stdoutReader,
3✔
329
                                Writer: stdinWriter,
3✔
330
                        }, nil
3✔
331
                }),
3✔
332
        )
333
        if err != nil {
3✔
334
                return err
×
335
        }
×
336

337
        go func() {
6✔
338
                waitStatus, runErr := h.BaseHost.Run(ctx, types.Cmd{
3✔
339
                        Path:   h.path,
3✔
340
                        Stdin:  stdinReader,
3✔
341
                        Stdout: stdoutWriter,
3✔
342
                        Stderr: WriterLogger{
3✔
343
                                Logger: log.MustLogger(ctx).WithGroup("🔗 Agent Server"),
3✔
344
                        },
3✔
345
                })
3✔
346

3✔
347
                var waitStatusErr error
3✔
348
                if !waitStatus.Success() {
3✔
349
                        waitStatusErr = errors.New(waitStatus.String())
×
350
                }
×
351

352
                stdinReaderErr := stdinReader.Close()
3✔
353

3✔
354
                stdoutWriterErr := stdoutWriter.Close()
3✔
355

3✔
356
                h.spawnErrCh <- errors.Join(
3✔
357
                        runErr,
3✔
358
                        waitStatusErr,
3✔
359
                        stdinReaderErr,
3✔
360
                        stdoutWriterErr,
3✔
361
                )
3✔
362
        }()
363

364
        h.hostServiceClient = proto.NewHostServiceClient(h.grpcClientConn)
3✔
365
        resp, err := h.hostServiceClient.Ping(ctx, &proto.PingRequest{})
3✔
366

3✔
367
        if err != nil {
3✔
368
                return errors.Join(err, h.Close(ctx))
×
369
        }
×
370

371
        if resp.Message != "Pong" {
3✔
372
                defer h.Close(ctx)
×
373
                return fmt.Errorf("unexpected response from agent: %s", resp.Message)
×
374
        }
×
375

376
        return nil
3✔
377
}
378

379
func (h *AgentClientWrapper) Geteuid(ctx context.Context) (uint64, error) {
3✔
380
        getuidResponse, err := h.hostServiceClient.Geteuid(ctx, &proto.Empty{})
3✔
381
        if err != nil {
3✔
382
                return 0, unwrapGrpcStatusErrno(err)
×
383
        }
×
384

385
        return getuidResponse.Uid, nil
3✔
386
}
387

388
func (h *AgentClientWrapper) Getegid(ctx context.Context) (uint64, error) {
3✔
389
        getgidResponse, err := h.hostServiceClient.Getegid(ctx, &proto.Empty{})
3✔
390
        if err != nil {
3✔
391
                return 0, unwrapGrpcStatusErrno(err)
×
392
        }
×
393

394
        return getgidResponse.Gid, nil
3✔
395
}
396

397
func (h *AgentClientWrapper) Chmod(ctx context.Context, name string, mode types.FileMode) error {
6✔
398
        _, err := h.hostServiceClient.Chmod(ctx, &proto.ChmodRequest{
6✔
399
                Name: name,
6✔
400
                Mode: uint32(mode),
6✔
401
        })
6✔
402

6✔
403
        if err != nil {
11✔
404
                return &fs.PathError{
5✔
405
                        Op:   "Chmod",
5✔
406
                        Path: name,
5✔
407
                        Err:  unwrapGrpcStatusErrno(err),
5✔
408
                }
5✔
409
        }
5✔
410

411
        return nil
3✔
412
}
413

414
func (h *AgentClientWrapper) Lchown(ctx context.Context, name string, uid, gid uint32) error {
6✔
415
        _, err := h.hostServiceClient.Lchown(ctx, &proto.LchownRequest{
6✔
416
                Name: name,
6✔
417
                Uid:  int64(uid),
6✔
418
                Gid:  int64(gid),
6✔
419
        })
6✔
420

6✔
421
        if err != nil {
11✔
422
                return &fs.PathError{
5✔
423
                        Op:   "Lchown",
5✔
424
                        Path: name,
5✔
425
                        Err:  unwrapGrpcStatusErrno(err),
5✔
426
                }
5✔
427
        }
5✔
428

429
        return nil
3✔
430
}
431

432
func (h *AgentClientWrapper) Lookup(ctx context.Context, username string) (*userPkg.User, error) {
4✔
433
        resp, err := h.hostServiceClient.Lookup(ctx, &proto.LookupRequest{
4✔
434
                Username: username,
4✔
435
        })
4✔
436

4✔
437
        if err != nil {
7✔
438
                st := status.Convert(err)
3✔
439
                for _, detail := range st.Details() {
6✔
440
                        if protoUnknownUserError, ok := detail.(*proto.UnknownUserError); ok {
6✔
441
                                return nil, userPkg.UnknownUserError(protoUnknownUserError.Username)
3✔
442
                        }
3✔
443
                }
444
                return nil, unwrapGrpcStatusErrno(err)
×
445
        }
446

447
        return &userPkg.User{
3✔
448
                Uid:      resp.Uid,
3✔
449
                Gid:      resp.Gid,
3✔
450
                Username: resp.Username,
3✔
451
                Name:     resp.Name,
3✔
452
                HomeDir:  resp.Homedir,
3✔
453
        }, nil
3✔
454
}
455

456
func (h *AgentClientWrapper) LookupGroup(ctx context.Context, name string) (*userPkg.Group, error) {
4✔
457
        resp, err := h.hostServiceClient.LookupGroup(ctx, &proto.LookupGroupRequest{
4✔
458
                Name: name,
4✔
459
        })
4✔
460
        if err != nil {
7✔
461
                st := status.Convert(err)
3✔
462
                for _, detail := range st.Details() {
6✔
463
                        if protoUnknownGroupError, ok := detail.(*proto.UnknownGroupError); ok {
6✔
464
                                return nil, userPkg.UnknownGroupError(protoUnknownGroupError.Name)
3✔
465
                        }
3✔
466
                }
467
                return nil, unwrapGrpcStatusErrno(err)
×
468
        }
469

470
        return &userPkg.Group{
3✔
471
                Gid:  resp.Gid,
3✔
472
                Name: resp.Name,
3✔
473
        }, nil
3✔
474
}
475

476
func (h *AgentClientWrapper) Lstat(ctx context.Context, name string) (*types.Stat_t, error) {
23✔
477
        resp, err := h.hostServiceClient.Lstat(ctx, &proto.LstatRequest{
23✔
478
                Name: name,
23✔
479
        })
23✔
480
        if err != nil {
28✔
481
                return nil, &fs.PathError{
5✔
482
                        Op:   "Lstat",
5✔
483
                        Path: name,
5✔
484
                        Err:  unwrapGrpcStatusErrno(err),
5✔
485
                }
5✔
486
        }
5✔
487

488
        stat_t := types.Stat_t{
20✔
489
                Dev:     resp.Dev,
20✔
490
                Ino:     resp.Ino,
20✔
491
                Mode:    resp.Mode,
20✔
492
                Nlink:   uint64(resp.Nlink),
20✔
493
                Uid:     resp.Uid,
20✔
494
                Gid:     resp.Gid,
20✔
495
                Rdev:    resp.Rdev,
20✔
496
                Size:    resp.Size,
20✔
497
                Blksize: int64(resp.Blksize),
20✔
498
                Blocks:  resp.Blocks,
20✔
499
                Atim: types.Timespec{
20✔
500
                        Sec:  resp.Atim.Sec,
20✔
501
                        Nsec: resp.Atim.Nsec,
20✔
502
                },
20✔
503
                Mtim: types.Timespec{
20✔
504
                        Sec:  resp.Mtim.Sec,
20✔
505
                        Nsec: resp.Mtim.Nsec,
20✔
506
                },
20✔
507
                Ctim: types.Timespec{
20✔
508
                        Sec:  resp.Ctim.Sec,
20✔
509
                        Nsec: resp.Ctim.Nsec,
20✔
510
                },
20✔
511
        }
20✔
512

20✔
513
        return &stat_t, nil
20✔
514
}
515

516
func (h *AgentClientWrapper) ReadDir(ctx context.Context, name string) (<-chan types.DirEntResult, func()) {
6✔
517
        ctx, cancel := context.WithCancel(ctx)
6✔
518

6✔
519
        dirEntResultCh := make(chan types.DirEntResult, 100)
6✔
520

6✔
521
        go func() {
12✔
522
                stream, err := h.hostServiceClient.ReadDir(ctx, &proto.ReadDirRequest{
6✔
523
                        Name: name,
6✔
524
                })
6✔
525
                if err != nil {
6✔
526
                        dirEntResultCh <- types.DirEntResult{
×
527
                                Error: &fs.PathError{
×
528
                                        Op:   "ReadDir",
×
529
                                        Path: name,
×
530
                                        Err:  unwrapGrpcStatusErrno(err),
×
531
                                },
×
532
                        }
×
533
                        close(dirEntResultCh)
×
534
                        return
×
535
                }
×
536

537
                for {
17✔
538
                        dirEnt, err := stream.Recv()
11✔
539
                        if err == io.EOF {
14✔
540
                                break
3✔
541
                        }
542
                        if err != nil {
15✔
543
                                dirEntResultCh <- types.DirEntResult{
5✔
544
                                        Error: &fs.PathError{
5✔
545
                                                Op:   "ReadDir",
5✔
546
                                                Path: name,
5✔
547
                                                Err:  unwrapGrpcStatusErrno(err),
5✔
548
                                        },
5✔
549
                                }
5✔
550
                                close(dirEntResultCh)
5✔
551
                                return
5✔
552
                        }
5✔
553

554
                        dirEntResultCh <- types.DirEntResult{
7✔
555
                                DirEnt: types.DirEnt{
7✔
556
                                        Ino:  dirEnt.Ino,
7✔
557
                                        Type: uint8(dirEnt.Type),
7✔
558
                                        Name: dirEnt.Name,
7✔
559
                                },
7✔
560
                        }
7✔
561
                }
562

563
                close(dirEntResultCh)
3✔
564
        }()
565

566
        return dirEntResultCh, cancel
6✔
567
}
568

569
func (h *AgentClientWrapper) Mkdir(ctx context.Context, name string, mode types.FileMode) error {
19✔
570
        _, err := h.hostServiceClient.Mkdir(ctx, &proto.MkdirRequest{
19✔
571
                Name: name,
19✔
572
                Mode: uint32(mode),
19✔
573
        })
19✔
574
        if err != nil {
25✔
575
                return &fs.PathError{
6✔
576
                        Op:   "Mkdir",
6✔
577
                        Path: name,
6✔
578
                        Err:  unwrapGrpcStatusErrno(err),
6✔
579
                }
6✔
580
        }
6✔
581

582
        return nil
15✔
583
}
584

585
func (h *AgentClientWrapper) ReadFile(ctx context.Context, name string) (io.ReadCloser, error) {
7✔
586
        ctx, cancel := context.WithCancel(ctx)
7✔
587

7✔
588
        stream, err := h.hostServiceClient.ReadFile(ctx, &proto.ReadFileRequest{Name: name})
7✔
589
        if err != nil {
7✔
590
                cancel()
×
591
                return nil, &fs.PathError{
×
592
                        Op:   "ReadFile",
×
593
                        Path: name,
×
594
                        Err:  unwrapGrpcStatusErrno(err),
×
595
                }
×
596
        }
×
597

598
        // ReadFile will succeeds to create the stream before the server function is called.
599
        // Because of this, we require to read the first element of the stream here, as it
600
        // enables to catch the various errors we're expected to return.
601
        readFileResponse, err := stream.Recv()
7✔
602
        if err == io.EOF {
10✔
603
                cancel()
3✔
604
                return io.NopCloser(bytes.NewReader([]byte{})), nil
3✔
605
        }
3✔
606
        if err != nil {
11✔
607
                cancel()
5✔
608
                return nil, &fs.PathError{
5✔
609
                        Op:   "ReadFile",
5✔
610
                        Path: name,
5✔
611
                        Err:  unwrapGrpcStatusErrno(err),
5✔
612
                }
5✔
613
        }
5✔
614

615
        return &AgentClientWrapperReadFileReadCloser{
3✔
616
                Stream:     stream,
3✔
617
                CancelFunc: cancel,
3✔
618
                Data:       readFileResponse.Chunk,
3✔
619
        }, nil
3✔
620
}
621

622
func (h *AgentClientWrapper) Symlink(ctx context.Context, oldname, newname string) error {
8✔
623
        _, err := h.hostServiceClient.Symlink(ctx, &proto.SymlinkRequest{
8✔
624
                Oldname: oldname,
8✔
625
                Newname: newname,
8✔
626
        })
8✔
627

8✔
628
        if err != nil {
14✔
629
                return &fs.PathError{
6✔
630
                        Op:   "Symlink",
6✔
631
                        Path: newname,
6✔
632
                        Err:  unwrapGrpcStatusErrno(err),
6✔
633
                }
6✔
634
        }
6✔
635

636
        return nil
4✔
637
}
638

639
func (h *AgentClientWrapper) Readlink(ctx context.Context, name string) (string, error) {
5✔
640
        resp, err := h.hostServiceClient.ReadLink(ctx, &proto.ReadLinkRequest{
5✔
641
                Name: name,
5✔
642
        })
5✔
643

5✔
644
        if err != nil {
9✔
645
                return "", &fs.PathError{
4✔
646
                        Op:   "Readlink",
4✔
647
                        Path: name,
4✔
648
                        Err:  unwrapGrpcStatusErrno(err),
4✔
649
                }
4✔
650
        }
4✔
651

652
        return resp.Destination, nil
3✔
653
}
654

655
func (h *AgentClientWrapper) Remove(ctx context.Context, name string) error {
8✔
656
        _, err := h.hostServiceClient.Remove(ctx, &proto.RemoveRequest{
8✔
657
                Name: name,
8✔
658
        })
8✔
659
        if err != nil {
14✔
660
                return &fs.PathError{
6✔
661
                        Op:   "Remove",
6✔
662
                        Path: name,
6✔
663
                        Err:  unwrapGrpcStatusErrno(err),
6✔
664
                }
6✔
665
        }
6✔
666

667
        return nil
4✔
668
}
669

670
func (h *AgentClientWrapper) Mknod(ctx context.Context, pathName string, mode types.FileMode, dev types.FileDevice) error {
19✔
671
        _, err := h.hostServiceClient.Mknod(ctx, &proto.MknodRequest{
19✔
672
                Path: pathName,
19✔
673
                Mode: uint32(mode),
19✔
674
                Dev:  uint64(dev),
19✔
675
        })
19✔
676
        if err != nil {
23✔
677
                return &fs.PathError{
4✔
678
                        Op:   "Mknod",
4✔
679
                        Path: pathName,
4✔
680
                        Err:  unwrapGrpcStatusErrno(err),
4✔
681
                }
4✔
682
        }
4✔
683
        return nil
17✔
684
}
685

686
func (h *AgentClientWrapper) runStdinCopier(
687
        stdinReader io.Reader,
688
        stream grpc.BidiStreamingClient[proto.RunRequest, proto.RunResponse],
689
) error {
3✔
690
        buffer := make([]byte, 8196)
3✔
691
        for {
7✔
692
                n, err := stdinReader.Read(buffer)
4✔
693
                if err == io.EOF {
7✔
694
                        break
3✔
695
                }
696
                if err != nil {
3✔
697
                        return unwrapGrpcStatusErrno(err)
×
698
                }
×
699

700
                err = stream.Send(
3✔
701
                        &proto.RunRequest{
3✔
702
                                Data: &proto.RunRequest_StdinChunk{
3✔
703
                                        StdinChunk: buffer[:n],
3✔
704
                                },
3✔
705
                        },
3✔
706
                )
3✔
707
                if err != nil {
3✔
708
                        return unwrapGrpcStatusErrno(err)
×
709
                }
×
710
        }
711
        return nil
3✔
712
}
713

714
func (h *AgentClientWrapper) Run(ctx context.Context, cmd types.Cmd) (types.WaitStatus, error) {
8✔
715
        stream, err := h.hostServiceClient.Run(ctx)
8✔
716
        if err != nil {
8✔
717
                return types.WaitStatus{}, unwrapGrpcStatusErrno(err)
×
718
        }
×
719

720
        err = stream.Send(
8✔
721
                &proto.RunRequest{
8✔
722
                        Data: &proto.RunRequest_Cmd{
8✔
723
                                Cmd: &proto.Cmd{
8✔
724
                                        Path:    cmd.Path,
8✔
725
                                        Args:    cmd.Args,
8✔
726
                                        EnvVars: cmd.Env,
8✔
727
                                        Dir:     cmd.Dir,
8✔
728
                                        Stdin:   cmd.Stdin != nil,
8✔
729
                                        Stdout:  cmd.Stdout != nil,
8✔
730
                                        Stderr:  cmd.Stderr != nil,
8✔
731
                                },
8✔
732
                        },
8✔
733
                },
8✔
734
        )
8✔
735
        if err != nil {
8✔
736
                return types.WaitStatus{}, errors.Join(
×
737
                        unwrapGrpcStatusErrno(err),
×
738
                        stream.CloseSend(),
×
739
                )
×
740
        }
×
741

742
        var wg sync.WaitGroup
8✔
743

8✔
744
        var stdinErr error
8✔
745
        if cmd.Stdin != nil {
11✔
746
                wg.Add(1)
3✔
747
                go func() {
6✔
748
                        defer wg.Done()
3✔
749
                        stdinErr = h.runStdinCopier(cmd.Stdin, stream)
3✔
750
                }()
3✔
751
        }
752

753
        var waitStatus types.WaitStatus
8✔
754
        var recvError error
8✔
755
        for {
22✔
756
                resp, err := stream.Recv()
14✔
757
                if err == io.EOF {
14✔
758
                        break
×
759
                }
760
                if err != nil {
17✔
761
                        recvError = unwrapGrpcStatusErrno(err)
3✔
762
                        break
3✔
763
                }
764

765
                if respData, ok := resp.Data.(*proto.RunResponse_Waitstatus); ok {
20✔
766
                        waitStatus.ExitCode = int(respData.Waitstatus.Exitcode)
7✔
767
                        waitStatus.Exited = respData.Waitstatus.Exited
7✔
768
                        waitStatus.Signal = respData.Waitstatus.Signal
7✔
769
                        break
7✔
770
                } else if respData, ok := resp.Data.(*proto.RunResponse_StdoutChunk); ok {
15✔
771
                        if cmd.Stdout == nil {
7✔
772
                                panic("bug: received stdout chunk for nil stdout")
×
773
                        }
774
                        if _, err := cmd.Stdout.Write(respData.StdoutChunk); err != nil {
7✔
775
                                recvError = unwrapGrpcStatusErrno(err)
×
776
                                break
×
777
                        }
778
                } else if respData, ok := resp.Data.(*proto.RunResponse_StderrChunk); ok {
6✔
779
                        if cmd.Stderr == nil {
3✔
780
                                panic("bug: received stderr chunk for nil stderr")
×
781
                        }
782
                        if _, err := cmd.Stderr.Write(respData.StderrChunk); err != nil {
3✔
783
                                recvError = unwrapGrpcStatusErrno(err)
×
784
                                break
×
785
                        }
786
                } else {
×
787
                        panic(fmt.Errorf("bug: unexpected response data: %#v", resp.Data))
×
788
                }
789
        }
790

791
        closeSendErr := stream.CloseSend()
8✔
792

8✔
793
        wg.Wait()
8✔
794

8✔
795
        err = errors.Join(
8✔
796
                stdinErr,
8✔
797
                recvError,
8✔
798
                closeSendErr,
8✔
799
        )
8✔
800
        if err != nil {
11✔
801
                return types.WaitStatus{}, err
3✔
802
        }
3✔
803

804
        return waitStatus, nil
7✔
805
}
806

807
func (h *AgentClientWrapper) WriteFile(ctx context.Context, name string, data io.Reader, perm types.FileMode) error {
20✔
808
        stream, err := h.hostServiceClient.WriteFile(ctx)
20✔
809
        if err != nil {
20✔
810
                return &fs.PathError{
×
811
                        Op:   "WriteFile",
×
812
                        Path: name,
×
813
                        Err:  unwrapGrpcStatusErrno(err),
×
814
                }
×
815
        }
×
816

817
        err = stream.Send(
20✔
818
                &proto.WriteFileRequest{
20✔
819
                        Data: &proto.WriteFileRequest_Metadata{
20✔
820
                                Metadata: &proto.FileMetadata{
20✔
821
                                        Name: name,
20✔
822
                                        Perm: uint32(perm),
20✔
823
                                },
20✔
824
                        },
20✔
825
                },
20✔
826
        )
20✔
827
        if err != nil {
20✔
828
                _, closeAndRecvErr := stream.CloseAndRecv()
×
829
                return &fs.PathError{
×
830
                        Op:   "WriteFile",
×
831
                        Path: name,
×
832
                        Err: errors.Join(
×
833
                                unwrapGrpcStatusErrno(err),
×
834
                                closeAndRecvErr,
×
835
                        ),
×
836
                }
×
837
        }
×
838

839
        var sendErr error
20✔
840
        buffer := make([]byte, 1024)
20✔
841
        for {
53✔
842
                n, err := data.Read(buffer)
33✔
843
                if err == io.EOF {
53✔
844
                        break
20✔
845
                }
846
                if err != nil {
15✔
847
                        sendErr = unwrapGrpcStatusErrno(err)
×
848
                        break
×
849
                }
850

851
                err = stream.Send(
15✔
852
                        &proto.WriteFileRequest{
15✔
853
                                Data: &proto.WriteFileRequest_Chunk{
15✔
854
                                        Chunk: buffer[:n],
15✔
855
                                },
15✔
856
                        },
15✔
857
                )
15✔
858
                if err != nil {
15✔
859
                        sendErr = unwrapGrpcStatusErrno(err)
×
860
                        break
×
861
                }
862
        }
863

864
        _, closeAndRecvErr := stream.CloseAndRecv()
20✔
865
        err = errors.Join(
20✔
866
                sendErr,
20✔
867
                unwrapGrpcStatusErrno(closeAndRecvErr),
20✔
868
        )
20✔
869
        if err != nil {
26✔
870
                return &fs.PathError{
6✔
871
                        Op:   "WriteFile",
6✔
872
                        Path: name,
6✔
873
                        Err:  err,
6✔
874
                }
6✔
875
        }
6✔
876
        return nil
16✔
877
}
878

879
func (h *AgentClientWrapper) String() string {
3✔
880
        return h.BaseHost.String()
3✔
881
}
3✔
882

883
func (h *AgentClientWrapper) Type() string {
3✔
884
        return h.BaseHost.Type()
3✔
885
}
3✔
886

887
func (h *AgentClientWrapper) Close(ctx context.Context) error {
3✔
888

3✔
889
        _, shutdownErr := h.hostServiceClient.Shutdown(ctx, &proto.Empty{})
3✔
890

3✔
891
        var spawnErr error
3✔
892
        if shutdownErr == nil {
6✔
893
                spawnErr = <-h.spawnErrCh
3✔
894
        }
3✔
895

896
        grpcClientConnErr := h.grpcClientConn.Close()
3✔
897

3✔
898
        hostCloseErr := h.BaseHost.Close(ctx)
3✔
899

3✔
900
        return errors.Join(
3✔
901
                shutdownErr,
3✔
902
                grpcClientConnErr,
3✔
903
                spawnErr,
3✔
904
                hostCloseErr,
3✔
905
        )
3✔
906
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc