• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 23240909129

18 Mar 2026 10:45AM UTC coverage: 87.374% (+0.1%) from 87.235%
23240909129

Pull #755

github

web-flow
Merge aa036ef51 into d68af310c
Pull Request #755: Remove Redis

1059 of 1253 branches covered (84.52%)

Branch coverage included in aggregate %.

106 of 118 new or added lines in 8 files covered. (89.83%)

18 existing lines in 2 files now uncovered.

10823 of 12346 relevant lines covered (87.66%)

108.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.97
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import { WebSocketServer } from 'ws';
1✔
4
import Ajv from 'ajv';
1✔
5
import { stdSerializers } from 'pino';
1✔
6
import { ulid, encodeTime } from 'ulid';
1✔
7
import { socketVote } from './controllers/booth.js';
1✔
8
import { disconnectUser } from './controllers/users.js';
1✔
9
import AuthRegistry from './AuthRegistry.js';
1✔
10
import GuestConnection from './sockets/GuestConnection.js';
1✔
11
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
12
import LostConnection from './sockets/LostConnection.js';
1✔
13
import { serializeUser } from './utils/serialize.js';
1✔
14
import { jsonb } from './utils/sqlite.js';
1✔
15
import { subMinutes } from './utils/date.js';
1✔
16

1✔
17
const { isEmpty } = lodash;
1✔
18

1✔
19
export const KEY_ACTIVE_SESSIONS = 'users';
1✔
20

1✔
21
const PING_INTERVAL = 10_000;
1✔
22
const GUEST_COUNT_INTERVAL = 2_000;
1✔
23

1✔
24
/**
1✔
25
 * @typedef {import('./schema.js').User} User
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
30
 */
1✔
31

1✔
32
/**
1✔
33
 * @typedef {object} ClientActionParameters
1✔
34
 * @prop {string} sendChat
1✔
35
 * @prop {-1 | 1} vote
1✔
36
 * @prop {undefined} logout
1✔
37
 */
1✔
38

1✔
39
/**
1✔
40
 * @typedef {{
1✔
41
 *   [Name in keyof ClientActionParameters]: (
1✔
42
 *     user: User,
1✔
43
 *     parameter: ClientActionParameters[Name],
1✔
44
 *     connection: AuthedConnection
1✔
45
 *   ) => void
1✔
46
 * }} ClientActions
1✔
47
 */
1✔
48

1✔
49
const ajv = new Ajv({
1✔
50
  coerceTypes: false,
1✔
51
  ownProperties: true,
1✔
52
  removeAdditional: true,
1✔
53
  useDefaults: false,
1✔
54
});
1✔
55

1✔
56
/**
1✔
57
 * @template {object} T
1✔
58
 * @param {T} object
1✔
59
 * @param {PropertyKey} property
1✔
60
 * @returns {property is keyof T}
1✔
61
 */
1✔
62
function has(object, property) {
11✔
63
  return Object.prototype.hasOwnProperty.call(object, property);
11✔
64
}
11✔
65

1✔
66
class SocketServer {
1✔
67
  /**
180✔
68
   * @param {import('./Uwave.js').Boot} uw
180✔
69
   * @param {{ secret: Buffer|string, sessionStore: import('express-session').Store }} options
180✔
70
   */
180✔
71
  static async plugin(uw, options) {
180✔
72
    uw.socketServer = new SocketServer(uw, {
180✔
73
      secret: options.secret,
180✔
74
      sessionStore: options.sessionStore,
180✔
75
      server: uw.server,
180✔
76
    });
180✔
77

180✔
78
    uw.after(async () => {
180✔
79
      try {
180✔
80
        await uw.socketServer.initLostConnections();
180✔
81
      } catch (err) {
180!
82
        // No need to prevent startup for this.
×
83
        // We do need to clear the `users` list because the lost connection handlers
×
84
        // will not do so.
×
85
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
86
        await uw.keyv.delete(KEY_ACTIVE_SESSIONS);
×
87
      }
×
88
    });
180✔
89

180✔
90
    uw.onClose(async () => {
180✔
91
      await uw.socketServer.destroy();
180✔
92
    });
180✔
93
  }
180✔
94

180✔
95
  #uw;
180✔
96

180✔
97
  #logger;
180✔
98

180✔
99
  #wss;
180✔
100

180✔
101
  #closing = false;
180✔
102

180✔
103
  /** @type {Connection[]} */
180✔
104
  #connections = [];
180✔
105

180✔
106
  #pinger;
180✔
107

180✔
108
  /** Update online guests count and broadcast an update if necessary. */
180✔
109
  #guestCountInterval;
180✔
110

180✔
111
  #guestCountDirty = true;
180✔
112

180✔
113
  /**
180✔
114
   * Handlers for commands that come in from clients.
180✔
115
   *
180✔
116
   * @type {ClientActions}
180✔
117
   */
180✔
118
  #clientActions;
180✔
119

180✔
120
  /**
180✔
121
   * @type {{
180✔
122
   *   [K in keyof ClientActionParameters]:
180✔
123
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
180✔
124
   * }}
180✔
125
   */
180✔
126
  #clientActionSchemas;
180✔
127

180✔
128
  /**
180✔
129
   * Handlers for commands that come in from the server side.
180✔
130
   *
180✔
131
   * @type {import('./redisMessages.js').ServerActions}
180✔
132
   */
180✔
133
  #serverActions;
180✔
134

180✔
135
  #unsubscribe;
180✔
136

180✔
137
  /**
180✔
138
   * Create a socket server.
180✔
139
   *
180✔
140
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
180✔
141
   * @param {object} options Socket server options.
180✔
142
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
180✔
143
   *     users to reconnect before removing them.
180✔
144
   * @param {Buffer|string} options.secret
180✔
145
   * @param {import('express-session').Store} options.sessionStore
180✔
146
   * @param {import('http').Server | import('https').Server} [options.server]
180✔
147
   * @param {number} [options.port]
180✔
148
   * @private
180✔
149
   */
180✔
150
  constructor(uw, options) {
180✔
151
    if (!uw) {
180!
152
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
153
    }
×
154

180✔
155
    this.#uw = uw;
180✔
156
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
180✔
157
      serializers: {
180✔
158
        req: stdSerializers.req,
180✔
159
      },
180✔
160
    });
180✔
161

180✔
162
    this.options = {
180✔
163
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
180✔
164
      onError: (_socket, err) => {
180✔
165
        throw err;
×
166
      },
180✔
167
      timeout: 30,
180✔
168
      ...options,
180✔
169
    };
180✔
170

180✔
171
    // TODO put this behind a symbol, it's just public for tests
180✔
172
    this.authRegistry = new AuthRegistry(uw.db);
180✔
173

180✔
174
    this.#wss = new WebSocketServer({
180✔
175
      server: options.server,
180✔
176
      port: options.server ? undefined : options.port,
180!
177
    });
180✔
178

180✔
179
    this.#unsubscribe = uw.events.onAny((command, data) => {
180✔
180
      this.#onServerMessage(command, data);
263✔
181
    });
180✔
182

180✔
183
    this.#wss.on('error', (error) => {
180✔
184
      this.onError(error);
×
185
    });
180✔
186
    this.#wss.on('connection', (socket, request) => {
180✔
187
      this.onSocketConnected(socket, request);
29✔
188
    });
180✔
189

180✔
190
    this.#pinger = setInterval(() => {
180✔
191
      this.ping();
×
192
    }, PING_INTERVAL);
180✔
193

180✔
194
    this.#guestCountInterval = setInterval(() => {
180✔
195
      if (!this.#guestCountDirty) {
×
196
        return;
×
197
      }
×
198

×
199
      this.#recountGuests();
×
200
    }, GUEST_COUNT_INTERVAL);
180✔
201

180✔
202
    this.#clientActions = {
180✔
203
      sendChat: (user, message) => {
180✔
204
        this.#logger.trace({ user, message }, 'sendChat');
11✔
205
        this.#uw.chat.send(user, message);
11✔
206
      },
180✔
207
      vote: (user, direction) => {
180✔
208
        socketVote(this.#uw, user.id, direction);
×
209
      },
180✔
210
      logout: (user, _, connection) => {
180✔
211
        this.replace(connection, this.createGuestConnection(connection.socket));
×
212
        if (!this.connection(user)) {
×
213
          disconnectUser(this.#uw, user.id);
×
214
        }
×
215
      },
180✔
216
    };
180✔
217

180✔
218
    this.#clientActionSchemas = {
180✔
219
      sendChat: ajv.compile({
180✔
220
        type: 'string',
180✔
221
      }),
180✔
222
      vote: ajv.compile({
180✔
223
        type: 'integer',
180✔
224
        enum: [-1, 1],
180✔
225
      }),
180✔
226
      logout: ajv.compile(true),
180✔
227
    };
180✔
228

180✔
229
    this.#serverActions = {
180✔
230
      /**
180✔
231
       * Broadcast the next track.
180✔
232
       */
180✔
233
      'advance:complete': (next) => {
180✔
234
        if (next) {
18✔
235
          this.broadcast('advance', {
18✔
236
            historyID: next.historyID,
18✔
237
            userID: next.userID,
18✔
238
            media: next.media,
18✔
239
            playedAt: new Date(next.playedAt).getTime(),
18✔
240
          });
18✔
241
        } else {
18!
242
          this.broadcast('advance', null);
×
243
        }
×
244
      },
180✔
245
      /**
180✔
246
       * Broadcast a skip notification.
180✔
247
       */
180✔
248
      'booth:skip': ({ moderatorID, userID, reason }) => {
180✔
249
        this.broadcast('skip', { moderatorID, userID, reason });
×
250
      },
180✔
251
      /**
180✔
252
       * Broadcast a chat message.
180✔
253
       */
180✔
254
      'chat:message': (message) => {
180✔
255
        this.broadcast('chatMessage', message);
13✔
256
      },
180✔
257
      /**
180✔
258
       * Delete chat messages. The delete filter can have an _id property to
180✔
259
       * delete a specific message, a userID property to delete messages by a
180✔
260
       * user, or be empty to delete all messages.
180✔
261
       */
180✔
262
      'chat:delete': ({ moderatorID, filter }) => {
180✔
263
        if ('id' in filter) {
6✔
264
          this.broadcast('chatDeleteByID', {
2✔
265
            moderatorID,
2✔
266
            _id: filter.id,
2✔
267
          });
2✔
268
        } else if ('userID' in filter) {
6✔
269
          this.broadcast('chatDeleteByUser', {
2✔
270
            moderatorID,
2✔
271
            userID: filter.userID,
2✔
272
          });
2✔
273
        } else if (isEmpty(filter)) {
2✔
274
          this.broadcast('chatDelete', { moderatorID });
2✔
275
        }
2✔
276
      },
180✔
277
      /**
180✔
278
       * Broadcast that a user was muted in chat.
180✔
279
       */
180✔
280
      'chat:mute': ({ moderatorID, userID, duration }) => {
180✔
281
        this.broadcast('chatMute', {
2✔
282
          userID,
2✔
283
          moderatorID,
2✔
284
          expiresAt: Date.now() + duration,
2✔
285
        });
2✔
286
      },
180✔
287
      /**
180✔
288
       * Broadcast that a user was unmuted in chat.
180✔
289
       */
180✔
290
      'chat:unmute': ({ moderatorID, userID }) => {
180✔
291
        this.broadcast('chatUnmute', { userID, moderatorID });
×
292
      },
180✔
293
      /**
180✔
294
       * Broadcast a vote for the current track.
180✔
295
       */
180✔
296
      'booth:vote': ({ userID, direction }) => {
180✔
297
        this.broadcast('vote', {
2✔
298
          _id: userID,
2✔
299
          value: direction,
2✔
300
        });
2✔
301
      },
180✔
302
      /**
180✔
303
       * Broadcast a favorite for the current track.
180✔
304
       */
180✔
305
      'booth:favorite': ({ userID }) => {
180✔
306
        this.broadcast('favorite', { userID });
1✔
307
      },
180✔
308
      /**
180✔
309
       * Cycle a single user's playlist.
180✔
310
       */
180✔
311
      'playlist:cycle': ({ userID, playlistID }) => {
180✔
312
        this.sendTo(userID, 'playlistCycle', { playlistID });
18✔
313
      },
180✔
314
      /**
180✔
315
       * Broadcast that a user joined the waitlist.
180✔
316
       */
180✔
317
      'waitlist:join': ({ userID, waitlist }) => {
180✔
318
        this.broadcast('waitlistJoin', { userID, waitlist });
31✔
319
      },
180✔
320
      /**
180✔
321
       * Broadcast that a user left the waitlist.
180✔
322
       */
180✔
323
      'waitlist:leave': ({ userID, waitlist }) => {
180✔
324
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
325
      },
180✔
326
      /**
180✔
327
       * Broadcast that a user was added to the waitlist.
180✔
328
       */
180✔
329
      'waitlist:add': ({
180✔
330
        userID, moderatorID, position, waitlist,
1✔
331
      }) => {
1✔
332
        this.broadcast('waitlistAdd', {
1✔
333
          userID, moderatorID, position, waitlist,
1✔
334
        });
1✔
335
      },
180✔
336
      /**
180✔
337
       * Broadcast that a user was removed from the waitlist.
180✔
338
       */
180✔
339
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
180✔
340
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
341
      },
180✔
342
      /**
180✔
343
       * Broadcast that a user was moved in the waitlist.
180✔
344
       */
180✔
345
      'waitlist:move': ({
180✔
346
        userID, moderatorID, position, waitlist,
3✔
347
      }) => {
3✔
348
        this.broadcast('waitlistMove', {
3✔
349
          userID, moderatorID, position, waitlist,
3✔
350
        });
3✔
351
      },
180✔
352
      /**
180✔
353
       * Broadcast a waitlist update.
180✔
354
       */
180✔
355
      'waitlist:update': (waitlist) => {
180✔
356
        this.broadcast('waitlistUpdate', waitlist);
18✔
357
      },
180✔
358
      /**
180✔
359
       * Broadcast that the waitlist was cleared.
180✔
360
       */
180✔
361
      'waitlist:clear': ({ moderatorID }) => {
180✔
362
        this.broadcast('waitlistClear', { moderatorID });
×
363
      },
180✔
364
      /**
180✔
365
       * Broadcast that the waitlist was locked.
180✔
366
       */
180✔
367
      'waitlist:lock': ({ moderatorID, locked }) => {
180✔
368
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
369
      },
180✔
370

180✔
371
      'acl:allow': ({ userID, roles }) => {
180✔
372
        this.broadcast('acl:allow', { userID, roles });
84✔
373
      },
180✔
374
      'acl:disallow': ({ userID, roles }) => {
180✔
375
        this.broadcast('acl:disallow', { userID, roles });
2✔
376
      },
180✔
377

180✔
378
      'user:update': ({ userID, moderatorID, new: update }) => {
180✔
379
        // TODO Remove this remnant of the old roles system
3✔
380
        if ('role' in update) {
3!
381
          this.broadcast('roleChange', {
×
382
            moderatorID,
×
383
            userID,
×
384
            role: update.role,
×
385
          });
×
386
        }
×
387
        if ('username' in update) {
3✔
388
          this.broadcast('nameChange', {
3✔
389
            moderatorID,
3✔
390
            userID,
3✔
391
            username: update.username,
3✔
392
          });
3✔
393
        }
3✔
394
      },
180✔
395
      'user:join': async ({ userID }) => {
180✔
396
        const { users, keyv } = this.#uw;
27✔
397
        const user = await users.getUser(userID);
27✔
398
        if (user) {
27✔
399
          // TODO this should not be the socket server code's responsibility
27✔
400
          const userIDs = /** @type {import('./schema').UserID[] | null} */ (
27✔
401
            await keyv.get(KEY_ACTIVE_SESSIONS)
27✔
402
          ) ?? [];
27✔
403
          userIDs.push(user.id);
27✔
404
          await keyv.set(KEY_ACTIVE_SESSIONS, userIDs);
27✔
405
          this.broadcast('join', serializeUser(user));
27✔
406
        }
27✔
407
      },
180✔
408
      /**
180✔
409
       * Broadcast that a user left the server.
180✔
410
       */
180✔
411
      'user:leave': ({ userID }) => {
180✔
412
        this.broadcast('leave', userID);
×
413
      },
180✔
414
      /**
180✔
415
       * Broadcast a ban event.
180✔
416
       */
180✔
417
      'user:ban': ({
180✔
418
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
419
      }) => {
3✔
420
        this.broadcast('ban', {
3✔
421
          moderatorID, userID, permanent, duration, expiresAt,
3✔
422
        });
3✔
423

3✔
424
        this.#connections.forEach((connection) => {
3✔
425
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
426
            connection.ban();
×
427
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
428
            connection.close();
×
429
          }
×
430
        });
3✔
431
      },
180✔
432
      /**
180✔
433
       * Broadcast an unban event.
180✔
434
       */
180✔
435
      'user:unban': ({ moderatorID, userID }) => {
180✔
436
        this.broadcast('unban', { moderatorID, userID });
1✔
437
      },
180✔
438
      /**
180✔
439
       * Force-close a connection.
180✔
440
       */
180✔
441
      'http-api:socket:close': (userID) => {
180✔
442
        this.#connections.forEach((connection) => {
×
443
          if ('user' in connection && connection.user.id === userID) {
×
444
            connection.close();
×
445
          }
×
446
        });
×
447
      },
180✔
448

180✔
449
      'emotes:reload': () => {
180✔
450
        this.broadcast('reloadEmotes', null);
×
451
      },
180✔
452
    };
180✔
453
  }
180✔
454

180✔
455
  /**
180✔
456
   * Create `LostConnection`s for every user that's known to be online, but that
180✔
457
   * is not currently connected to the socket server.
180✔
458
   *
180✔
459
   * @private
180✔
460
   */
180✔
461
  async initLostConnections() {
180✔
462
    const { db, keyv } = this.#uw;
180✔
463
    const userIDs = /** @type {import('./schema').UserID[] | null} */ (
180✔
464
      await keyv.get(KEY_ACTIVE_SESSIONS)
180✔
465
    ) ?? [];
180✔
466
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
180✔
467

180✔
468
    if (disconnectedIDs.length === 0) {
180✔
469
      return;
180✔
470
    }
180✔
471

×
472
    const disconnectedUsers = await db.selectFrom('users')
×
473
      .where('id', 'in', disconnectedIDs)
×
474
      .selectAll()
×
475
      .execute();
×
NEW
476
    disconnectedUsers.forEach((_user) => {
×
NEW
477
      // TODO
×
NEW
478
      // this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
×
UNCOV
479
    });
×
480
  }
180✔
481

180✔
482
  /**
180✔
483
   * @param {import('ws').WebSocket} socket
180✔
484
   * @param {import('http').IncomingMessage} request
180✔
485
   * @private
180✔
486
   */
180✔
487
  onSocketConnected(socket, request) {
180✔
488
    this.#logger.info({ req: request }, 'new connection');
29✔
489

29✔
490
    socket.on('error', (error) => {
29✔
491
      this.onSocketError(socket, error);
×
492
    });
29✔
493
    this.add(this.createGuestConnection(socket));
29✔
494
  }
29✔
495

180✔
496
  /**
180✔
497
   * @param {import('ws').WebSocket} socket
180✔
498
   * @param {Error} error
180✔
499
   * @private
180✔
500
   */
180✔
501
  onSocketError(socket, error) {
180✔
502
    this.#logger.warn({ err: error }, 'socket error');
×
503

×
504
    this.options.onError(socket, error);
×
505
  }
×
506

180✔
507
  /**
180✔
508
   * @param {Error} error
180✔
509
   * @private
180✔
510
   */
180✔
511
  onError(error) {
180✔
512
    this.#logger.error({ err: error }, 'server error');
×
513

×
514
    this.options.onError(undefined, error);
×
515
  }
×
516

180✔
517
  /**
180✔
518
   * Get a LostConnection for a user, if one exists.
180✔
519
   *
180✔
520
   * @param {string} sessionID
180✔
521
   * @private
180✔
522
   */
180✔
523
  getLostConnection(sessionID) {
180✔
524
    return this.#connections.find((connection) => (
2✔
525
      connection instanceof LostConnection && connection.sessionID === sessionID
2✔
526
    ));
2✔
527
  }
2✔
528

180✔
529
  /**
180✔
530
   * Create a connection instance for an unauthenticated user.
180✔
531
   *
180✔
532
   * @param {import('ws').WebSocket} socket
180✔
533
   * @private
180✔
534
   */
180✔
535
  createGuestConnection(socket) {
180✔
536
    const connection = new GuestConnection(
29✔
537
      this.#uw,
29✔
538
      this.options.sessionStore,
29✔
539
      socket,
29✔
540
      { authRegistry: this.authRegistry },
29✔
541
    );
29✔
542
    connection.on('close', () => {
29✔
543
      this.remove(connection);
×
544
    });
29✔
545
    connection.on('authenticate', async ({ user, sessionID, lastEventID }) => {
29✔
546
      const isReconnect = await connection.isReconnect(sessionID);
29✔
547
      this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
29✔
548
      if (isReconnect) {
29✔
549
        const previousConnection = this.getLostConnection(sessionID);
2✔
550
        if (previousConnection) this.remove(previousConnection);
2✔
551
      }
2✔
552

29✔
553
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));
29✔
554

29✔
555
      if (!isReconnect) {
29✔
556
        this.#uw.publish('user:join', { userID: user.id });
27✔
557
      }
27✔
558
    });
29✔
559
    return connection;
29✔
560
  }
29✔
561

180✔
562
  /**
180✔
563
   * Create a connection instance for an authenticated user.
180✔
564
   *
180✔
565
   * @param {import('ws').WebSocket} socket
180✔
566
   * @param {User} user
180✔
567
   * @param {string} sessionID
180✔
568
   * @param {string|null} lastEventID
180✔
569
   * @returns {AuthedConnection}
180✔
570
   * @private
180✔
571
   */
180✔
572
  createAuthedConnection(socket, user, sessionID, lastEventID) {
180✔
573
    const connection = new AuthedConnection(
29✔
574
      this.#uw,
29✔
575
      this.options.sessionStore,
29✔
576
      socket,
29✔
577
      user,
29✔
578
      sessionID,
29✔
579
      lastEventID,
29✔
580
    );
29✔
581
    connection.on('close', ({ banned, lastEventID }) => {
29✔
582
      if (banned) {
29!
583
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
584
        disconnectUser(this.#uw, user.id);
×
585
      } else if (!this.#closing) {
29✔
586
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
587
        this.add(this.createLostConnection(user, sessionID, lastEventID));
2✔
588
      }
2✔
589
      this.remove(connection);
29✔
590
    });
29✔
591
    connection.on(
29✔
592
      'command',
29✔
593
      ({ command, data }) => {
29✔
594
        this.#logger.trace({ userId: user.id, command, data }, 'command');
11✔
595
        if (has(this.#clientActions, command)) {
11✔
596
          // Ignore incorrect input
11✔
597
          const validate = this.#clientActionSchemas[command];
11✔
598
          if (validate && !validate(data)) {
11!
599
            return;
×
600
          }
×
601

11✔
602
          const action = this.#clientActions[command];
11✔
603
          // @ts-expect-error TS2345 `data` is validated
11✔
604
          action(user, data, connection);
11✔
605
        }
11✔
606
      },
29✔
607
    );
29✔
608
    return connection;
29✔
609
  }
29✔
610

180✔
611
  /**
180✔
612
   * Create a connection instance for a user who disconnected.
180✔
613
   *
180✔
614
   * @param {User} user
180✔
615
   * @param {string} sessionID
180✔
616
   * @param {string|null} lastEventID
180✔
617
   * @returns {LostConnection}
180✔
618
   * @private
180✔
619
   */
180✔
620
  createLostConnection(user, sessionID, lastEventID) {
180✔
621
    const connection = new LostConnection(
2✔
622
      this.#uw,
2✔
623
      this.options.sessionStore,
2✔
624
      user,
2✔
625
      sessionID,
2✔
626
      lastEventID,
2✔
627
      this.options.timeout,
2✔
628
    );
2✔
629
    connection.on('close', () => {
2✔
630
      this.#logger.info({ userId: user.id }, 'user left');
1✔
631
      this.remove(connection);
1✔
632
      // Only register that the user left if they didn't have another connection
1✔
633
      // still open.
1✔
634
      if (!this.connection(user)) {
1✔
635
        disconnectUser(this.#uw, user.id);
1✔
636
      }
1✔
637
    });
2✔
638
    return connection;
2✔
639
  }
2✔
640

180✔
641
  /**
180✔
642
   * Add a connection.
180✔
643
   *
180✔
644
   * @param {Connection} connection
180✔
645
   * @private
180✔
646
   */
180✔
647
  add(connection) {
180✔
648
    const userID = 'user' in connection ? connection.user.id : null;
60✔
649
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
650
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
60✔
651

60✔
652
    this.#connections.push(connection);
60✔
653
    this.#guestCountDirty = true;
60✔
654
  }
60✔
655

180✔
656
  /**
180✔
657
   * Remove a connection.
180✔
658
   *
180✔
659
   * @param {Connection} connection
180✔
660
   * @private
180✔
661
   */
180✔
662
  remove(connection) {
180✔
663
    const userID = 'user' in connection ? connection.user.id : null;
60✔
664
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
665
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
60✔
666

60✔
667
    const i = this.#connections.indexOf(connection);
60✔
668
    this.#connections.splice(i, 1);
60✔
669

60✔
670
    connection.removed();
60✔
671
    this.#guestCountDirty = true;
60✔
672
  }
60✔
673

180✔
674
  /**
180✔
675
   * Replace a connection instance with another connection instance. Useful when
180✔
676
   * a connection changes "type", like GuestConnection → AuthedConnection.
180✔
677
   *
180✔
678
   * @param {Connection} oldConnection
180✔
679
   * @param {Connection} newConnection
180✔
680
   * @private
180✔
681
   */
180✔
682
  replace(oldConnection, newConnection) {
180✔
683
    this.remove(oldConnection);
29✔
684
    this.add(newConnection);
29✔
685
  }
29✔
686

180✔
687
  /**
180✔
688
   * Handle command messages coming in from elsewhere in the app.
180✔
689
   * Some commands are intended to broadcast immediately to all connected
180✔
690
   * clients, but others require special action.
180✔
691
   *
180✔
692
   * @template {keyof import('./redisMessages.js').ServerActionParameters} K
180✔
693
   * @param {K} command
180✔
694
   * @param {import('./redisMessages.js').ServerActionParameters[K]} data
180✔
695
   */
180✔
696
  #onServerMessage(command, data) {
180✔
697
    this.#logger.trace({ channel: command, command, data }, 'server message');
263✔
698

263✔
699
    const action = this.#serverActions[command];
263✔
700
    if (action !== undefined) {
263✔
701
      action(data);
240✔
702
    }
240✔
703
  }
263✔
704

180✔
705
  /**
180✔
706
   * Stop the socket server.
180✔
707
   *
180✔
708
   * @returns {Promise<void>}
180✔
709
   */
180✔
710
  async destroy() {
180✔
711
    this.#closing = true;
180✔
712

180✔
713
    this.#unsubscribe();
180✔
714
    clearInterval(this.#pinger);
180✔
715
    clearInterval(this.#guestCountInterval);
180✔
716

180✔
717
    for (const connection of this.#connections) {
180✔
718
      connection.close();
28✔
719
    }
28✔
720

180✔
721
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
180✔
722
    await closeWsServer();
180✔
723
  }
180✔
724

180✔
725
  /**
180✔
726
   * Get the connection instance for a specific user.
180✔
727
   *
180✔
728
   * @param {User|import('./schema.js').UserID} user The user.
180✔
729
   * @returns {Connection|undefined}
180✔
730
   */
180✔
731
  connection(user) {
180✔
732
    const userID = typeof user === 'object' ? user.id : user;
1!
733
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
1✔
734
  }
1✔
735

180✔
736
  ping() {
180✔
737
    this.#connections.forEach((connection) => {
×
738
      connection.ping();
×
739
    });
×
740

×
741
    this.#cleanupMessageQueue().catch((err) => {
×
742
      this.#logger.error({ err }, 'failed to clean up socket message queue');
×
743
    });
×
744
  }
×
745

180✔
746
  async #cleanupMessageQueue() {
180✔
747
    const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
×
748

×
749
    await this.#uw.db.deleteFrom('socketMessageQueue')
×
750
      .where('id', '<', oldestID)
×
751
      .execute();
×
752
  }
×
753

180✔
754
  /**
180✔
755
   * Broadcast a command to all connected clients.
180✔
756
   *
180✔
757
   * @param {string} command Command name.
180✔
758
   * @param {import('type-fest').JsonValue} data Command data.
180✔
759
   * @param {import('./schema.js').UserID | null} targetUserID
180✔
760
   */
180✔
761
  #recordMessage(command, data, targetUserID = null) {
180✔
762
    const id = ulid();
240✔
763

240✔
764
    this.#uw.db.insertInto('socketMessageQueue')
240✔
765
      .values({
240✔
766
        id,
240✔
767
        command,
240✔
768
        data: jsonb(data),
240✔
769
        targetUserID,
240✔
770
      })
240✔
771
      .execute();
240✔
772

240✔
773
    return id;
240✔
774
  }
240✔
775

180✔
776
  /**
180✔
777
   * Broadcast a command to all connected clients.
180✔
778
   *
180✔
779
   * @param {string} command Command name.
180✔
780
   * @param {import('type-fest').JsonValue} data Command data.
180✔
781
   */
180✔
782
  broadcast(command, data) {
180✔
783
    const id = this.#recordMessage(command, data);
222✔
784

222✔
785
    this.#logger.trace({
222✔
786
      id,
222✔
787
      command,
222✔
788
      data,
222✔
789
      to: this.#connections.map((connection) => (
222✔
790
        'user' in connection ? connection.user.id : null
121!
791
      )),
222✔
792
    }, 'broadcast');
222✔
793

222✔
794
    this.#connections.forEach((connection) => {
222✔
795
      connection.send(id, command, data);
121✔
796
    });
222✔
797
  }
222✔
798

180✔
799
  /**
180✔
800
   * Send a command to a single user.
180✔
801
   *
180✔
802
   * @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
180✔
803
   * @param {string} command Command name.
180✔
804
   * @param {import('type-fest').JsonValue} data Command data.
180✔
805
   */
180✔
806
  sendTo(user, command, data) {
180✔
807
    const userID = typeof user === 'object' ? user.id : user;
18!
808
    const id = this.#recordMessage(command, data, userID);
18✔
809

18✔
810
    this.#connections.forEach((connection) => {
18✔
811
      if ('user' in connection && connection.user.id === userID) {
16✔
812
        connection.send(id, command, data);
13✔
813
      }
13✔
814
    });
18✔
815
  }
18✔
816

180✔
817
  #lastGuestCount = 0;
180✔
818

180✔
819
  /** The number of unauthenticated connections. */
180✔
820
  get guestCount() {
180✔
821
    return this.#connections.reduce((acc, connection) => {
7✔
822
      if (connection instanceof GuestConnection) {
4!
823
        return acc + 1;
×
824
      }
×
825
      return acc;
4✔
826
    }, 0);
7✔
827
  }
7✔
828

180✔
829
  #recountGuests() {
180✔
830
    const guests = this.guestCount;
×
831
    if (guests !== this.#lastGuestCount) {
×
832
      this.#lastGuestCount = guests;
×
833
      this.broadcast('guests', guests);
×
834
    }
×
835
  }
×
836
}
180✔
837

1✔
838
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc