• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valksor / go-assern / 21338598851

25 Jan 2026 07:54PM UTC coverage: 45.704% (-6.8%) from 52.533%
21338598851

push

github

k0d3r1s
Updates go.sum dependencies

Removes an outdated dependency and updates to a newer version.
This ensures that the project uses the most recent and compatible version.

2101 of 4597 relevant lines covered (45.7%)

88.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.93
/internal/instance/socket.go
1
package instance
2

3
import (
4
        "bufio"
5
        "bytes"
6
        "context"
7
        "encoding/json"
8
        "fmt"
9
        "io"
10
        "log/slog"
11
        "net"
12
        "os"
13
        "sync"
14
        "time"
15

16
        "github.com/mark3labs/mcp-go/mcp"
17
        "github.com/mark3labs/mcp-go/server"
18

19
        "github.com/valksor/go-assern/internal/aggregator"
20
)
21

22
// handshakeTimeout is the time to wait for the first message to determine
23
// if this is an internal command (ping) or an MCP client connection.
24
const handshakeTimeout = 100 * time.Millisecond
25

26
// Server manages the Unix socket for instance sharing.
27
type Server struct {
28
        socketPath string
29
        mcpServer  *server.MCPServer
30
        aggregator *aggregator.Aggregator
31
        logger     *slog.Logger
32
        info       *Info
33

34
        listener net.Listener
35
        clients  map[net.Conn]struct{}
36
        mu       sync.Mutex
37
        wg       sync.WaitGroup
38
        done     chan struct{}
39
}
40

41
// NewServer creates a new instance sharing server.
42
func NewServer(socketPath string, mcpServer *server.MCPServer, agg *aggregator.Aggregator, logger *slog.Logger) *Server {
23✔
43
        cwd, _ := os.Getwd()
23✔
44

23✔
45
        return &Server{
23✔
46
                socketPath: socketPath,
23✔
47
                mcpServer:  mcpServer,
23✔
48
                aggregator: agg,
23✔
49
                logger:     logger,
23✔
50
                info: &Info{
23✔
51
                        PID:        os.Getpid(),
23✔
52
                        SocketPath: socketPath,
23✔
53
                        StartTime:  time.Now(),
23✔
54
                        WorkDir:    cwd,
23✔
55
                },
23✔
56
                clients: make(map[net.Conn]struct{}),
23✔
57
                done:    make(chan struct{}),
23✔
58
        }
23✔
59
}
23✔
60

61
// Start begins listening on the Unix socket.
62
func (s *Server) Start() error {
22✔
63
        // Remove stale socket if exists
22✔
64
        if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
22✔
65
                s.logger.Debug("failed to remove existing socket", "error", err)
×
66
        }
×
67

68
        var lc net.ListenConfig
22✔
69
        listener, err := lc.Listen(context.Background(), "unix", s.socketPath)
22✔
70
        if err != nil {
22✔
71
                return err
×
72
        }
×
73

74
        // Set socket permissions to owner only
75
        if err := os.Chmod(s.socketPath, 0o600); err != nil {
22✔
76
                _ = listener.Close()
×
77

×
78
                return err
×
79
        }
×
80

81
        s.listener = listener
22✔
82
        s.logger.Info("instance sharing socket listening", "path", s.socketPath)
22✔
83

22✔
84
        // Accept connections
22✔
85
        s.wg.Add(1)
22✔
86
        go s.acceptLoop()
22✔
87

22✔
88
        return nil
22✔
89
}
90

91
// Stop closes the socket and all client connections.
92
func (s *Server) Stop() error {
22✔
93
        close(s.done)
22✔
94

22✔
95
        if s.listener != nil {
44✔
96
                _ = s.listener.Close()
22✔
97
        }
22✔
98

99
        // Close all client connections
100
        s.mu.Lock()
22✔
101
        for conn := range s.clients {
30✔
102
                _ = conn.Close()
8✔
103
        }
8✔
104
        s.clients = make(map[net.Conn]struct{})
22✔
105
        s.mu.Unlock()
22✔
106

22✔
107
        s.wg.Wait()
22✔
108

22✔
109
        // Clean up socket file
22✔
110
        _ = os.Remove(s.socketPath)
22✔
111

22✔
112
        return nil
22✔
113
}
114

115
func (s *Server) acceptLoop() {
22✔
116
        defer s.wg.Done()
22✔
117

22✔
118
        for {
84✔
119
                conn, err := s.listener.Accept()
62✔
120
                if err != nil {
84✔
121
                        select {
22✔
122
                        case <-s.done:
22✔
123
                                return
22✔
124
                        default:
×
125
                                s.logger.Debug("accept error", "error", err)
×
126

×
127
                                continue
×
128
                        }
129
                }
130

131
                s.mu.Lock()
40✔
132
                s.clients[conn] = struct{}{}
40✔
133
                s.mu.Unlock()
40✔
134

40✔
135
                s.wg.Add(1)
40✔
136
                go s.handleConnection(conn)
40✔
137
        }
138
}
139

140
func (s *Server) handleConnection(conn net.Conn) {
40✔
141
        defer s.wg.Done()
40✔
142
        defer func() {
80✔
143
                s.mu.Lock()
40✔
144
                delete(s.clients, conn)
40✔
145
                s.mu.Unlock()
40✔
146
                _ = conn.Close()
40✔
147
        }()
40✔
148

149
        s.logger.Debug("client connected", "remote", conn.RemoteAddr())
40✔
150

40✔
151
        // Check for internal handshake command (ping/info) before starting MCP
40✔
152
        // This allows the detector to quickly check if an instance is running
40✔
153
        reader, handled := s.tryHandleInternalCommand(conn)
40✔
154
        if handled {
60✔
155
                s.logger.Debug("handled internal command, closing connection")
20✔
156

20✔
157
                return
20✔
158
        }
20✔
159

160
        // Not an internal command - proceed with MCP protocol
161
        // reader may contain buffered data from the handshake check
162
        s.serveMCP(conn, reader)
20✔
163
}
164

165
// tryHandleInternalCommand checks if the first message is an internal command.
166
// Returns the reader to use for subsequent reads and whether the command was handled.
167
// If handled is true, the connection should be closed.
168
// If handled is false, the returned reader should be used for MCP serving.
169
func (s *Server) tryHandleInternalCommand(conn net.Conn) (io.Reader, bool) {
40✔
170
        // Set deadline for reading first message
40✔
171
        if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
41✔
172
                s.logger.Debug("failed to set read deadline", "error", err)
1✔
173

1✔
174
                return conn, false
1✔
175
        }
1✔
176

177
        // Read first line (newline-delimited JSON)
178
        reader := bufio.NewReader(conn)
39✔
179
        line, err := reader.ReadBytes('\n')
39✔
180

39✔
181
        // Clear deadline for subsequent operations
39✔
182
        _ = conn.SetReadDeadline(time.Time{})
39✔
183

39✔
184
        if err != nil {
58✔
185
                // Timeout or error - not an internal command
19✔
186
                // ReadBytes may have read partial data into `line` before the error
19✔
187
                if len(line) > 0 {
19✔
188
                        // Prepend any data read so far, then continue reading from the buffered reader
×
189
                        return io.MultiReader(bytes.NewReader(line), reader), false
×
190
                }
×
191

192
                // No data was read - use the buffered reader directly
193
                return reader, false
19✔
194
        }
195

196
        // Try to parse as internal command
197
        var req struct {
20✔
198
                JSONRPC string `json:"jsonrpc"`
20✔
199
                ID      any    `json:"id"`
20✔
200
                Method  string `json:"method"`
20✔
201
        }
20✔
202

20✔
203
        if err := json.Unmarshal(line, &req); err != nil {
20✔
204
                // Not valid JSON - prepend the line and continue with MCP
×
205
                return io.MultiReader(bytes.NewReader(line), reader), false
×
206
        }
×
207

208
        // Check if it's an internal command
209
        switch req.Method {
20✔
210
        case "assern/ping", "assern/info":
19✔
211
                s.sendInternalResponse(conn, req.ID, s.info)
19✔
212

19✔
213
                return nil, true
19✔
214
        case "assern/reload":
1✔
215
                if s.aggregator == nil {
1✔
216
                        s.sendInternalError(conn, req.ID, "aggregator not available")
×
217
                } else {
1✔
218
                        ctx := context.Background()
1✔
219
                        result, err := s.aggregator.Reload(ctx)
1✔
220
                        if err != nil {
1✔
221
                                s.sendInternalError(conn, req.ID, err.Error())
×
222
                        } else {
1✔
223
                                s.sendInternalResponse(conn, req.ID, result)
1✔
224
                        }
1✔
225
                }
226

227
                return nil, true
1✔
228
        }
229

230
        // Not an internal command - prepend the message for MCP to process
231
        return io.MultiReader(bytes.NewReader(line), reader), false
×
232
}
233

234
func (s *Server) sendInternalResponse(conn net.Conn, id any, result any) {
20✔
235
        resp := map[string]any{
20✔
236
                "jsonrpc": "2.0",
20✔
237
                "id":      id,
20✔
238
                "result":  result,
20✔
239
        }
20✔
240

20✔
241
        data, err := json.Marshal(resp)
20✔
242
        if err != nil {
20✔
243
                s.logger.Debug("failed to marshal response", "error", err)
×
244

×
245
                return
×
246
        }
×
247

248
        data = append(data, '\n')
20✔
249

20✔
250
        if _, err := conn.Write(data); err != nil {
20✔
251
                s.logger.Debug("failed to write response", "error", err)
×
252
        }
×
253
}
254

255
func (s *Server) sendInternalError(conn net.Conn, id any, message string) {
×
256
        resp := map[string]any{
×
257
                "jsonrpc": "2.0",
×
258
                "id":      id,
×
259
                "error": map[string]any{
×
260
                        "code":    -32603, // Internal error
×
261
                        "message": message,
×
262
                },
×
263
        }
×
264

×
265
        data, err := json.Marshal(resp)
×
266
        if err != nil {
×
267
                s.logger.Debug("failed to marshal error response", "error", err)
×
268

×
269
                return
×
270
        }
×
271

272
        data = append(data, '\n')
×
273

×
274
        if _, err := conn.Write(data); err != nil {
×
275
                s.logger.Debug("failed to write error response", "error", err)
×
276
        }
×
277
}
278

279
func (s *Server) serveMCP(conn net.Conn, reader io.Reader) {
20✔
280
        // Create a context that cancels when server stops
20✔
281
        ctx, cancel := context.WithCancel(context.Background())
20✔
282
        go func() {
40✔
283
                <-s.done
20✔
284
                cancel()
20✔
285
        }()
20✔
286

287
        // Create a unique session for this socket connection.
288
        // This avoids conflicts with the "stdio" session used by the primary instance.
289
        session := newSocketSession()
20✔
290

20✔
291
        if err := s.mcpServer.RegisterSession(ctx, session); err != nil {
20✔
292
                s.logger.Debug("failed to register session", "error", err)
×
293

×
294
                return
×
295
        }
×
296
        defer func() {
40✔
297
                s.mcpServer.UnregisterSession(ctx, session.SessionID())
20✔
298
                session.close()
20✔
299
        }()
20✔
300

301
        // Add session to context for message handling
302
        ctx = s.mcpServer.WithContext(ctx, session)
20✔
303

20✔
304
        // Handle notifications from server to client in background
20✔
305
        go s.handleNotifications(ctx, session, conn)
20✔
306

20✔
307
        // Read and process MCP messages
20✔
308
        bufReader := bufio.NewReader(reader)
20✔
309

20✔
310
        for {
65✔
311
                // Check for context cancellation
45✔
312
                select {
45✔
313
                case <-ctx.Done():
3✔
314
                        return
3✔
315
                default:
42✔
316
                }
317

318
                line, err := bufReader.ReadString('\n')
42✔
319
                if err != nil {
59✔
320
                        if err != io.EOF && !s.isStopped() {
17✔
321
                                s.logger.Debug("client read error", "error", err)
×
322
                        }
×
323

324
                        return
17✔
325
                }
326

327
                if len(line) == 0 {
25✔
328
                        continue
×
329
                }
330

331
                // Parse as JSON-RPC message
332
                var rawMsg json.RawMessage
25✔
333
                if err := json.Unmarshal([]byte(line), &rawMsg); err != nil {
25✔
334
                        s.logger.Debug("invalid JSON message", "error", err)
×
335
                        s.writeErrorResponse(conn, nil, mcp.PARSE_ERROR, "Parse error")
×
336

×
337
                        continue
×
338
                }
339

340
                // Handle the message
341
                response := s.mcpServer.HandleMessage(ctx, rawMsg)
25✔
342
                if response != nil {
43✔
343
                        if err := s.writeJSONResponse(conn, response); err != nil {
18✔
344
                                s.logger.Debug("failed to write response", "error", err)
×
345

×
346
                                return
×
347
                        }
×
348
                }
349
        }
350
}
351

352
// handleNotifications forwards server notifications to the client connection.
353
func (s *Server) handleNotifications(ctx context.Context, session *socketSession, conn net.Conn) {
20✔
354
        for {
40✔
355
                select {
20✔
356
                case notification, ok := <-session.notifications:
14✔
357
                        if !ok {
28✔
358
                                return
14✔
359
                        }
14✔
360

361
                        if err := s.writeJSONResponse(conn, notification); err != nil {
×
362
                                s.logger.Debug("failed to write notification", "error", err)
×
363

×
364
                                return
×
365
                        }
×
366
                case <-ctx.Done():
6✔
367
                        return
6✔
368
                }
369
        }
370
}
371

372
// writeJSONResponse writes a JSON-RPC response followed by newline.
373
func (s *Server) writeJSONResponse(conn net.Conn, response any) error {
18✔
374
        data, err := json.Marshal(response)
18✔
375
        if err != nil {
18✔
376
                return fmt.Errorf("marshal response: %w", err)
×
377
        }
×
378

379
        data = append(data, '\n')
18✔
380

18✔
381
        if _, err := conn.Write(data); err != nil {
18✔
382
                return fmt.Errorf("write response: %w", err)
×
383
        }
×
384

385
        return nil
18✔
386
}
387

388
// writeErrorResponse writes a JSON-RPC error response.
389
func (s *Server) writeErrorResponse(conn net.Conn, id any, code int, message string) {
×
390
        response := map[string]any{
×
391
                "jsonrpc": "2.0",
×
392
                "id":      id,
×
393
                "error": map[string]any{
×
394
                        "code":    code,
×
395
                        "message": message,
×
396
                },
×
397
        }
×
398

×
399
        _ = s.writeJSONResponse(conn, response)
×
400
}
×
401

402
func (s *Server) isStopped() bool {
5✔
403
        select {
5✔
404
        case <-s.done:
3✔
405
                return true
3✔
406
        default:
2✔
407
                return false
2✔
408
        }
409
}
410

411
// extractJSONMessage attempts to extract a complete JSON message from buffer.
412
// Used by tests.
413
func extractJSONMessage(buf []byte) ([]byte, []byte, bool) {
4✔
414
        // Look for newline delimiter (JSON-RPC messages are newline-delimited)
4✔
415
        for i, b := range buf {
39✔
416
                if b == '\n' {
37✔
417
                        return buf[:i], buf[i+1:], true
2✔
418
                }
2✔
419
        }
420

421
        return nil, buf, false
2✔
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc