• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 20918361171

12 Jan 2026 11:52AM UTC coverage: 86.641% (+0.04%) from 86.599%
20918361171

Pull #734

github

web-flow
Merge ce65fd2b6 into 2dd6aa2f8
Pull Request #734: Move away from Redis for key-value usages

1027 of 1222 branches covered (84.04%)

Branch coverage included in aggregate %.

35 of 50 new or added lines in 5 files covered. (70.0%)

5 existing lines in 2 files now uncovered.

10628 of 12230 relevant lines covered (86.9%)

106.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.34
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import { stdSerializers } from 'pino';
1✔
7
import { ulid, encodeTime } from 'ulid';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15
import { jsonb } from './utils/sqlite.js';
1✔
16
import { subMinutes } from './utils/date.js';
1✔
17

1✔
18
const { isEmpty } = lodash;
1✔
19

1✔
20
export const KEY_ACTIVE_SESSIONS = 'users';
1✔
21

1✔
22
const PING_INTERVAL = 10_000;
1✔
23
const GUEST_COUNT_INTERVAL = 2_000;
1✔
24

1✔
25
/**
1✔
26
 * @typedef {import('./schema.js').User} User
1✔
27
 */
1✔
28

1✔
29
/**
1✔
30
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {object} ClientActionParameters
1✔
35
 * @prop {string} sendChat
1✔
36
 * @prop {-1 | 1} vote
1✔
37
 * @prop {undefined} logout
1✔
38
 */
1✔
39

1✔
40
/**
1✔
41
 * @typedef {{
1✔
42
 *   [Name in keyof ClientActionParameters]: (
1✔
43
 *     user: User,
1✔
44
 *     parameter: ClientActionParameters[Name],
1✔
45
 *     connection: AuthedConnection
1✔
46
 *   ) => void
1✔
47
 * }} ClientActions
1✔
48
 */
1✔
49

1✔
50
const ajv = new Ajv({
1✔
51
  coerceTypes: false,
1✔
52
  ownProperties: true,
1✔
53
  removeAdditional: true,
1✔
54
  useDefaults: false,
1✔
55
});
1✔
56

1✔
57
/**
1✔
58
 * @template {object} T
1✔
59
 * @param {T} object
1✔
60
 * @param {PropertyKey} property
1✔
61
 * @returns {property is keyof T}
1✔
62
 */
1✔
63
function has(object, property) {
258✔
64
  return Object.prototype.hasOwnProperty.call(object, property);
258✔
65
}
258✔
66

1✔
67
class SocketServer {
1✔
68
  /**
175✔
69
   * @param {import('./Uwave.js').Boot} uw
175✔
70
   * @param {{ secret: Buffer|string }} options
175✔
71
   */
175✔
72
  static async plugin(uw, options) {
175✔
73
    uw.socketServer = new SocketServer(uw, {
175✔
74
      secret: options.secret,
175✔
75
      server: uw.server,
175✔
76
    });
175✔
77

175✔
78
    uw.after(async () => {
175✔
79
      try {
175✔
80
        await uw.socketServer.initLostConnections();
175✔
81
      } catch (err) {
175!
82
        // No need to prevent startup for this.
×
83
        // We do need to clear the `users` list because the lost connection handlers
×
84
        // will not do so.
×
85
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
86
        await uw.keyv.delete(KEY_ACTIVE_SESSIONS);
×
87
      }
×
88
    });
175✔
89

175✔
90
    uw.onClose(async () => {
175✔
91
      await uw.socketServer.destroy();
175✔
92
    });
175✔
93
  }
175✔
94

175✔
95
  #uw;
175✔
96

175✔
97
  #logger;
175✔
98

175✔
99
  #redisSubscription;
175✔
100

175✔
101
  #wss;
175✔
102

175✔
103
  #closing = false;
175✔
104

175✔
105
  /** @type {Connection[]} */
175✔
106
  #connections = [];
175✔
107

175✔
108
  #pinger;
175✔
109

175✔
110
  /** Update online guests count and broadcast an update if necessary. */
175✔
111
  #guestCountInterval;
175✔
112

175✔
113
  #guestCountDirty = true;
175✔
114

175✔
115
  /**
175✔
116
   * Handlers for commands that come in from clients.
175✔
117
   *
175✔
118
   * @type {ClientActions}
175✔
119
   */
175✔
120
  #clientActions;
175✔
121

175✔
122
  /**
175✔
123
   * @type {{
175✔
124
   *   [K in keyof ClientActionParameters]:
175✔
125
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
175✔
126
   * }}
175✔
127
   */
175✔
128
  #clientActionSchemas;
175✔
129

175✔
130
  /**
175✔
131
   * Handlers for commands that come in from the server side.
175✔
132
   *
175✔
133
   * @type {import('./redisMessages.js').ServerActions}
175✔
134
   */
175✔
135
  #serverActions;
175✔
136

175✔
137
  /**
175✔
138
   * Create a socket server.
175✔
139
   *
175✔
140
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
175✔
141
   * @param {object} options Socket server options.
175✔
142
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
175✔
143
   *     users to reconnect before removing them.
175✔
144
   * @param {Buffer|string} options.secret
175✔
145
   * @param {import('http').Server | import('https').Server} [options.server]
175✔
146
   * @param {number} [options.port]
175✔
147
   * @private
175✔
148
   */
175✔
149
  constructor(uw, options) {
175✔
150
    if (!uw) {
175!
151
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
152
    }
×
153

175✔
154
    this.#uw = uw;
175✔
155
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
175✔
156
      serializers: {
175✔
157
        req: stdSerializers.req,
175✔
158
      },
175✔
159
    });
175✔
160
    this.#redisSubscription = uw.redis.duplicate();
175✔
161

175✔
162
    this.options = {
175✔
163
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
175✔
164
      onError: (_socket, err) => {
175✔
165
        throw err;
×
166
      },
175✔
167
      timeout: 30,
175✔
168
      ...options,
175✔
169
    };
175✔
170

175✔
171
    // TODO put this behind a symbol, it's just public for tests
175✔
172
    this.authRegistry = new AuthRegistry(uw.redis);
175✔
173

175✔
174
    this.#wss = new WebSocketServer({
175✔
175
      server: options.server,
175✔
176
      port: options.server ? undefined : options.port,
175!
177
    });
175✔
178

175✔
179
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
175✔
180
    this.#redisSubscription.on('message', (channel, command) => {
175✔
181
      // this returns a promise, but we don't handle the error case:
247✔
182
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
247✔
183
      this.onServerMessage(channel, command);
247✔
184
    });
175✔
185

175✔
186
    this.#wss.on('error', (error) => {
175✔
187
      this.onError(error);
×
188
    });
175✔
189
    this.#wss.on('connection', (socket, request) => {
175✔
190
      this.onSocketConnected(socket, request);
29✔
191
    });
175✔
192

175✔
193
    this.#pinger = setInterval(() => {
175✔
194
      this.ping();
×
195
    }, PING_INTERVAL);
175✔
196

175✔
197
    this.#guestCountInterval = setInterval(() => {
175✔
198
      if (!this.#guestCountDirty) {
×
199
        return;
×
200
      }
×
201

×
NEW
202
      this.#recountGuests();
×
203
    }, GUEST_COUNT_INTERVAL);
175✔
204

175✔
205
    this.#clientActions = {
175✔
206
      sendChat: (user, message) => {
175✔
207
        this.#logger.trace({ user, message }, 'sendChat');
11✔
208
        this.#uw.chat.send(user, message);
11✔
209
      },
175✔
210
      vote: (user, direction) => {
175✔
211
        socketVote(this.#uw, user.id, direction);
×
212
      },
175✔
213
      logout: (user, _, connection) => {
175✔
214
        this.replace(connection, this.createGuestConnection(connection.socket));
×
215
        if (!this.connection(user)) {
×
216
          disconnectUser(this.#uw, user.id);
×
217
        }
×
218
      },
175✔
219
    };
175✔
220

175✔
221
    this.#clientActionSchemas = {
175✔
222
      sendChat: ajv.compile({
175✔
223
        type: 'string',
175✔
224
      }),
175✔
225
      vote: ajv.compile({
175✔
226
        type: 'integer',
175✔
227
        enum: [-1, 1],
175✔
228
      }),
175✔
229
      logout: ajv.compile(true),
175✔
230
    };
175✔
231

175✔
232
    this.#serverActions = {
175✔
233
      /**
175✔
234
       * Broadcast the next track.
175✔
235
       */
175✔
236
      'advance:complete': (next) => {
175✔
237
        if (next) {
18✔
238
          this.broadcast('advance', {
18✔
239
            historyID: next.historyID,
18✔
240
            userID: next.userID,
18✔
241
            media: next.media,
18✔
242
            playedAt: new Date(next.playedAt).getTime(),
18✔
243
          });
18✔
244
        } else {
18!
245
          this.broadcast('advance', null);
×
246
        }
×
247
      },
175✔
248
      /**
175✔
249
       * Broadcast a skip notification.
175✔
250
       */
175✔
251
      'booth:skip': ({ moderatorID, userID, reason }) => {
175✔
252
        this.broadcast('skip', { moderatorID, userID, reason });
×
253
      },
175✔
254
      /**
175✔
255
       * Broadcast a chat message.
175✔
256
       */
175✔
257
      'chat:message': (message) => {
175✔
258
        this.broadcast('chatMessage', message);
13✔
259
      },
175✔
260
      /**
175✔
261
       * Delete chat messages. The delete filter can have an _id property to
175✔
262
       * delete a specific message, a userID property to delete messages by a
175✔
263
       * user, or be empty to delete all messages.
175✔
264
       */
175✔
265
      'chat:delete': ({ moderatorID, filter }) => {
175✔
266
        if ('id' in filter) {
6✔
267
          this.broadcast('chatDeleteByID', {
2✔
268
            moderatorID,
2✔
269
            _id: filter.id,
2✔
270
          });
2✔
271
        } else if ('userID' in filter) {
6✔
272
          this.broadcast('chatDeleteByUser', {
2✔
273
            moderatorID,
2✔
274
            userID: filter.userID,
2✔
275
          });
2✔
276
        } else if (isEmpty(filter)) {
2✔
277
          this.broadcast('chatDelete', { moderatorID });
2✔
278
        }
2✔
279
      },
175✔
280
      /**
175✔
281
       * Broadcast that a user was muted in chat.
175✔
282
       */
175✔
283
      'chat:mute': ({ moderatorID, userID, duration }) => {
175✔
284
        this.broadcast('chatMute', {
2✔
285
          userID,
2✔
286
          moderatorID,
2✔
287
          expiresAt: Date.now() + duration,
2✔
288
        });
2✔
289
      },
175✔
290
      /**
175✔
291
       * Broadcast that a user was unmuted in chat.
175✔
292
       */
175✔
293
      'chat:unmute': ({ moderatorID, userID }) => {
175✔
294
        this.broadcast('chatUnmute', { userID, moderatorID });
×
295
      },
175✔
296
      /**
175✔
297
       * Broadcast a vote for the current track.
175✔
298
       */
175✔
299
      'booth:vote': ({ userID, direction }) => {
175✔
300
        this.broadcast('vote', {
2✔
301
          _id: userID,
2✔
302
          value: direction,
2✔
303
        });
2✔
304
      },
175✔
305
      /**
175✔
306
       * Broadcast a favorite for the current track.
175✔
307
       */
175✔
308
      'booth:favorite': ({ userID }) => {
175✔
309
        this.broadcast('favorite', { userID });
1✔
310
      },
175✔
311
      /**
175✔
312
       * Cycle a single user's playlist.
175✔
313
       */
175✔
314
      'playlist:cycle': ({ userID, playlistID }) => {
175✔
315
        this.sendTo(userID, 'playlistCycle', { playlistID });
18✔
316
      },
175✔
317
      /**
175✔
318
       * Broadcast that a user joined the waitlist.
175✔
319
       */
175✔
320
      'waitlist:join': ({ userID, waitlist }) => {
175✔
321
        this.broadcast('waitlistJoin', { userID, waitlist });
31✔
322
      },
175✔
323
      /**
175✔
324
       * Broadcast that a user left the waitlist.
175✔
325
       */
175✔
326
      'waitlist:leave': ({ userID, waitlist }) => {
175✔
327
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
328
      },
175✔
329
      /**
175✔
330
       * Broadcast that a user was added to the waitlist.
175✔
331
       */
175✔
332
      'waitlist:add': ({
175✔
333
        userID, moderatorID, position, waitlist,
1✔
334
      }) => {
1✔
335
        this.broadcast('waitlistAdd', {
1✔
336
          userID, moderatorID, position, waitlist,
1✔
337
        });
1✔
338
      },
175✔
339
      /**
175✔
340
       * Broadcast that a user was removed from the waitlist.
175✔
341
       */
175✔
342
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
175✔
343
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
344
      },
175✔
345
      /**
175✔
346
       * Broadcast that a user was moved in the waitlist.
175✔
347
       */
175✔
348
      'waitlist:move': ({
175✔
349
        userID, moderatorID, position, waitlist,
3✔
350
      }) => {
3✔
351
        this.broadcast('waitlistMove', {
3✔
352
          userID, moderatorID, position, waitlist,
3✔
353
        });
3✔
354
      },
175✔
355
      /**
175✔
356
       * Broadcast a waitlist update.
175✔
357
       */
175✔
358
      'waitlist:update': (waitlist) => {
175✔
359
        this.broadcast('waitlistUpdate', waitlist);
18✔
360
      },
175✔
361
      /**
175✔
362
       * Broadcast that the waitlist was cleared.
175✔
363
       */
175✔
364
      'waitlist:clear': ({ moderatorID }) => {
175✔
365
        this.broadcast('waitlistClear', { moderatorID });
×
366
      },
175✔
367
      /**
175✔
368
       * Broadcast that the waitlist was locked.
175✔
369
       */
175✔
370
      'waitlist:lock': ({ moderatorID, locked }) => {
175✔
371
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
372
      },
175✔
373

175✔
374
      'acl:allow': ({ userID, roles }) => {
175✔
375
        this.broadcast('acl:allow', { userID, roles });
77✔
376
      },
175✔
377
      'acl:disallow': ({ userID, roles }) => {
175✔
378
        this.broadcast('acl:disallow', { userID, roles });
1✔
379
      },
175✔
380

175✔
381
      'user:update': ({ userID, moderatorID, new: update }) => {
175✔
382
        // TODO Remove this remnant of the old roles system
1✔
383
        if ('role' in update) {
1!
384
          this.broadcast('roleChange', {
×
385
            moderatorID,
×
386
            userID,
×
387
            role: update.role,
×
388
          });
×
389
        }
×
390
        if ('username' in update) {
1✔
391
          this.broadcast('nameChange', {
1✔
392
            moderatorID,
1✔
393
            userID,
1✔
394
            username: update.username,
1✔
395
          });
1✔
396
        }
1✔
397
      },
175✔
398
      'user:join': async ({ userID }) => {
175✔
399
        const { users, keyv } = this.#uw;
28✔
400
        const user = await users.getUser(userID);
28✔
401
        if (user) {
28✔
402
          // TODO this should not be the socket server code's responsibility
28✔
403
          const userIDs = /** @type {import('./schema').UserID[] | null} */ (
28✔
404
            await keyv.get(KEY_ACTIVE_SESSIONS)
28✔
405
          ) ?? [];
28✔
406
          userIDs.push(user.id);
28✔
407
          await keyv.set(KEY_ACTIVE_SESSIONS, userIDs);
28✔
408
          this.broadcast('join', serializeUser(user));
28✔
409
        }
28✔
410
      },
175✔
411
      /**
175✔
412
       * Broadcast that a user left the server.
175✔
413
       */
175✔
414
      'user:leave': ({ userID }) => {
175✔
415
        this.broadcast('leave', userID);
×
416
      },
175✔
417
      /**
175✔
418
       * Broadcast a ban event.
175✔
419
       */
175✔
420
      'user:ban': ({
175✔
421
        moderatorID, userID, permanent = false, duration, expiresAt,
1✔
422
      }) => {
1✔
423
        this.broadcast('ban', {
1✔
424
          moderatorID, userID, permanent, duration, expiresAt,
1✔
425
        });
1✔
426

1✔
427
        this.#connections.forEach((connection) => {
1✔
428
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
429
            connection.ban();
×
430
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
431
            connection.close();
×
432
          }
×
433
        });
1✔
434
      },
175✔
435
      /**
175✔
436
       * Broadcast an unban event.
175✔
437
       */
175✔
438
      'user:unban': ({ moderatorID, userID }) => {
175✔
UNCOV
439
        this.broadcast('unban', { moderatorID, userID });
×
440
      },
175✔
441
      /**
175✔
442
       * Force-close a connection.
175✔
443
       */
175✔
444
      'http-api:socket:close': (userID) => {
175✔
445
        this.#connections.forEach((connection) => {
×
446
          if ('user' in connection && connection.user.id === userID) {
×
447
            connection.close();
×
448
          }
×
449
        });
×
450
      },
175✔
451

175✔
452
      'emotes:reload': () => {
175✔
453
        this.broadcast('reloadEmotes', null);
×
454
      },
175✔
455
    };
175✔
456
  }
175✔
457

175✔
458
  /**
175✔
459
   * Create `LostConnection`s for every user that's known to be online, but that
175✔
460
   * is not currently connected to the socket server.
175✔
461
   *
175✔
462
   * @private
175✔
463
   */
175✔
464
  async initLostConnections() {
175✔
465
    const { db, keyv } = this.#uw;
175✔
466
    const userIDs = /** @type {import('./schema').UserID[] | null} */ (
175✔
467
      await keyv.get(KEY_ACTIVE_SESSIONS)
175✔
468
    ) ?? [];
175✔
469
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
175✔
470

175✔
471
    if (disconnectedIDs.length === 0) {
175✔
472
      return;
175✔
473
    }
175✔
474

×
475
    const disconnectedUsers = await db.selectFrom('users')
×
476
      .where('id', 'in', disconnectedIDs)
×
477
      .selectAll()
×
478
      .execute();
×
479
    disconnectedUsers.forEach((user) => {
×
480
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
×
481
    });
×
482
  }
175✔
483

175✔
484
  /**
175✔
485
   * @param {import('ws').WebSocket} socket
175✔
486
   * @param {import('http').IncomingMessage} request
175✔
487
   * @private
175✔
488
   */
175✔
489
  onSocketConnected(socket, request) {
175✔
490
    this.#logger.info({ req: request }, 'new connection');
29✔
491

29✔
492
    socket.on('error', (error) => {
29✔
493
      this.onSocketError(socket, error);
×
494
    });
29✔
495
    this.add(this.createGuestConnection(socket));
29✔
496
  }
29✔
497

175✔
498
  /**
175✔
499
   * @param {import('ws').WebSocket} socket
175✔
500
   * @param {Error} error
175✔
501
   * @private
175✔
502
   */
175✔
503
  onSocketError(socket, error) {
175✔
504
    this.#logger.warn({ err: error }, 'socket error');
×
505

×
506
    this.options.onError(socket, error);
×
507
  }
×
508

175✔
509
  /**
175✔
510
   * @param {Error} error
175✔
511
   * @private
175✔
512
   */
175✔
513
  onError(error) {
175✔
514
    this.#logger.error({ err: error }, 'server error');
×
515

×
516
    this.options.onError(undefined, error);
×
517
  }
×
518

175✔
519
  /**
175✔
520
   * Get a LostConnection for a user, if one exists.
175✔
521
   *
175✔
522
   * @param {User} user
175✔
523
   * @private
175✔
524
   */
175✔
525
  getLostConnection(user) {
175✔
526
    return this.#connections.find((connection) => (
1✔
527
      connection instanceof LostConnection && connection.user.id === user.id
3✔
528
    ));
1✔
529
  }
1✔
530

175✔
531
  /**
175✔
532
   * Create a connection instance for an unauthenticated user.
175✔
533
   *
175✔
534
   * @param {import('ws').WebSocket} socket
175✔
535
   * @private
175✔
536
   */
175✔
537
  createGuestConnection(socket) {
175✔
538
    const connection = new GuestConnection(this.#uw, socket, {
29✔
539
      authRegistry: this.authRegistry,
29✔
540
    });
29✔
541
    connection.on('close', () => {
29✔
542
      this.remove(connection);
×
543
    });
29✔
544
    connection.on('authenticate', async (user, sessionID, lastEventID) => {
29✔
545
      const isReconnect = await connection.isReconnect(sessionID);
29✔
546
      this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
29✔
547
      if (isReconnect) {
29✔
548
        const previousConnection = this.getLostConnection(sessionID);
1✔
549
        if (previousConnection) this.remove(previousConnection);
1!
550
      }
1✔
551

29✔
552
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));
29✔
553

29✔
554
      if (!isReconnect) {
29✔
555
        this.#uw.publish('user:join', { userID: user.id });
28✔
556
      }
28✔
557
    });
29✔
558
    return connection;
29✔
559
  }
29✔
560

175✔
561
  /**
175✔
562
   * Create a connection instance for an authenticated user.
175✔
563
   *
175✔
564
   * @param {import('ws').WebSocket} socket
175✔
565
   * @param {User} user
175✔
566
   * @param {string} sessionID
175✔
567
   * @param {string|null} lastEventID
175✔
568
   * @returns {AuthedConnection}
175✔
569
   * @private
175✔
570
   */
175✔
571
  createAuthedConnection(socket, user, sessionID, lastEventID) {
175✔
572
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID, lastEventID);
29✔
573
    connection.on('close', ({ banned, lastEventID }) => {
29✔
574
      if (banned) {
29!
575
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
576
        disconnectUser(this.#uw, user.id);
×
577
      } else if (!this.#closing) {
29✔
578
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
579
        this.add(this.createLostConnection(user, sessionID, lastEventID));
2✔
580
      }
2✔
581
      this.remove(connection);
29✔
582
    });
29✔
583
    connection.on(
29✔
584
      'command',
29✔
585
      /**
29✔
586
       * @param {string} command
29✔
587
       * @param {import('type-fest').JsonValue} data
29✔
588
       */
29✔
589
      (command, data) => {
29✔
590
        this.#logger.trace({ userId: user.id, command, data }, 'command');
11✔
591
        if (has(this.#clientActions, command)) {
11✔
592
          // Ignore incorrect input
11✔
593
          const validate = this.#clientActionSchemas[command];
11✔
594
          if (validate && !validate(data)) {
11!
595
            return;
×
596
          }
×
597

11✔
598
          const action = this.#clientActions[command];
11✔
599
          // @ts-expect-error TS2345 `data` is validated
11✔
600
          action(user, data, connection);
11✔
601
        }
11✔
602
      },
29✔
603
    );
29✔
604
    return connection;
29✔
605
  }
29✔
606

175✔
607
  /**
175✔
608
   * Create a connection instance for a user who disconnected.
175✔
609
   *
175✔
610
   * @param {User} user
175✔
611
   * @param {string} sessionID
175✔
612
   * @param {string|null} lastEventID
175✔
613
   * @returns {LostConnection}
175✔
614
   * @private
175✔
615
   */
175✔
616
  createLostConnection(user, sessionID, lastEventID) {
175✔
617
    const connection = new LostConnection(
2✔
618
      this.#uw,
2✔
619
      user,
2✔
620
      sessionID,
2✔
621
      lastEventID,
2✔
622
      this.options.timeout,
2✔
623
    );
2✔
624
    connection.on('close', () => {
2✔
625
      this.#logger.info({ userId: user.id }, 'user left');
1✔
626
      this.remove(connection);
1✔
627
      // Only register that the user left if they didn't have another connection
1✔
628
      // still open.
1✔
629
      if (!this.connection(user)) {
1!
630
        disconnectUser(this.#uw, user.id);
×
631
      }
×
632
    });
2✔
633
    return connection;
2✔
634
  }
2✔
635

175✔
636
  /**
175✔
637
   * Add a connection.
175✔
638
   *
175✔
639
   * @param {Connection} connection
175✔
640
   * @private
175✔
641
   */
175✔
642
  add(connection) {
175✔
643
    const userID = 'user' in connection ? connection.user.id : null;
60✔
644
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
645
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
60✔
646

60✔
647
    this.#connections.push(connection);
60✔
648
    this.#guestCountDirty = true;
60✔
649
  }
60✔
650

175✔
651
  /**
175✔
652
   * Remove a connection.
175✔
653
   *
175✔
654
   * @param {Connection} connection
175✔
655
   * @private
175✔
656
   */
175✔
657
  remove(connection) {
175✔
658
    const userID = 'user' in connection ? connection.user.id : null;
59✔
659
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
59✔
660
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
59✔
661

59✔
662
    const i = this.#connections.indexOf(connection);
59✔
663
    this.#connections.splice(i, 1);
59✔
664

59✔
665
    connection.removed();
59✔
666
    this.#guestCountDirty = true;
59✔
667
  }
59✔
668

175✔
669
  /**
175✔
670
   * Replace a connection instance with another connection instance. Useful when
175✔
671
   * a connection changes "type", like GuestConnection → AuthedConnection.
175✔
672
   *
175✔
673
   * @param {Connection} oldConnection
175✔
674
   * @param {Connection} newConnection
175✔
675
   * @private
175✔
676
   */
175✔
677
  replace(oldConnection, newConnection) {
175✔
678
    this.remove(oldConnection);
29✔
679
    this.add(newConnection);
29✔
680
  }
29✔
681

175✔
682
  /**
175✔
683
   * Handle command messages coming in from Redis.
175✔
684
   * Some commands are intended to broadcast immediately to all connected
175✔
685
   * clients, but others require special action.
175✔
686
   *
175✔
687
   * @param {string} channel
175✔
688
   * @param {string} rawCommand
175✔
689
   * @returns {Promise<void>}
175✔
690
   * @private
175✔
691
   */
175✔
692
  async onServerMessage(channel, rawCommand) {
175✔
693
    /**
247✔
694
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
247✔
695
     */
247✔
696
    const json = sjson.safeParse(rawCommand);
247✔
697
    if (!json) {
247!
698
      return;
×
699
    }
×
700
    const { command, data } = json;
247✔
701

247✔
702
    this.#logger.trace({ channel, command, data }, 'server message');
247✔
703

247✔
704
    if (has(this.#serverActions, command)) {
247✔
705
      const action = this.#serverActions[command];
228✔
706
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
228✔
707
        // @ts-expect-error TS2345 `data` is validated
228✔
708
        action(data);
228✔
709
      }
228✔
710
    }
228✔
711
  }
247✔
712

175✔
713
  /**
175✔
714
   * Stop the socket server.
175✔
715
   *
175✔
716
   * @returns {Promise<void>}
175✔
717
   */
175✔
718
  async destroy() {
175✔
719
    clearInterval(this.#pinger);
175✔
720

175✔
721
    this.#closing = true;
175✔
722
    clearInterval(this.#guestCountInterval);
175✔
723

175✔
724
    for (const connection of this.#connections) {
175✔
725
      connection.close();
28✔
726
    }
28✔
727

175✔
728
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
175✔
729
    await closeWsServer();
175✔
730
    await this.#redisSubscription.quit();
175✔
731
  }
175✔
732

175✔
733
  /**
175✔
734
   * Get the connection instance for a specific user.
175✔
735
   *
175✔
736
   * @param {User|import('./schema.js').UserID} user The user.
175✔
737
   * @returns {Connection|undefined}
175✔
738
   */
175✔
739
  connection(user) {
175✔
740
    const userID = typeof user === 'object' ? user.id : user;
1!
741
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
1✔
742
  }
1✔
743

175✔
744
  ping() {
175✔
745
    this.#connections.forEach((connection) => {
×
746
      connection.ping();
×
747
    });
×
748

×
749
    this.#cleanupMessageQueue().catch((err) => {
×
750
      this.#logger.error({ err }, 'failed to clean up socket message queue');
×
751
    });
×
752
  }
×
753

175✔
754
  async #cleanupMessageQueue() {
175✔
755
    const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
×
756

×
757
    await this.#uw.db.deleteFrom('socketMessageQueue')
×
758
      .where('id', '<', oldestID)
×
759
      .execute();
×
760
  }
×
761

175✔
762
  /**
175✔
763
   * Broadcast a command to all connected clients.
175✔
764
   *
175✔
765
   * @param {string} command Command name.
175✔
766
   * @param {import('type-fest').JsonValue} data Command data.
175✔
767
   * @param {import('./schema.js').UserID | null} targetUserID
175✔
768
   */
175✔
769
  #recordMessage(command, data, targetUserID = null) {
175✔
770
    const id = ulid();
228✔
771

228✔
772
    this.#uw.db.insertInto('socketMessageQueue')
228✔
773
      .values({
228✔
774
        id,
228✔
775
        command,
228✔
776
        data: jsonb(data),
228✔
777
        targetUserID,
228✔
778
      })
228✔
779
      .execute();
228✔
780

228✔
781
    return id;
228✔
782
  }
228✔
783

175✔
784
  /**
175✔
785
   * Broadcast a command to all connected clients.
175✔
786
   *
175✔
787
   * @param {string} command Command name.
175✔
788
   * @param {import('type-fest').JsonValue} data Command data.
175✔
789
   */
175✔
790
  broadcast(command, data) {
175✔
791
    const id = this.#recordMessage(command, data);
210✔
792

210✔
793
    this.#logger.trace({
210✔
794
      id,
210✔
795
      command,
210✔
796
      data,
210✔
797
      to: this.#connections.map((connection) => (
210✔
798
        'user' in connection ? connection.user.id : null
122!
799
      )),
210✔
800
    }, 'broadcast');
210✔
801

210✔
802
    this.#connections.forEach((connection) => {
210✔
803
      connection.send(id, command, data);
122✔
804
    });
210✔
805
  }
210✔
806

175✔
807
  /**
175✔
808
   * Send a command to a single user.
175✔
809
   *
175✔
810
   * @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
175✔
811
   * @param {string} command Command name.
175✔
812
   * @param {import('type-fest').JsonValue} data Command data.
175✔
813
   */
175✔
814
  sendTo(user, command, data) {
175✔
815
    const userID = typeof user === 'object' ? user.id : user;
18!
816
    const id = this.#recordMessage(command, data, userID);
18✔
817

18✔
818
    this.#connections.forEach((connection) => {
18✔
819
      if ('user' in connection && connection.user.id === userID) {
16✔
820
        connection.send(id, command, data);
13✔
821
      }
13✔
822
    });
18✔
823
  }
18✔
824

175✔
825
  #lastGuestCount = 0;
175✔
826

175✔
827
  /** The number of unauthenticated connections. */
175✔
828
  get guestCount() {
175✔
829
    return this.#connections.reduce((acc, connection) => {
5✔
830
      if (connection instanceof GuestConnection) {
2!
NEW
831
        return acc + 1;
×
NEW
832
      }
×
833
      return acc;
2✔
834
    }, 0);
5✔
835
  }
5✔
836

175✔
837
  #recountGuests() {
175✔
NEW
838
    const guests = this.guestCount;
×
NEW
839
    if (guests !== this.#lastGuestCount) {
×
NEW
840
      this.#lastGuestCount = guests;
×
841
      this.broadcast('guests', guests);
×
842
    }
×
843
  }
×
844
}
175✔
845

1✔
846
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc