• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valksor / go-mehrhof / 21342980857

26 Jan 2026 01:09AM UTC coverage: 40.055% (-0.5%) from 40.54%
21342980857

push

github

k0d3r1s
Update provider packages to use go-toolkit slug directly

Replaces internal/naming.Slugify wrapper with go-toolkit/slug.Slugify
across all provider packages. Updates mock event bus to use eventbus.

Changes:
- Update provider imports: remove internal/naming, add github.com/valksor/go-toolkit/slug
- Replace naming.Slugify() calls with slug.Slugify()
- Update mock_eventbus.go to use go-toolkit/eventbus types
- Rename local 'slug' variables to avoid shadowing package name

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

7 of 61 new or added lines in 31 files covered. (11.48%)

402 existing lines in 15 files now uncovered.

21307 of 53194 relevant lines covered (40.06%)

21.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.54
/internal/plugin/process.go
1
package plugin
2

3
import (
4
        "bufio"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "log/slog"
11
        "os"
12
        "os/exec"
13
        "path/filepath"
14
        "strings"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/valksor/go-toolkit/jsonrpc"
20
)
21

22
const (
23
        // defaultPluginBufferSize is the buffer size for plugin stdout/stderr.
24
        defaultPluginBufferSize = 1024 * 1024 // 1MB
25
        // pluginStopTimeout is the maximum time to wait for a plugin to stop gracefully.
26
        pluginStopTimeout = 10 * time.Second
27
)
28

29
// Process represents a running plugin process.
30
type Process struct {
31
        stdin      io.WriteCloser
32
        stderr     io.ReadCloser
33
        stdoutPipe io.ReadCloser // Original stdout pipe for explicit cleanup
34
        //nolint:containedctx // stored for plugin process lifecycle management
35
        ctx      context.Context
36
        err      error
37
        done     chan struct{}
38
        cmd      *exec.Cmd
39
        stdout   *bufio.Reader
40
        cancel   context.CancelFunc
41
        manifest *Manifest
42
        pending  map[int64]chan *jsonrpc.Response
43
        streamCh chan json.RawMessage
44
        reqID    atomic.Int64
45
        mu       sync.Mutex
46
        stopping bool
47
        started  bool
48
}
49

50
// startProcess spawns the plugin executable and sets up communication.
51
func startProcess(ctx context.Context, manifest *Manifest) (*Process, error) {
2✔
52
        cmdArgs := manifest.ExecutableCommand()
2✔
53
        if len(cmdArgs) == 0 {
2✔
UNCOV
54
                return nil, fmt.Errorf("no executable configured for plugin %s", manifest.Name)
×
UNCOV
55
        }
×
56

57
        // Validate the executable path for security
58
        // Ensure it's either an absolute path or a relative path within the plugin directory
59
        execPath := cmdArgs[0]
2✔
60
        if !filepath.IsAbs(execPath) {
4✔
61
                // Relative path - must be within the plugin directory
2✔
62
                if manifest.Dir == "" {
4✔
63
                        return nil, fmt.Errorf("plugin %s: relative executable path requires a valid plugin directory", manifest.Name)
2✔
64
                }
2✔
65
                execPath = filepath.Join(manifest.Dir, execPath)
×
66
                // Clean the path to resolve any ".." components
×
67
                execPath = filepath.Clean(execPath)
×
68
                // Resolve symlinks to prevent symlink-based directory escapes
×
69
                resolved, err := filepath.EvalSymlinks(execPath)
×
70
                if err != nil {
×
71
                        return nil, fmt.Errorf("plugin %s: resolve path: %w", manifest.Name, err)
×
72
                }
×
73
                execPath = resolved
×
74
                // Verify the resolved path is still within the plugin directory
×
75
                rel, err := filepath.Rel(manifest.Dir, execPath)
×
76
                if err != nil || strings.HasPrefix(rel, "..") {
×
UNCOV
77
                        return nil, fmt.Errorf("plugin %s: executable path %q escapes plugin directory", manifest.Name, cmdArgs[0])
×
UNCOV
78
                }
×
79
        }
80

81
        cmd := exec.CommandContext(ctx, execPath, cmdArgs[1:]...)
×
82
        cmd.Dir = manifest.Dir
×
83

×
84
        // Inherit environment and add plugin-specific vars
×
85
        cmd.Env = os.Environ()
×
86

×
87
        stdin, err := cmd.StdinPipe()
×
88
        if err != nil {
×
UNCOV
89
                return nil, fmt.Errorf("create stdin pipe: %w", err)
×
90
        }
×
91

92
        stdout, err := cmd.StdoutPipe()
×
93
        if err != nil {
×
94
                _ = stdin.Close()
×
95

×
UNCOV
96
                return nil, fmt.Errorf("create stdout pipe: %w", err)
×
97
        }
×
98

99
        stderr, err := cmd.StderrPipe()
×
100
        if err != nil {
×
101
                _ = stdin.Close()
×
102
                _ = stdout.Close()
×
103

×
UNCOV
104
                return nil, fmt.Errorf("create stderr pipe: %w", err)
×
UNCOV
105
        }
×
106

107
        // Create a context for this process that can be cancelled on shutdown
108
        procCtx, procCancel := context.WithCancel(ctx)
×
109

×
110
        proc := &Process{
×
111
                manifest:   manifest,
×
112
                cmd:        cmd,
×
113
                stdin:      stdin,
×
114
                stdoutPipe: stdout, // Store original pipe for explicit cleanup
×
115
                stdout:     bufio.NewReaderSize(stdout, defaultPluginBufferSize),
×
116
                stderr:     stderr,
×
117
                pending:    make(map[int64]chan *jsonrpc.Response),
×
118
                done:       make(chan struct{}),
×
119
                ctx:        procCtx,
×
120
                cancel:     procCancel,
×
121
        }
×
122

×
123
        if err := cmd.Start(); err != nil {
×
124
                procCancel()
×
125

×
UNCOV
126
                return nil, fmt.Errorf("start plugin %s: %w", manifest.Name, err)
×
127
        }
×
128

129
        proc.started = true
×
130

×
131
        // Start response reader goroutine
×
132
        go proc.readResponses()
×
133

×
134
        // Start stderr reader goroutine (for logging)
×
135
        go proc.readStderr()
×
UNCOV
136

×
UNCOV
137
        return proc, nil
×
138
}
139

140
// readResponses reads JSON-RPC responses and notifications from stdout.
141
func (p *Process) readResponses() {
×
142
        defer close(p.done)
×
143

×
144
        for {
×
145
                // Check if context was cancelled (e.g., during Stop)
×
146
                select {
×
147
                case <-p.ctx.Done():
×
148
                        // Context cancelled, exit gracefully
×
149
                        p.mu.Lock()
×
150
                        for id, ch := range p.pending {
×
151
                                close(ch)
×
152
                                delete(p.pending, id)
×
153
                        }
×
154
                        if p.streamCh != nil {
×
155
                                close(p.streamCh)
×
156
                                p.streamCh = nil
×
157
                        }
×
158
                        p.mu.Unlock()
×
159

×
UNCOV
160
                        return
×
UNCOV
161
                default:
×
162
                        // Continue reading
163
                }
164

165
                line, err := p.stdout.ReadBytes('\n')
×
166
                if err != nil {
×
167
                        if err != io.EOF {
×
UNCOV
168
                                p.err = fmt.Errorf("read stdout: %w", err)
×
169
                        }
×
170
                        // Close all pending requests
171
                        p.mu.Lock()
×
172
                        for id, ch := range p.pending {
×
173
                                close(ch)
×
174
                                delete(p.pending, id)
×
175
                        }
×
176
                        if p.streamCh != nil {
×
177
                                close(p.streamCh)
×
178
                                p.streamCh = nil
×
179
                        }
×
180
                        p.mu.Unlock()
×
UNCOV
181

×
UNCOV
182
                        return
×
183
                }
184

185
                // Try to parse as response (has ID)
186
                var resp jsonrpc.Response
×
UNCOV
187
                if err := json.Unmarshal(line, &resp); err != nil {
×
UNCOV
188
                        continue // Skip malformed lines
×
189
                }
190

191
                if resp.ID != 0 {
×
192
                        // This is a response to a request
×
193
                        p.mu.Lock()
×
194
                        if ch, ok := p.pending[resp.ID]; ok {
×
195
                                ch <- &resp
×
196
                                delete(p.pending, resp.ID)
×
197
                        }
×
198
                        p.mu.Unlock()
×
199
                } else {
×
200
                        // This is a notification (streaming event)
×
201
                        var notif jsonrpc.Notification
×
UNCOV
202
                        if err := json.Unmarshal(line, &notif); err != nil {
×
UNCOV
203
                                continue
×
204
                        }
205

206
                        if notif.Method == "stream" {
×
207
                                p.mu.Lock()
×
208
                                if p.streamCh != nil {
×
209
                                        // Marshal params back to JSON for the stream channel
×
210
                                        if paramsJSON, err := json.Marshal(notif.Params); err == nil {
×
211
                                                select {
×
UNCOV
212
                                                case p.streamCh <- paramsJSON:
×
UNCOV
213
                                                default:
×
214
                                                        // Channel full, drop event
215
                                                }
216
                                        }
217
                                }
UNCOV
218
                                p.mu.Unlock()
×
219
                        }
220
                }
221
        }
222
}
223

224
// readStderr reads stderr output for logging.
225
func (p *Process) readStderr() {
×
226
        reader := bufio.NewReaderSize(p.stderr, defaultPluginBufferSize)
×
227
        for {
×
228
                line, err := reader.ReadBytes('\n')
×
229
                if err != nil {
×
230
                        return
×
231
                }
×
232
                slog.Debug("plugin stderr",
×
UNCOV
233
                        "plugin", p.manifest.Name,
×
UNCOV
234
                        "output", strings.TrimSpace(string(line)))
×
235
        }
236
}
237

238
// Call sends a JSON-RPC request and waits for a response.
239
func (p *Process) Call(ctx context.Context, method string, params any) (json.RawMessage, error) {
2✔
240
        p.mu.Lock()
2✔
241
        if p.stopping {
2✔
242
                p.mu.Unlock()
×
243

×
UNCOV
244
                return nil, errors.New("plugin is stopping")
×
UNCOV
245
        }
×
246

247
        id := p.reqID.Add(1)
2✔
248
        ch := make(chan *jsonrpc.Response, 1)
2✔
249
        p.pending[id] = ch
2✔
250
        p.mu.Unlock()
2✔
251

2✔
252
        req := jsonrpc.NewRequest(id, method, params)
2✔
253
        data, err := json.Marshal(req)
2✔
254
        if err != nil {
2✔
255
                p.mu.Lock()
×
256
                delete(p.pending, id)
×
257
                p.mu.Unlock()
×
258

×
259
                return nil, fmt.Errorf("marshal request: %w", err)
×
260
        }
×
261
        data = append(data, '\n')
×
262

×
263
        p.mu.Lock()
×
264
        _, err = p.stdin.Write(data)
×
265
        p.mu.Unlock()
×
266
        if err != nil {
×
267
                p.mu.Lock()
×
268
                delete(p.pending, id)
×
269
                p.mu.Unlock()
×
270

×
UNCOV
271
                return nil, fmt.Errorf("write request: %w", err)
×
UNCOV
272
        }
×
273

274
        // Wait for response with context timeout
275
        select {
×
276
        case resp, ok := <-ch:
×
277
                if !ok {
×
278
                        return nil, errors.New("plugin process closed")
×
279
                }
×
280
                if resp.Error != nil {
×
UNCOV
281
                        return nil, resp.Error
×
282
                }
×
283

284
                return resp.Result, nil
×
285
        case <-ctx.Done():
×
286
                p.mu.Lock()
×
287
                delete(p.pending, id)
×
288
                p.mu.Unlock()
×
UNCOV
289

×
UNCOV
290
                return nil, ctx.Err()
×
291
        }
292
}
293

294
// Stream sends a JSON-RPC request that returns streaming events.
295
// Returns a channel that receives stream events until completion or error.
296
func (p *Process) Stream(ctx context.Context, method string, params any) (<-chan json.RawMessage, error) {
×
297
        p.mu.Lock()
×
298
        if p.stopping {
×
299
                p.mu.Unlock()
×
300

×
UNCOV
301
                return nil, errors.New("plugin is stopping")
×
UNCOV
302
        }
×
303

304
        // Set up stream channel
305
        p.streamCh = make(chan json.RawMessage, 100)
×
306
        streamCh := p.streamCh
×
307
        p.mu.Unlock()
×
308

×
309
        req := jsonrpc.NewRequest(0, method, params) // ID 0 for streaming (no response expected)
×
310
        data, err := json.Marshal(req)
×
311
        if err != nil {
×
312
                return nil, fmt.Errorf("marshal request: %w", err)
×
313
        }
×
314
        data = append(data, '\n')
×
315

×
316
        p.mu.Lock()
×
317
        _, err = p.stdin.Write(data)
×
318
        p.mu.Unlock()
×
319
        if err != nil {
×
UNCOV
320
                return nil, fmt.Errorf("write request: %w", err)
×
UNCOV
321
        }
×
322

323
        // Wrap channel to handle context cancellation
324
        out := make(chan json.RawMessage, 100)
×
325
        go func() {
×
326
                defer close(out)
×
327
                // Drain remaining events on exit to prevent goroutine leak
×
328
                defer func() {
×
329
                        for range streamCh {
×
UNCOV
330
                                // Drain any remaining events
×
331
                        }
×
332
                }()
333
                for {
×
334
                        select {
×
335
                        case event, ok := <-streamCh:
×
336
                                if !ok {
×
337
                                        return
×
338
                                }
×
339
                                select {
×
340
                                case out <- event:
×
UNCOV
341
                                case <-ctx.Done():
×
342
                                        return
×
343
                                }
UNCOV
344
                        case <-ctx.Done():
×
UNCOV
345
                                return
×
346
                        }
347
                }
348
        }()
349

UNCOV
350
        return out, nil
×
351
}
352

353
// Stop gracefully stops the plugin process.
354
func (p *Process) Stop(ctx context.Context) error {
×
355
        p.mu.Lock()
×
356
        if p.stopping {
×
357
                p.mu.Unlock()
×
358
                <-p.done
×
359

×
360
                return p.err
×
361
        }
×
362
        p.stopping = true
×
363
        p.mu.Unlock()
×
364

×
365
        // Signal goroutines to stop (unblocks ReadBytes)
×
366
        if p.cancel != nil {
×
UNCOV
367
                p.cancel()
×
UNCOV
368
        }
×
369

370
        // Try to send shutdown request - log error but continue with cleanup
371
        shutdownCtx, cancel := context.WithTimeout(ctx, pluginStopTimeout)
×
372
        defer cancel()
×
373
        if _, err := p.Call(shutdownCtx, "shutdown", nil); err != nil {
×
UNCOV
374
                slog.Warn("plugin shutdown request failed", "plugin", p.manifest.Name, "error", err)
×
UNCOV
375
        }
×
376

377
        // Close stdin to signal EOF
378
        if err := p.stdin.Close(); err != nil {
×
UNCOV
379
                slog.Warn("failed to close plugin stdin", "plugin", p.manifest.Name, "error", err)
×
UNCOV
380
        }
×
381

382
        // Close stderr pipe
383
        if p.stderr != nil {
×
384
                if err := p.stderr.Close(); err != nil {
×
385
                        slog.Warn("failed to close plugin stderr", "plugin", p.manifest.Name, "error", err)
×
UNCOV
386
                }
×
UNCOV
387
                p.stderr = nil
×
388
        }
389

390
        // Close stdout pipe explicitly (the underlying io.ReadCloser, not the bufio wrapper)
391
        if p.stdoutPipe != nil {
×
392
                if err := p.stdoutPipe.Close(); err != nil {
×
393
                        slog.Warn("failed to close plugin stdout", "plugin", p.manifest.Name, "error", err)
×
UNCOV
394
                }
×
395
                p.stdoutPipe = nil
×
396
        }
397
        p.stdout = nil // Clear the bufio.Reader reference
×
398

×
399
        // Wait for process with timeout
×
400
        done := make(chan error, 1)
×
401
        go func() {
×
UNCOV
402
                done <- p.cmd.Wait()
×
403
        }()
×
404

405
        timer := time.NewTimer(pluginStopTimeout)
×
406
        defer timer.Stop()
×
407

×
408
        select {
×
409
        case err := <-done:
×
410
                return err
×
411
        case <-timer.C:
×
412
                // Force kill - log error but still return wait result
×
413
                if err := p.cmd.Process.Kill(); err != nil {
×
UNCOV
414
                        slog.Warn("failed to kill plugin process", "plugin", p.manifest.Name, "error", err)
×
415
                }
×
416
                // Wait returns immediately for killed process
UNCOV
417
                return <-done
×
418
        }
419
}
420

421
// Manifest returns the plugin manifest.
422
func (p *Process) Manifest() *Manifest {
2✔
423
        return p.manifest
2✔
424
}
2✔
425

426
// IsRunning returns true if the process is still running.
427
func (p *Process) IsRunning() bool {
8✔
428
        select {
8✔
429
        case <-p.done:
2✔
430
                return false
2✔
431
        default:
6✔
432
                return p.started && !p.stopping
6✔
433
        }
434
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc