• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VolvoxLLC / volvox-bot / 25209905825

01 May 2026 09:39AM UTC coverage: 90.204% (+0.02%) from 90.188%
25209905825

push

github

BillChirico
fix: mark analytics KPI props readonly

9583 of 11243 branches covered (85.24%)

Branch coverage included in aggregate %.

15169 of 16197 relevant lines covered (93.65%)

172.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.22
/src/api/ws/logStream.js
1
/**
2
 * WebSocket Log Stream Server
3
 *
4
 * Manages WebSocket connections for real-time log streaming.
5
 * Handles auth, client lifecycle, per-client filtering, and heartbeat.
6
 */
7

8
import { createHmac, timingSafeEqual } from 'node:crypto';
9
import WebSocket, { WebSocketServer } from 'ws';
10
import { info, error as logError, warn } from '../../logger.js';
11

12
/** Maximum number of concurrent authenticated clients */
13
const MAX_CLIENTS = 10;
47✔
14

15
/** Heartbeat ping interval in milliseconds */
16
const HEARTBEAT_INTERVAL_MS = 30_000;
47✔
17

18
/** Auth timeout — clients must authenticate within this window */
19
const AUTH_TIMEOUT_MS = 10_000;
47✔
20

21
/**
22
 * @type {WebSocketServer | null}
23
 */
24
let wss = null;
47✔
25

26
/**
27
 * @type {ReturnType<typeof setInterval> | null}
28
 */
29
let heartbeatTimer = null;
47✔
30

31
/**
32
 * @type {import('../../transports/websocket.js').WebSocketTransport | null}
33
 */
34
let wsTransport = null;
47✔
35

36
/**
37
 * Count of currently authenticated clients.
38
 * @type {number}
39
 */
40
let authenticatedCount = 0;
47✔
41

42
/**
43
 * Set up the WebSocket server for log streaming.
44
 * Attaches to an existing HTTP server on path `/ws/logs`.
45
 *
46
 * @param {import('node:http').Server} httpServer - The HTTP server to attach to
47
 * @param {import('../../transports/websocket.js').WebSocketTransport} transport - The WebSocket Winston transport
48
 */
49
export function setupLogStream(httpServer, transport) {
50
  // Guard against double-call — cleanup previous instance first
51
  if (wss) {
55✔
52
    warn('setupLogStream called while already running — cleaning up previous instance');
2✔
53
    stopLogStream();
2✔
54
  }
55

56
  wsTransport = transport;
55✔
57

58
  wss = new WebSocketServer({
55✔
59
    server: httpServer,
60
    path: '/ws/logs',
61
  });
62

63
  wss.on('connection', handleConnection);
55✔
64

65
  // Heartbeat — ping all clients every 30s, terminate dead ones
66
  heartbeatTimer = setInterval(() => {
55✔
67
    if (!wss) return;
×
68

69
    for (const ws of wss.clients) {
×
70
      if (ws.isAlive === false) {
×
71
        info('Terminating dead WebSocket client', { reason: 'heartbeat timeout' });
×
72
        cleanupClient(ws);
×
73
        ws.terminate();
×
74
        continue;
×
75
      }
76
      ws.isAlive = false;
×
77
      ws.ping();
×
78
    }
79
  }, HEARTBEAT_INTERVAL_MS);
80

81
  if (heartbeatTimer.unref) {
55!
82
    heartbeatTimer.unref();
55✔
83
  }
84

85
  info('WebSocket log stream server started', { path: '/ws/logs' });
55✔
86
}
87

88
/**
89
 * Handle a new WebSocket connection.
90
 * Client must authenticate within AUTH_TIMEOUT_MS.
91
 *
92
 * @param {import('ws').WebSocket} ws
93
 */
94
function handleConnection(ws) {
95
  ws.isAlive = true;
62✔
96
  ws.authenticated = false;
62✔
97
  ws.guildId = null;
62✔
98
  ws.logFilter = null;
62✔
99

100
  // Set auth timeout
101
  ws.authTimeout = setTimeout(() => {
62✔
102
    if (!ws.authenticated) {
×
103
      ws.close(4001, 'Authentication timeout');
×
104
    }
105
  }, AUTH_TIMEOUT_MS);
106

107
  ws.on('pong', () => {
62✔
108
    ws.isAlive = true;
×
109
  });
110

111
  ws.on('message', (data) => {
62✔
112
    handleMessage(ws, data).catch((err) => {
79✔
113
      logError('Unhandled error in WebSocket message handler', { error: err.message });
×
114
    });
115
  });
116

117
  ws.on('close', () => {
62✔
118
    cleanupClient(ws);
62✔
119
  });
120

121
  ws.on('error', (err) => {
62✔
122
    logError('WebSocket client error', { error: err.message });
×
123
    cleanupClient(ws);
×
124
  });
125
}
126

127
/**
128
 * Handle an incoming message from a client.
129
 *
130
 * @param {import('ws').WebSocket} ws
131
 * @param {Buffer|string} data
132
 */
133
async function handleMessage(ws, data) {
134
  let msg;
135
  try {
79✔
136
    msg = JSON.parse(data.toString());
79✔
137
  } catch {
138
    sendError(ws, 'Invalid JSON');
2✔
139
    return;
2✔
140
  }
141

142
  if (!msg || typeof msg.type !== 'string') {
77✔
143
    sendError(ws, 'Missing message type');
2✔
144
    return;
2✔
145
  }
146

147
  switch (msg.type) {
75✔
148
    case 'auth':
149
      await handleAuth(ws, msg);
54✔
150
      break;
54✔
151

152
    case 'filter':
153
      handleFilter(ws, msg);
19✔
154
      break;
19✔
155

156
    default:
157
      sendError(ws, `Unknown message type: ${msg.type}`);
2✔
158
  }
159
}
160

161
/**
162
 * Validate and verify an HMAC-signed ticket and extract its bound guild ID when present.
163
 *
164
 * The ticket must be either `nonce.expiry.hmac` (legacy) or `nonce.expiry.guildId.hmac` (guild-bound).
165
 * Validation checks that the ticket is well-formed, not expired, and that the HMAC matches
166
 * the HMAC computed with `secret`.
167
 *
168
 * @param {string} ticket - The client-provided ticket string in one of the two allowed formats.
169
 * @param {string} secret - The shared secret used to derive and verify the HMAC.
170
 * @returns {{ valid: boolean, guildId: string | null }} `valid` is `true` when the ticket is well-formed, not expired, and its HMAC matches; `guildId` is the bound guild ID when present, otherwise `null`.
171
 */
172
function validateTicket(ticket, secret) {
173
  if (typeof ticket !== 'string' || typeof secret !== 'string') {
52✔
174
    return { valid: false, guildId: null };
1✔
175
  }
176

177
  const parts = ticket.split('.');
51✔
178
  if (parts.length !== 3 && parts.length !== 4) {
51✔
179
    return { valid: false, guildId: null };
1✔
180
  }
181

182
  const [nonce, expiry, maybeGuildId, maybeHmac] = parts;
50✔
183
  const guildId = parts.length === 4 ? maybeGuildId : null;
50✔
184
  const hmac = parts.length === 4 ? maybeHmac : maybeGuildId;
52✔
185
  if (!nonce || !expiry || !hmac) {
52!
186
    return { valid: false, guildId: null };
×
187
  }
188
  if (parts.length === 4 && !guildId) {
50!
189
    return { valid: false, guildId: null };
×
190
  }
191

192
  // Check expiry — guard against NaN from non-numeric strings
193
  const expiryNum = Number(expiry);
50✔
194
  if (!Number.isFinite(expiryNum) || expiryNum <= Date.now()) {
50✔
195
    return { valid: false, guildId: null };
3✔
196
  }
197

198
  // Re-derive HMAC and compare with timing-safe equality
199
  const payload = guildId ? `${nonce}.${expiry}.${guildId}` : `${nonce}.${expiry}`;
47✔
200
  const expected = createHmac('sha256', secret).update(payload).digest('hex');
52✔
201

202
  try {
52✔
203
    return {
52✔
204
      valid: timingSafeEqual(Buffer.from(expected, 'hex'), Buffer.from(hmac, 'hex')),
205
      guildId,
206
    };
207
  } catch {
208
    return { valid: false, guildId: null };
1✔
209
  }
210
}
211

212
/**
213
 * Authenticate a WebSocket client using a ticket and register it for real-time log delivery.
214
 *
215
 * Validates the provided `msg.ticket`, requires a guild-scoped ticket, enforces the maximum concurrent
216
 * authenticated client limit, marks the socket as authenticated (setting `ws.authenticated` and `ws.guildId`),
217
 * clears the socket's authentication timeout, increments the module-level authenticated client count,
218
 * sends an `auth_ok` acknowledgement and an empty `history` payload, and registers the socket with the real-time transport.
219
 *
220
 * Observable failure behaviour:
221
 * - If the socket is already authenticated, an error message is sent and no state changes occur.
222
 * - If the ticket is invalid or a legacy (non-guild) ticket, the connection is closed with code 4003.
223
 * - If the server is at capacity, the connection is closed with code 4029.
224
 *
225
 * @param {import('ws').WebSocket} ws - WebSocket to authenticate; mutated to record authentication state, guildId, and to clear auth timeout.
226
 * @param {Object} msg - Incoming message object; expected to contain a `ticket` string.
227
 */
228
async function handleAuth(ws, msg) {
229
  if (ws.authenticated) {
54✔
230
    sendError(ws, 'Already authenticated');
2✔
231
    return;
2✔
232
  }
233

234
  const authResult = validateTicket(msg.ticket, process.env.BOT_API_SECRET);
52✔
235
  if (!authResult.valid) {
52✔
236
    warn('WebSocket auth failed', { reason: 'invalid ticket' });
6✔
237
    ws.close(4003, 'Authentication failed');
6✔
238
    return;
6✔
239
  }
240

241
  // Reject legacy tickets without guild scope
242
  if (!authResult.guildId) {
46✔
243
    warn('WebSocket auth rejected — guild-scoped ticket required', { reason: 'legacy-ticket' });
2✔
244
    ws.close(4003, 'Guild-scoped ticket required');
2✔
245
    return;
2✔
246
  }
247

248
  // Check max client limit
249
  if (authenticatedCount >= MAX_CLIENTS) {
44✔
250
    warn('WebSocket max clients reached', { max: MAX_CLIENTS });
1✔
251
    ws.close(4029, 'Too many clients');
1✔
252
    return;
1✔
253
  }
254

255
  // Auth successful
256
  ws.authenticated = true;
43✔
257
  ws.guildId = authResult.guildId;
43✔
258
  authenticatedCount++;
43✔
259

260
  if (ws.authTimeout) {
43!
261
    clearTimeout(ws.authTimeout);
43✔
262
    ws.authTimeout = null;
43✔
263
  }
264

265
  sendJson(ws, { type: 'auth_ok' });
43✔
266

267
  info('WebSocket client authenticated', { totalClients: authenticatedCount });
43✔
268

269
  sendJson(ws, { type: 'history', logs: [] });
43✔
270

271
  // Register with transport for real-time log broadcasting AFTER history is sent
272
  if (wsTransport) {
43✔
273
    wsTransport.addClient(ws);
42✔
274
  }
275
}
276

277
/**
278
 * Update the client's log filter based on a received filter message.
279
 *
280
 * If the connection is not authenticated the message is rejected. If `msg.guildId`
281
 * is present it must match the authenticated guild for the connection; otherwise
282
 * the message is rejected. On success the connection's `ws.logFilter` is set to
283
 * an object with the following shape and an acknowledgment `{ type: 'filter_ok', filter }`
284
 * is sent to the client.
285
 *
286
 * @param {import('ws').WebSocket} ws - The client's WebSocket connection.
287
 * @param {Object} msg - Filter message payload.
288
 * @param {string} [msg.guildId] - Optional guild id; if provided must match the authenticated guild.
289
 * @param {Array<any>} [msg.channelIds] - Optional array whose string entries become `channelIds`; empty or absent results in `null`.
290
 * @param {string} [msg.level] - Optional log level to filter by; non-strings become `null`.
291
 * @param {string} [msg.module] - Optional module name to filter by; non-strings become `null`.
292
 * @param {string} [msg.search] - Optional search string to filter log messages; non-strings become `null`.
293
 */
294
function handleFilter(ws, msg) {
295
  if (!ws.authenticated) {
19✔
296
    sendError(ws, 'Not authenticated');
2✔
297
    return;
2✔
298
  }
299

300
  if (msg.guildId && msg.guildId !== ws.guildId) {
17✔
301
    sendError(ws, 'Guild filter does not match authenticated guild');
2✔
302
    return;
2✔
303
  }
304

305
  const validChannelIds = Array.isArray(msg.channelIds)
15✔
306
    ? msg.channelIds.filter((id) => typeof id === 'string')
7✔
307
    : [];
308

309
  ws.logFilter = {
19✔
310
    guildId: ws.guildId || null,
19!
311
    channelIds: validChannelIds.length > 0 ? validChannelIds : null,
15✔
312
    level: typeof msg.level === 'string' ? msg.level : null,
15✔
313
    module: typeof msg.module === 'string' ? msg.module : null,
15✔
314
    search: typeof msg.search === 'string' ? msg.search : null,
15✔
315
  };
316

317
  sendJson(ws, { type: 'filter_ok', filter: ws.logFilter });
19✔
318
}
319

320
/**
321
 * Perform cleanup for a disconnecting WebSocket client: clear its auth timeout, reset authentication state, decrement the authenticated client count, and unregister it from the broadcast transport.
322
 *
323
 * @param {import('ws').WebSocket} ws - The client WebSocket being cleaned up.
324
 */
325
function cleanupClient(ws) {
326
  if (ws.authTimeout) {
117✔
327
    clearTimeout(ws.authTimeout);
19✔
328
    ws.authTimeout = null;
19✔
329
  }
330

331
  if (ws.authenticated) {
117✔
332
    ws.authenticated = false;
43✔
333
    ws.guildId = null;
43✔
334
    authenticatedCount = Math.max(0, authenticatedCount - 1);
43✔
335

336
    if (wsTransport) {
43✔
337
      wsTransport.removeClient(ws);
42✔
338
    }
339

340
    info('WebSocket client disconnected', { totalClients: authenticatedCount });
43✔
341
  }
342
}
343

344
/**
345
 * Send a JSON message to a client.
346
 *
347
 * @param {import('ws').WebSocket} ws
348
 * @param {Object} data
349
 */
350
function sendJson(ws, data) {
351
  try {
113✔
352
    if (ws.readyState === WebSocket.OPEN) {
113!
353
      ws.send(JSON.stringify(data));
113✔
354
    }
355
  } catch {
356
    // Ignore send errors — client cleanup happens elsewhere
357
  }
358
}
359

360
/**
361
 * Send an error message to a client.
362
 *
363
 * @param {import('ws').WebSocket} ws
364
 * @param {string} message
365
 */
366
function sendError(ws, message) {
367
  sendJson(ws, { type: 'error', message });
12✔
368
}
369

370
/**
371
 * Shut down the WebSocket server.
372
 * Closes all client connections and cleans up resources.
373
 *
374
 * @returns {Promise<void>}
375
 */
376
export async function stopLogStream() {
377
  if (heartbeatTimer) {
66✔
378
    clearInterval(heartbeatTimer);
55✔
379
    heartbeatTimer = null;
55✔
380
  }
381

382
  if (wss) {
66✔
383
    // Close all connected clients
384
    for (const ws of wss.clients) {
53✔
385
      cleanupClient(ws);
55✔
386
      ws.close(1001, 'Server shutting down');
55✔
387
    }
388

389
    await new Promise((resolve) => {
53✔
390
      wss.close(() => resolve());
53✔
391
    });
392

393
    wss = null;
53✔
394
    wsTransport = null;
53✔
395
    authenticatedCount = 0;
53✔
396
    info('WebSocket log stream server stopped', { module: 'logStream' });
53✔
397
  }
398
}
399

400
/**
401
 * Get the current count of authenticated clients.
402
 * Useful for health checks and monitoring.
403
 *
404
 * @returns {number}
405
 */
406
export function getAuthenticatedClientCount() {
407
  return authenticatedCount;
13✔
408
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc