• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valksor / go-assern / 26285265118

22 May 2026 11:31AM UTC coverage: 51.35% (+5.7%) from 45.617%
26285265118

push

github

k0d3r1s
Ignores local Claude memory files

The local memory server creates files that are not meant for version control. These rules ensure they stay out of the repository.

2909 of 5665 relevant lines covered (51.35%)

75.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.93
/internal/instance/socket.go
1
package instance
2

3
import (
4
        "bufio"
5
        "bytes"
6
        "context"
7
        "encoding/json"
8
        "fmt"
9
        "io"
10
        "log/slog"
11
        "net"
12
        "os"
13
        "sync"
14
        "time"
15

16
        "github.com/mark3labs/mcp-go/mcp"
17
        "github.com/mark3labs/mcp-go/server"
18

19
        "github.com/valksor/go-assern/internal/aggregator"
20
)
21

22
// handshakeTimeout is the time to wait for the first message to determine
23
// if this is an internal command (ping) or an MCP client connection.
24
const handshakeTimeout = 100 * time.Millisecond
25

26
// JSON-RPC protocol constants shared across the instance socket protocol.
27
const (
28
        keyJSONRPC     = "jsonrpc"
29
        jsonrpcVersion = "2.0"
30
        keyMethod      = "method"
31
)
32

33
// Server manages the Unix socket for instance sharing.
34
type Server struct {
35
        socketPath string
36
        mcpServer  *server.MCPServer
37
        aggregator *aggregator.Aggregator
38
        logger     *slog.Logger
39
        info       *Info
40

41
        listener net.Listener
42
        clients  map[net.Conn]struct{}
43
        mu       sync.Mutex
44
        wg       sync.WaitGroup
45
        done     chan struct{}
46
}
47

48
// NewServer creates a new instance sharing server.
49
func NewServer(socketPath string, mcpServer *server.MCPServer, agg *aggregator.Aggregator, logger *slog.Logger) *Server {
23✔
50
        cwd, _ := os.Getwd()
23✔
51

23✔
52
        return &Server{
23✔
53
                socketPath: socketPath,
23✔
54
                mcpServer:  mcpServer,
23✔
55
                aggregator: agg,
23✔
56
                logger:     logger,
23✔
57
                info: &Info{
23✔
58
                        PID:        os.Getpid(),
23✔
59
                        SocketPath: socketPath,
23✔
60
                        StartTime:  time.Now(),
23✔
61
                        WorkDir:    cwd,
23✔
62
                },
23✔
63
                clients: make(map[net.Conn]struct{}),
23✔
64
                done:    make(chan struct{}),
23✔
65
        }
23✔
66
}
23✔
67

68
// Start begins listening on the Unix socket.
69
func (s *Server) Start() error {
22✔
70
        // Remove stale socket if exists
22✔
71
        if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
22✔
72
                s.logger.Debug("failed to remove existing socket", "error", err)
×
73
        }
×
74

75
        var lc net.ListenConfig
22✔
76
        listener, err := lc.Listen(context.Background(), "unix", s.socketPath)
22✔
77
        if err != nil {
22✔
78
                return err
×
79
        }
×
80

81
        // Set socket permissions to owner only
82
        if err := os.Chmod(s.socketPath, 0o600); err != nil {
22✔
83
                _ = listener.Close()
×
84

×
85
                return err
×
86
        }
×
87

88
        s.listener = listener
22✔
89
        s.logger.Info("instance sharing socket listening", "path", s.socketPath)
22✔
90

22✔
91
        // Accept connections
22✔
92
        s.wg.Add(1)
22✔
93
        go s.acceptLoop()
22✔
94

22✔
95
        return nil
22✔
96
}
97

98
// Stop closes the socket and all client connections.
99
func (s *Server) Stop() error {
22✔
100
        close(s.done)
22✔
101

22✔
102
        if s.listener != nil {
44✔
103
                _ = s.listener.Close()
22✔
104
        }
22✔
105

106
        // Close all client connections
107
        s.mu.Lock()
22✔
108
        for conn := range s.clients {
31✔
109
                _ = conn.Close()
9✔
110
        }
9✔
111
        s.clients = make(map[net.Conn]struct{})
22✔
112
        s.mu.Unlock()
22✔
113

22✔
114
        s.wg.Wait()
22✔
115

22✔
116
        // Clean up socket file
22✔
117
        _ = os.Remove(s.socketPath)
22✔
118

22✔
119
        return nil
22✔
120
}
121

122
func (s *Server) acceptLoop() {
22✔
123
        defer s.wg.Done()
22✔
124

22✔
125
        for {
83✔
126
                conn, err := s.listener.Accept()
61✔
127
                if err != nil {
83✔
128
                        select {
22✔
129
                        case <-s.done:
22✔
130
                                return
22✔
131
                        default:
×
132
                                s.logger.Debug("accept error", "error", err)
×
133

×
134
                                continue
×
135
                        }
136
                }
137

138
                s.mu.Lock()
39✔
139
                s.clients[conn] = struct{}{}
39✔
140
                s.mu.Unlock()
39✔
141

39✔
142
                s.wg.Add(1)
39✔
143
                go s.handleConnection(conn)
39✔
144
        }
145
}
146

147
func (s *Server) handleConnection(conn net.Conn) {
39✔
148
        defer s.wg.Done()
39✔
149
        defer func() {
78✔
150
                s.mu.Lock()
39✔
151
                delete(s.clients, conn)
39✔
152
                s.mu.Unlock()
39✔
153
                _ = conn.Close()
39✔
154
        }()
39✔
155

156
        s.logger.Debug("client connected", "remote", conn.RemoteAddr())
39✔
157

39✔
158
        // Check for internal handshake command (ping/info) before starting MCP
39✔
159
        // This allows the detector to quickly check if an instance is running
39✔
160
        reader, handled := s.tryHandleInternalCommand(conn)
39✔
161
        if handled {
59✔
162
                s.logger.Debug("handled internal command, closing connection")
20✔
163

20✔
164
                return
20✔
165
        }
20✔
166

167
        // Not an internal command - proceed with MCP protocol
168
        // reader may contain buffered data from the handshake check
169
        s.serveMCP(conn, reader)
19✔
170
}
171

172
// tryHandleInternalCommand checks if the first message is an internal command.
173
// Returns the reader to use for subsequent reads and whether the command was handled.
174
// If handled is true, the connection should be closed.
175
// If handled is false, the returned reader should be used for MCP serving.
176
func (s *Server) tryHandleInternalCommand(conn net.Conn) (io.Reader, bool) {
39✔
177
        // Set deadline for reading first message
39✔
178
        if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
41✔
179
                s.logger.Debug("failed to set read deadline", "error", err)
2✔
180

2✔
181
                return conn, false
2✔
182
        }
2✔
183

184
        // Read first line (newline-delimited JSON)
185
        reader := bufio.NewReader(conn)
37✔
186
        line, err := reader.ReadBytes('\n')
37✔
187

37✔
188
        // Clear deadline for subsequent operations
37✔
189
        _ = conn.SetReadDeadline(time.Time{})
37✔
190

37✔
191
        if err != nil {
54✔
192
                // Timeout or error - not an internal command
17✔
193
                // ReadBytes may have read partial data into `line` before the error
17✔
194
                if len(line) > 0 {
17✔
195
                        // Prepend any data read so far, then continue reading from the buffered reader
×
196
                        return io.MultiReader(bytes.NewReader(line), reader), false
×
197
                }
×
198

199
                // No data was read - use the buffered reader directly
200
                return reader, false
17✔
201
        }
202

203
        // Try to parse as internal command
204
        var req struct {
20✔
205
                JSONRPC string `json:"jsonrpc"`
20✔
206
                ID      any    `json:"id"`
20✔
207
                Method  string `json:"method"`
20✔
208
        }
20✔
209

20✔
210
        if err := json.Unmarshal(line, &req); err != nil {
20✔
211
                // Not valid JSON - prepend the line and continue with MCP
×
212
                return io.MultiReader(bytes.NewReader(line), reader), false
×
213
        }
×
214

215
        // Check if it's an internal command
216
        switch req.Method {
20✔
217
        case "assern/ping", "assern/info":
19✔
218
                s.sendInternalResponse(conn, req.ID, s.info)
19✔
219

19✔
220
                return nil, true
19✔
221
        case "assern/reload":
1✔
222
                if s.aggregator == nil {
1✔
223
                        s.sendInternalError(conn, req.ID, "aggregator not available")
×
224
                } else {
1✔
225
                        ctx := context.Background()
1✔
226
                        result, err := s.aggregator.Reload(ctx)
1✔
227
                        if err != nil {
1✔
228
                                s.sendInternalError(conn, req.ID, err.Error())
×
229
                        } else {
1✔
230
                                s.sendInternalResponse(conn, req.ID, result)
1✔
231
                        }
1✔
232
                }
233

234
                return nil, true
1✔
235
        }
236

237
        // Not an internal command - prepend the message for MCP to process
238
        return io.MultiReader(bytes.NewReader(line), reader), false
×
239
}
240

241
func (s *Server) sendInternalResponse(conn net.Conn, id any, result any) {
20✔
242
        resp := map[string]any{
20✔
243
                keyJSONRPC: jsonrpcVersion,
20✔
244
                "id":       id,
20✔
245
                "result":   result,
20✔
246
        }
20✔
247

20✔
248
        data, err := json.Marshal(resp)
20✔
249
        if err != nil {
20✔
250
                s.logger.Debug("failed to marshal response", "error", err)
×
251

×
252
                return
×
253
        }
×
254

255
        data = append(data, '\n')
20✔
256

20✔
257
        if _, err := conn.Write(data); err != nil {
20✔
258
                s.logger.Debug("failed to write response", "error", err)
×
259
        }
×
260
}
261

262
func (s *Server) sendInternalError(conn net.Conn, id any, message string) {
×
263
        resp := map[string]any{
×
264
                keyJSONRPC: jsonrpcVersion,
×
265
                "id":       id,
×
266
                "error": map[string]any{
×
267
                        "code":    -32603, // Internal error
×
268
                        "message": message,
×
269
                },
×
270
        }
×
271

×
272
        data, err := json.Marshal(resp)
×
273
        if err != nil {
×
274
                s.logger.Debug("failed to marshal error response", "error", err)
×
275

×
276
                return
×
277
        }
×
278

279
        data = append(data, '\n')
×
280

×
281
        if _, err := conn.Write(data); err != nil {
×
282
                s.logger.Debug("failed to write error response", "error", err)
×
283
        }
×
284
}
285

286
func (s *Server) serveMCP(conn net.Conn, reader io.Reader) {
19✔
287
        // Create a context that cancels when server stops
19✔
288
        ctx, cancel := context.WithCancel(context.Background())
19✔
289
        go func() {
38✔
290
                <-s.done
19✔
291
                cancel()
19✔
292
        }()
19✔
293

294
        // Create a unique session for this socket connection.
295
        // This avoids conflicts with the "stdio" session used by the primary instance.
296
        session := newSocketSession()
19✔
297

19✔
298
        if err := s.mcpServer.RegisterSession(ctx, session); err != nil {
19✔
299
                s.logger.Debug("failed to register session", "error", err)
×
300

×
301
                return
×
302
        }
×
303
        defer func() {
38✔
304
                s.mcpServer.UnregisterSession(ctx, session.SessionID())
19✔
305
                session.close()
19✔
306
        }()
19✔
307

308
        // Add session to context for message handling
309
        ctx = s.mcpServer.WithContext(ctx, session)
19✔
310

19✔
311
        // Handle notifications from server to client in background
19✔
312
        go s.handleNotifications(ctx, session, conn)
19✔
313

19✔
314
        // Read and process MCP messages
19✔
315
        bufReader := bufio.NewReader(reader)
19✔
316

19✔
317
        for {
63✔
318
                // Check for context cancellation
44✔
319
                select {
44✔
320
                case <-ctx.Done():
1✔
321
                        return
1✔
322
                default:
43✔
323
                }
324

325
                line, err := bufReader.ReadString('\n')
43✔
326
                if err != nil {
61✔
327
                        if err != io.EOF && !s.isStopped() {
18✔
328
                                s.logger.Debug("client read error", "error", err)
×
329
                        }
×
330

331
                        return
18✔
332
                }
333

334
                if len(line) == 0 {
25✔
335
                        continue
×
336
                }
337

338
                // Parse as JSON-RPC message
339
                var rawMsg json.RawMessage
25✔
340
                if err := json.Unmarshal([]byte(line), &rawMsg); err != nil {
25✔
341
                        s.logger.Debug("invalid JSON message", "error", err)
×
342
                        s.writeErrorResponse(conn, nil, mcp.PARSE_ERROR, "Parse error")
×
343

×
344
                        continue
×
345
                }
346

347
                // Handle the message
348
                response := s.mcpServer.HandleMessage(ctx, rawMsg)
25✔
349
                if response != nil {
43✔
350
                        if err := s.writeJSONResponse(conn, response); err != nil {
18✔
351
                                s.logger.Debug("failed to write response", "error", err)
×
352

×
353
                                return
×
354
                        }
×
355
                }
356
        }
357
}
358

359
// handleNotifications forwards server notifications to the client connection.
360
func (s *Server) handleNotifications(ctx context.Context, session *socketSession, conn net.Conn) {
19✔
361
        for {
38✔
362
                select {
19✔
363
                case notification, ok := <-session.notifications:
11✔
364
                        if !ok {
22✔
365
                                return
11✔
366
                        }
11✔
367

368
                        if err := s.writeJSONResponse(conn, notification); err != nil {
×
369
                                s.logger.Debug("failed to write notification", "error", err)
×
370

×
371
                                return
×
372
                        }
×
373
                case <-ctx.Done():
8✔
374
                        return
8✔
375
                }
376
        }
377
}
378

379
// writeJSONResponse writes a JSON-RPC response followed by newline.
380
func (s *Server) writeJSONResponse(conn net.Conn, response any) error {
18✔
381
        data, err := json.Marshal(response)
18✔
382
        if err != nil {
18✔
383
                return fmt.Errorf("marshal response: %w", err)
×
384
        }
×
385

386
        data = append(data, '\n')
18✔
387

18✔
388
        if _, err := conn.Write(data); err != nil {
18✔
389
                return fmt.Errorf("write response: %w", err)
×
390
        }
×
391

392
        return nil
18✔
393
}
394

395
// writeErrorResponse writes a JSON-RPC error response.
396
func (s *Server) writeErrorResponse(conn net.Conn, id any, code int, message string) {
×
397
        response := map[string]any{
×
398
                keyJSONRPC: jsonrpcVersion,
×
399
                "id":       id,
×
400
                "error": map[string]any{
×
401
                        "code":    code,
×
402
                        "message": message,
×
403
                },
×
404
        }
×
405

×
406
        _ = s.writeJSONResponse(conn, response)
×
407
}
×
408

409
func (s *Server) isStopped() bool {
6✔
410
        select {
6✔
411
        case <-s.done:
4✔
412
                return true
4✔
413
        default:
2✔
414
                return false
2✔
415
        }
416
}
417

418
// extractJSONMessage attempts to extract a complete JSON message from buffer.
419
// Used by tests.
420
func extractJSONMessage(buf []byte) ([]byte, []byte, bool) {
4✔
421
        // Look for newline delimiter (JSON-RPC messages are newline-delimited)
4✔
422
        for i, b := range buf {
39✔
423
                if b == '\n' {
37✔
424
                        return buf[:i], buf[i+1:], true
2✔
425
                }
2✔
426
        }
427

428
        return nil, buf, false
2✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc