• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

romshark / plugger / 15238555594

25 May 2025 01:49PM UTC coverage: 47.388%. Remained the same
15238555594

push

github

romshark
feat: Add ErrGoToolchainNotFound

1 of 2 new or added lines in 1 file covered. (50.0%)

56 existing lines in 1 file now uncovered.

127 of 268 relevant lines covered (47.39%)

3.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.39
/plugger.go
1
package plugger
2

3
import (
4
        "bufio"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "os"
11
        "os/exec"
12
        "path/filepath"
13
        "regexp"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
)
18

19
// envelope defines the JSON based wire format.
20
type envelope struct {
21
        Cancel string          `json:"cancel"`           // Request ID to cancel
22
        ID     string          `json:"id"`               // Unique per request
23
        Method string          `json:"method,omitempty"` // Request side only
24
        Error  string          `json:"err,omitempty"`    // Set on error responses
25
        Data   json.RawMessage `json:"data,omitempty"`   // Payload
26
}
27

28
type Host struct {
29
        idCounter atomic.Uint64
30
        running   atomic.Bool
31
        wgRun     sync.WaitGroup
32
        enc       *json.Encoder
33
        dec       *json.Decoder
34
        cmd       *exec.Cmd
35
        closer    io.Closer // plugin stdin
36
        mu        sync.Mutex
37
        pending   map[string]chan envelope
38
}
39

40
// NewHost creates an empty host. Call RunPlugin afterwards.
41
func NewHost() *Host {
5✔
42
        h := &Host{pending: map[string]chan envelope{}}
5✔
43
        h.wgRun.Add(1)
5✔
44
        return h
5✔
45
}
5✔
46

47
var (
48
        ErrAlreadyRunning      = errors.New("plugin already running")
49
        ErrGoToolchainNotFound = errors.New("go toolchain not in PATH")
50
        ErrClosed              = errors.New("closed")
51
        ErrMalformedResponse   = errors.New("malformed response")
52
)
53

54
// ErrorResponse is a copy of the "err" field in the plugin response JSON.
55
type ErrorResponse string
56

57
func (e ErrorResponse) Error() string { return string(e) }
6✔
58

59
// RunPlugin executes a plugin executable or Go file/package/module.
60
func (h *Host) RunPlugin(
61
        ctx context.Context, plugin string, pluginStderr io.WriteCloser,
62
) error {
5✔
63
        if h.running.Load() {
5✔
64
                return ErrAlreadyRunning
×
65
        }
×
66
        cmd, err := spawn(plugin)
5✔
67
        if err != nil {
5✔
68
                return err
×
69
        }
×
70
        stdin, err := cmd.StdinPipe()
5✔
71
        if err != nil {
5✔
72
                return fmt.Errorf("getting stdin pipe: %w", err)
×
73
        }
×
74
        stdout, err := cmd.StdoutPipe()
5✔
75
        if err != nil {
5✔
76
                return fmt.Errorf("getting stdout pipe: %w", err)
×
77
        }
×
78
        if pluginStderr == nil {
5✔
79
                pluginStderr = os.Stderr
×
80
        }
×
81
        cmd.Stderr = pluginStderr
5✔
82
        defer func() {
10✔
83
                _ = pluginStderr.Close() // Signal no more logs.
5✔
84
        }()
5✔
85

86
        if err := cmd.Start(); err != nil {
5✔
87
                return err
×
88
        }
×
89

90
        h.enc = json.NewEncoder(stdin)
5✔
91
        h.dec = json.NewDecoder(bufio.NewReader(stdout))
5✔
92
        h.cmd = cmd
5✔
93
        h.closer = stdin
5✔
94
        h.running.Store(true)
5✔
95
        h.wgRun.Done()
5✔
96
        return h.run(ctx)
5✔
97
}
98

99
// Call sends a typed request and waits for the typed response.
100
// Returns ErrMalformedResponse if plugin returns a malformed JSON response.
101
// Returns ErrClosed if the plugin is closed.
102
func Call[Req any, Resp any](
103
        ctx context.Context, h *Host, method string, req Req,
104
) (Resp, error) {
14✔
105
        // Wait for the plugin to start.
14✔
106
        h.wgRun.Wait()
14✔
107

14✔
108
        var zero Resp
14✔
109
        if !h.running.Load() {
14✔
UNCOV
110
                return zero, ErrClosed
×
UNCOV
111
        }
×
112

113
        id := fmt.Sprintf("%x", h.idCounter.Add(1))
14✔
114
        raw, err := json.Marshal(req)
14✔
115
        if err != nil {
14✔
UNCOV
116
                return zero, fmt.Errorf("marshaling request: %w", err)
×
UNCOV
117
        }
×
118

119
        wait := make(chan envelope, 1)
14✔
120
        h.mu.Lock()
14✔
121
        h.pending[id] = wait
14✔
122
        h.mu.Unlock()
14✔
123

14✔
124
        if err := h.enc.Encode(envelope{ID: id, Method: method, Data: raw}); err != nil {
14✔
UNCOV
125
                return zero, err
×
UNCOV
126
        }
×
127

128
        select {
14✔
129
        case ev, ok := <-wait:
13✔
130
                h.mu.Lock()
13✔
131
                delete(h.pending, id)
13✔
132
                h.mu.Unlock()
13✔
133
                if !ok {
13✔
UNCOV
134
                        return zero, ErrClosed
×
UNCOV
135
                }
×
136
                if ev.Error != "" {
22✔
137
                        return zero, ErrorResponse(ev.Error)
9✔
138
                }
9✔
139
                if err := json.Unmarshal(ev.Data, &zero); err != nil {
5✔
140
                        return zero, fmt.Errorf("%w: %w", ErrMalformedResponse, err)
1✔
141
                }
1✔
142
                return zero, nil
3✔
143
        case <-ctx.Done():
1✔
144
                h.mu.Lock()
1✔
145
                delete(h.pending, id)
1✔
146
                h.mu.Unlock()
1✔
147
                // Send cancelation message.
1✔
148
                if err := h.enc.Encode(envelope{Cancel: id}); err != nil {
1✔
UNCOV
149
                        return zero, err
×
UNCOV
150
                }
×
151
                return zero, ctx.Err()
1✔
152
        }
153
}
154

155
// Close closes stdin (signals EOF) and waits for plugin exit.
156
// No-op if already closed.
157
func (h *Host) Close() error {
5✔
158
        wasRunning := h.running.Swap(false)
5✔
159
        if !wasRunning {
5✔
UNCOV
160
                return nil
×
UNCOV
161
        }
×
162
        if h.closer != nil {
10✔
163
                _ = h.closer.Close()
5✔
164
        }
5✔
165
        if h.cmd != nil {
10✔
166
                return h.cmd.Wait()
5✔
167
        }
5✔
UNCOV
168
        return nil
×
169
}
170

171
func (h *Host) run(ctx context.Context) error {
5✔
172
        for {
24✔
173
                var ev envelope
19✔
174
                if err := h.dec.Decode(&ev); err != nil {
24✔
175
                        // broadcast EOF to waiters
5✔
176
                        h.mu.Lock()
5✔
177
                        for _, ch := range h.pending {
5✔
UNCOV
178
                                close(ch)
×
UNCOV
179
                        }
×
180
                        h.mu.Unlock()
5✔
181
                        return err
5✔
182
                }
183
                h.mu.Lock()
14✔
184
                ch := h.pending[ev.ID]
14✔
185
                h.mu.Unlock()
14✔
186
                if ch != nil {
27✔
187
                        select {
13✔
188
                        case ch <- ev:
13✔
UNCOV
189
                        case <-ctx.Done():
×
UNCOV
190
                                return ctx.Err()
×
191
                        }
192
                }
193
        }
194
}
195

196
type Plugin struct {
197
        enc          *json.Encoder
198
        dec          *json.Decoder
199
        endpoints    map[string]func(context.Context, json.RawMessage) (any, error)
200
        running      atomic.Bool
201
        wgDispatcher sync.WaitGroup
202
        lockCancel   sync.Mutex                    // protects cancel
203
        cancel       map[string]context.CancelFunc // id → cancel func
204
}
205

206
// NewPlugin binds to the process’ own stdin/stdout.
207
func NewPlugin() *Plugin {
×
208
        return &Plugin{
×
209
                enc:       json.NewEncoder(os.Stdout),
×
210
                dec:       json.NewDecoder(bufio.NewReader(os.Stdin)),
×
211
                endpoints: map[string]func(context.Context, json.RawMessage) (any, error){},
×
UNCOV
212
                cancel:    make(map[string]context.CancelFunc),
×
UNCOV
213
        }
×
UNCOV
214
}
×
215

216
// Handle registers an RPC endpoint overwriting any existing endpoint.
217
// Must be used before Run is invoked!
218
//
219
// WARNING: Logs must be written to os.Stderr because os.Stdout is reserved
220
// for host-plugin communication!
221
func Handle[Req any, Resp any](
222
        p *Plugin,
223
        name string,
224
        fn func(context.Context, Req) (Resp, error),
UNCOV
225
) {
×
226
        if p.running.Load() {
×
227
                panic("add handlers before invoking Run")
×
228
        }
229
        p.endpoints[name] = func(ctx context.Context, raw json.RawMessage) (any, error) {
×
230
                var req Req
×
231
                if err := json.Unmarshal(raw, &req); err != nil {
×
232
                        var zero Resp
×
UNCOV
233
                        return zero, err
×
UNCOV
234
                }
×
UNCOV
235
                return fn(ctx, req)
×
236
        }
237
}
238

239
// Run blocks handling requests until stdin closes or ctx is done.
240
// Return value is suitable for os.Exit().
UNCOV
241
func (p *Plugin) Run(ctx context.Context) (osReturnCode int) {
×
242
        if wasRunning := p.running.Swap(true); wasRunning {
×
243
                panic("plugin is already running")
×
244
        }
245
        for {
×
246
                if ctx.Err() != nil {
×
247
                        // Run canceled.
×
248
                        return 0
×
249
                }
×
250
                var e envelope
×
251
                if err := p.dec.Decode(&e); err != nil {
×
UNCOV
252
                        // stdin closed – clean exit
×
253
                        return 0
×
254
                }
×
255

256
                switch {
×
257
                case e.Cancel != "":
×
258
                        // Cancelation message received.
×
259
                        p.lockCancel.Lock()
×
260
                        if cancelFn, ok := p.cancel[e.Cancel]; ok {
×
261
                                cancelFn() // Abort the worker goroutine.
×
262
                                delete(p.cancel, e.Cancel)
×
263
                        }
×
264
                        p.lockCancel.Unlock()
×
UNCOV
265
                        continue // No reply for cancel.
×
UNCOV
266
                case e.ID == "":
×
267
                        panic(`protocol violation: both "id" and "cancel" empty`)
×
268
                }
269

270
                ctxCancelable, cancelFn := context.WithCancel(ctx)
×
271

×
272
                p.lockCancel.Lock()
×
273
                p.cancel[e.ID] = cancelFn
×
274
                p.lockCancel.Unlock()
×
UNCOV
275

×
UNCOV
276
                p.wgDispatcher.Add(1)
×
UNCOV
277
                go p.dispatch(ctxCancelable, e)
×
278
        }
279
}
280

281
func (p *Plugin) dispatch(ctx context.Context, ev envelope) {
×
282
        // Register cancelation function.
×
283
        ctx, cancelFn := context.WithCancel(ctx)
×
284

×
285
        p.lockCancel.Lock()
×
286
        p.cancel[ev.ID] = cancelFn
×
287
        p.lockCancel.Unlock()
×
288

×
289
        defer func() {
×
290
                // Clean up cancelation function and release dispatcher slot.
×
291
                p.lockCancel.Lock()
×
292
                delete(p.cancel, ev.ID)
×
293
                p.lockCancel.Unlock()
×
UNCOV
294
                cancelFn()
×
295
                p.wgDispatcher.Done()
×
296
        }()
×
297

298
        fn := p.endpoints[ev.Method]
×
299

×
300
        out := envelope{ID: ev.ID}
×
301

×
302
        if fn == nil {
×
UNCOV
303
                out.Error = "unknown method: " + ev.Method
×
304
                if err := p.enc.Encode(out); err != nil {
×
UNCOV
305
                        panic(fmt.Errorf("encoding unknown method response: %w", err))
×
306
                }
307
                return
×
308
        }
309
        data, err := fn(ctx, ev.Data)
×
310
        if err != nil {
×
311
                out.Error = err.Error()
×
312
        } else if data != nil {
×
313
                out.Data, _ = json.Marshal(data)
×
UNCOV
314
        }
×
UNCOV
315
        if err := p.enc.Encode(out); err != nil {
×
UNCOV
316
                panic(fmt.Errorf("encoding response: %w", err))
×
317
        }
318
}
319

320
var reModule = regexp.MustCompile(`^[\w.\-]+(\.[\w.\-]+)+/[\w.\-/]+(@[\w.\-]+)?$`)
321

322
func spawn(plugin string) (*exec.Cmd, error) {
5✔
323
        switch {
5✔
324
        case reModule.MatchString(plugin):
×
325
                if err := requireGo(); err != nil {
×
UNCOV
326
                        return nil, err
×
UNCOV
327
                }
×
328
                return exec.Command("go", "run", plugin), nil
×
329
        case isGoFile(plugin):
1✔
330
                if err := requireGo(); err != nil {
1✔
UNCOV
331
                        return nil, err
×
UNCOV
332
                }
×
333
                cmd := exec.Command("go", "run", plugin)
1✔
334
                return cmd, nil
1✔
335
        case isLocalGoPackage(plugin):
3✔
336
                if err := requireGo(); err != nil {
3✔
UNCOV
337
                        return nil, err
×
UNCOV
338
                }
×
339
                cmd := exec.Command("go", "run", ".")
3✔
340
                cmd.Dir = plugin
3✔
341
                return cmd, nil
3✔
342
        case isExecutable(plugin):
1✔
343
                return exec.Command(plugin), nil
1✔
UNCOV
344
        default:
×
UNCOV
345
                return nil, errors.New("invalid plugin path")
×
346
        }
347
}
348

349
func isGoFile(p string) bool {
5✔
350
        abs, err := filepath.Abs(p)
5✔
351
        if err != nil {
5✔
UNCOV
352
                return false
×
353
        }
×
354
        info, err := os.Stat(abs)
5✔
355
        if err != nil {
5✔
UNCOV
356
                return false
×
UNCOV
357
        }
×
358
        return !info.IsDir() && filepath.Ext(abs) == ".go"
5✔
359
}
360

361
func isLocalGoPackage(p string) bool {
4✔
362
        abs, err := filepath.Abs(p)
4✔
363
        if err != nil {
4✔
UNCOV
364
                return false
×
365
        }
×
366
        info, err := os.Stat(abs)
4✔
367
        if err != nil {
4✔
UNCOV
368
                return false
×
UNCOV
369
        }
×
370
        if !info.IsDir() {
5✔
371
                return false
1✔
372
        }
1✔
373
        cmd := exec.Command("go", "list", "-m")
3✔
374
        cmd.Dir = abs
3✔
375
        err = cmd.Run()
3✔
376
        return err == nil
3✔
377
}
378

379
func isExecutable(p string) bool {
1✔
380
        abs, err := filepath.Abs(p)
1✔
381
        if err != nil {
1✔
UNCOV
382
                return false
×
383
        }
×
384
        info, err := os.Stat(abs)
1✔
385
        if err != nil || info.IsDir() {
1✔
386
                return false
×
387
        }
×
388
        if runtime.GOOS == "windows" {
1✔
UNCOV
389
                ext := filepath.Ext(abs)
×
UNCOV
390
                return ext == ".exe" || ext == ".bat" || ext == ".cmd"
×
UNCOV
391
        }
×
392
        return info.Mode().Perm()&0o111 != 0
1✔
393
}
394

395
func requireGo() error {
4✔
396
        if _, err := exec.LookPath("go"); err != nil {
4✔
NEW
397
                return ErrGoToolchainNotFound
×
UNCOV
398
        }
×
399
        return nil
4✔
400
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc