• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

conneroisu / groq-go / 12357267349

16 Dec 2024 04:45PM UTC coverage: 48.59% (-0.04%) from 48.633%
12357267349

push

github

conneroisu
even better formatting and appearance for doubly linked list implementation

2275 of 4682 relevant lines covered (48.59%)

91.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.91
/extensions/e2b/sandbox.go
1
package e2b
2

3
import (
4
        "context"
5
        "encoding/base64"
6
        "encoding/json"
7
        "fmt"
8
        "io"
9
        "log/slog"
10
        "math/rand"
11
        "net/http"
12
        "sync"
13
        "time"
14

15
        "github.com/conneroisu/groq-go/pkg/builders"
16
        "github.com/gorilla/websocket"
17
)
18

19
type (
20
        // ProcessEvents is a process event type.
21
        // string
22
        ProcessEvents string
23
        // SandboxTemplate is a sandbox template.
24
        SandboxTemplate string
25
        // Sandbox is a code sandbox.
26
        //
27
        // The sandbox is like an isolated, but interactive system.
28
        Sandbox struct {
29
                ID       string                  `json:"sandboxID"`  // ID of the sandbox.
30
                ClientID string                  `json:"clientID"`   // ClientID of the sandbox.
31
                Cwd      string                  `json:"cwd"`        // Cwd is the sandbox's current working directory.
32
                apiKey   string                  `json:"-"`          // apiKey is the sandbox's api key.
33
                Template SandboxTemplate         `json:"templateID"` // Template of the sandbox.
34
                baseURL  string                  `json:"-"`          // baseAPIURL is the base api url of the sandbox.
35
                Metadata map[string]string       `json:"metadata"`   // Metadata of the sandbox.
36
                logger   *slog.Logger            `json:"-"`          // logger is the sandbox's logger.
37
                client   *http.Client            `json:"-"`          // client is the sandbox's http client.
38
                header   builders.Header         `json:"-"`          // header is the sandbox's request header builder.
39
                ws       *websocket.Conn         `json:"-"`          // ws is the sandbox's websocket connection.
40
                wsURL    func(s *Sandbox) string `json:"-"`          // wsURL is the sandbox's websocket url.
41
                Map      *sync.Map               `json:"-"`          // Map is the map of the sandbox.
42
                idCh     chan int                `json:"-"`          // idCh is the channel to generate ids for requests.
43
                toolW    ToolingWrapper          `json:"-"`          // toolW is the tooling wrapper for the sandbox.
44
        }
45
        // Option is an option for the sandbox.
46
        Option func(*Sandbox)
47
        // Process is a process in the sandbox.
48
        Process struct {
49
                id  string            // ID is process id.
50
                cmd string            // cmd is process's command.
51
                Cwd string            // cwd is process's current working directory.
52
                ctx context.Context   // ctx is the context for the process.
53
                sb  *Sandbox          // sb is the sandbox the process belongs to.
54
                Env map[string]string // env is process's environment variables.
55
        }
56
        // ProcessOption is an option for the process.
57
        ProcessOption func(*Process)
58
        // Event is a file system event.
59
        Event struct {
60
                Path      string      `json:"path"`      // Path is the path of the event.
61
                Name      string      `json:"name"`      // Name is the name of file or directory.
62
                Timestamp int64       `json:"timestamp"` // Timestamp is the timestamp of the event.
63
                Error     string      `json:"error"`     // Error is the possible error of the event.
64
                Params    EventParams `json:"params"`    // Params is the parameters of the event.
65
        }
66
        // EventParams is the params for subscribing to a process event.
67
        EventParams struct {
68
                Subscription string      `json:"subscription"` // Subscription is the subscription id of the event.
69
                Result       EventResult `json:"result"`       // Result is the result of the event.
70
        }
71
        // EventResult is a file system event response.
72
        EventResult struct {
73
                Type        string `json:"type"`
74
                Line        string `json:"line"`
75
                Timestamp   int64  `json:"timestamp"`
76
                IsDirectory bool   `json:"isDirectory"`
77
                Error       string `json:"error"`
78
        }
79
        // Request is a JSON-RPC request.
80
        Request struct {
81
                JSONRPC string `json:"jsonrpc"` // JSONRPC is the JSON-RPC version of the request.
82
                Method  Method `json:"method"`  // Method is the request method.
83
                ID      int    `json:"id"`      // ID of the request.
84
                Params  []any  `json:"params"`  // Params of the request.
85
        }
86
        // Response is a JSON-RPC response.
87
        Response[T any, Q any] struct {
88
                ID     int `json:"id"`     // ID of the response.
89
                Result T   `json:"result"` // Result of the response.
90
                Error  Q   `json:"error"`  // Error of the message.
91
        }
92
        // LsResult is a result of the list request.
93
        LsResult struct {
94
                Name  string `json:"name"`  // Name is the name of the file or directory.
95
                IsDir bool   `json:"isDir"` // isDir is true if the entry is a directory.
96
        }
97
        // APIError is the error of the API.
98
        APIError struct {
99
                Code    int    `json:"code,omitempty"` // Code is the code of the error.
100
                Message string `json:"message"`        // Message is the message of the error.
101
        }
102
        // Method is a JSON-RPC method.
103
        Method string
104
)
105

106
const (
107
        OnStdout ProcessEvents = "onStdout" // OnStdout is the event for the stdout.
108
        OnStderr ProcessEvents = "onStderr" // OnStderr is the event for the stderr.
109
        OnExit   ProcessEvents = "onExit"   // OnExit is the event for the exit.
110

111
        rpc                = "2.0"
112
        charset            = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
113
        defaultBaseURL     = "https://api.e2b.dev"
114
        defaultWSScheme    = "wss"
115
        wsRoute            = "/ws"
116
        fileRoute          = "/file"
117
        sandboxesRoute     = "/sandboxes"  // (GET/POST /sandboxes)
118
        deleteSandboxRoute = "/sandboxes/" // (DELETE /sandboxes/:id)
119

120
        filesystemWrite      Method = "filesystem_write"
121
        filesystemRead       Method = "filesystem_read"
122
        filesystemList       Method = "filesystem_list"
123
        filesystemRemove     Method = "filesystem_remove"
124
        filesystemMakeDir    Method = "filesystem_makeDir"
125
        filesystemReadBytes  Method = "filesystem_readBase64"
126
        filesystemWriteBytes Method = "filesystem_writeBase64"
127
        filesystemSubscribe  Method = "filesystem_subscribe"
128
        processSubscribe     Method = "process_subscribe"
129
        processUnsubscribe   Method = "process_unsubscribe"
130
        processStart         Method = "process_start"
131
)
132

133
// NewSandbox creates a new sandbox.
134
func NewSandbox(
135
        ctx context.Context,
136
        apiKey string,
137
        opts ...Option,
138
) (*Sandbox, error) {
1✔
139
        sb := Sandbox{
1✔
140
                apiKey:   apiKey,
1✔
141
                Template: "base",
1✔
142
                baseURL:  defaultBaseURL,
1✔
143
                Metadata: map[string]string{
1✔
144
                        "sdk": "groq-go v1",
1✔
145
                },
1✔
146
                client: http.DefaultClient,
1✔
147
                logger: slog.New(slog.NewJSONHandler(io.Discard, nil)),
1✔
148
                toolW:  defaultToolWrapper,
1✔
149
                idCh:   make(chan int),
1✔
150
                Map:    new(sync.Map),
1✔
151
                wsURL: func(s *Sandbox) string {
1✔
152
                        return fmt.Sprintf("wss://49982-%s-%s.e2b.dev/ws", s.ID, s.ClientID)
×
153
                },
×
154
                header: builders.Header{
155
                        SetCommonHeaders: func(req *http.Request) {
1✔
156
                                req.Header.Set("X-API-Key", apiKey)
1✔
157
                                req.Header.Set("Content-Type", "application/json")
1✔
158
                                req.Header.Set("Accept", "application/json")
1✔
159
                        },
1✔
160
                },
161
        }
162
        for _, opt := range opts {
4✔
163
                opt(&sb)
3✔
164
        }
3✔
165
        req, err := builders.NewRequest(
1✔
166
                ctx, sb.header, http.MethodPost,
1✔
167
                fmt.Sprintf("%s%s", sb.baseURL, sandboxesRoute),
1✔
168
                builders.WithBody(&sb),
1✔
169
        )
1✔
170
        if err != nil {
1✔
171
                return &sb, err
×
172
        }
×
173
        err = sb.sendRequest(req, &sb)
1✔
174
        if err != nil {
1✔
175
                return &sb, err
×
176
        }
×
177
        sb.ws, _, err = websocket.DefaultDialer.Dial(sb.wsURL(&sb), nil)
1✔
178
        if err != nil {
1✔
179
                return &sb, err
×
180
        }
×
181
        go sb.identify(ctx)
1✔
182
        go func() {
2✔
183
                err := sb.read(ctx)
1✔
184
                if err != nil {
1✔
185
                        sb.logger.Error("failed to read sandbox", "error", err)
×
186
                }
×
187
        }()
188
        return &sb, nil
1✔
189
}
190

191
// KeepAlive keeps the sandbox alive.
192
func (s *Sandbox) KeepAlive(ctx context.Context, timeout time.Duration) error {
×
193
        req, err := builders.NewRequest(
×
194
                ctx, s.header, http.MethodPost,
×
195
                fmt.Sprintf("%s/sandboxes/%s/refreshes", s.baseURL, s.ID),
×
196
                builders.WithBody(struct {
×
197
                        Duration int `json:"duration"`
×
198
                }{Duration: int(timeout.Seconds())}),
×
199
        )
×
200
        if err != nil {
×
201
                return err
×
202
        }
×
203
        resp, err := s.client.Do(req)
×
204
        if err != nil {
×
205
                return err
×
206
        }
×
207
        defer resp.Body.Close()
×
208
        if resp.StatusCode < http.StatusOK ||
×
209
                resp.StatusCode >= http.StatusBadRequest {
×
210
                return fmt.Errorf("request to create sandbox failed: %s", resp.Status)
×
211
        }
×
212
        return nil
×
213
}
214

215
// Reconnect reconnects to the sandbox.
216
func (s *Sandbox) Reconnect(ctx context.Context) (err error) {
×
217
        if err := s.ws.Close(); err != nil {
×
218
                return err
×
219
        }
×
220
        urlu := fmt.Sprintf("wss://49982-%s-%s.e2b.dev/ws", s.ID, s.ClientID)
×
221
        s.ws, _, err = websocket.DefaultDialer.Dial(urlu, nil)
×
222
        if err != nil {
×
223
                return err
×
224
        }
×
225
        go func() {
×
226
                err := s.read(ctx)
×
227
                if err != nil {
×
228
                        fmt.Println(err)
×
229
                }
×
230
        }()
231
        return err
×
232
}
233

234
// Stop stops the sandbox.
235
func (s *Sandbox) Stop(ctx context.Context) error {
×
236
        req, err := builders.NewRequest(
×
237
                ctx, s.header, http.MethodDelete,
×
238
                fmt.Sprintf("%s%s%s", s.baseURL, deleteSandboxRoute, s.ID),
×
239
                builders.WithBody(interface{}(nil)),
×
240
        )
×
241
        if err != nil {
×
242
                return err
×
243
        }
×
244
        resp, err := s.client.Do(req)
×
245
        if err != nil {
×
246
                return err
×
247
        }
×
248
        defer resp.Body.Close()
×
249
        if resp.StatusCode < http.StatusOK ||
×
250
                resp.StatusCode >= http.StatusBadRequest {
×
251
                return fmt.Errorf("request to delete sandbox failed: %s", resp.Status)
×
252
        }
×
253
        return nil
×
254
}
255

256
// Mkdir makes a directory in the sandbox file system.
257
func (s *Sandbox) Mkdir(ctx context.Context, path string) error {
1✔
258
        respCh := make(chan []byte)
1✔
259
        err := s.writeRequest(ctx, filesystemMakeDir, []any{path}, respCh)
1✔
260
        if err != nil {
1✔
261
                return err
×
262
        }
×
263
        select {
1✔
264
        case body := <-respCh:
1✔
265
                resp, err := decodeResponse[string, APIError](body)
1✔
266
                if err != nil {
1✔
267
                        return fmt.Errorf("failed to mkdir: %w", err)
×
268
                }
×
269
                if resp.Error.Code != 0 {
1✔
270
                        return fmt.Errorf("failed to write to file: %s", resp.Error.Message)
×
271
                }
×
272
                return nil
1✔
273
        case <-ctx.Done():
×
274
                return ctx.Err()
×
275
        }
276
}
277

278
// Ls lists the files and/or directories in the sandbox file system at
279
// the given path.
280
func (s *Sandbox) Ls(ctx context.Context, path string) ([]LsResult, error) {
1✔
281
        respCh := make(chan []byte)
1✔
282
        defer close(respCh)
1✔
283
        err := s.writeRequest(ctx, filesystemList, []any{path}, respCh)
1✔
284
        if err != nil {
1✔
285
                return nil, err
×
286
        }
×
287
        select {
1✔
288
        case body := <-respCh:
1✔
289
                res, err := decodeResponse[[]LsResult, string](body)
1✔
290
                if err != nil {
1✔
291
                        return nil, err
×
292
                }
×
293
                return res.Result, nil
1✔
294
        case <-ctx.Done():
×
295
                return nil, ctx.Err()
×
296
        }
297
}
298

299
// Read reads a file from the sandbox file system.
300
func (s *Sandbox) Read(
301
        ctx context.Context,
302
        path string,
303
) (string, error) {
1✔
304
        respCh := make(chan []byte)
1✔
305
        err := s.writeRequest(ctx, filesystemRead, []any{path}, respCh)
1✔
306
        if err != nil {
1✔
307
                return "", err
×
308
        }
×
309
        select {
1✔
310
        case <-ctx.Done():
×
311
                return "", ctx.Err()
×
312
        case body := <-respCh:
1✔
313
                res, err := decodeResponse[string, string](body)
1✔
314
                if err != nil {
1✔
315
                        return "", err
×
316
                }
×
317
                if res.Error != "" {
1✔
318
                        return "", fmt.Errorf("failed to read file: %s", res.Error)
×
319
                }
×
320
                return res.Result, nil
1✔
321
        }
322
}
323

324
// Write writes to a file to the sandbox file system.
325
func (s *Sandbox) Write(ctx context.Context, path string, data []byte) error {
1✔
326
        respCh := make(chan []byte)
1✔
327
        err := s.writeRequest(ctx, filesystemWrite, []any{path, string(data)}, respCh)
1✔
328
        if err != nil {
1✔
329
                return err
×
330
        }
×
331
        select {
1✔
332
        case <-ctx.Done():
×
333
                return ctx.Err()
×
334
        case resp := <-respCh:
1✔
335
                err = json.Unmarshal(resp, &Request{})
1✔
336
                if err != nil {
1✔
337
                        return err
×
338
                }
×
339
                return nil
1✔
340
        }
341
}
342

343
// ReadBytes reads a file from the sandbox file system.
344
func (s *Sandbox) ReadBytes(ctx context.Context, path string) ([]byte, error) {
×
345
        resCh := make(chan []byte)
×
346
        defer close(resCh)
×
347
        err := s.writeRequest(ctx, filesystemReadBytes, []any{path}, resCh)
×
348
        if err != nil {
×
349
                return nil, err
×
350
        }
×
351
        select {
×
352
        case body := <-resCh:
×
353
                res, err := decodeResponse[string, string](body)
×
354
                if err != nil {
×
355
                        return nil, err
×
356
                }
×
357
                sDec, err := base64.StdEncoding.DecodeString(res.Result)
×
358
                if err != nil {
×
359
                        return nil, err
×
360
                }
×
361
                return sDec, nil
×
362
        case <-ctx.Done():
×
363
                return nil, ctx.Err()
×
364
        }
365
}
366

367
// Watch watches a directory in the sandbox file system.
368
//
369
// This is intended to be run in a goroutine as it will block until the
370
// connection is closed, an error occurs, or the context is canceled.
371
//
372
// While blocking, filesystem events will be written to the provided channel.
373
func (s *Sandbox) Watch(
374
        ctx context.Context,
375
        path string,
376
        eCh chan<- Event,
377
) error {
×
378
        respCh := make(chan []byte)
×
379
        defer close(respCh)
×
380
        err := s.writeRequest(ctx, filesystemSubscribe, []any{"watchDir", path}, respCh)
×
381
        if err != nil {
×
382
                return err
×
383
        }
×
384
        res, err := decodeResponse[string, string](<-respCh)
×
385
        if err != nil {
×
386
                return err
×
387
        }
×
388
        s.Map.Store(res.Result, eCh)
×
389
        go func() {
×
390
                for {
×
391
                        select {
×
392
                        case <-ctx.Done():
×
393
                                return
×
394
                        default:
×
395
                                var event Event
×
396
                                err := json.Unmarshal(<-respCh, &event)
×
397
                                if err != nil {
×
398
                                        return
×
399
                                }
×
400
                                if event.Error != "" {
×
401
                                        return
×
402
                                }
×
403
                                if event.Params.Subscription != path {
×
404
                                        continue
×
405
                                }
406
                                eCh <- event
×
407
                        }
408
                }
409
        }()
410
        return nil
×
411
}
412

413
// NewProcess creates a new process startable in the sandbox.
414
func (s *Sandbox) NewProcess(
415
        cmd string,
416
        opts ...ProcessOption,
417
) (*Process, error) {
1✔
418
        b := make([]byte, 12)
1✔
419
        for i := range b {
13✔
420
                b[i] = charset[rand.Intn(len(charset))]
12✔
421
        }
12✔
422
        proc := &Process{
1✔
423
                id:  string(b),
1✔
424
                sb:  s,
1✔
425
                cmd: cmd,
1✔
426
        }
1✔
427
        for _, opt := range opts {
1✔
428
                opt(proc)
×
429
        }
×
430
        if proc.Cwd == "" {
2✔
431
                proc.Cwd = s.Cwd
1✔
432
        }
1✔
433
        return proc, nil
1✔
434
}
435

436
// Start starts a process in the sandbox.
437
func (p *Process) Start(ctx context.Context) (err error) {
1✔
438
        if p.Env == nil {
2✔
439
                p.Env = map[string]string{"PYTHONUNBUFFERED": "1"}
1✔
440
        }
1✔
441
        respCh := make(chan []byte)
1✔
442
        err = p.sb.writeRequest(
1✔
443
                ctx,
1✔
444
                processStart,
1✔
445
                []any{p.id, p.cmd, p.Env, p.Cwd},
1✔
446
                respCh,
1✔
447
        )
1✔
448
        if err != nil {
1✔
449
                return err
×
450
        }
×
451
        p.ctx = ctx
1✔
452
        select {
1✔
453
        case body := <-respCh:
1✔
454
                res, err := decodeResponse[string, APIError](body)
1✔
455
                if err != nil {
1✔
456
                        return err
×
457
                }
×
458
                if res.Error.Code != 0 {
1✔
459
                        return fmt.Errorf("process start failed(%d): %s", res.Error.Code, res.Error.Message)
×
460
                }
×
461
                if res.Result == "" || len(res.Result) == 0 {
1✔
462
                        return fmt.Errorf("process start failed got empty result id")
×
463
                }
×
464
                if p.id != res.Result {
1✔
465
                        return fmt.Errorf("process start failed got wrong result id; want %s, got %s", p.id, res.Result)
×
466
                }
×
467
                return nil
1✔
468
        case <-ctx.Done():
×
469
                return ctx.Err()
×
470
        }
471
}
472

473
// Done returns a channel that is closed when the process is done.
474
func (p *Process) Done() <-chan struct{} {
2✔
475
        rCh, ok := p.sb.Map.Load(p.id)
2✔
476
        if !ok {
4✔
477
                return nil
2✔
478
        }
2✔
479
        return rCh.(<-chan struct{})
×
480
}
481

482
// SubscribeStdout subscribes to the process's stdout.
483
func (p *Process) SubscribeStdout(ctx context.Context) (chan Event, chan error) {
1✔
484
        return p.subscribe(ctx, OnStdout)
1✔
485
}
1✔
486

487
// SubscribeStderr subscribes to the process's stderr.
488
func (p *Process) SubscribeStderr(ctx context.Context) (chan Event, chan error) {
×
489
        return p.subscribe(ctx, OnStderr)
×
490
}
×
491

492
// SubscribeExit subscribes to the process's exit.
493
func (p *Process) SubscribeExit(ctx context.Context) (chan Event, chan error) {
×
494
        return p.subscribe(ctx, OnExit)
×
495
}
×
496

497
// Subscribe subscribes to a process event.
498
//
499
// It creates a go routine to read the process events into the provided channel.
500
func (p *Process) subscribe(
501
        ctx context.Context,
502
        event ProcessEvents,
503
) (chan Event, chan error) {
1✔
504
        events := make(chan Event)
1✔
505
        errs := make(chan error)
1✔
506
        go func(errCh chan error) {
2✔
507
                respCh := make(chan []byte)
1✔
508
                defer close(respCh)
1✔
509
                err := p.sb.writeRequest(ctx, processSubscribe, []any{event, p.id}, respCh)
1✔
510
                if err != nil {
1✔
511
                        errCh <- err
×
512
                }
×
513
                res, err := decodeResponse[string, any](<-respCh)
1✔
514
                if err != nil {
1✔
515
                        errCh <- err
×
516
                }
×
517
                p.sb.Map.Store(res.Result, respCh)
1✔
518
        loop:
1✔
519
                for {
3✔
520
                        select {
2✔
521
                        case eventBd := <-respCh:
1✔
522
                                var event Event
1✔
523
                                _ = json.Unmarshal(eventBd, &event)
1✔
524
                                if event.Error != "" {
1✔
525
                                        p.sb.logger.Error("failed to read event", "error", event.Error)
×
526
                                        continue
×
527
                                }
528
                                events <- event
1✔
529
                        case <-ctx.Done():
1✔
530
                                break loop
1✔
531
                        case <-p.Done():
×
532
                                break loop
×
533
                        }
534
                }
535

536
                p.sb.Map.Delete(res.Result)
1✔
537
                finishCtx, cancel := context.WithCancel(context.Background())
1✔
538
                defer cancel()
1✔
539
                p.sb.logger.Debug("unsubscribing from process", "event", event, "id", res.Result)
1✔
540
                _ = p.sb.writeRequest(finishCtx, processUnsubscribe, []any{res.Result}, respCh)
1✔
541
                unsubRes, _ := decodeResponse[bool, string](<-respCh)
1✔
542
                if unsubRes.Error != "" || !unsubRes.Result {
1✔
543
                        p.sb.logger.Debug("failed to unsubscribe from process", "error", unsubRes.Error)
×
544
                }
×
545
        }(errs)
546
        return events, errs
1✔
547
}
548
func (s *Sandbox) sendRequest(req *http.Request, v interface{}) error {
1✔
549
        res, err := s.client.Do(req)
1✔
550
        if err != nil {
1✔
551
                return err
×
552
        }
×
553
        defer res.Body.Close()
1✔
554
        if res.StatusCode < http.StatusOK ||
1✔
555
                res.StatusCode >= http.StatusBadRequest {
1✔
556
                return fmt.Errorf("failed to create sandbox: %s", res.Status)
×
557
        }
×
558
        if v == nil {
1✔
559
                return nil
×
560
        }
×
561
        switch o := v.(type) {
1✔
562
        case *string:
×
563
                b, err := io.ReadAll(res.Body)
×
564
                if err != nil {
×
565
                        return err
×
566
                }
×
567
                *o = string(b)
×
568
                return nil
×
569
        default:
1✔
570
                return json.NewDecoder(res.Body).Decode(v)
1✔
571
        }
572
}
573
func decodeResponse[T any, Q any](body []byte) (*Response[T, Q], error) {
5✔
574
        decResp := new(Response[T, Q])
5✔
575
        err := json.Unmarshal(body, decResp)
5✔
576
        if err != nil {
5✔
577
                return nil, err
×
578
        }
×
579
        return decResp, nil
5✔
580
}
581
func (s *Sandbox) read(ctx context.Context) error {
1✔
582
        var body []byte
1✔
583
        var err error
1✔
584
        type decResp struct {
1✔
585
                Method string `json:"method"`
1✔
586
                ID     int    `json:"id"`
1✔
587
                Params struct {
1✔
588
                        Subscription string `json:"subscription"`
1✔
589
                }
1✔
590
        }
1✔
591
        defer func() {
1✔
592
                err := s.ws.Close()
×
593
                if err != nil {
×
594
                        s.logger.Error("failed to close sandbox", "error", err)
×
595
                }
×
596
        }()
597
        msgCh := make(chan []byte, 10)
1✔
598
        for {
16✔
599
                select {
15✔
600
                case body = <-msgCh:
7✔
601
                        var decResp decResp
7✔
602
                        err = json.Unmarshal(body, &decResp)
7✔
603
                        if err != nil {
7✔
604
                                return err
×
605
                        }
×
606
                        var key any
7✔
607
                        key = decResp.Params.Subscription
7✔
608
                        if decResp.ID != 0 {
13✔
609
                                key = decResp.ID
6✔
610
                        }
6✔
611
                        toR, ok := s.Map.Load(key)
7✔
612
                        if !ok {
7✔
613
                                msgCh <- body
×
614
                                continue
×
615
                        }
616
                        toRCh, ok := toR.(chan []byte)
7✔
617
                        if !ok {
7✔
618
                                msgCh <- body
×
619
                                continue
×
620
                        }
621
                        s.logger.Debug("read",
7✔
622
                                "subscription", decResp.Params.Subscription,
7✔
623
                                "body", body,
7✔
624
                                "sandbox", s.ID,
7✔
625
                        )
7✔
626
                        toRCh <- body
7✔
627
                case <-ctx.Done():
×
628
                        return ctx.Err()
×
629
                default:
8✔
630
                        _, msg, err := s.ws.ReadMessage()
8✔
631
                        if err != nil {
8✔
632
                                return err
×
633
                        }
×
634
                        msgCh <- msg
7✔
635
                }
636
        }
637
}
638
func (s *Sandbox) writeRequest(
639
        ctx context.Context,
640
        method Method,
641
        params []any,
642
        respCh chan []byte,
643
) error {
7✔
644
        select {
7✔
645
        case <-ctx.Done():
×
646
                return ctx.Err()
×
647
        case id := <-s.idCh:
7✔
648
                req := Request{
7✔
649
                        Method:  method,
7✔
650
                        JSONRPC: rpc,
7✔
651
                        Params:  params,
7✔
652
                        ID:      id,
7✔
653
                }
7✔
654
                s.logger.Debug("request",
7✔
655
                        "sandbox", id,
7✔
656
                        "method", method,
7✔
657
                        "id", id,
7✔
658
                        "params", params,
7✔
659
                )
7✔
660
                s.Map.Store(req.ID, respCh)
7✔
661
                jsVal, err := json.Marshal(req)
7✔
662
                if err != nil {
7✔
663
                        return err
×
664
                }
×
665
                err = s.ws.WriteMessage(websocket.TextMessage, jsVal)
7✔
666
                if err != nil {
7✔
667
                        return fmt.Errorf(
×
668
                                "writing %s request failed (%d): %w",
×
669
                                method,
×
670
                                req.ID,
×
671
                                err,
×
672
                        )
×
673
                }
×
674
                return nil
7✔
675
        }
676
}
677
func (s *Sandbox) identify(ctx context.Context) {
1✔
678
        id := 1
1✔
679
        for {
9✔
680
                select {
8✔
681
                case <-ctx.Done():
1✔
682
                        return
1✔
683
                default:
7✔
684
                        s.idCh <- id
7✔
685
                        id++
7✔
686
                }
687
        }
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc