• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12201221070

06 Dec 2024 03:11PM UTC coverage: 85.316% (+1.0%) from 84.36%
12201221070

push

github

web-flow
Improve session handling (#682)

* Use the session ID as the key for lost socket messages

This supports multiple connections per user.

* Restore login state from session

* No longer set uwsession

* Continue to support existing uwsession tokens

* lint

* Do not store JWT auth in session

* Mimick session ID for JWT auth

The main point is to allow the tests to continue to use JWT auth,
while also testing session features such as missing messages for lost
connections

* Test for lost connection

* use off the shelf redis store

933 of 1112 branches covered (83.9%)

Branch coverage included in aggregate %.

79 of 120 new or added lines in 11 files covered. (65.83%)

6 existing lines in 1 file now uncovered.

9984 of 11684 relevant lines covered (85.45%)

90.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.51
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./schema.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
201✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
201✔
60
}
201✔
61

1✔
62
class SocketServer {
144✔
63
  /**
144✔
64
   * @param {import('./Uwave.js').Boot} uw
144✔
65
   * @param {{ secret: Buffer|string }} options
144✔
66
   */
144✔
67
  static async plugin(uw, options) {
144✔
68
    uw.socketServer = new SocketServer(uw, {
144✔
69
      secret: options.secret,
144✔
70
      server: uw.server,
144✔
71
    });
144✔
72

144✔
73
    uw.after(async () => {
144✔
74
      try {
144✔
75
        await uw.socketServer.initLostConnections();
144✔
76
      } catch (err) {
144!
77
        // No need to prevent startup for this.
×
78
        // We do need to clear the `users` list because the lost connection handlers
×
79
        // will not do so.
×
80
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
81
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
82
      }
×
83
    });
144✔
84

144✔
85
    uw.onClose(async () => {
144✔
86
      await uw.socketServer.destroy();
144✔
87
    });
144✔
88
  }
144✔
89

144✔
90
  #uw;
144✔
91

144✔
92
  #logger;
144✔
93

144✔
94
  #redisSubscription;
144✔
95

144✔
96
  #wss;
144✔
97

144✔
98
  #closing = false;
144✔
99

144✔
100
  /** @type {Connection[]} */
144✔
101
  #connections = [];
144✔
102

144✔
103
  #pinger;
144✔
104

144✔
105
  /**
144✔
106
   * Update online guests count and broadcast an update if necessary.
144✔
107
   */
144✔
108
  #recountGuests;
144✔
109

144✔
110
  /**
144✔
111
   * Handlers for commands that come in from clients.
144✔
112
   *
144✔
113
   * @type {ClientActions}
144✔
114
   */
144✔
115
  #clientActions;
144✔
116

144✔
117
  /**
144✔
118
   * @type {{
144✔
119
   *   [K in keyof ClientActionParameters]:
144✔
120
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
144✔
121
   * }}
144✔
122
   */
144✔
123
  #clientActionSchemas;
144✔
124

144✔
125
  /**
144✔
126
   * Handlers for commands that come in from the server side.
144✔
127
   *
144✔
128
   * @type {import('./redisMessages.js').ServerActions}
144✔
129
   */
144✔
130
  #serverActions;
144✔
131

144✔
132
  /**
144✔
133
   * Create a socket server.
144✔
134
   *
144✔
135
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
144✔
136
   * @param {object} options Socket server options.
144✔
137
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
144✔
138
   *     users to reconnect before removing them.
144✔
139
   * @param {Buffer|string} options.secret
144✔
140
   * @param {import('http').Server | import('https').Server} [options.server]
144✔
141
   * @param {number} [options.port]
144✔
142
   * @private
144✔
143
   */
144✔
144
  constructor(uw, options) {
144✔
145
    if (!uw) {
144!
146
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
147
    }
×
148

144✔
149
    this.#uw = uw;
144✔
150
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
144✔
151
      serializers: {
144✔
152
        req: stdSerializers.req,
144✔
153
      },
144✔
154
    });
144✔
155
    this.#redisSubscription = uw.redis.duplicate();
144✔
156

144✔
157
    this.options = {
144✔
158
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
144✔
159
      onError: (_socket, err) => {
144✔
160
        throw err;
×
161
      },
144✔
162
      timeout: 30,
144✔
163
      ...options,
144✔
164
    };
144✔
165

144✔
166
    // TODO put this behind a symbol, it's just public for tests
144✔
167
    this.authRegistry = new AuthRegistry(uw.redis);
144✔
168

144✔
169
    this.#wss = new WebSocketServer({
144✔
170
      server: options.server,
144✔
171
      port: options.server ? undefined : options.port,
144!
172
    });
144✔
173

144✔
174
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
144✔
175
    this.#redisSubscription.on('message', (channel, command) => {
144✔
176
      // this returns a promise, but we don't handle the error case:
194✔
177
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
194✔
178
      this.onServerMessage(channel, command);
194✔
179
    });
144✔
180

144✔
181
    this.#wss.on('error', (error) => {
144✔
182
      this.onError(error);
×
183
    });
144✔
184
    this.#wss.on('connection', (socket, request) => {
144✔
185
      this.onSocketConnected(socket, request);
25✔
186
    });
144✔
187

144✔
188
    this.#pinger = setInterval(() => {
144✔
189
      this.ping();
×
190
    }, ms('10 seconds'));
144✔
191

144✔
192
    this.#recountGuests = debounce(() => {
144✔
193
      if (this.#closing) {
21✔
194
        return;
21✔
195
      }
21✔
196
      this.#recountGuestsInternal().catch((error) => {
×
197
        this.#logger.error({ err: error }, 'counting guests failed');
×
198
      });
×
199
    }, ms('2 seconds'));
144✔
200

144✔
201
    this.#clientActions = {
144✔
202
      sendChat: (user, message) => {
144✔
203
        this.#logger.trace({ user, message }, 'sendChat');
7✔
204
        this.#uw.chat.send(user, message);
7✔
205
      },
144✔
206
      vote: (user, direction) => {
144✔
207
        socketVote(this.#uw, user.id, direction);
×
208
      },
144✔
209
      logout: (user, _, connection) => {
144✔
210
        this.replace(connection, this.createGuestConnection(connection.socket));
×
211
        if (!this.connection(user)) {
×
212
          disconnectUser(this.#uw, user.id);
×
213
        }
×
214
      },
144✔
215
    };
144✔
216

144✔
217
    this.#clientActionSchemas = {
144✔
218
      sendChat: ajv.compile({
144✔
219
        type: 'string',
144✔
220
      }),
144✔
221
      vote: ajv.compile({
144✔
222
        type: 'integer',
144✔
223
        enum: [-1, 1],
144✔
224
      }),
144✔
225
      logout: ajv.compile(true),
144✔
226
    };
144✔
227

144✔
228
    this.#serverActions = {
144✔
229
      /**
144✔
230
       * Broadcast the next track.
144✔
231
       */
144✔
232
      'advance:complete': (next) => {
144✔
233
        if (next) {
15✔
234
          this.broadcast('advance', {
15✔
235
            historyID: next.historyID,
15✔
236
            userID: next.userID,
15✔
237
            media: next.media,
15✔
238
            playedAt: new Date(next.playedAt).getTime(),
15✔
239
          });
15✔
240
        } else {
15!
241
          this.broadcast('advance', null);
×
242
        }
×
243
      },
144✔
244
      /**
144✔
245
       * Broadcast a skip notification.
144✔
246
       */
144✔
247
      'booth:skip': ({ moderatorID, userID, reason }) => {
144✔
248
        this.broadcast('skip', { moderatorID, userID, reason });
×
249
      },
144✔
250
      /**
144✔
251
       * Broadcast a chat message.
144✔
252
       */
144✔
253
      'chat:message': (message) => {
144✔
254
        this.broadcast('chatMessage', message);
6✔
255
      },
144✔
256
      /**
144✔
257
       * Delete chat messages. The delete filter can have an _id property to
144✔
258
       * delete a specific message, a userID property to delete messages by a
144✔
259
       * user, or be empty to delete all messages.
144✔
260
       */
144✔
261
      'chat:delete': ({ moderatorID, filter }) => {
144✔
262
        if ('id' in filter) {
6✔
263
          this.broadcast('chatDeleteByID', {
2✔
264
            moderatorID,
2✔
265
            _id: filter.id,
2✔
266
          });
2✔
267
        } else if ('userID' in filter) {
6✔
268
          this.broadcast('chatDeleteByUser', {
2✔
269
            moderatorID,
2✔
270
            userID: filter.userID,
2✔
271
          });
2✔
272
        } else if (isEmpty(filter)) {
2✔
273
          this.broadcast('chatDelete', { moderatorID });
2✔
274
        }
2✔
275
      },
144✔
276
      /**
144✔
277
       * Broadcast that a user was muted in chat.
144✔
278
       */
144✔
279
      'chat:mute': ({ moderatorID, userID, duration }) => {
144✔
280
        this.broadcast('chatMute', {
1✔
281
          userID,
1✔
282
          moderatorID,
1✔
283
          expiresAt: Date.now() + duration,
1✔
284
        });
1✔
285
      },
144✔
286
      /**
144✔
287
       * Broadcast that a user was unmuted in chat.
144✔
288
       */
144✔
289
      'chat:unmute': ({ moderatorID, userID }) => {
144✔
290
        this.broadcast('chatUnmute', { userID, moderatorID });
×
291
      },
144✔
292
      /**
144✔
293
       * Broadcast a vote for the current track.
144✔
294
       */
144✔
295
      'booth:vote': ({ userID, direction }) => {
144✔
296
        this.broadcast('vote', {
2✔
297
          _id: userID,
2✔
298
          value: direction,
2✔
299
        });
2✔
300
      },
144✔
301
      /**
144✔
302
       * Broadcast a favorite for the current track.
144✔
303
       */
144✔
304
      'booth:favorite': ({ userID }) => {
144✔
305
        this.broadcast('favorite', { userID });
1✔
306
      },
144✔
307
      /**
144✔
308
       * Cycle a single user's playlist.
144✔
309
       */
144✔
310
      'playlist:cycle': ({ userID, playlistID }) => {
144✔
311
        this.sendTo(userID, 'playlistCycle', { playlistID });
15✔
312
      },
144✔
313
      /**
144✔
314
       * Broadcast that a user joined the waitlist.
144✔
315
       */
144✔
316
      'waitlist:join': ({ userID, waitlist }) => {
144✔
317
        this.broadcast('waitlistJoin', { userID, waitlist });
18✔
318
      },
144✔
319
      /**
144✔
320
       * Broadcast that a user left the waitlist.
144✔
321
       */
144✔
322
      'waitlist:leave': ({ userID, waitlist }) => {
144✔
323
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
324
      },
144✔
325
      /**
144✔
326
       * Broadcast that a user was added to the waitlist.
144✔
327
       */
144✔
328
      'waitlist:add': ({
144✔
329
        userID, moderatorID, position, waitlist,
1✔
330
      }) => {
1✔
331
        this.broadcast('waitlistAdd', {
1✔
332
          userID, moderatorID, position, waitlist,
1✔
333
        });
1✔
334
      },
144✔
335
      /**
144✔
336
       * Broadcast that a user was removed from the waitlist.
144✔
337
       */
144✔
338
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
144✔
339
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
340
      },
144✔
341
      /**
144✔
342
       * Broadcast that a user was moved in the waitlist.
144✔
343
       */
144✔
344
      'waitlist:move': ({
144✔
345
        userID, moderatorID, position, waitlist,
×
346
      }) => {
×
347
        this.broadcast('waitlistMove', {
×
348
          userID, moderatorID, position, waitlist,
×
349
        });
×
350
      },
144✔
351
      /**
144✔
352
       * Broadcast a waitlist update.
144✔
353
       */
144✔
354
      'waitlist:update': (waitlist) => {
144✔
355
        this.broadcast('waitlistUpdate', waitlist);
15✔
356
      },
144✔
357
      /**
144✔
358
       * Broadcast that the waitlist was cleared.
144✔
359
       */
144✔
360
      'waitlist:clear': ({ moderatorID }) => {
144✔
361
        this.broadcast('waitlistClear', { moderatorID });
×
362
      },
144✔
363
      /**
144✔
364
       * Broadcast that the waitlist was locked.
144✔
365
       */
144✔
366
      'waitlist:lock': ({ moderatorID, locked }) => {
144✔
367
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
368
      },
144✔
369

144✔
370
      'acl:allow': ({ userID, roles }) => {
144✔
371
        this.broadcast('acl:allow', { userID, roles });
57✔
372
      },
144✔
373
      'acl:disallow': ({ userID, roles }) => {
144✔
374
        this.broadcast('acl:disallow', { userID, roles });
2✔
375
      },
144✔
376

144✔
377
      'user:update': ({ userID, moderatorID, new: update }) => {
144✔
378
        // TODO Remove this remnant of the old roles system
3✔
379
        if ('role' in update) {
3!
380
          this.broadcast('roleChange', {
×
381
            moderatorID,
×
382
            userID,
×
383
            role: update.role,
×
384
          });
×
385
        }
×
386
        if ('username' in update) {
3✔
387
          this.broadcast('nameChange', {
3✔
388
            moderatorID,
3✔
389
            userID,
3✔
390
            username: update.username,
3✔
391
          });
3✔
392
        }
3✔
393
      },
144✔
394
      'user:join': async ({ userID }) => {
144✔
395
        const { users, redis } = this.#uw;
24✔
396
        const user = await users.getUser(userID);
24✔
397
        if (user) {
24✔
398
          // TODO this should not be the socket server code's responsibility
24✔
399
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
24✔
400
          this.broadcast('join', serializeUser(user));
24✔
401
        }
24✔
402
      },
144✔
403
      /**
144✔
404
       * Broadcast that a user left the server.
144✔
405
       */
144✔
406
      'user:leave': ({ userID }) => {
144✔
407
        this.broadcast('leave', userID);
×
408
      },
144✔
409
      /**
144✔
410
       * Broadcast a ban event.
144✔
411
       */
144✔
412
      'user:ban': ({
144✔
413
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
414
      }) => {
3✔
415
        this.broadcast('ban', {
3✔
416
          moderatorID, userID, permanent, duration, expiresAt,
3✔
417
        });
3✔
418

3✔
419
        this.#connections.forEach((connection) => {
3✔
420
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
421
            connection.ban();
×
422
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
423
            connection.close();
×
424
          }
×
425
        });
3✔
426
      },
144✔
427
      /**
144✔
428
       * Broadcast an unban event.
144✔
429
       */
144✔
430
      'user:unban': ({ moderatorID, userID }) => {
144✔
431
        this.broadcast('unban', { moderatorID, userID });
1✔
432
      },
144✔
433
      /**
144✔
434
       * Force-close a connection.
144✔
435
       */
144✔
436
      'http-api:socket:close': (userID) => {
144✔
437
        this.#connections.forEach((connection) => {
×
438
          if ('user' in connection && connection.user.id === userID) {
×
439
            connection.close();
×
440
          }
×
441
        });
×
442
      },
144✔
443

144✔
444
      'emotes:reload': () => {
144✔
445
        this.broadcast('reloadEmotes', null);
×
446
      },
144✔
447
    };
144✔
448
  }
144✔
449

144✔
450
  /**
144✔
451
   * Create `LostConnection`s for every user that's known to be online, but that
144✔
452
   * is not currently connected to the socket server.
144✔
453
   *
144✔
454
   * @private
144✔
455
   */
144✔
456
  async initLostConnections() {
144✔
457
    const { db, redis } = this.#uw;
144✔
458
    const userIDs = /** @type {import('./schema').UserID[]} */ (
144✔
459
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
144✔
460
    );
144✔
461
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
144✔
462

144✔
463
    if (disconnectedIDs.length === 0) {
144✔
464
      return;
144✔
465
    }
144✔
466

×
467
    const disconnectedUsers = await db.selectFrom('users')
×
468
      .where('id', 'in', disconnectedIDs)
×
469
      .selectAll()
×
470
      .execute();
×
471
    disconnectedUsers.forEach((user) => {
×
NEW
472
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
×
473
    });
×
474
  }
144✔
475

144✔
476
  /**
144✔
477
   * @param {import('ws').WebSocket} socket
144✔
478
   * @param {import('http').IncomingMessage} request
144✔
479
   * @private
144✔
480
   */
144✔
481
  onSocketConnected(socket, request) {
144✔
482
    this.#logger.info({ req: request }, 'new connection');
25✔
483

25✔
484
    socket.on('error', (error) => {
25✔
485
      this.onSocketError(socket, error);
×
486
    });
25✔
487
    this.add(this.createGuestConnection(socket));
25✔
488
  }
25✔
489

144✔
490
  /**
144✔
491
   * @param {import('ws').WebSocket} socket
144✔
492
   * @param {Error} error
144✔
493
   * @private
144✔
494
   */
144✔
495
  onSocketError(socket, error) {
144✔
496
    this.#logger.warn({ err: error }, 'socket error');
×
497

×
498
    this.options.onError(socket, error);
×
499
  }
×
500

144✔
501
  /**
144✔
502
   * @param {Error} error
144✔
503
   * @private
144✔
504
   */
144✔
505
  onError(error) {
144✔
506
    this.#logger.error({ err: error }, 'server error');
×
507

×
508
    this.options.onError(undefined, error);
×
509
  }
×
510

144✔
511
  /**
144✔
512
   * Get a LostConnection for a user, if one exists.
144✔
513
   *
144✔
514
   * @param {User} user
144✔
515
   * @private
144✔
516
   */
144✔
517
  getLostConnection(user) {
144✔
518
    return this.#connections.find((connection) => (
1✔
519
      connection instanceof LostConnection && connection.user.id === user.id
3✔
520
    ));
1✔
521
  }
1✔
522

144✔
523
  /**
144✔
524
   * Create a connection instance for an unauthenticated user.
144✔
525
   *
144✔
526
   * @param {import('ws').WebSocket} socket
144✔
527
   * @private
144✔
528
   */
144✔
529
  createGuestConnection(socket) {
144✔
530
    const connection = new GuestConnection(this.#uw, socket, {
25✔
531
      authRegistry: this.authRegistry,
25✔
532
    });
25✔
533
    connection.on('close', () => {
25✔
534
      this.remove(connection);
×
535
    });
25✔
536
    connection.on('authenticate', async (user, sessionID) => {
25✔
537
      const isReconnect = await connection.isReconnect(sessionID);
25✔
538
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
25✔
539
      if (isReconnect) {
25✔
540
        const previousConnection = this.getLostConnection(sessionID);
1✔
541
        if (previousConnection) this.remove(previousConnection);
1!
542
      }
1✔
543

25✔
544
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
25✔
545

25✔
546
      if (!isReconnect) {
25✔
547
        this.#uw.publish('user:join', { userID: user.id });
24✔
548
      }
24✔
549
    });
25✔
550
    return connection;
25✔
551
  }
25✔
552

144✔
553
  /**
144✔
554
   * Create a connection instance for an authenticated user.
144✔
555
   *
144✔
556
   * @param {import('ws').WebSocket} socket
144✔
557
   * @param {User} user
144✔
558
   * @param {string} sessionID
144✔
559
   * @returns {AuthedConnection}
144✔
560
   * @private
144✔
561
   */
144✔
562
  createAuthedConnection(socket, user, sessionID) {
144✔
563
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
25✔
564
    connection.on('close', ({ banned }) => {
25✔
565
      if (banned) {
25!
566
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
567
        disconnectUser(this.#uw, user.id);
×
568
      } else if (!this.#closing) {
25✔
569
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
570
        this.add(this.createLostConnection(user, sessionID));
2✔
571
      }
2✔
572
      this.remove(connection);
25✔
573
    });
25✔
574
    connection.on(
25✔
575
      'command',
25✔
576
      /**
25✔
577
       * @param {string} command
25✔
578
       * @param {import('type-fest').JsonValue} data
25✔
579
       */
25✔
580
      (command, data) => {
25✔
581
        this.#logger.trace({ userId: user.id, command, data }, 'command');
7✔
582
        if (has(this.#clientActions, command)) {
7✔
583
          // Ignore incorrect input
7✔
584
          const validate = this.#clientActionSchemas[command];
7✔
585
          if (validate && !validate(data)) {
7!
586
            return;
×
587
          }
×
588

7✔
589
          const action = this.#clientActions[command];
7✔
590
          // @ts-expect-error TS2345 `data` is validated
7✔
591
          action(user, data, connection);
7✔
592
        }
7✔
593
      },
25✔
594
    );
25✔
595
    return connection;
25✔
596
  }
25✔
597

144✔
598
  /**
144✔
599
   * Create a connection instance for a user who disconnected.
144✔
600
   *
144✔
601
   * @param {User} user
144✔
602
   * @param {string} sessionID
144✔
603
   * @returns {LostConnection}
144✔
604
   * @private
144✔
605
   */
144✔
606
  createLostConnection(user, sessionID) {
144✔
607
    const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
2✔
608
    connection.on('close', () => {
2✔
609
      this.#logger.info({ userId: user.id }, 'user left');
2✔
610
      this.remove(connection);
2✔
611
      // Only register that the user left if they didn't have another connection
2✔
612
      // still open.
2✔
613
      if (!this.connection(user)) {
2✔
614
        disconnectUser(this.#uw, user.id);
2✔
615
      }
2✔
616
    });
2✔
617
    return connection;
2✔
618
  }
2✔
619

144✔
620
  /**
144✔
621
   * Add a connection.
144✔
622
   *
144✔
623
   * @param {Connection} connection
144✔
624
   * @private
144✔
625
   */
144✔
626
  add(connection) {
144✔
627
    const userID = 'user' in connection ? connection.user.id : null;
52✔
628
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
629
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
52✔
630

52✔
631
    this.#connections.push(connection);
52✔
632
    this.#recountGuests();
52✔
633
  }
52✔
634

144✔
635
  /**
144✔
636
   * Remove a connection.
144✔
637
   *
144✔
638
   * @param {Connection} connection
144✔
639
   * @private
144✔
640
   */
144✔
641
  remove(connection) {
144✔
642
    const userID = 'user' in connection ? connection.user.id : null;
52✔
643
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
644
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
52✔
645

52✔
646
    const i = this.#connections.indexOf(connection);
52✔
647
    this.#connections.splice(i, 1);
52✔
648

52✔
649
    connection.removed();
52✔
650
    this.#recountGuests();
52✔
651
  }
52✔
652

144✔
653
  /**
144✔
654
   * Replace a connection instance with another connection instance. Useful when
144✔
655
   * a connection changes "type", like GuestConnection → AuthedConnection.
144✔
656
   *
144✔
657
   * @param {Connection} oldConnection
144✔
658
   * @param {Connection} newConnection
144✔
659
   * @private
144✔
660
   */
144✔
661
  replace(oldConnection, newConnection) {
144✔
662
    this.remove(oldConnection);
25✔
663
    this.add(newConnection);
25✔
664
  }
25✔
665

144✔
666
  /**
144✔
667
   * Handle command messages coming in from Redis.
144✔
668
   * Some commands are intended to broadcast immediately to all connected
144✔
669
   * clients, but others require special action.
144✔
670
   *
144✔
671
   * @param {string} channel
144✔
672
   * @param {string} rawCommand
144✔
673
   * @returns {Promise<void>}
144✔
674
   * @private
144✔
675
   */
144✔
676
  async onServerMessage(channel, rawCommand) {
144✔
677
    /**
194✔
678
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
194✔
679
     */
194✔
680
    const json = sjson.safeParse(rawCommand);
194✔
681
    if (!json) {
194!
682
      return;
×
683
    }
×
684
    const { command, data } = json;
194✔
685

194✔
686
    this.#logger.trace({ channel, command, data }, 'server message');
194✔
687

194✔
688
    if (has(this.#serverActions, command)) {
194✔
689
      const action = this.#serverActions[command];
177✔
690
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
177✔
691
        // @ts-expect-error TS2345 `data` is validated
177✔
692
        action(data);
177✔
693
      }
177✔
694
    }
177✔
695
  }
194✔
696

144✔
697
  /**
144✔
698
   * Stop the socket server.
144✔
699
   *
144✔
700
   * @returns {Promise<void>}
144✔
701
   */
144✔
702
  async destroy() {
144✔
703
    clearInterval(this.#pinger);
144✔
704

144✔
705
    this.#closing = true;
144✔
706
    for (const connection of this.#wss.clients) {
144✔
707
      connection.close();
23✔
708
    }
23✔
709

144✔
710
    this.#recountGuests.cancel();
144✔
711

144✔
712
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
144✔
713
    await closeWsServer();
144✔
714
    await this.#redisSubscription.quit();
144✔
715
  }
144✔
716

144✔
717
  /**
144✔
718
   * Get the connection instance for a specific user.
144✔
719
   *
144✔
720
   * @param {User|string} user The user.
144✔
721
   * @returns {Connection|undefined}
144✔
722
   */
144✔
723
  connection(user) {
144✔
724
    const userID = typeof user === 'object' ? user.id : user;
2!
725
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
2✔
726
  }
2✔
727

144✔
728
  ping() {
144✔
729
    this.#connections.forEach((connection) => {
×
730
      if ('socket' in connection) {
×
731
        connection.ping();
×
732
      }
×
733
    });
×
734
  }
×
735

144✔
736
  /**
144✔
737
   * Broadcast a command to all connected clients.
144✔
738
   *
144✔
739
   * @param {string} command Command name.
144✔
740
   * @param {import('type-fest').JsonValue} data Command data.
144✔
741
   */
144✔
742
  broadcast(command, data) {
144✔
743
    this.#logger.trace({
162✔
744
      command,
162✔
745
      data,
162✔
746
      to: this.#connections.map((connection) => (
162✔
747
        'user' in connection ? connection.user.id : null
109!
748
      )),
162✔
749
    }, 'broadcast');
162✔
750

162✔
751
    this.#connections.forEach((connection) => {
162✔
752
      connection.send(command, data);
109✔
753
    });
162✔
754
  }
162✔
755

144✔
756
  /**
144✔
757
   * Send a command to a single user.
144✔
758
   *
144✔
759
   * @param {User|string} user User or user ID to send the command to.
144✔
760
   * @param {string} command Command name.
144✔
761
   * @param {import('type-fest').JsonValue} data Command data.
144✔
762
   */
144✔
763
  sendTo(user, command, data) {
144✔
764
    const userID = typeof user === 'object' ? user.id : user;
15!
765

15✔
766
    this.#connections.forEach((connection) => {
15✔
767
      if ('user' in connection && connection.user.id === userID) {
16✔
768
        connection.send(command, data);
13✔
769
      }
13✔
770
    });
15✔
771
  }
15✔
772

144✔
773
  async getGuestCount() {
144✔
774
    const { redis } = this.#uw;
×
775
    const rawCount = await redis.get('http-api:guests');
×
776
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
777
      return 0;
×
778
    }
×
779
    return parseInt(rawCount, 10);
×
780
  }
×
781

144✔
782
  async #recountGuestsInternal() {
144✔
783
    const { redis } = this.#uw;
×
784
    const guests = this.#connections
×
785
      .filter((connection) => connection instanceof GuestConnection)
×
786
      .length;
×
787

×
788
    const lastGuestCount = await this.getGuestCount();
×
789
    if (guests !== lastGuestCount) {
×
790
      await redis.set('http-api:guests', guests);
×
791
      this.broadcast('guests', guests);
×
792
    }
×
793
  }
×
794
}
144✔
795

1✔
796
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc