• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VolvoxLLC / volvox-bot / 23973509372

04 Apr 2026 06:45AM UTC coverage: 90.634% (+0.06%) from 90.57%
23973509372

Pull #427

github

web-flow
Merge 30cdfe79c into 4c34f9b07
Pull Request #427: feat(logs): guild-scoped log streaming with channel filtering

6779 of 7926 branches covered (85.53%)

Branch coverage included in aggregate %.

187 of 199 new or added lines in 30 files covered. (93.97%)

3 existing lines in 3 files now uncovered.

11471 of 12210 relevant lines covered (93.95%)

218.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.7
/src/api/ws/logStream.js
1
/**
2
 * WebSocket Log Stream Server
3
 *
4
 * Manages WebSocket connections for real-time log streaming.
5
 * Handles auth, client lifecycle, per-client filtering, and heartbeat.
6
 */
7

8
import { createHmac, timingSafeEqual } from 'node:crypto';
9
import WebSocket, { WebSocketServer } from 'ws';
10
import { info, error as logError, warn } from '../../logger.js';
11
import { queryLogs } from '../../utils/logQuery.js';
12

13
/** Maximum number of concurrent authenticated clients */
14
const MAX_CLIENTS = 10;
54✔
15

16
/** Heartbeat ping interval in milliseconds */
17
const HEARTBEAT_INTERVAL_MS = 30_000;
54✔
18

19
/** Auth timeout — clients must authenticate within this window */
20
const AUTH_TIMEOUT_MS = 10_000;
54✔
21

22
/** Number of historical log entries to send on connect */
23
const HISTORY_LIMIT = 100;
54✔
24

25
/** Sensitive metadata keys to strip before broadcasting */
26
const SENSITIVE_KEYS = new Set([
54✔
27
  'ip',
28
  'accessToken',
29
  'secret',
30
  'apiKey',
31
  'authorization',
32
  'password',
33
  'token',
34
  'stack',
35
  'cookie',
36
]);
37

38
/**
39
 * Strip sensitive keys from a metadata object.
40
 *
41
 * @param {Object} metadata - Raw metadata from log entry
42
 * @returns {Object} Sanitized metadata with sensitive keys removed
43
 */
44
function sanitizeMetadata(metadata) {
45
  if (!metadata || typeof metadata !== 'object') return {};
2✔
46
  const sanitized = {};
1✔
47
  for (const [key, value] of Object.entries(metadata)) {
1✔
48
    if (!SENSITIVE_KEYS.has(key)) {
2!
49
      sanitized[key] = value;
2✔
50
    }
51
  }
52
  return sanitized;
1✔
53
}
54

55
/**
56
 * @type {WebSocketServer | null}
57
 */
58
let wss = null;
54✔
59

60
/**
61
 * @type {ReturnType<typeof setInterval> | null}
62
 */
63
let heartbeatTimer = null;
54✔
64

65
/**
66
 * @type {import('../../transports/websocket.js').WebSocketTransport | null}
67
 */
68
let wsTransport = null;
54✔
69

70
/**
71
 * Count of currently authenticated clients.
72
 * @type {number}
73
 */
74
let authenticatedCount = 0;
54✔
75

76
/**
77
 * Set up the WebSocket server for log streaming.
78
 * Attaches to an existing HTTP server on path `/ws/logs`.
79
 *
80
 * @param {import('node:http').Server} httpServer - The HTTP server to attach to
81
 * @param {import('../../transports/websocket.js').WebSocketTransport} transport - The WebSocket Winston transport
82
 */
83
export function setupLogStream(httpServer, transport) {
84
  // Guard against double-call — cleanup previous instance first
85
  if (wss) {
57✔
86
    warn('setupLogStream called while already running — cleaning up previous instance');
2✔
87
    stopLogStream();
2✔
88
  }
89

90
  wsTransport = transport;
57✔
91

92
  wss = new WebSocketServer({
57✔
93
    server: httpServer,
94
    path: '/ws/logs',
95
  });
96

97
  wss.on('connection', handleConnection);
57✔
98

99
  // Heartbeat — ping all clients every 30s, terminate dead ones
100
  heartbeatTimer = setInterval(() => {
57✔
101
    if (!wss) return;
×
102

103
    for (const ws of wss.clients) {
×
104
      if (ws.isAlive === false) {
×
105
        info('Terminating dead WebSocket client', { reason: 'heartbeat timeout' });
×
106
        cleanupClient(ws);
×
107
        ws.terminate();
×
108
        continue;
×
109
      }
110
      ws.isAlive = false;
×
111
      ws.ping();
×
112
    }
113
  }, HEARTBEAT_INTERVAL_MS);
114

115
  if (heartbeatTimer.unref) {
57!
116
    heartbeatTimer.unref();
57✔
117
  }
118

119
  info('WebSocket log stream server started', { path: '/ws/logs' });
57✔
120
}
121

122
/**
123
 * Handle a new WebSocket connection.
124
 * Client must authenticate within AUTH_TIMEOUT_MS.
125
 *
126
 * @param {import('ws').WebSocket} ws
127
 */
128
function handleConnection(ws) {
129
  ws.isAlive = true;
64✔
130
  ws.authenticated = false;
64✔
131
  ws.guildId = null;
64✔
132
  ws.logFilter = null;
64✔
133

134
  // Set auth timeout
135
  ws.authTimeout = setTimeout(() => {
64✔
136
    if (!ws.authenticated) {
×
137
      ws.close(4001, 'Authentication timeout');
×
138
    }
139
  }, AUTH_TIMEOUT_MS);
140

141
  ws.on('pong', () => {
64✔
142
    ws.isAlive = true;
×
143
  });
144

145
  ws.on('message', (data) => {
64✔
146
    handleMessage(ws, data).catch((err) => {
81✔
147
      logError('Unhandled error in WebSocket message handler', { error: err.message });
×
148
    });
149
  });
150

151
  ws.on('close', () => {
64✔
152
    cleanupClient(ws);
64✔
153
  });
154

155
  ws.on('error', (err) => {
64✔
156
    logError('WebSocket client error', { error: err.message });
×
157
    cleanupClient(ws);
×
158
  });
159
}
160

161
/**
162
 * Handle an incoming message from a client.
163
 *
164
 * @param {import('ws').WebSocket} ws
165
 * @param {Buffer|string} data
166
 */
167
async function handleMessage(ws, data) {
168
  let msg;
169
  try {
81✔
170
    msg = JSON.parse(data.toString());
81✔
171
  } catch {
172
    sendError(ws, 'Invalid JSON');
2✔
173
    return;
2✔
174
  }
175

176
  if (!msg || typeof msg.type !== 'string') {
79✔
177
    sendError(ws, 'Missing message type');
2✔
178
    return;
2✔
179
  }
180

181
  switch (msg.type) {
77✔
182
    case 'auth':
183
      await handleAuth(ws, msg);
56✔
184
      break;
56✔
185

186
    case 'filter':
187
      handleFilter(ws, msg);
19✔
188
      break;
19✔
189

190
    default:
191
      sendError(ws, `Unknown message type: ${msg.type}`);
2✔
192
  }
193
}
194

195
/**
196
 * Validate an HMAC ticket of the form `nonce.expiry.hmac` (legacy)
197
 * or `nonce.expiry.guildId.hmac` (guild-bound).
198
 *
199
 * @param {string} ticket - The ticket string from the client
200
 * @param {string} secret - The BOT_API_SECRET used to derive the HMAC
201
 * @returns {{ valid: boolean, guildId: string | null }} Validation result and bound guild
202
 */
203
function validateTicket(ticket, secret) {
204
  if (typeof ticket !== 'string' || typeof secret !== 'string') {
54✔
205
    return { valid: false, guildId: null };
1✔
206
  }
207

208
  const parts = ticket.split('.');
53✔
209
  if (parts.length !== 3 && parts.length !== 4) {
53✔
210
    return { valid: false, guildId: null };
1✔
211
  }
212

213
  const [nonce, expiry, maybeGuildId, maybeHmac] = parts;
52✔
214
  const guildId = parts.length === 4 ? maybeGuildId : null;
52✔
215
  const hmac = parts.length === 4 ? maybeHmac : maybeGuildId;
54✔
216
  if (!nonce || !expiry || !hmac) {
54!
NEW
217
    return { valid: false, guildId: null };
×
218
  }
219
  if (parts.length === 4 && !guildId) {
52!
NEW
220
    return { valid: false, guildId: null };
×
221
  }
222

223
  // Check expiry — guard against NaN from non-numeric strings
224
  const expiryNum = Number(expiry);
52✔
225
  if (!Number.isFinite(expiryNum) || expiryNum <= Date.now()) {
52✔
226
    return { valid: false, guildId: null };
3✔
227
  }
228

229
  // Re-derive HMAC and compare with timing-safe equality
230
  const payload = guildId ? `${nonce}.${expiry}.${guildId}` : `${nonce}.${expiry}`;
49✔
231
  const expected = createHmac('sha256', secret).update(payload).digest('hex');
54✔
232

233
  try {
54✔
234
    return {
54✔
235
      valid: timingSafeEqual(Buffer.from(expected, 'hex'), Buffer.from(hmac, 'hex')),
236
      guildId,
237
    };
238
  } catch {
239
    return { valid: false, guildId: null };
1✔
240
  }
241
}
242

243
/**
244
 * Authenticate a WebSocket client using a ticket and deliver recent logs.
245
 *
246
 * Validates `msg.ticket`, enforces guild-scoped tickets and client limits, marks the socket as authenticated,
247
 * clears the auth timeout, sends an `auth_ok` acknowledgement, transmits up to `HISTORY_LIMIT` historical log
248
 * entries scoped to the authenticated guild, and registers the socket with the real-time transport.
249
 *
250
 * On invalid or legacy tickets the connection is closed with code 4003; when the server is at capacity the
251
 * connection is closed with code 4029. Historical log delivery failures are non-fatal and result in an empty
252
 * history being sent.
253
 *
254
 * @param {import('ws').WebSocket} ws - The WebSocket connection to authenticate; mutated to record authentication state and filters.
255
 * @param {Object} msg - The incoming message object; expected to contain a `ticket` string.
256
 */
257
async function handleAuth(ws, msg) {
258
  if (ws.authenticated) {
56✔
259
    sendError(ws, 'Already authenticated');
2✔
260
    return;
2✔
261
  }
262

263
  const authResult = validateTicket(msg.ticket, process.env.BOT_API_SECRET);
54✔
264
  if (!authResult.valid) {
54✔
265
    warn('WebSocket auth failed', { reason: 'invalid ticket' });
6✔
266
    ws.close(4003, 'Authentication failed');
6✔
267
    return;
6✔
268
  }
269

270
  // Reject legacy tickets without guild scope
271
  if (!authResult.guildId) {
48✔
272
    warn('WebSocket auth rejected — guild-scoped ticket required', { reason: 'legacy-ticket' });
2✔
273
    ws.close(4003, 'Guild-scoped ticket required');
2✔
274
    return;
2✔
275
  }
276

277
  // Check max client limit
278
  if (authenticatedCount >= MAX_CLIENTS) {
46✔
279
    warn('WebSocket max clients reached', { max: MAX_CLIENTS });
1✔
280
    ws.close(4029, 'Too many clients');
1✔
281
    return;
1✔
282
  }
283

284
  // Auth successful
285
  ws.authenticated = true;
45✔
286
  ws.guildId = authResult.guildId;
45✔
287
  authenticatedCount++;
45✔
288

289
  if (ws.authTimeout) {
45!
290
    clearTimeout(ws.authTimeout);
45✔
291
    ws.authTimeout = null;
45✔
292
  }
293

294
  sendJson(ws, { type: 'auth_ok' });
45✔
295

296
  info('WebSocket client authenticated', { totalClients: authenticatedCount });
45✔
297

298
  // Send historical logs BEFORE registering for real-time broadcast
299
  // to prevent race where live logs arrive before history and get overwritten
300
  try {
45✔
301
    // NOTE: Historical replay is guild-scoped only — channel filtering from a
302
    // subsequent filter message applies to the live stream only. Replaying filtered
303
    // history on every channel filter change is not worth the complexity for 100 entries.
304
    const { rows } = await queryLogs({ limit: HISTORY_LIMIT, guildId: ws.guildId || undefined });
45!
305
    // Reverse so oldest comes first (queryLogs returns DESC order)
306
    const logs = rows.reverse().map((row) => {
44✔
307
      const meta = sanitizeMetadata(row.metadata);
2✔
308
      return {
2✔
309
        level: row.level,
310
        message: row.message,
311
        metadata: meta,
312
        timestamp: row.timestamp,
313
        module: meta.module || null,
3✔
314
      };
315
    });
316
    sendJson(ws, { type: 'history', logs });
44✔
317
  } catch (err) {
318
    logError('Failed to send historical logs', { error: err.message });
1✔
319
    // Non-fatal — real-time streaming still works
320
    sendJson(ws, { type: 'history', logs: [] });
1✔
321
  }
322

323
  // Register with transport for real-time log broadcasting AFTER history is sent
324
  if (wsTransport) {
45✔
325
    wsTransport.addClient(ws);
44✔
326
  }
327
}
328

329
/**
330
 * Update the client's log filter based on a received filter message.
331
 *
332
 * If the connection is not authenticated the message is rejected. If `msg.guildId`
333
 * is present it must match the authenticated guild for the connection; otherwise
334
 * the message is rejected. On success the connection's `ws.logFilter` is set to
335
 * an object with the following shape and an acknowledgment `{ type: 'filter_ok', filter }`
336
 * is sent to the client.
337
 *
338
 * @param {import('ws').WebSocket} ws - The client's WebSocket connection.
339
 * @param {Object} msg - Filter message payload.
340
 * @param {string} [msg.guildId] - Optional guild id; if provided must match the authenticated guild.
341
 * @param {Array<any>} [msg.channelIds] - Optional array whose string entries become `channelIds`; empty or absent results in `null`.
342
 * @param {string} [msg.level] - Optional log level to filter by; non-strings become `null`.
343
 * @param {string} [msg.module] - Optional module name to filter by; non-strings become `null`.
344
 * @param {string} [msg.search] - Optional search string to filter log messages; non-strings become `null`.
345
 */
346
function handleFilter(ws, msg) {
347
  if (!ws.authenticated) {
19✔
348
    sendError(ws, 'Not authenticated');
2✔
349
    return;
2✔
350
  }
351

352
  if (msg.guildId && msg.guildId !== ws.guildId) {
17✔
353
    sendError(ws, 'Guild filter does not match authenticated guild');
2✔
354
    return;
2✔
355
  }
356

357
  const validChannelIds = Array.isArray(msg.channelIds)
15✔
358
    ? msg.channelIds.filter((id) => typeof id === 'string')
7✔
359
    : [];
360

361
  ws.logFilter = {
19✔
362
    guildId: ws.guildId || null,
19!
363
    channelIds: validChannelIds.length > 0 ? validChannelIds : null,
15✔
364
    level: typeof msg.level === 'string' ? msg.level : null,
15✔
365
    module: typeof msg.module === 'string' ? msg.module : null,
15✔
366
    search: typeof msg.search === 'string' ? msg.search : null,
15✔
367
  };
368

369
  sendJson(ws, { type: 'filter_ok', filter: ws.logFilter });
19✔
370
}
371

372
/**
373
 * Perform cleanup for a disconnecting WebSocket client: clear its auth timeout, reset authentication state, decrement the authenticated client count, and unregister it from the broadcast transport.
374
 *
375
 * @param {import('ws').WebSocket} ws - The client WebSocket being cleaned up.
376
 */
377
function cleanupClient(ws) {
378
  if (ws.authTimeout) {
121✔
379
    clearTimeout(ws.authTimeout);
19✔
380
    ws.authTimeout = null;
19✔
381
  }
382

383
  if (ws.authenticated) {
121✔
384
    ws.authenticated = false;
45✔
385
    ws.guildId = null;
45✔
386
    authenticatedCount = Math.max(0, authenticatedCount - 1);
45✔
387

388
    if (wsTransport) {
45✔
389
      wsTransport.removeClient(ws);
44✔
390
    }
391

392
    info('WebSocket client disconnected', { totalClients: authenticatedCount });
45✔
393
  }
394
}
395

396
/**
397
 * Send a JSON message to a client.
398
 *
399
 * @param {import('ws').WebSocket} ws
400
 * @param {Object} data
401
 */
402
function sendJson(ws, data) {
403
  try {
117✔
404
    if (ws.readyState === WebSocket.OPEN) {
117!
405
      ws.send(JSON.stringify(data));
117✔
406
    }
407
  } catch {
408
    // Ignore send errors — client cleanup happens elsewhere
409
  }
410
}
411

412
/**
413
 * Send an error message to a client.
414
 *
415
 * @param {import('ws').WebSocket} ws
416
 * @param {string} message
417
 */
418
function sendError(ws, message) {
419
  sendJson(ws, { type: 'error', message });
12✔
420
}
421

422
/**
423
 * Shut down the WebSocket server.
424
 * Closes all client connections and cleans up resources.
425
 *
426
 * @returns {Promise<void>}
427
 */
428
export async function stopLogStream() {
429
  if (heartbeatTimer) {
68✔
430
    clearInterval(heartbeatTimer);
57✔
431
    heartbeatTimer = null;
57✔
432
  }
433

434
  if (wss) {
68✔
435
    // Close all connected clients
436
    for (const ws of wss.clients) {
55✔
437
      cleanupClient(ws);
57✔
438
      ws.close(1001, 'Server shutting down');
57✔
439
    }
440

441
    await new Promise((resolve) => {
55✔
442
      wss.close(() => resolve());
55✔
443
    });
444

445
    wss = null;
55✔
446
    wsTransport = null;
55✔
447
    authenticatedCount = 0;
55✔
448
    info('WebSocket log stream server stopped', { module: 'logStream' });
55✔
449
  }
450
}
451

452
/**
453
 * Get the current count of authenticated clients.
454
 * Useful for health checks and monitoring.
455
 *
456
 * @returns {number}
457
 */
458
export function getAuthenticatedClientCount() {
459
  return authenticatedCount;
13✔
460
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc