• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 23241056408

18 Mar 2026 10:49AM UTC coverage: 87.368% (+0.1%) from 87.235%
23241056408

push

github

web-flow
Remove Redis (#755)

1059 of 1253 branches covered (84.52%)

Branch coverage included in aggregate %.

106 of 119 new or added lines in 8 files covered. (89.08%)

18 existing lines in 2 files now uncovered.

10823 of 12347 relevant lines covered (87.66%)

108.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.88
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import { WebSocketServer } from 'ws';
1✔
4
import Ajv from 'ajv';
1✔
5
import { stdSerializers } from 'pino';
1✔
6
import { ulid, encodeTime } from 'ulid';
1✔
7
import { socketVote } from './controllers/booth.js';
1✔
8
import { disconnectUser } from './controllers/users.js';
1✔
9
import AuthRegistry from './AuthRegistry.js';
1✔
10
import GuestConnection from './sockets/GuestConnection.js';
1✔
11
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
12
import LostConnection from './sockets/LostConnection.js';
1✔
13
import { serializeUser } from './utils/serialize.js';
1✔
14
import { jsonb } from './utils/sqlite.js';
1✔
15
import { subMinutes } from './utils/date.js';
1✔
16

1✔
17
const { isEmpty } = lodash;
1✔
18

1✔
19
export const KEY_ACTIVE_SESSIONS = 'users';
1✔
20

1✔
21
const PING_INTERVAL = 10_000;
1✔
22
const GUEST_COUNT_INTERVAL = 2_000;
1✔
23

1✔
24
/**
1✔
25
 * @typedef {import('./schema.js').User} User
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
30
 */
1✔
31

1✔
32
/**
1✔
33
 * @typedef {object} ClientActionParameters
1✔
34
 * @prop {string} sendChat
1✔
35
 * @prop {-1 | 1} vote
1✔
36
 * @prop {undefined} logout
1✔
37
 */
1✔
38

1✔
39
/**
1✔
40
 * @typedef {{
1✔
41
 *   [Name in keyof ClientActionParameters]: (
1✔
42
 *     user: User,
1✔
43
 *     parameter: ClientActionParameters[Name],
1✔
44
 *     connection: AuthedConnection
1✔
45
 *   ) => void
1✔
46
 * }} ClientActions
1✔
47
 */
1✔
48

1✔
49
const ajv = new Ajv({
1✔
50
  coerceTypes: false,
1✔
51
  ownProperties: true,
1✔
52
  removeAdditional: true,
1✔
53
  useDefaults: false,
1✔
54
});
1✔
55

1✔
56
/**
1✔
57
 * @template {object} T
1✔
58
 * @param {T} object
1✔
59
 * @param {PropertyKey} property
1✔
60
 * @returns {property is keyof T}
1✔
61
 */
1✔
62
function has(object, property) {
11✔
63
  return Object.prototype.hasOwnProperty.call(object, property);
11✔
64
}
11✔
65

1✔
66
class SocketServer {
1✔
67
  /**
180✔
68
   * @param {import('./Uwave.js').Boot} uw
180✔
69
   * @param {{ secret: Buffer|string, sessionStore: import('express-session').Store }} options
180✔
70
   */
180✔
71
  static async plugin(uw, options) {
180✔
72
    uw.socketServer = new SocketServer(uw, {
180✔
73
      secret: options.secret,
180✔
74
      sessionStore: options.sessionStore,
180✔
75
      server: uw.server,
180✔
76
    });
180✔
77

180✔
78
    uw.after(async () => {
180✔
79
      try {
180✔
80
        await uw.socketServer.initLostConnections();
180✔
81
      } catch (err) {
180!
82
        // No need to prevent startup for this.
×
83
        // We do need to clear the `users` list because the lost connection handlers
×
84
        // will not do so.
×
85
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
86
        await uw.keyv.delete(KEY_ACTIVE_SESSIONS);
×
87
      }
×
88
    });
180✔
89

180✔
90
    uw.onClose(async () => {
180✔
91
      await uw.socketServer.destroy();
180✔
92
    });
180✔
93
  }
180✔
94

180✔
95
  #uw;
180✔
96

180✔
97
  #logger;
180✔
98

180✔
99
  #wss;
180✔
100

180✔
101
  #closing = false;
180✔
102

180✔
103
  /** @type {Connection[]} */
180✔
104
  #connections = [];
180✔
105

180✔
106
  #pinger;
180✔
107

180✔
108
  /** Update online guests count and broadcast an update if necessary. */
180✔
109
  #guestCountInterval;
180✔
110

180✔
111
  #guestCountDirty = true;
180✔
112

180✔
113
  /**
180✔
114
   * Handlers for commands that come in from clients.
180✔
115
   *
180✔
116
   * @type {ClientActions}
180✔
117
   */
180✔
118
  #clientActions;
180✔
119

180✔
120
  /**
180✔
121
   * @type {{
180✔
122
   *   [K in keyof ClientActionParameters]:
180✔
123
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
180✔
124
   * }}
180✔
125
   */
180✔
126
  #clientActionSchemas;
180✔
127

180✔
128
  /**
180✔
129
   * Handlers for commands that come in from the server side.
180✔
130
   *
180✔
131
   * @type {import('./redisMessages.js').ServerActions}
180✔
132
   */
180✔
133
  #serverActions;
180✔
134

180✔
135
  #unsubscribe;
180✔
136

180✔
137
  /**
180✔
138
   * Create a socket server.
180✔
139
   *
180✔
140
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
180✔
141
   * @param {object} options Socket server options.
180✔
142
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
180✔
143
   *     users to reconnect before removing them.
180✔
144
   * @param {Buffer|string} options.secret
180✔
145
   * @param {import('express-session').Store} options.sessionStore
180✔
146
   * @param {import('http').Server | import('https').Server} [options.server]
180✔
147
   * @param {number} [options.port]
180✔
148
   * @private
180✔
149
   */
180✔
150
  constructor(uw, options) {
180✔
151
    if (!uw) {
180!
152
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
153
    }
×
154

180✔
155
    this.#uw = uw;
180✔
156
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
180✔
157
      serializers: {
180✔
158
        req: stdSerializers.req,
180✔
159
      },
180✔
160
    });
180✔
161

180✔
162
    this.options = {
180✔
163
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
180✔
164
      onError: (_socket, err) => {
180✔
165
        throw err;
×
166
      },
180✔
167
      timeout: 30,
180✔
168
      ...options,
180✔
169
    };
180✔
170

180✔
171
    // TODO put this behind a symbol, it's just public for tests
180✔
172
    this.authRegistry = new AuthRegistry(uw.db);
180✔
173

180✔
174
    this.#wss = new WebSocketServer({
180✔
175
      server: options.server,
180✔
176
      port: options.server ? undefined : options.port,
180!
177
    });
180✔
178

180✔
179
    this.#unsubscribe = uw.events.onAny((command, data) => {
180✔
180
      this.#onServerMessage(command, data);
263✔
181
    });
180✔
182

180✔
183
    this.#wss.on('error', (error) => {
180✔
184
      this.onError(error);
×
185
    });
180✔
186
    this.#wss.on('connection', (socket, request) => {
180✔
187
      this.onSocketConnected(socket, request);
29✔
188
    });
180✔
189

180✔
190
    this.#pinger = setInterval(() => {
180✔
191
      this.ping();
×
192
    }, PING_INTERVAL);
180✔
193

180✔
194
    this.#guestCountInterval = setInterval(() => {
180✔
195
      if (!this.#guestCountDirty) {
×
196
        return;
×
197
      }
×
198

×
199
      this.#recountGuests();
×
200
    }, GUEST_COUNT_INTERVAL);
180✔
201

180✔
202
    this.#clientActions = {
180✔
203
      sendChat: (user, message) => {
180✔
204
        this.#logger.trace({ user, message }, 'sendChat');
11✔
205
        this.#uw.chat.send(user, message);
11✔
206
      },
180✔
207
      vote: (user, direction) => {
180✔
208
        socketVote(this.#uw, user.id, direction);
×
209
      },
180✔
210
      logout: (user, _, connection) => {
180✔
211
        this.replace(connection, this.createGuestConnection(connection.socket));
×
212
        if (!this.connection(user)) {
×
213
          disconnectUser(this.#uw, user.id);
×
214
        }
×
215
      },
180✔
216
    };
180✔
217

180✔
218
    this.#clientActionSchemas = {
180✔
219
      sendChat: ajv.compile({
180✔
220
        type: 'string',
180✔
221
      }),
180✔
222
      vote: ajv.compile({
180✔
223
        type: 'integer',
180✔
224
        enum: [-1, 1],
180✔
225
      }),
180✔
226
      logout: ajv.compile(true),
180✔
227
    };
180✔
228

180✔
229
    this.#serverActions = {
180✔
230
      /**
180✔
231
       * Broadcast the next track.
180✔
232
       */
180✔
233
      'advance:complete': (next) => {
180✔
234
        if (next) {
18✔
235
          this.broadcast('advance', {
18✔
236
            historyID: next.historyID,
18✔
237
            userID: next.userID,
18✔
238
            media: next.media,
18✔
239
            playedAt: new Date(next.playedAt).getTime(),
18✔
240
          });
18✔
241
        } else {
18!
242
          this.broadcast('advance', null);
×
243
        }
×
244
      },
180✔
245
      /**
180✔
246
       * Broadcast a skip notification.
180✔
247
       */
180✔
248
      'booth:skip': ({ moderatorID, userID, reason }) => {
180✔
249
        this.broadcast('skip', { moderatorID, userID, reason });
×
250
      },
180✔
251
      /**
180✔
252
       * Broadcast a chat message.
180✔
253
       */
180✔
254
      'chat:message': (message) => {
180✔
255
        this.broadcast('chatMessage', message);
13✔
256
      },
180✔
257
      /**
180✔
258
       * Delete chat messages. The delete filter can have an _id property to
180✔
259
       * delete a specific message, a userID property to delete messages by a
180✔
260
       * user, or be empty to delete all messages.
180✔
261
       */
180✔
262
      'chat:delete': ({ moderatorID, filter }) => {
180✔
263
        if ('id' in filter) {
6✔
264
          this.broadcast('chatDeleteByID', {
2✔
265
            moderatorID,
2✔
266
            _id: filter.id,
2✔
267
          });
2✔
268
        } else if ('userID' in filter) {
6✔
269
          this.broadcast('chatDeleteByUser', {
2✔
270
            moderatorID,
2✔
271
            userID: filter.userID,
2✔
272
          });
2✔
273
        } else if (isEmpty(filter)) {
2✔
274
          this.broadcast('chatDelete', { moderatorID });
2✔
275
        }
2✔
276
      },
180✔
277
      /**
180✔
278
       * Broadcast that a user was muted in chat.
180✔
279
       */
180✔
280
      'chat:mute': ({ moderatorID, userID, duration }) => {
180✔
281
        this.broadcast('chatMute', {
2✔
282
          userID,
2✔
283
          moderatorID,
2✔
284
          expiresAt: Date.now() + duration,
2✔
285
        });
2✔
286
      },
180✔
287
      /**
180✔
288
       * Broadcast that a user was unmuted in chat.
180✔
289
       */
180✔
290
      'chat:unmute': ({ moderatorID, userID }) => {
180✔
291
        this.broadcast('chatUnmute', { userID, moderatorID });
×
292
      },
180✔
293
      /**
180✔
294
       * Broadcast a vote for the current track.
180✔
295
       */
180✔
296
      'booth:vote': ({ userID, direction }) => {
180✔
297
        this.broadcast('vote', {
2✔
298
          _id: userID,
2✔
299
          value: direction,
2✔
300
        });
2✔
301
      },
180✔
302
      /**
180✔
303
       * Broadcast a favorite for the current track.
180✔
304
       */
180✔
305
      'booth:favorite': ({ userID }) => {
180✔
306
        this.broadcast('favorite', { userID });
1✔
307
      },
180✔
308
      /**
180✔
309
       * Cycle a single user's playlist.
180✔
310
       */
180✔
311
      'playlist:cycle': ({ userID, playlistID }) => {
180✔
312
        this.sendTo(userID, 'playlistCycle', { playlistID });
18✔
313
      },
180✔
314
      /**
180✔
315
       * Broadcast that a user joined the waitlist.
180✔
316
       */
180✔
317
      'waitlist:join': ({ userID, waitlist }) => {
180✔
318
        this.broadcast('waitlistJoin', { userID, waitlist });
31✔
319
      },
180✔
320
      /**
180✔
321
       * Broadcast that a user left the waitlist.
180✔
322
       */
180✔
323
      'waitlist:leave': ({ userID, waitlist }) => {
180✔
324
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
325
      },
180✔
326
      /**
180✔
327
       * Broadcast that a user was added to the waitlist.
180✔
328
       */
180✔
329
      'waitlist:add': ({
180✔
330
        userID, moderatorID, position, waitlist,
1✔
331
      }) => {
1✔
332
        this.broadcast('waitlistAdd', {
1✔
333
          userID, moderatorID, position, waitlist,
1✔
334
        });
1✔
335
      },
180✔
336
      /**
180✔
337
       * Broadcast that a user was removed from the waitlist.
180✔
338
       */
180✔
339
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
180✔
340
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
341
      },
180✔
342
      /**
180✔
343
       * Broadcast that a user was moved in the waitlist.
180✔
344
       */
180✔
345
      'waitlist:move': ({
180✔
346
        userID, moderatorID, position, waitlist,
3✔
347
      }) => {
3✔
348
        this.broadcast('waitlistMove', {
3✔
349
          userID, moderatorID, position, waitlist,
3✔
350
        });
3✔
351
      },
180✔
352
      /**
180✔
353
       * Broadcast a waitlist update.
180✔
354
       */
180✔
355
      'waitlist:update': (waitlist) => {
180✔
356
        this.broadcast('waitlistUpdate', waitlist);
18✔
357
      },
180✔
358
      /**
180✔
359
       * Broadcast that the waitlist was cleared.
180✔
360
       */
180✔
361
      'waitlist:clear': ({ moderatorID }) => {
180✔
362
        this.broadcast('waitlistClear', { moderatorID });
×
363
      },
180✔
364
      /**
180✔
365
       * Broadcast that the waitlist was locked.
180✔
366
       */
180✔
367
      'waitlist:lock': ({ moderatorID, locked }) => {
180✔
368
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
369
      },
180✔
370

180✔
371
      'acl:allow': ({ userID, roles }) => {
180✔
372
        this.broadcast('acl:allow', { userID, roles });
84✔
373
      },
180✔
374
      'acl:disallow': ({ userID, roles }) => {
180✔
375
        this.broadcast('acl:disallow', { userID, roles });
2✔
376
      },
180✔
377

180✔
378
      'user:update': ({ userID, moderatorID, new: update }) => {
180✔
379
        // TODO Remove this remnant of the old roles system
3✔
380
        if ('role' in update) {
3!
381
          this.broadcast('roleChange', {
×
382
            moderatorID,
×
383
            userID,
×
384
            role: update.role,
×
385
          });
×
386
        }
×
387
        if ('username' in update) {
3✔
388
          this.broadcast('nameChange', {
3✔
389
            moderatorID,
3✔
390
            userID,
3✔
391
            username: update.username,
3✔
392
          });
3✔
393
        }
3✔
394
      },
180✔
395
      'user:join': async ({ userID }) => {
180✔
396
        const { users, keyv } = this.#uw;
27✔
397
        const user = await users.getUser(userID);
27✔
398
        if (user) {
27✔
399
          // TODO this should not be the socket server code's responsibility
27✔
400
          const userIDs = /** @type {import('./schema').UserID[] | null} */ (
27✔
401
            await keyv.get(KEY_ACTIVE_SESSIONS)
27✔
402
          ) ?? [];
27✔
403
          userIDs.push(user.id);
27✔
404
          await keyv.set(KEY_ACTIVE_SESSIONS, userIDs);
27✔
405
          this.broadcast('join', serializeUser(user));
27✔
406
        }
27✔
407
      },
180✔
408
      /**
180✔
409
       * Broadcast that a user left the server.
180✔
410
       */
180✔
411
      'user:leave': ({ userID }) => {
180✔
412
        this.broadcast('leave', userID);
×
413
      },
180✔
414
      /**
180✔
415
       * Broadcast a ban event.
180✔
416
       */
180✔
417
      'user:ban': ({
180✔
418
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
419
      }) => {
3✔
420
        this.broadcast('ban', {
3✔
421
          moderatorID, userID, permanent, duration, expiresAt,
3✔
422
        });
3✔
423

3✔
424
        this.#connections.forEach((connection) => {
3✔
425
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
426
            connection.ban();
×
427
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
428
            connection.close();
×
429
          }
×
430
        });
3✔
431
      },
180✔
432
      /**
180✔
433
       * Broadcast an unban event.
180✔
434
       */
180✔
435
      'user:unban': ({ moderatorID, userID }) => {
180✔
436
        this.broadcast('unban', { moderatorID, userID });
1✔
437
      },
180✔
438
      /**
180✔
439
       * Force-close a connection.
180✔
440
       */
180✔
441
      'http-api:socket:close': (userID) => {
180✔
442
        this.#connections.forEach((connection) => {
×
443
          if ('user' in connection && connection.user.id === userID) {
×
444
            connection.close();
×
445
          }
×
446
        });
×
447
      },
180✔
448

180✔
449
      'emotes:reload': () => {
180✔
450
        this.broadcast('reloadEmotes', null);
×
451
      },
180✔
452
    };
180✔
453
  }
180✔
454

180✔
455
  /**
180✔
456
   * Create `LostConnection`s for every user that's known to be online, but that
180✔
457
   * is not currently connected to the socket server.
180✔
458
   *
180✔
459
   * @private
180✔
460
   */
180✔
461
  async initLostConnections() {
180✔
462
    const { db, keyv } = this.#uw;
180✔
463
    const userIDs = /** @type {import('./schema').UserID[] | null} */ (
180✔
464
      await keyv.get(KEY_ACTIVE_SESSIONS)
180✔
465
    ) ?? [];
180✔
466
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
180✔
467

180✔
468
    if (disconnectedIDs.length === 0) {
180✔
469
      return;
180✔
470
    }
180✔
471

×
472
    const disconnectedUsers = await db.selectFrom('users')
×
473
      .where('id', 'in', disconnectedIDs)
×
474
      .selectAll()
×
475
      .execute();
×
NEW
476
    disconnectedUsers.forEach((_user) => {
×
NEW
477
      // TODO (commented out but it already didn't really work)
×
NEW
478
      void _user;
×
NEW
479
      // this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
×
UNCOV
480
    });
×
481
  }
180✔
482

180✔
483
  /**
180✔
484
   * @param {import('ws').WebSocket} socket
180✔
485
   * @param {import('http').IncomingMessage} request
180✔
486
   * @private
180✔
487
   */
180✔
488
  onSocketConnected(socket, request) {
180✔
489
    this.#logger.info({ req: request }, 'new connection');
29✔
490

29✔
491
    socket.on('error', (error) => {
29✔
492
      this.onSocketError(socket, error);
×
493
    });
29✔
494
    this.add(this.createGuestConnection(socket));
29✔
495
  }
29✔
496

180✔
497
  /**
180✔
498
   * @param {import('ws').WebSocket} socket
180✔
499
   * @param {Error} error
180✔
500
   * @private
180✔
501
   */
180✔
502
  onSocketError(socket, error) {
180✔
503
    this.#logger.warn({ err: error }, 'socket error');
×
504

×
505
    this.options.onError(socket, error);
×
506
  }
×
507

180✔
508
  /**
180✔
509
   * @param {Error} error
180✔
510
   * @private
180✔
511
   */
180✔
512
  onError(error) {
180✔
513
    this.#logger.error({ err: error }, 'server error');
×
514

×
515
    this.options.onError(undefined, error);
×
516
  }
×
517

180✔
518
  /**
180✔
519
   * Get a LostConnection for a user, if one exists.
180✔
520
   *
180✔
521
   * @param {string} sessionID
180✔
522
   * @private
180✔
523
   */
180✔
524
  getLostConnection(sessionID) {
180✔
525
    return this.#connections.find((connection) => (
2✔
526
      connection instanceof LostConnection && connection.sessionID === sessionID
2✔
527
    ));
2✔
528
  }
2✔
529

180✔
530
  /**
180✔
531
   * Create a connection instance for an unauthenticated user.
180✔
532
   *
180✔
533
   * @param {import('ws').WebSocket} socket
180✔
534
   * @private
180✔
535
   */
180✔
536
  createGuestConnection(socket) {
180✔
537
    const connection = new GuestConnection(
29✔
538
      this.#uw,
29✔
539
      this.options.sessionStore,
29✔
540
      socket,
29✔
541
      { authRegistry: this.authRegistry },
29✔
542
    );
29✔
543
    connection.on('close', () => {
29✔
544
      this.remove(connection);
×
545
    });
29✔
546
    connection.on('authenticate', async ({ user, sessionID, lastEventID }) => {
29✔
547
      const isReconnect = await connection.isReconnect(sessionID);
29✔
548
      this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
29✔
549
      if (isReconnect) {
29✔
550
        const previousConnection = this.getLostConnection(sessionID);
2✔
551
        if (previousConnection) this.remove(previousConnection);
2✔
552
      }
2✔
553

29✔
554
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));
29✔
555

29✔
556
      if (!isReconnect) {
29✔
557
        this.#uw.publish('user:join', { userID: user.id });
27✔
558
      }
27✔
559
    });
29✔
560
    return connection;
29✔
561
  }
29✔
562

180✔
563
  /**
180✔
564
   * Create a connection instance for an authenticated user.
180✔
565
   *
180✔
566
   * @param {import('ws').WebSocket} socket
180✔
567
   * @param {User} user
180✔
568
   * @param {string} sessionID
180✔
569
   * @param {string|null} lastEventID
180✔
570
   * @returns {AuthedConnection}
180✔
571
   * @private
180✔
572
   */
180✔
573
  createAuthedConnection(socket, user, sessionID, lastEventID) {
180✔
574
    const connection = new AuthedConnection(
29✔
575
      this.#uw,
29✔
576
      this.options.sessionStore,
29✔
577
      socket,
29✔
578
      user,
29✔
579
      sessionID,
29✔
580
      lastEventID,
29✔
581
    );
29✔
582
    connection.on('close', ({ banned, lastEventID }) => {
29✔
583
      if (banned) {
29!
584
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
585
        disconnectUser(this.#uw, user.id);
×
586
      } else if (!this.#closing) {
29✔
587
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
588
        this.add(this.createLostConnection(user, sessionID, lastEventID));
2✔
589
      }
2✔
590
      this.remove(connection);
29✔
591
    });
29✔
592
    connection.on(
29✔
593
      'command',
29✔
594
      ({ command, data }) => {
29✔
595
        this.#logger.trace({ userId: user.id, command, data }, 'command');
11✔
596
        if (has(this.#clientActions, command)) {
11✔
597
          // Ignore incorrect input
11✔
598
          const validate = this.#clientActionSchemas[command];
11✔
599
          if (validate && !validate(data)) {
11!
600
            return;
×
601
          }
×
602

11✔
603
          const action = this.#clientActions[command];
11✔
604
          // @ts-expect-error TS2345 `data` is validated
11✔
605
          action(user, data, connection);
11✔
606
        }
11✔
607
      },
29✔
608
    );
29✔
609
    return connection;
29✔
610
  }
29✔
611

180✔
612
  /**
180✔
613
   * Create a connection instance for a user who disconnected.
180✔
614
   *
180✔
615
   * @param {User} user
180✔
616
   * @param {string} sessionID
180✔
617
   * @param {string|null} lastEventID
180✔
618
   * @returns {LostConnection}
180✔
619
   * @private
180✔
620
   */
180✔
621
  createLostConnection(user, sessionID, lastEventID) {
180✔
622
    const connection = new LostConnection(
2✔
623
      this.#uw,
2✔
624
      this.options.sessionStore,
2✔
625
      user,
2✔
626
      sessionID,
2✔
627
      lastEventID,
2✔
628
      this.options.timeout,
2✔
629
    );
2✔
630
    connection.on('close', () => {
2✔
631
      this.#logger.info({ userId: user.id }, 'user left');
1✔
632
      this.remove(connection);
1✔
633
      // Only register that the user left if they didn't have another connection
1✔
634
      // still open.
1✔
635
      if (!this.connection(user)) {
1✔
636
        disconnectUser(this.#uw, user.id);
1✔
637
      }
1✔
638
    });
2✔
639
    return connection;
2✔
640
  }
2✔
641

180✔
642
  /**
180✔
643
   * Add a connection.
180✔
644
   *
180✔
645
   * @param {Connection} connection
180✔
646
   * @private
180✔
647
   */
180✔
648
  add(connection) {
180✔
649
    const userID = 'user' in connection ? connection.user.id : null;
60✔
650
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
651
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
60✔
652

60✔
653
    this.#connections.push(connection);
60✔
654
    this.#guestCountDirty = true;
60✔
655
  }
60✔
656

180✔
657
  /**
180✔
658
   * Remove a connection.
180✔
659
   *
180✔
660
   * @param {Connection} connection
180✔
661
   * @private
180✔
662
   */
180✔
663
  remove(connection) {
180✔
664
    const userID = 'user' in connection ? connection.user.id : null;
60✔
665
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
666
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
60✔
667

60✔
668
    const i = this.#connections.indexOf(connection);
60✔
669
    this.#connections.splice(i, 1);
60✔
670

60✔
671
    connection.removed();
60✔
672
    this.#guestCountDirty = true;
60✔
673
  }
60✔
674

180✔
675
  /**
180✔
676
   * Replace a connection instance with another connection instance. Useful when
180✔
677
   * a connection changes "type", like GuestConnection → AuthedConnection.
180✔
678
   *
180✔
679
   * @param {Connection} oldConnection
180✔
680
   * @param {Connection} newConnection
180✔
681
   * @private
180✔
682
   */
180✔
683
  replace(oldConnection, newConnection) {
180✔
684
    this.remove(oldConnection);
29✔
685
    this.add(newConnection);
29✔
686
  }
29✔
687

180✔
688
  /**
180✔
689
   * Handle command messages coming in from elsewhere in the app.
180✔
690
   * Some commands are intended to broadcast immediately to all connected
180✔
691
   * clients, but others require special action.
180✔
692
   *
180✔
693
   * @template {keyof import('./redisMessages.js').ServerActionParameters} K
180✔
694
   * @param {K} command
180✔
695
   * @param {import('./redisMessages.js').ServerActionParameters[K]} data
180✔
696
   */
180✔
697
  #onServerMessage(command, data) {
180✔
698
    this.#logger.trace({ channel: command, command, data }, 'server message');
263✔
699

263✔
700
    const action = this.#serverActions[command];
263✔
701
    if (action !== undefined) {
263✔
702
      action(data);
240✔
703
    }
240✔
704
  }
263✔
705

180✔
706
  /**
180✔
707
   * Stop the socket server.
180✔
708
   *
180✔
709
   * @returns {Promise<void>}
180✔
710
   */
180✔
711
  async destroy() {
180✔
712
    this.#closing = true;
180✔
713

180✔
714
    this.#unsubscribe();
180✔
715
    clearInterval(this.#pinger);
180✔
716
    clearInterval(this.#guestCountInterval);
180✔
717

180✔
718
    for (const connection of this.#connections) {
180✔
719
      connection.close();
28✔
720
    }
28✔
721

180✔
722
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
180✔
723
    await closeWsServer();
180✔
724
  }
180✔
725

180✔
726
  /**
180✔
727
   * Get the connection instance for a specific user.
180✔
728
   *
180✔
729
   * @param {User|import('./schema.js').UserID} user The user.
180✔
730
   * @returns {Connection|undefined}
180✔
731
   */
180✔
732
  connection(user) {
180✔
733
    const userID = typeof user === 'object' ? user.id : user;
1!
734
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
1✔
735
  }
1✔
736

180✔
737
  ping() {
180✔
738
    this.#connections.forEach((connection) => {
×
739
      connection.ping();
×
740
    });
×
741

×
742
    this.#cleanupMessageQueue().catch((err) => {
×
743
      this.#logger.error({ err }, 'failed to clean up socket message queue');
×
744
    });
×
745
  }
×
746

180✔
747
  async #cleanupMessageQueue() {
180✔
748
    const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
×
749

×
750
    await this.#uw.db.deleteFrom('socketMessageQueue')
×
751
      .where('id', '<', oldestID)
×
752
      .execute();
×
753
  }
×
754

180✔
755
  /**
180✔
756
   * Broadcast a command to all connected clients.
180✔
757
   *
180✔
758
   * @param {string} command Command name.
180✔
759
   * @param {import('type-fest').JsonValue} data Command data.
180✔
760
   * @param {import('./schema.js').UserID | null} targetUserID
180✔
761
   */
180✔
762
  #recordMessage(command, data, targetUserID = null) {
180✔
763
    const id = ulid();
240✔
764

240✔
765
    this.#uw.db.insertInto('socketMessageQueue')
240✔
766
      .values({
240✔
767
        id,
240✔
768
        command,
240✔
769
        data: jsonb(data),
240✔
770
        targetUserID,
240✔
771
      })
240✔
772
      .execute();
240✔
773

240✔
774
    return id;
240✔
775
  }
240✔
776

180✔
777
  /**
180✔
778
   * Broadcast a command to all connected clients.
180✔
779
   *
180✔
780
   * @param {string} command Command name.
180✔
781
   * @param {import('type-fest').JsonValue} data Command data.
180✔
782
   */
180✔
783
  broadcast(command, data) {
180✔
784
    const id = this.#recordMessage(command, data);
222✔
785

222✔
786
    this.#logger.trace({
222✔
787
      id,
222✔
788
      command,
222✔
789
      data,
222✔
790
      to: this.#connections.map((connection) => (
222✔
791
        'user' in connection ? connection.user.id : null
121!
792
      )),
222✔
793
    }, 'broadcast');
222✔
794

222✔
795
    this.#connections.forEach((connection) => {
222✔
796
      connection.send(id, command, data);
121✔
797
    });
222✔
798
  }
222✔
799

180✔
800
  /**
180✔
801
   * Send a command to a single user.
180✔
802
   *
180✔
803
   * @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
180✔
804
   * @param {string} command Command name.
180✔
805
   * @param {import('type-fest').JsonValue} data Command data.
180✔
806
   */
180✔
807
  sendTo(user, command, data) {
180✔
808
    const userID = typeof user === 'object' ? user.id : user;
18!
809
    const id = this.#recordMessage(command, data, userID);
18✔
810

18✔
811
    this.#connections.forEach((connection) => {
18✔
812
      if ('user' in connection && connection.user.id === userID) {
16✔
813
        connection.send(id, command, data);
13✔
814
      }
13✔
815
    });
18✔
816
  }
18✔
817

180✔
818
  #lastGuestCount = 0;
180✔
819

180✔
820
  /** The number of unauthenticated connections. */
180✔
821
  get guestCount() {
180✔
822
    return this.#connections.reduce((acc, connection) => {
7✔
823
      if (connection instanceof GuestConnection) {
4!
824
        return acc + 1;
×
825
      }
×
826
      return acc;
4✔
827
    }, 0);
7✔
828
  }
7✔
829

180✔
830
  #recountGuests() {
180✔
831
    const guests = this.guestCount;
×
832
    if (guests !== this.#lastGuestCount) {
×
833
      this.#lastGuestCount = guests;
×
834
      this.broadcast('guests', guests);
×
835
    }
×
836
  }
×
837
}
180✔
838

1✔
839
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc