• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valksor / go-assern / 21232054917

22 Jan 2026 01:08AM UTC coverage: 52.533% (+4.6%) from 47.928%
21232054917

push

github

k0d3r1s
Add tool aliases, CLI disambiguation, and performance optimizations
Adds tool aliasing system, CLI colon notation, and caching for better performance:
- cmd: Add Execute() with colon notation support (e.g., list:servers)
- cmd: Integrate go-toolkit disambiguate for interactive command selection
- tools: Add alias management (SetAliases, AddAlias, RemoveAlias, ResolveAlias)
- tools: Improve ParsePrefixedName() to return errors instead of empty strings
- registry: Add double-checked locking cache to generic registry
- registry: Invalidate cache on mutations for consistency
- resources: Add proper error handling for ParsePrefixedURI()
- prompts: Add proper error handling for ParsePrefixedPromptName()
- instance: Add buffer pool (256KB) for proxy I/O operations
- instance: Replace time-based session IDs with atomic counter
- transport: Make ParseLogLevel() case-insensitive
- aggregator: Add benchmark_test.go with 8 benchmark functions
- aggregator: Add registry_test.go with 16 registry cache tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

101 of 153 new or added lines in 8 files covered. (66.01%)

475 existing lines in 8 files now uncovered.

1649 of 3139 relevant lines covered (52.53%)

127.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.08
/internal/instance/socket.go
1
package instance
2

3
import (
4
        "bufio"
5
        "bytes"
6
        "context"
7
        "encoding/json"
8
        "fmt"
9
        "io"
10
        "log/slog"
11
        "net"
12
        "os"
13
        "sync"
14
        "time"
15

16
        "github.com/mark3labs/mcp-go/mcp"
17
        "github.com/mark3labs/mcp-go/server"
18
)
19

20
// handshakeTimeout is the time to wait for the first message to determine
21
// if this is an internal command (ping) or an MCP client connection.
22
const handshakeTimeout = 100 * time.Millisecond
23

24
// Server manages the Unix socket for instance sharing.
25
type Server struct {
26
        socketPath string
27
        mcpServer  *server.MCPServer
28
        logger     *slog.Logger
29
        info       *Info
30

31
        listener net.Listener
32
        clients  map[net.Conn]struct{}
33
        mu       sync.Mutex
34
        wg       sync.WaitGroup
35
        done     chan struct{}
36
}
37

38
// NewServer creates a new instance sharing server.
39
func NewServer(socketPath string, mcpServer *server.MCPServer, logger *slog.Logger) *Server {
22✔
40
        cwd, _ := os.Getwd()
22✔
41

22✔
42
        return &Server{
22✔
43
                socketPath: socketPath,
22✔
44
                mcpServer:  mcpServer,
22✔
45
                logger:     logger,
22✔
46
                info: &Info{
22✔
47
                        PID:        os.Getpid(),
22✔
48
                        SocketPath: socketPath,
22✔
49
                        StartTime:  time.Now(),
22✔
50
                        WorkDir:    cwd,
22✔
51
                },
22✔
52
                clients: make(map[net.Conn]struct{}),
22✔
53
                done:    make(chan struct{}),
22✔
54
        }
22✔
55
}
22✔
56

57
// Start begins listening on the Unix socket.
58
func (s *Server) Start() error {
21✔
59
        // Remove stale socket if exists
21✔
60
        if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
21✔
61
                s.logger.Debug("failed to remove existing socket", "error", err)
×
UNCOV
62
        }
×
63

64
        var lc net.ListenConfig
21✔
65
        listener, err := lc.Listen(context.Background(), "unix", s.socketPath)
21✔
66
        if err != nil {
21✔
67
                return err
×
UNCOV
68
        }
×
69

70
        // Set socket permissions to owner only
71
        if err := os.Chmod(s.socketPath, 0o600); err != nil {
21✔
72
                _ = listener.Close()
×
73

×
74
                return err
×
UNCOV
75
        }
×
76

77
        s.listener = listener
21✔
78
        s.logger.Info("instance sharing socket listening", "path", s.socketPath)
21✔
79

21✔
80
        // Accept connections
21✔
81
        s.wg.Add(1)
21✔
82
        go s.acceptLoop()
21✔
83

21✔
84
        return nil
21✔
85
}
86

87
// Stop closes the socket and all client connections.
88
func (s *Server) Stop() error {
21✔
89
        close(s.done)
21✔
90

21✔
91
        if s.listener != nil {
42✔
92
                _ = s.listener.Close()
21✔
93
        }
21✔
94

95
        // Close all client connections
96
        s.mu.Lock()
21✔
97
        for conn := range s.clients {
30✔
98
                _ = conn.Close()
9✔
99
        }
9✔
100
        s.clients = make(map[net.Conn]struct{})
21✔
101
        s.mu.Unlock()
21✔
102

21✔
103
        s.wg.Wait()
21✔
104

21✔
105
        // Clean up socket file
21✔
106
        _ = os.Remove(s.socketPath)
21✔
107

21✔
108
        return nil
21✔
109
}
110

111
func (s *Server) acceptLoop() {
21✔
112
        defer s.wg.Done()
21✔
113

21✔
114
        for {
81✔
115
                conn, err := s.listener.Accept()
60✔
116
                if err != nil {
81✔
117
                        select {
21✔
118
                        case <-s.done:
21✔
119
                                return
21✔
120
                        default:
×
121
                                s.logger.Debug("accept error", "error", err)
×
122

×
UNCOV
123
                                continue
×
124
                        }
125
                }
126

127
                s.mu.Lock()
39✔
128
                s.clients[conn] = struct{}{}
39✔
129
                s.mu.Unlock()
39✔
130

39✔
131
                s.wg.Add(1)
39✔
132
                go s.handleConnection(conn)
39✔
133
        }
134
}
135

136
func (s *Server) handleConnection(conn net.Conn) {
39✔
137
        defer s.wg.Done()
39✔
138
        defer func() {
78✔
139
                s.mu.Lock()
39✔
140
                delete(s.clients, conn)
39✔
141
                s.mu.Unlock()
39✔
142
                _ = conn.Close()
39✔
143
        }()
39✔
144

145
        s.logger.Debug("client connected", "remote", conn.RemoteAddr())
39✔
146

39✔
147
        // Check for internal handshake command (ping/info) before starting MCP
39✔
148
        // This allows the detector to quickly check if an instance is running
39✔
149
        reader, handled := s.tryHandleInternalCommand(conn)
39✔
150
        if handled {
58✔
151
                s.logger.Debug("handled internal command, closing connection")
19✔
152

19✔
153
                return
19✔
154
        }
19✔
155

156
        // Not an internal command - proceed with MCP protocol
157
        // reader may contain buffered data from the handshake check
158
        s.serveMCP(conn, reader)
20✔
159
}
160

161
// tryHandleInternalCommand checks if the first message is an internal command.
162
// Returns the reader to use for subsequent reads and whether the command was handled.
163
// If handled is true, the connection should be closed.
164
// If handled is false, the returned reader should be used for MCP serving.
165
func (s *Server) tryHandleInternalCommand(conn net.Conn) (io.Reader, bool) {
39✔
166
        // Set deadline for reading first message
39✔
167
        if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
40✔
168
                s.logger.Debug("failed to set read deadline", "error", err)
1✔
169

1✔
170
                return conn, false
1✔
171
        }
1✔
172

173
        // Read first line (newline-delimited JSON)
174
        reader := bufio.NewReader(conn)
38✔
175
        line, err := reader.ReadBytes('\n')
38✔
176

38✔
177
        // Clear deadline for subsequent operations
38✔
178
        _ = conn.SetReadDeadline(time.Time{})
38✔
179

38✔
180
        if err != nil {
57✔
181
                // Timeout or error - not an internal command
19✔
182
                // ReadBytes may have read partial data into `line` before the error
19✔
183
                if len(line) > 0 {
19✔
184
                        // Prepend any data read so far, then continue reading from the buffered reader
×
UNCOV
185
                        return io.MultiReader(bytes.NewReader(line), reader), false
×
UNCOV
186
                }
×
187

188
                // No data was read - use the buffered reader directly
189
                return reader, false
19✔
190
        }
191

192
        // Try to parse as internal command
193
        var req struct {
19✔
194
                JSONRPC string `json:"jsonrpc"`
19✔
195
                ID      any    `json:"id"`
19✔
196
                Method  string `json:"method"`
19✔
197
        }
19✔
198

19✔
199
        if err := json.Unmarshal(line, &req); err != nil {
19✔
UNCOV
200
                // Not valid JSON - prepend the line and continue with MCP
×
UNCOV
201
                return io.MultiReader(bytes.NewReader(line), reader), false
×
UNCOV
202
        }
×
203

204
        // Check if it's an internal command
205
        switch req.Method {
19✔
206
        case "assern/ping", "assern/info":
19✔
207
                s.sendInternalResponse(conn, req.ID, s.info)
19✔
208

19✔
209
                return nil, true
19✔
210
        }
211

212
        // Not an internal command - prepend the message for MCP to process
UNCOV
213
        return io.MultiReader(bytes.NewReader(line), reader), false
×
214
}
215

216
func (s *Server) sendInternalResponse(conn net.Conn, id any, result any) {
19✔
217
        resp := map[string]any{
19✔
218
                "jsonrpc": "2.0",
19✔
219
                "id":      id,
19✔
220
                "result":  result,
19✔
221
        }
19✔
222

19✔
223
        data, err := json.Marshal(resp)
19✔
224
        if err != nil {
19✔
225
                s.logger.Debug("failed to marshal response", "error", err)
×
UNCOV
226

×
UNCOV
227
                return
×
UNCOV
228
        }
×
229

230
        data = append(data, '\n')
19✔
231

19✔
232
        if _, err := conn.Write(data); err != nil {
19✔
UNCOV
233
                s.logger.Debug("failed to write response", "error", err)
×
UNCOV
234
        }
×
235
}
236

237
func (s *Server) serveMCP(conn net.Conn, reader io.Reader) {
20✔
238
        // Create a context that cancels when server stops
20✔
239
        ctx, cancel := context.WithCancel(context.Background())
20✔
240
        go func() {
40✔
241
                <-s.done
20✔
242
                cancel()
20✔
243
        }()
20✔
244

245
        // Create a unique session for this socket connection.
246
        // This avoids conflicts with the "stdio" session used by the primary instance.
247
        session := newSocketSession()
20✔
248

20✔
249
        if err := s.mcpServer.RegisterSession(ctx, session); err != nil {
20✔
UNCOV
250
                s.logger.Debug("failed to register session", "error", err)
×
UNCOV
251

×
UNCOV
252
                return
×
UNCOV
253
        }
×
254
        defer func() {
40✔
255
                s.mcpServer.UnregisterSession(ctx, session.SessionID())
20✔
256
                session.close()
20✔
257
        }()
20✔
258

259
        // Add session to context for message handling
260
        ctx = s.mcpServer.WithContext(ctx, session)
20✔
261

20✔
262
        // Handle notifications from server to client in background
20✔
263
        go s.handleNotifications(ctx, session, conn)
20✔
264

20✔
265
        // Read and process MCP messages
20✔
266
        bufReader := bufio.NewReader(reader)
20✔
267

20✔
268
        for {
65✔
269
                // Check for context cancellation
45✔
270
                select {
45✔
271
                case <-ctx.Done():
2✔
272
                        return
2✔
273
                default:
43✔
274
                }
275

276
                line, err := bufReader.ReadString('\n')
43✔
277
                if err != nil {
61✔
278
                        if err != io.EOF && !s.isStopped() {
18✔
UNCOV
279
                                s.logger.Debug("client read error", "error", err)
×
UNCOV
280
                        }
×
281

282
                        return
18✔
283
                }
284

285
                if len(line) == 0 {
25✔
UNCOV
286
                        continue
×
287
                }
288

289
                // Parse as JSON-RPC message
290
                var rawMsg json.RawMessage
25✔
291
                if err := json.Unmarshal([]byte(line), &rawMsg); err != nil {
25✔
UNCOV
292
                        s.logger.Debug("invalid JSON message", "error", err)
×
UNCOV
293
                        s.writeErrorResponse(conn, nil, mcp.PARSE_ERROR, "Parse error")
×
UNCOV
294

×
UNCOV
295
                        continue
×
296
                }
297

298
                // Handle the message
299
                response := s.mcpServer.HandleMessage(ctx, rawMsg)
25✔
300
                if response != nil {
43✔
301
                        if err := s.writeJSONResponse(conn, response); err != nil {
18✔
UNCOV
302
                                s.logger.Debug("failed to write response", "error", err)
×
UNCOV
303

×
UNCOV
304
                                return
×
UNCOV
305
                        }
×
306
                }
307
        }
308
}
309

310
// handleNotifications forwards server notifications to the client connection.
311
func (s *Server) handleNotifications(ctx context.Context, session *socketSession, conn net.Conn) {
20✔
312
        for {
40✔
313
                select {
20✔
314
                case notification, ok := <-session.notifications:
14✔
315
                        if !ok {
28✔
316
                                return
14✔
317
                        }
14✔
318

UNCOV
319
                        if err := s.writeJSONResponse(conn, notification); err != nil {
×
UNCOV
320
                                s.logger.Debug("failed to write notification", "error", err)
×
UNCOV
321

×
UNCOV
322
                                return
×
UNCOV
323
                        }
×
324
                case <-ctx.Done():
6✔
325
                        return
6✔
326
                }
327
        }
328
}
329

330
// writeJSONResponse writes a JSON-RPC response followed by newline.
331
func (s *Server) writeJSONResponse(conn net.Conn, response any) error {
18✔
332
        data, err := json.Marshal(response)
18✔
333
        if err != nil {
18✔
UNCOV
334
                return fmt.Errorf("marshal response: %w", err)
×
UNCOV
335
        }
×
336

337
        data = append(data, '\n')
18✔
338

18✔
339
        if _, err := conn.Write(data); err != nil {
18✔
UNCOV
340
                return fmt.Errorf("write response: %w", err)
×
UNCOV
341
        }
×
342

343
        return nil
18✔
344
}
345

346
// writeErrorResponse writes a JSON-RPC error response.
UNCOV
347
func (s *Server) writeErrorResponse(conn net.Conn, id any, code int, message string) {
×
UNCOV
348
        response := map[string]any{
×
UNCOV
349
                "jsonrpc": "2.0",
×
UNCOV
350
                "id":      id,
×
UNCOV
351
                "error": map[string]any{
×
UNCOV
352
                        "code":    code,
×
UNCOV
353
                        "message": message,
×
UNCOV
354
                },
×
UNCOV
355
        }
×
UNCOV
356

×
UNCOV
357
        _ = s.writeJSONResponse(conn, response)
×
UNCOV
358
}
×
359

360
func (s *Server) isStopped() bool {
5✔
361
        select {
5✔
362
        case <-s.done:
3✔
363
                return true
3✔
364
        default:
2✔
365
                return false
2✔
366
        }
367
}
368

369
// extractJSONMessage attempts to extract a complete JSON message from buffer.
370
// Used by tests.
371
func extractJSONMessage(buf []byte) ([]byte, []byte, bool) {
4✔
372
        // Look for newline delimiter (JSON-RPC messages are newline-delimited)
4✔
373
        for i, b := range buf {
39✔
374
                if b == '\n' {
37✔
375
                        return buf[:i], buf[i+1:], true
2✔
376
                }
2✔
377
        }
378

379
        return nil, buf, false
2✔
380
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc