• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 23111234039

15 Mar 2026 01:22PM UTC coverage: 87.215% (+0.1%) from 87.12%
23111234039

Pull #734

github

web-flow
Merge b4680ec97 into 0d40676d8
Pull Request #734: Move away from Redis for key-value usages

1029 of 1218 branches covered (84.48%)

Branch coverage included in aggregate %.

43 of 50 new or added lines in 5 files covered. (86.0%)

4 existing lines in 2 files now uncovered.

10684 of 12212 relevant lines covered (87.49%)

108.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import { WebSocketServer } from 'ws';
1✔
4
import Ajv from 'ajv';
1✔
5
import { stdSerializers } from 'pino';
1✔
6
import { ulid, encodeTime } from 'ulid';
1✔
7
import { socketVote } from './controllers/booth.js';
1✔
8
import { disconnectUser } from './controllers/users.js';
1✔
9
import AuthRegistry from './AuthRegistry.js';
1✔
10
import GuestConnection from './sockets/GuestConnection.js';
1✔
11
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
12
import LostConnection from './sockets/LostConnection.js';
1✔
13
import { serializeUser } from './utils/serialize.js';
1✔
14
import { jsonb } from './utils/sqlite.js';
1✔
15
import { subMinutes } from './utils/date.js';
1✔
16

1✔
17
const { isEmpty } = lodash;
1✔
18

1✔
19
export const KEY_ACTIVE_SESSIONS = 'users';
1✔
20

1✔
21
const PING_INTERVAL = 10_000;
1✔
22
const GUEST_COUNT_INTERVAL = 2_000;
1✔
23

1✔
24
/**
1✔
25
 * @typedef {import('./schema.js').User} User
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
30
 */
1✔
31

1✔
32
/**
1✔
33
 * @typedef {object} ClientActionParameters
1✔
34
 * @prop {string} sendChat
1✔
35
 * @prop {-1 | 1} vote
1✔
36
 * @prop {undefined} logout
1✔
37
 */
1✔
38

1✔
39
/**
1✔
40
 * @typedef {{
1✔
41
 *   [Name in keyof ClientActionParameters]: (
1✔
42
 *     user: User,
1✔
43
 *     parameter: ClientActionParameters[Name],
1✔
44
 *     connection: AuthedConnection
1✔
45
 *   ) => void
1✔
46
 * }} ClientActions
1✔
47
 */
1✔
48

1✔
49
const ajv = new Ajv({
1✔
50
  coerceTypes: false,
1✔
51
  ownProperties: true,
1✔
52
  removeAdditional: true,
1✔
53
  useDefaults: false,
1✔
54
});
1✔
55

1✔
56
/**
1✔
57
 * @template {object} T
1✔
58
 * @param {T} object
1✔
59
 * @param {PropertyKey} property
1✔
60
 * @returns {property is keyof T}
1✔
61
 */
1✔
62
function has(object, property) {
11✔
63
  return Object.prototype.hasOwnProperty.call(object, property);
11✔
64
}
11✔
65

1✔
66
class SocketServer {
1✔
67
  /**
180✔
68
   * @param {import('./Uwave.js').Boot} uw
180✔
69
   * @param {{ secret: Buffer|string }} options
180✔
70
   */
180✔
71
  static async plugin(uw, options) {
180✔
72
    uw.socketServer = new SocketServer(uw, {
180✔
73
      secret: options.secret,
180✔
74
      server: uw.server,
180✔
75
    });
180✔
76

180✔
77
    uw.after(async () => {
180✔
78
      try {
180✔
79
        await uw.socketServer.initLostConnections();
180✔
80
      } catch (err) {
180!
81
        // No need to prevent startup for this.
×
82
        // We do need to clear the `users` list because the lost connection handlers
×
83
        // will not do so.
×
84
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
85
        await uw.keyv.delete(KEY_ACTIVE_SESSIONS);
×
86
      }
×
87
    });
180✔
88

180✔
89
    uw.onClose(async () => {
180✔
90
      await uw.socketServer.destroy();
180✔
91
    });
180✔
92
  }
180✔
93

180✔
94
  #uw;
180✔
95

180✔
96
  #logger;
180✔
97

180✔
98
  #wss;
180✔
99

180✔
100
  #closing = false;
180✔
101

180✔
102
  /** @type {Connection[]} */
180✔
103
  #connections = [];
180✔
104

180✔
105
  #pinger;
180✔
106

180✔
107
  /** Update online guests count and broadcast an update if necessary. */
180✔
108
  #guestCountInterval;
180✔
109

180✔
110
  #guestCountDirty = true;
180✔
111

180✔
112
  /**
180✔
113
   * Handlers for commands that come in from clients.
180✔
114
   *
180✔
115
   * @type {ClientActions}
180✔
116
   */
180✔
117
  #clientActions;
180✔
118

180✔
119
  /**
180✔
120
   * @type {{
180✔
121
   *   [K in keyof ClientActionParameters]:
180✔
122
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
180✔
123
   * }}
180✔
124
   */
180✔
125
  #clientActionSchemas;
180✔
126

180✔
127
  /**
180✔
128
   * Handlers for commands that come in from the server side.
180✔
129
   *
180✔
130
   * @type {import('./redisMessages.js').ServerActions}
180✔
131
   */
180✔
132
  #serverActions;
180✔
133

180✔
134
  #unsubscribe;
180✔
135

180✔
136
  /**
180✔
137
   * Create a socket server.
180✔
138
   *
180✔
139
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
180✔
140
   * @param {object} options Socket server options.
180✔
141
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
180✔
142
   *     users to reconnect before removing them.
180✔
143
   * @param {Buffer|string} options.secret
180✔
144
   * @param {import('http').Server | import('https').Server} [options.server]
180✔
145
   * @param {number} [options.port]
180✔
146
   * @private
180✔
147
   */
180✔
148
  constructor(uw, options) {
180✔
149
    if (!uw) {
180!
150
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
151
    }
×
152

180✔
153
    this.#uw = uw;
180✔
154
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
180✔
155
      serializers: {
180✔
156
        req: stdSerializers.req,
180✔
157
      },
180✔
158
    });
180✔
159

180✔
160
    this.options = {
180✔
161
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
180✔
162
      onError: (_socket, err) => {
180✔
163
        throw err;
×
164
      },
180✔
165
      timeout: 30,
180✔
166
      ...options,
180✔
167
    };
180✔
168

180✔
169
    // TODO put this behind a symbol, it's just public for tests
180✔
170
    this.authRegistry = new AuthRegistry(uw.redis);
180✔
171

180✔
172
    this.#wss = new WebSocketServer({
180✔
173
      server: options.server,
180✔
174
      port: options.server ? undefined : options.port,
180!
175
    });
180✔
176

180✔
177
    this.#unsubscribe = uw.events.onAny((command, data) => {
180✔
178
      this.#onServerMessage(command, data);
262✔
179
    });
180✔
180

180✔
181
    this.#wss.on('error', (error) => {
180✔
182
      this.onError(error);
×
183
    });
180✔
184
    this.#wss.on('connection', (socket, request) => {
180✔
185
      this.onSocketConnected(socket, request);
29✔
186
    });
180✔
187

180✔
188
    this.#pinger = setInterval(() => {
180✔
189
      this.ping();
×
190
    }, PING_INTERVAL);
180✔
191

180✔
192
    this.#guestCountInterval = setInterval(() => {
180✔
193
      if (!this.#guestCountDirty) {
×
194
        return;
×
195
      }
×
196

×
NEW
197
      this.#recountGuests();
×
198
    }, GUEST_COUNT_INTERVAL);
180✔
199

180✔
200
    this.#clientActions = {
180✔
201
      sendChat: (user, message) => {
180✔
202
        this.#logger.trace({ user, message }, 'sendChat');
11✔
203
        this.#uw.chat.send(user, message);
11✔
204
      },
180✔
205
      vote: (user, direction) => {
180✔
206
        socketVote(this.#uw, user.id, direction);
×
207
      },
180✔
208
      logout: (user, _, connection) => {
180✔
209
        this.replace(connection, this.createGuestConnection(connection.socket));
×
210
        if (!this.connection(user)) {
×
211
          disconnectUser(this.#uw, user.id);
×
212
        }
×
213
      },
180✔
214
    };
180✔
215

180✔
216
    this.#clientActionSchemas = {
180✔
217
      sendChat: ajv.compile({
180✔
218
        type: 'string',
180✔
219
      }),
180✔
220
      vote: ajv.compile({
180✔
221
        type: 'integer',
180✔
222
        enum: [-1, 1],
180✔
223
      }),
180✔
224
      logout: ajv.compile(true),
180✔
225
    };
180✔
226

180✔
227
    this.#serverActions = {
180✔
228
      /**
180✔
229
       * Broadcast the next track.
180✔
230
       */
180✔
231
      'advance:complete': (next) => {
180✔
232
        if (next) {
18✔
233
          this.broadcast('advance', {
18✔
234
            historyID: next.historyID,
18✔
235
            userID: next.userID,
18✔
236
            media: next.media,
18✔
237
            playedAt: new Date(next.playedAt).getTime(),
18✔
238
          });
18✔
239
        } else {
18!
240
          this.broadcast('advance', null);
×
241
        }
×
242
      },
180✔
243
      /**
180✔
244
       * Broadcast a skip notification.
180✔
245
       */
180✔
246
      'booth:skip': ({ moderatorID, userID, reason }) => {
180✔
247
        this.broadcast('skip', { moderatorID, userID, reason });
×
248
      },
180✔
249
      /**
180✔
250
       * Broadcast a chat message.
180✔
251
       */
180✔
252
      'chat:message': (message) => {
180✔
253
        this.broadcast('chatMessage', message);
13✔
254
      },
180✔
255
      /**
180✔
256
       * Delete chat messages. The delete filter can have an _id property to
180✔
257
       * delete a specific message, a userID property to delete messages by a
180✔
258
       * user, or be empty to delete all messages.
180✔
259
       */
180✔
260
      'chat:delete': ({ moderatorID, filter }) => {
180✔
261
        if ('id' in filter) {
6✔
262
          this.broadcast('chatDeleteByID', {
2✔
263
            moderatorID,
2✔
264
            _id: filter.id,
2✔
265
          });
2✔
266
        } else if ('userID' in filter) {
6✔
267
          this.broadcast('chatDeleteByUser', {
2✔
268
            moderatorID,
2✔
269
            userID: filter.userID,
2✔
270
          });
2✔
271
        } else if (isEmpty(filter)) {
2✔
272
          this.broadcast('chatDelete', { moderatorID });
2✔
273
        }
2✔
274
      },
180✔
275
      /**
180✔
276
       * Broadcast that a user was muted in chat.
180✔
277
       */
180✔
278
      'chat:mute': ({ moderatorID, userID, duration }) => {
180✔
279
        this.broadcast('chatMute', {
2✔
280
          userID,
2✔
281
          moderatorID,
2✔
282
          expiresAt: Date.now() + duration,
2✔
283
        });
2✔
284
      },
180✔
285
      /**
180✔
286
       * Broadcast that a user was unmuted in chat.
180✔
287
       */
180✔
288
      'chat:unmute': ({ moderatorID, userID }) => {
180✔
289
        this.broadcast('chatUnmute', { userID, moderatorID });
×
290
      },
180✔
291
      /**
180✔
292
       * Broadcast a vote for the current track.
180✔
293
       */
180✔
294
      'booth:vote': ({ userID, direction }) => {
180✔
295
        this.broadcast('vote', {
2✔
296
          _id: userID,
2✔
297
          value: direction,
2✔
298
        });
2✔
299
      },
180✔
300
      /**
180✔
301
       * Broadcast a favorite for the current track.
180✔
302
       */
180✔
303
      'booth:favorite': ({ userID }) => {
180✔
304
        this.broadcast('favorite', { userID });
1✔
305
      },
180✔
306
      /**
180✔
307
       * Cycle a single user's playlist.
180✔
308
       */
180✔
309
      'playlist:cycle': ({ userID, playlistID }) => {
180✔
310
        this.sendTo(userID, 'playlistCycle', { playlistID });
18✔
311
      },
180✔
312
      /**
180✔
313
       * Broadcast that a user joined the waitlist.
180✔
314
       */
180✔
315
      'waitlist:join': ({ userID, waitlist }) => {
180✔
316
        this.broadcast('waitlistJoin', { userID, waitlist });
31✔
317
      },
180✔
318
      /**
180✔
319
       * Broadcast that a user left the waitlist.
180✔
320
       */
180✔
321
      'waitlist:leave': ({ userID, waitlist }) => {
180✔
322
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
323
      },
180✔
324
      /**
180✔
325
       * Broadcast that a user was added to the waitlist.
180✔
326
       */
180✔
327
      'waitlist:add': ({
180✔
328
        userID, moderatorID, position, waitlist,
1✔
329
      }) => {
1✔
330
        this.broadcast('waitlistAdd', {
1✔
331
          userID, moderatorID, position, waitlist,
1✔
332
        });
1✔
333
      },
180✔
334
      /**
180✔
335
       * Broadcast that a user was removed from the waitlist.
180✔
336
       */
180✔
337
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
180✔
338
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
339
      },
180✔
340
      /**
180✔
341
       * Broadcast that a user was moved in the waitlist.
180✔
342
       */
180✔
343
      'waitlist:move': ({
180✔
344
        userID, moderatorID, position, waitlist,
3✔
345
      }) => {
3✔
346
        this.broadcast('waitlistMove', {
3✔
347
          userID, moderatorID, position, waitlist,
3✔
348
        });
3✔
349
      },
180✔
350
      /**
180✔
351
       * Broadcast a waitlist update.
180✔
352
       */
180✔
353
      'waitlist:update': (waitlist) => {
180✔
354
        this.broadcast('waitlistUpdate', waitlist);
18✔
355
      },
180✔
356
      /**
180✔
357
       * Broadcast that the waitlist was cleared.
180✔
358
       */
180✔
359
      'waitlist:clear': ({ moderatorID }) => {
180✔
360
        this.broadcast('waitlistClear', { moderatorID });
×
361
      },
180✔
362
      /**
180✔
363
       * Broadcast that the waitlist was locked.
180✔
364
       */
180✔
365
      'waitlist:lock': ({ moderatorID, locked }) => {
180✔
366
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
367
      },
180✔
368

180✔
369
      'acl:allow': ({ userID, roles }) => {
180✔
370
        this.broadcast('acl:allow', { userID, roles });
83✔
371
      },
180✔
372
      'acl:disallow': ({ userID, roles }) => {
180✔
373
        this.broadcast('acl:disallow', { userID, roles });
2✔
374
      },
180✔
375

180✔
376
      'user:update': ({ userID, moderatorID, new: update }) => {
180✔
377
        // TODO Remove this remnant of the old roles system
3✔
378
        if ('role' in update) {
3!
379
          this.broadcast('roleChange', {
×
380
            moderatorID,
×
381
            userID,
×
382
            role: update.role,
×
383
          });
×
384
        }
×
385
        if ('username' in update) {
3✔
386
          this.broadcast('nameChange', {
3✔
387
            moderatorID,
3✔
388
            userID,
3✔
389
            username: update.username,
3✔
390
          });
3✔
391
        }
3✔
392
      },
180✔
393
      'user:join': async ({ userID }) => {
180✔
394
        const { users, keyv } = this.#uw;
28✔
395
        const user = await users.getUser(userID);
28✔
396
        if (user) {
28✔
397
          // TODO this should not be the socket server code's responsibility
28✔
398
          const userIDs = /** @type {import('./schema').UserID[] | null} */ (
28✔
399
            await keyv.get(KEY_ACTIVE_SESSIONS)
28✔
400
          ) ?? [];
28✔
401
          userIDs.push(user.id);
28✔
402
          await keyv.set(KEY_ACTIVE_SESSIONS, userIDs);
28✔
403
          this.broadcast('join', serializeUser(user));
28✔
404
        }
28✔
405
      },
180✔
406
      /**
180✔
407
       * Broadcast that a user left the server.
180✔
408
       */
180✔
409
      'user:leave': ({ userID }) => {
180✔
410
        this.broadcast('leave', userID);
×
411
      },
180✔
412
      /**
180✔
413
       * Broadcast a ban event.
180✔
414
       */
180✔
415
      'user:ban': ({
180✔
416
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
417
      }) => {
3✔
418
        this.broadcast('ban', {
3✔
419
          moderatorID, userID, permanent, duration, expiresAt,
3✔
420
        });
3✔
421

3✔
422
        this.#connections.forEach((connection) => {
3✔
423
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
424
            connection.ban();
×
425
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
426
            connection.close();
×
427
          }
×
428
        });
3✔
429
      },
180✔
430
      /**
180✔
431
       * Broadcast an unban event.
180✔
432
       */
180✔
433
      'user:unban': ({ moderatorID, userID }) => {
180✔
434
        this.broadcast('unban', { moderatorID, userID });
1✔
435
      },
180✔
436
      /**
180✔
437
       * Force-close a connection.
180✔
438
       */
180✔
439
      'http-api:socket:close': (userID) => {
180✔
440
        this.#connections.forEach((connection) => {
×
441
          if ('user' in connection && connection.user.id === userID) {
×
442
            connection.close();
×
443
          }
×
444
        });
×
445
      },
180✔
446

180✔
447
      'emotes:reload': () => {
180✔
448
        this.broadcast('reloadEmotes', null);
×
449
      },
180✔
450
    };
180✔
451
  }
180✔
452

180✔
453
  /**
180✔
454
   * Create `LostConnection`s for every user that's known to be online, but that
180✔
455
   * is not currently connected to the socket server.
180✔
456
   *
180✔
457
   * @private
180✔
458
   */
180✔
459
  async initLostConnections() {
180✔
460
    const { db, keyv } = this.#uw;
180✔
461
    const userIDs = /** @type {import('./schema').UserID[] | null} */ (
180✔
462
      await keyv.get(KEY_ACTIVE_SESSIONS)
180✔
463
    ) ?? [];
180✔
464
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
180✔
465

180✔
466
    if (disconnectedIDs.length === 0) {
180✔
467
      return;
180✔
468
    }
180✔
469

×
470
    const disconnectedUsers = await db.selectFrom('users')
×
471
      .where('id', 'in', disconnectedIDs)
×
472
      .selectAll()
×
473
      .execute();
×
474
    disconnectedUsers.forEach((user) => {
×
475
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
×
476
    });
×
477
  }
180✔
478

180✔
479
  /**
180✔
480
   * @param {import('ws').WebSocket} socket
180✔
481
   * @param {import('http').IncomingMessage} request
180✔
482
   * @private
180✔
483
   */
180✔
484
  onSocketConnected(socket, request) {
180✔
485
    this.#logger.info({ req: request }, 'new connection');
29✔
486

29✔
487
    socket.on('error', (error) => {
29✔
488
      this.onSocketError(socket, error);
×
489
    });
29✔
490
    this.add(this.createGuestConnection(socket));
29✔
491
  }
29✔
492

180✔
493
  /**
180✔
494
   * @param {import('ws').WebSocket} socket
180✔
495
   * @param {Error} error
180✔
496
   * @private
180✔
497
   */
180✔
498
  onSocketError(socket, error) {
180✔
499
    this.#logger.warn({ err: error }, 'socket error');
×
500

×
501
    this.options.onError(socket, error);
×
502
  }
×
503

180✔
504
  /**
180✔
505
   * @param {Error} error
180✔
506
   * @private
180✔
507
   */
180✔
508
  onError(error) {
180✔
509
    this.#logger.error({ err: error }, 'server error');
×
510

×
511
    this.options.onError(undefined, error);
×
512
  }
×
513

180✔
514
  /**
180✔
515
   * Get a LostConnection for a user, if one exists.
180✔
516
   *
180✔
517
   * @param {string} sessionID
180✔
518
   * @private
180✔
519
   */
180✔
520
  getLostConnection(sessionID) {
180✔
521
    return this.#connections.find((connection) => (
1✔
522
      connection instanceof LostConnection && connection.sessionID === sessionID
1✔
523
    ));
1✔
524
  }
1✔
525

180✔
526
  /**
180✔
527
   * Create a connection instance for an unauthenticated user.
180✔
528
   *
180✔
529
   * @param {import('ws').WebSocket} socket
180✔
530
   * @private
180✔
531
   */
180✔
532
  createGuestConnection(socket) {
180✔
533
    const connection = new GuestConnection(this.#uw, socket, {
29✔
534
      authRegistry: this.authRegistry,
29✔
535
    });
29✔
536
    connection.on('close', () => {
29✔
537
      this.remove(connection);
×
538
    });
29✔
539
    connection.on('authenticate', async ({ user, sessionID, lastEventID }) => {
29✔
540
      const isReconnect = await connection.isReconnect(sessionID);
29✔
541
      this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
29✔
542
      if (isReconnect) {
29✔
543
        const previousConnection = this.getLostConnection(sessionID);
1✔
544
        if (previousConnection) this.remove(previousConnection);
1✔
545
      }
1✔
546

29✔
547
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));
29✔
548

29✔
549
      if (!isReconnect) {
29✔
550
        this.#uw.publish('user:join', { userID: user.id });
28✔
551
      }
28✔
552
    });
29✔
553
    return connection;
29✔
554
  }
29✔
555

180✔
556
  /**
180✔
557
   * Create a connection instance for an authenticated user.
180✔
558
   *
180✔
559
   * @param {import('ws').WebSocket} socket
180✔
560
   * @param {User} user
180✔
561
   * @param {string} sessionID
180✔
562
   * @param {string|null} lastEventID
180✔
563
   * @returns {AuthedConnection}
180✔
564
   * @private
180✔
565
   */
180✔
566
  createAuthedConnection(socket, user, sessionID, lastEventID) {
180✔
567
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID, lastEventID);
29✔
568
    connection.on('close', ({ banned, lastEventID }) => {
29✔
569
      if (banned) {
29!
570
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
571
        disconnectUser(this.#uw, user.id);
×
572
      } else if (!this.#closing) {
29✔
573
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
574
        this.add(this.createLostConnection(user, sessionID, lastEventID));
2✔
575
      }
2✔
576
      this.remove(connection);
29✔
577
    });
29✔
578
    connection.on(
29✔
579
      'command',
29✔
580
      ({ command, data }) => {
29✔
581
        this.#logger.trace({ userId: user.id, command, data }, 'command');
11✔
582
        if (has(this.#clientActions, command)) {
11✔
583
          // Ignore incorrect input
11✔
584
          const validate = this.#clientActionSchemas[command];
11✔
585
          if (validate && !validate(data)) {
11!
586
            return;
×
587
          }
×
588

11✔
589
          const action = this.#clientActions[command];
11✔
590
          // @ts-expect-error TS2345 `data` is validated
11✔
591
          action(user, data, connection);
11✔
592
        }
11✔
593
      },
29✔
594
    );
29✔
595
    return connection;
29✔
596
  }
29✔
597

180✔
598
  /**
180✔
599
   * Create a connection instance for a user who disconnected.
180✔
600
   *
180✔
601
   * @param {User} user
180✔
602
   * @param {string} sessionID
180✔
603
   * @param {string|null} lastEventID
180✔
604
   * @returns {LostConnection}
180✔
605
   * @private
180✔
606
   */
180✔
607
  createLostConnection(user, sessionID, lastEventID) {
180✔
608
    const connection = new LostConnection(
2✔
609
      this.#uw,
2✔
610
      user,
2✔
611
      sessionID,
2✔
612
      lastEventID,
2✔
613
      this.options.timeout,
2✔
614
    );
2✔
615
    connection.on('close', () => {
2✔
616
      this.#logger.info({ userId: user.id }, 'user left');
1✔
617
      this.remove(connection);
1✔
618
      // Only register that the user left if they didn't have another connection
1✔
619
      // still open.
1✔
620
      if (!this.connection(user)) {
1✔
621
        disconnectUser(this.#uw, user.id);
1✔
622
      }
1✔
623
    });
2✔
624
    return connection;
2✔
625
  }
2✔
626

180✔
627
  /**
180✔
628
   * Add a connection.
180✔
629
   *
180✔
630
   * @param {Connection} connection
180✔
631
   * @private
180✔
632
   */
180✔
633
  add(connection) {
180✔
634
    const userID = 'user' in connection ? connection.user.id : null;
60✔
635
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
636
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
60✔
637

60✔
638
    this.#connections.push(connection);
60✔
639
    this.#guestCountDirty = true;
60✔
640
  }
60✔
641

180✔
642
  /**
180✔
643
   * Remove a connection.
180✔
644
   *
180✔
645
   * @param {Connection} connection
180✔
646
   * @private
180✔
647
   */
180✔
648
  remove(connection) {
180✔
649
    const userID = 'user' in connection ? connection.user.id : null;
60✔
650
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
60✔
651
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
60✔
652

60✔
653
    const i = this.#connections.indexOf(connection);
60✔
654
    this.#connections.splice(i, 1);
60✔
655

60✔
656
    connection.removed();
60✔
657
    this.#guestCountDirty = true;
60✔
658
  }
60✔
659

180✔
660
  /**
180✔
661
   * Replace a connection instance with another connection instance. Useful when
180✔
662
   * a connection changes "type", like GuestConnection → AuthedConnection.
180✔
663
   *
180✔
664
   * @param {Connection} oldConnection
180✔
665
   * @param {Connection} newConnection
180✔
666
   * @private
180✔
667
   */
180✔
668
  replace(oldConnection, newConnection) {
180✔
669
    this.remove(oldConnection);
29✔
670
    this.add(newConnection);
29✔
671
  }
29✔
672

180✔
673
  /**
180✔
674
   * Handle command messages coming in from elsewhere in the app.
180✔
675
   * Some commands are intended to broadcast immediately to all connected
180✔
676
   * clients, but others require special action.
180✔
677
   *
180✔
678
   * @template {keyof import('./redisMessages.js').ServerActionParameters} K
180✔
679
   * @param {K} command
180✔
680
   * @param {import('./redisMessages.js').ServerActionParameters[K]} data
180✔
681
   */
180✔
682
  #onServerMessage(command, data) {
180✔
683
    this.#logger.trace({ channel: command, command, data }, 'server message');
262✔
684

262✔
685
    const action = this.#serverActions[command];
262✔
686
    if (action !== undefined) {
262✔
687
      action(data);
240✔
688
    }
240✔
689
  }
262✔
690

180✔
691
  /**
180✔
692
   * Stop the socket server.
180✔
693
   *
180✔
694
   * @returns {Promise<void>}
180✔
695
   */
180✔
696
  async destroy() {
180✔
697
    this.#closing = true;
180✔
698

180✔
699
    this.#unsubscribe();
180✔
700
    clearInterval(this.#pinger);
180✔
701
    clearInterval(this.#guestCountInterval);
180✔
702

180✔
703
    for (const connection of this.#connections) {
180✔
704
      connection.close();
28✔
705
    }
28✔
706

180✔
707
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
180✔
708
    await closeWsServer();
180✔
709
  }
180✔
710

180✔
711
  /**
180✔
712
   * Get the connection instance for a specific user.
180✔
713
   *
180✔
714
   * @param {User|import('./schema.js').UserID} user The user.
180✔
715
   * @returns {Connection|undefined}
180✔
716
   */
180✔
717
  connection(user) {
180✔
718
    const userID = typeof user === 'object' ? user.id : user;
1!
719
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
1✔
720
  }
1✔
721

180✔
722
  ping() {
180✔
723
    this.#connections.forEach((connection) => {
×
724
      connection.ping();
×
725
    });
×
726

×
727
    this.#cleanupMessageQueue().catch((err) => {
×
728
      this.#logger.error({ err }, 'failed to clean up socket message queue');
×
729
    });
×
730
  }
×
731

180✔
732
  async #cleanupMessageQueue() {
180✔
733
    const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
×
734

×
735
    await this.#uw.db.deleteFrom('socketMessageQueue')
×
736
      .where('id', '<', oldestID)
×
737
      .execute();
×
738
  }
×
739

180✔
740
  /**
180✔
741
   * Broadcast a command to all connected clients.
180✔
742
   *
180✔
743
   * @param {string} command Command name.
180✔
744
   * @param {import('type-fest').JsonValue} data Command data.
180✔
745
   * @param {import('./schema.js').UserID | null} targetUserID
180✔
746
   */
180✔
747
  #recordMessage(command, data, targetUserID = null) {
180✔
748
    const id = ulid();
240✔
749

240✔
750
    this.#uw.db.insertInto('socketMessageQueue')
240✔
751
      .values({
240✔
752
        id,
240✔
753
        command,
240✔
754
        data: jsonb(data),
240✔
755
        targetUserID,
240✔
756
      })
240✔
757
      .execute();
240✔
758

240✔
759
    return id;
240✔
760
  }
240✔
761

180✔
762
  /**
180✔
763
   * Broadcast a command to all connected clients.
180✔
764
   *
180✔
765
   * @param {string} command Command name.
180✔
766
   * @param {import('type-fest').JsonValue} data Command data.
180✔
767
   */
180✔
768
  broadcast(command, data) {
180✔
769
    const id = this.#recordMessage(command, data);
222✔
770

222✔
771
    this.#logger.trace({
222✔
772
      id,
222✔
773
      command,
222✔
774
      data,
222✔
775
      to: this.#connections.map((connection) => (
222✔
776
        'user' in connection ? connection.user.id : null
122!
777
      )),
222✔
778
    }, 'broadcast');
222✔
779

222✔
780
    this.#connections.forEach((connection) => {
222✔
781
      connection.send(id, command, data);
122✔
782
    });
222✔
783
  }
222✔
784

180✔
785
  /**
180✔
786
   * Send a command to a single user.
180✔
787
   *
180✔
788
   * @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
180✔
789
   * @param {string} command Command name.
180✔
790
   * @param {import('type-fest').JsonValue} data Command data.
180✔
791
   */
180✔
792
  sendTo(user, command, data) {
180✔
793
    const userID = typeof user === 'object' ? user.id : user;
18!
794
    const id = this.#recordMessage(command, data, userID);
18✔
795

18✔
796
    this.#connections.forEach((connection) => {
18✔
797
      if ('user' in connection && connection.user.id === userID) {
16✔
798
        connection.send(id, command, data);
13✔
799
      }
13✔
800
    });
18✔
801
  }
18✔
802

180✔
803
  #lastGuestCount = 0;
180✔
804

180✔
805
  /** The number of unauthenticated connections. */
180✔
806
  get guestCount() {
180✔
807
    return this.#connections.reduce((acc, connection) => {
6✔
808
      if (connection instanceof GuestConnection) {
2!
NEW
809
        return acc + 1;
×
NEW
810
      }
×
811
      return acc;
2✔
812
    }, 0);
6✔
813
  }
6✔
814

180✔
815
  #recountGuests() {
180✔
NEW
816
    const guests = this.guestCount;
×
NEW
817
    if (guests !== this.#lastGuestCount) {
×
NEW
818
      this.#lastGuestCount = guests;
×
819
      this.broadcast('guests', guests);
×
820
    }
×
821
  }
×
822
}
180✔
823

1✔
824
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc