• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12200196837

06 Dec 2024 02:06PM UTC coverage: 84.237% (-0.1%) from 84.36%
12200196837

Pull #682

github

goto-bus-stop
Do not store JWT auth in session
Pull Request #682: Improve session handling

905 of 1083 branches covered (83.56%)

Branch coverage included in aggregate %.

76 of 123 new or added lines in 10 files covered. (61.79%)

6 existing lines in 1 file now uncovered.

9863 of 11700 relevant lines covered (84.3%)

90.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.5
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./schema.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
191✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
191✔
60
}
191✔
61

1✔
62
class SocketServer {
143✔
63
  /**
143✔
64
   * @param {import('./Uwave.js').Boot} uw
143✔
65
   * @param {{ secret: Buffer|string }} options
143✔
66
   */
143✔
67
  static async plugin(uw, options) {
143✔
68
    uw.socketServer = new SocketServer(uw, {
143✔
69
      secret: options.secret,
143✔
70
      server: uw.server,
143✔
71
    });
143✔
72

143✔
73
    uw.after(async () => {
143✔
74
      try {
143✔
75
        await uw.socketServer.initLostConnections();
143✔
76
      } catch (err) {
143!
77
        // No need to prevent startup for this.
×
78
        // We do need to clear the `users` list because the lost connection handlers
×
79
        // will not do so.
×
80
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
81
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
82
      }
×
83
    });
143✔
84

143✔
85
    uw.onClose(async () => {
143✔
86
      await uw.socketServer.destroy();
143✔
87
    });
143✔
88
  }
143✔
89

143✔
90
  #uw;
143✔
91

143✔
92
  #logger;
143✔
93

143✔
94
  #redisSubscription;
143✔
95

143✔
96
  #wss;
143✔
97

143✔
98
  #closing = false;
143✔
99

143✔
100
  /** @type {Connection[]} */
143✔
101
  #connections = [];
143✔
102

143✔
103
  #pinger;
143✔
104

143✔
105
  /**
143✔
106
   * Update online guests count and broadcast an update if necessary.
143✔
107
   */
143✔
108
  #recountGuests;
143✔
109

143✔
110
  /**
143✔
111
   * Handlers for commands that come in from clients.
143✔
112
   *
143✔
113
   * @type {ClientActions}
143✔
114
   */
143✔
115
  #clientActions;
143✔
116

143✔
117
  /**
143✔
118
   * @type {{
143✔
119
   *   [K in keyof ClientActionParameters]:
143✔
120
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
143✔
121
   * }}
143✔
122
   */
143✔
123
  #clientActionSchemas;
143✔
124

143✔
125
  /**
143✔
126
   * Handlers for commands that come in from the server side.
143✔
127
   *
143✔
128
   * @type {import('./redisMessages.js').ServerActions}
143✔
129
   */
143✔
130
  #serverActions;
143✔
131

143✔
132
  /**
143✔
133
   * Create a socket server.
143✔
134
   *
143✔
135
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
143✔
136
   * @param {object} options Socket server options.
143✔
137
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
143✔
138
   *     users to reconnect before removing them.
143✔
139
   * @param {Buffer|string} options.secret
143✔
140
   * @param {import('http').Server | import('https').Server} [options.server]
143✔
141
   * @param {number} [options.port]
143✔
142
   * @private
143✔
143
   */
143✔
144
  constructor(uw, options) {
143✔
145
    if (!uw) {
143!
146
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
147
    }
×
148

143✔
149
    this.#uw = uw;
143✔
150
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
143✔
151
      serializers: {
143✔
152
        req: stdSerializers.req,
143✔
153
      },
143✔
154
    });
143✔
155
    this.#redisSubscription = uw.redis.duplicate();
143✔
156

143✔
157
    this.options = {
143✔
158
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
143✔
159
      onError: (_socket, err) => {
143✔
160
        throw err;
×
161
      },
143✔
162
      timeout: 30,
143✔
163
      ...options,
143✔
164
    };
143✔
165

143✔
166
    // TODO put this behind a symbol, it's just public for tests
143✔
167
    this.authRegistry = new AuthRegistry(uw.redis);
143✔
168

143✔
169
    this.#wss = new WebSocketServer({
143✔
170
      server: options.server,
143✔
171
      port: options.server ? undefined : options.port,
143!
172
    });
143✔
173

143✔
174
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
143✔
175
    this.#redisSubscription.on('message', (channel, command) => {
143✔
176
      // this returns a promise, but we don't handle the error case:
188✔
177
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
188✔
178
      this.onServerMessage(channel, command);
188✔
179
    });
143✔
180

143✔
181
    this.#wss.on('error', (error) => {
143✔
182
      this.onError(error);
×
183
    });
143✔
184
    this.#wss.on('connection', (socket, request) => {
143✔
185
      this.onSocketConnected(socket, request);
22✔
186
    });
143✔
187

143✔
188
    this.#pinger = setInterval(() => {
143✔
189
      this.ping();
×
190
    }, ms('10 seconds'));
143✔
191

143✔
192
    this.#recountGuests = debounce(() => {
143✔
193
      if (this.#closing) {
19✔
194
        return;
19✔
195
      }
19✔
196
      this.#recountGuestsInternal().catch((error) => {
×
197
        this.#logger.error({ err: error }, 'counting guests failed');
×
198
      });
×
199
    }, ms('2 seconds'));
143✔
200

143✔
201
    this.#clientActions = {
143✔
202
      sendChat: (user, message) => {
143✔
203
        this.#logger.trace({ user, message }, 'sendChat');
3✔
204
        this.#uw.chat.send(user, message);
3✔
205
      },
143✔
206
      vote: (user, direction) => {
143✔
207
        socketVote(this.#uw, user.id, direction);
×
208
      },
143✔
209
      logout: (user, _, connection) => {
143✔
210
        this.replace(connection, this.createGuestConnection(connection.socket));
×
211
        if (!this.connection(user)) {
×
212
          disconnectUser(this.#uw, user.id);
×
213
        }
×
214
      },
143✔
215
    };
143✔
216

143✔
217
    this.#clientActionSchemas = {
143✔
218
      sendChat: ajv.compile({
143✔
219
        type: 'string',
143✔
220
      }),
143✔
221
      vote: ajv.compile({
143✔
222
        type: 'integer',
143✔
223
        enum: [-1, 1],
143✔
224
      }),
143✔
225
      logout: ajv.compile(true),
143✔
226
    };
143✔
227

143✔
228
    this.#serverActions = {
143✔
229
      /**
143✔
230
       * Broadcast the next track.
143✔
231
       */
143✔
232
      'advance:complete': (next) => {
143✔
233
        if (next) {
15✔
234
          this.broadcast('advance', {
15✔
235
            historyID: next.historyID,
15✔
236
            userID: next.userID,
15✔
237
            media: next.media,
15✔
238
            playedAt: new Date(next.playedAt).getTime(),
15✔
239
          });
15✔
240
        } else {
15!
241
          this.broadcast('advance', null);
×
242
        }
×
243
      },
143✔
244
      /**
143✔
245
       * Broadcast a skip notification.
143✔
246
       */
143✔
247
      'booth:skip': ({ moderatorID, userID, reason }) => {
143✔
248
        this.broadcast('skip', { moderatorID, userID, reason });
×
249
      },
143✔
250
      /**
143✔
251
       * Broadcast a chat message.
143✔
252
       */
143✔
253
      'chat:message': (message) => {
143✔
254
        this.broadcast('chatMessage', message);
2✔
255
      },
143✔
256
      /**
143✔
257
       * Delete chat messages. The delete filter can have an _id property to
143✔
258
       * delete a specific message, a userID property to delete messages by a
143✔
259
       * user, or be empty to delete all messages.
143✔
260
       */
143✔
261
      'chat:delete': ({ moderatorID, filter }) => {
143✔
262
        if ('id' in filter) {
6✔
263
          this.broadcast('chatDeleteByID', {
2✔
264
            moderatorID,
2✔
265
            _id: filter.id,
2✔
266
          });
2✔
267
        } else if ('userID' in filter) {
6✔
268
          this.broadcast('chatDeleteByUser', {
2✔
269
            moderatorID,
2✔
270
            userID: filter.userID,
2✔
271
          });
2✔
272
        } else if (isEmpty(filter)) {
2✔
273
          this.broadcast('chatDelete', { moderatorID });
2✔
274
        }
2✔
275
      },
143✔
276
      /**
143✔
277
       * Broadcast that a user was muted in chat.
143✔
278
       */
143✔
279
      'chat:mute': ({ moderatorID, userID, duration }) => {
143✔
280
        this.broadcast('chatMute', {
1✔
281
          userID,
1✔
282
          moderatorID,
1✔
283
          expiresAt: Date.now() + duration,
1✔
284
        });
1✔
285
      },
143✔
286
      /**
143✔
287
       * Broadcast that a user was unmuted in chat.
143✔
288
       */
143✔
289
      'chat:unmute': ({ moderatorID, userID }) => {
143✔
290
        this.broadcast('chatUnmute', { userID, moderatorID });
×
291
      },
143✔
292
      /**
143✔
293
       * Broadcast a vote for the current track.
143✔
294
       */
143✔
295
      'booth:vote': ({ userID, direction }) => {
143✔
296
        this.broadcast('vote', {
2✔
297
          _id: userID,
2✔
298
          value: direction,
2✔
299
        });
2✔
300
      },
143✔
301
      /**
143✔
302
       * Broadcast a favorite for the current track.
143✔
303
       */
143✔
304
      'booth:favorite': ({ userID }) => {
143✔
305
        this.broadcast('favorite', { userID });
1✔
306
      },
143✔
307
      /**
143✔
308
       * Cycle a single user's playlist.
143✔
309
       */
143✔
310
      'playlist:cycle': ({ userID, playlistID }) => {
143✔
311
        this.sendTo(userID, 'playlistCycle', { playlistID });
15✔
312
      },
143✔
313
      /**
143✔
314
       * Broadcast that a user joined the waitlist.
143✔
315
       */
143✔
316
      'waitlist:join': ({ userID, waitlist }) => {
143✔
317
        this.broadcast('waitlistJoin', { userID, waitlist });
18✔
318
      },
143✔
319
      /**
143✔
320
       * Broadcast that a user left the waitlist.
143✔
321
       */
143✔
322
      'waitlist:leave': ({ userID, waitlist }) => {
143✔
323
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
324
      },
143✔
325
      /**
143✔
326
       * Broadcast that a user was added to the waitlist.
143✔
327
       */
143✔
328
      'waitlist:add': ({
143✔
329
        userID, moderatorID, position, waitlist,
1✔
330
      }) => {
1✔
331
        this.broadcast('waitlistAdd', {
1✔
332
          userID, moderatorID, position, waitlist,
1✔
333
        });
1✔
334
      },
143✔
335
      /**
143✔
336
       * Broadcast that a user was removed from the waitlist.
143✔
337
       */
143✔
338
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
143✔
339
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
340
      },
143✔
341
      /**
143✔
342
       * Broadcast that a user was moved in the waitlist.
143✔
343
       */
143✔
344
      'waitlist:move': ({
143✔
345
        userID, moderatorID, position, waitlist,
×
346
      }) => {
×
347
        this.broadcast('waitlistMove', {
×
348
          userID, moderatorID, position, waitlist,
×
349
        });
×
350
      },
143✔
351
      /**
143✔
352
       * Broadcast a waitlist update.
143✔
353
       */
143✔
354
      'waitlist:update': (waitlist) => {
143✔
355
        this.broadcast('waitlistUpdate', waitlist);
15✔
356
      },
143✔
357
      /**
143✔
358
       * Broadcast that the waitlist was cleared.
143✔
359
       */
143✔
360
      'waitlist:clear': ({ moderatorID }) => {
143✔
361
        this.broadcast('waitlistClear', { moderatorID });
×
362
      },
143✔
363
      /**
143✔
364
       * Broadcast that the waitlist was locked.
143✔
365
       */
143✔
366
      'waitlist:lock': ({ moderatorID, locked }) => {
143✔
367
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
368
      },
143✔
369

143✔
370
      'acl:allow': ({ userID, roles }) => {
143✔
371
        this.broadcast('acl:allow', { userID, roles });
57✔
372
      },
143✔
373
      'acl:disallow': ({ userID, roles }) => {
143✔
374
        this.broadcast('acl:disallow', { userID, roles });
2✔
375
      },
143✔
376

143✔
377
      'user:update': ({ userID, moderatorID, new: update }) => {
143✔
378
        // TODO Remove this remnant of the old roles system
3✔
379
        if ('role' in update) {
3!
380
          this.broadcast('roleChange', {
×
381
            moderatorID,
×
382
            userID,
×
383
            role: update.role,
×
384
          });
×
385
        }
×
386
        if ('username' in update) {
3✔
387
          this.broadcast('nameChange', {
3✔
388
            moderatorID,
3✔
389
            userID,
3✔
390
            username: update.username,
3✔
391
          });
3✔
392
        }
3✔
393
      },
143✔
394
      'user:join': async ({ userID }) => {
143✔
395
        const { users, redis } = this.#uw;
22✔
396
        const user = await users.getUser(userID);
22✔
397
        if (user) {
22✔
398
          // TODO this should not be the socket server code's responsibility
22✔
399
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
22✔
400
          this.broadcast('join', serializeUser(user));
22✔
401
        }
22✔
402
      },
143✔
403
      /**
143✔
404
       * Broadcast that a user left the server.
143✔
405
       */
143✔
406
      'user:leave': ({ userID }) => {
143✔
407
        this.broadcast('leave', userID);
×
408
      },
143✔
409
      /**
143✔
410
       * Broadcast a ban event.
143✔
411
       */
143✔
412
      'user:ban': ({
143✔
413
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
414
      }) => {
3✔
415
        this.broadcast('ban', {
3✔
416
          moderatorID, userID, permanent, duration, expiresAt,
3✔
417
        });
3✔
418

3✔
419
        this.#connections.forEach((connection) => {
3✔
420
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
421
            connection.ban();
×
422
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
423
            connection.close();
×
424
          }
×
425
        });
3✔
426
      },
143✔
427
      /**
143✔
428
       * Broadcast an unban event.
143✔
429
       */
143✔
430
      'user:unban': ({ moderatorID, userID }) => {
143✔
431
        this.broadcast('unban', { moderatorID, userID });
1✔
432
      },
143✔
433
      /**
143✔
434
       * Force-close a connection.
143✔
435
       */
143✔
436
      'http-api:socket:close': (userID) => {
143✔
437
        this.#connections.forEach((connection) => {
×
438
          if ('user' in connection && connection.user.id === userID) {
×
439
            connection.close();
×
440
          }
×
441
        });
×
442
      },
143✔
443

143✔
444
      'emotes:reload': () => {
143✔
445
        this.broadcast('reloadEmotes', null);
×
446
      },
143✔
447
    };
143✔
448
  }
143✔
449

143✔
450
  /**
143✔
451
   * Create `LostConnection`s for every user that's known to be online, but that
143✔
452
   * is not currently connected to the socket server.
143✔
453
   *
143✔
454
   * @private
143✔
455
   */
143✔
456
  async initLostConnections() {
143✔
457
    const { db, redis } = this.#uw;
143✔
458
    const userIDs = /** @type {import('./schema').UserID[]} */ (
143✔
459
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
143✔
460
    );
143✔
461
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
143✔
462

143✔
463
    if (disconnectedIDs.length === 0) {
143✔
464
      return;
143✔
465
    }
143✔
466

×
467
    const disconnectedUsers = await db.selectFrom('users')
×
468
      .where('id', 'in', disconnectedIDs)
×
469
      .selectAll()
×
470
      .execute();
×
471
    disconnectedUsers.forEach((user) => {
×
NEW
472
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
×
473
    });
×
474
  }
143✔
475

143✔
476
  /**
143✔
477
   * @param {import('ws').WebSocket} socket
143✔
478
   * @param {import('http').IncomingMessage} request
143✔
479
   * @private
143✔
480
   */
143✔
481
  onSocketConnected(socket, request) {
143✔
482
    this.#logger.info({ req: request }, 'new connection');
22✔
483

22✔
484
    socket.on('error', (error) => {
22✔
485
      this.onSocketError(socket, error);
×
486
    });
22✔
487
    this.add(this.createGuestConnection(socket));
22✔
488
  }
22✔
489

143✔
490
  /**
143✔
491
   * @param {import('ws').WebSocket} socket
143✔
492
   * @param {Error} error
143✔
493
   * @private
143✔
494
   */
143✔
495
  onSocketError(socket, error) {
143✔
496
    this.#logger.warn({ err: error }, 'socket error');
×
497

×
498
    this.options.onError(socket, error);
×
499
  }
×
500

143✔
501
  /**
143✔
502
   * @param {Error} error
143✔
503
   * @private
143✔
504
   */
143✔
505
  onError(error) {
143✔
506
    this.#logger.error({ err: error }, 'server error');
×
507

×
508
    this.options.onError(undefined, error);
×
509
  }
×
510

143✔
511
  /**
143✔
512
   * Get a LostConnection for a user, if one exists.
143✔
513
   *
143✔
514
   * @param {User} user
143✔
515
   * @private
143✔
516
   */
143✔
517
  getLostConnection(user) {
143✔
518
    return this.#connections.find((connection) => (
×
519
      connection instanceof LostConnection && connection.user.id === user.id
×
520
    ));
×
521
  }
×
522

143✔
523
  /**
143✔
524
   * Create a connection instance for an unauthenticated user.
143✔
525
   *
143✔
526
   * @param {import('ws').WebSocket} socket
143✔
527
   * @private
143✔
528
   */
143✔
529
  createGuestConnection(socket) {
143✔
530
    const connection = new GuestConnection(this.#uw, socket, {
22✔
531
      authRegistry: this.authRegistry,
22✔
532
    });
22✔
533
    connection.on('close', () => {
22✔
534
      this.remove(connection);
×
535
    });
22✔
536
    connection.on('authenticate', async (user, sessionID) => {
22✔
537
      const isReconnect = await connection.isReconnect(sessionID);
22✔
538
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
22✔
539
      if (isReconnect) {
22!
NEW
540
        const previousConnection = this.getLostConnection(sessionID);
×
541
        if (previousConnection) this.remove(previousConnection);
×
542
      }
×
543

22✔
544
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
22✔
545

22✔
546
      if (!isReconnect) {
22✔
547
        this.#uw.publish('user:join', { userID: user.id });
22✔
548
      }
22✔
549
    });
22✔
550
    return connection;
22✔
551
  }
22✔
552

143✔
553
  /**
143✔
554
   * Create a connection instance for an authenticated user.
143✔
555
   *
143✔
556
   * @param {import('ws').WebSocket} socket
143✔
557
   * @param {User} user
143✔
558
   * @param {string} sessionID
143✔
559
   * @returns {AuthedConnection}
143✔
560
   * @private
143✔
561
   */
143✔
562
  createAuthedConnection(socket, user, sessionID) {
143✔
563
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
22✔
564
    connection.on('close', ({ banned }) => {
22✔
565
      if (banned) {
22!
566
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
567
        disconnectUser(this.#uw, user.id);
×
568
      } else if (!this.#closing) {
22!
569
        this.#logger.info({ userId: user.id }, 'lost connection');
×
NEW
570
        this.add(this.createLostConnection(user, sessionID));
×
571
      }
×
572
      this.remove(connection);
22✔
573
    });
22✔
574
    connection.on(
22✔
575
      'command',
22✔
576
      /**
22✔
577
       * @param {string} command
22✔
578
       * @param {import('type-fest').JsonValue} data
22✔
579
       */
22✔
580
      (command, data) => {
22✔
581
        this.#logger.trace({ userId: user.id, command, data }, 'command');
3✔
582
        if (has(this.#clientActions, command)) {
3✔
583
          // Ignore incorrect input
3✔
584
          const validate = this.#clientActionSchemas[command];
3✔
585
          if (validate && !validate(data)) {
3!
586
            return;
×
587
          }
×
588

3✔
589
          const action = this.#clientActions[command];
3✔
590
          // @ts-expect-error TS2345 `data` is validated
3✔
591
          action(user, data, connection);
3✔
592
        }
3✔
593
      },
22✔
594
    );
22✔
595
    return connection;
22✔
596
  }
22✔
597

143✔
598
  /**
143✔
599
   * Create a connection instance for a user who disconnected.
143✔
600
   *
143✔
601
   * @param {User} user
143✔
602
   * @param {string} sessionID
143✔
603
   * @returns {LostConnection}
143✔
604
   * @private
143✔
605
   */
143✔
606
  createLostConnection(user, sessionID) {
143✔
NEW
607
    const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
×
608
    connection.on('close', () => {
×
609
      this.#logger.info({ userId: user.id }, 'user left');
×
610
      this.remove(connection);
×
611
      // Only register that the user left if they didn't have another connection
×
612
      // still open.
×
613
      if (!this.connection(user)) {
×
614
        disconnectUser(this.#uw, user.id);
×
615
      }
×
616
    });
×
617
    return connection;
×
618
  }
×
619

143✔
620
  /**
143✔
621
   * Add a connection.
143✔
622
   *
143✔
623
   * @param {Connection} connection
143✔
624
   * @private
143✔
625
   */
143✔
626
  add(connection) {
143✔
627
    const userID = 'user' in connection ? connection.user.id : null;
44✔
628
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
44✔
629
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
44✔
630

44✔
631
    this.#connections.push(connection);
44✔
632
    this.#recountGuests();
44✔
633
  }
44✔
634

143✔
635
  /**
143✔
636
   * Remove a connection.
143✔
637
   *
143✔
638
   * @param {Connection} connection
143✔
639
   * @private
143✔
640
   */
143✔
641
  remove(connection) {
143✔
642
    const userID = 'user' in connection ? connection.user.id : null;
44✔
643
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
44✔
644
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
44✔
645

44✔
646
    const i = this.#connections.indexOf(connection);
44✔
647
    this.#connections.splice(i, 1);
44✔
648

44✔
649
    connection.removed();
44✔
650
    this.#recountGuests();
44✔
651
  }
44✔
652

143✔
653
  /**
143✔
654
   * Replace a connection instance with another connection instance. Useful when
143✔
655
   * a connection changes "type", like GuestConnection → AuthedConnection.
143✔
656
   *
143✔
657
   * @param {Connection} oldConnection
143✔
658
   * @param {Connection} newConnection
143✔
659
   * @private
143✔
660
   */
143✔
661
  replace(oldConnection, newConnection) {
143✔
662
    this.remove(oldConnection);
22✔
663
    this.add(newConnection);
22✔
664
  }
22✔
665

143✔
666
  /**
143✔
667
   * Handle command messages coming in from Redis.
143✔
668
   * Some commands are intended to broadcast immediately to all connected
143✔
669
   * clients, but others require special action.
143✔
670
   *
143✔
671
   * @param {string} channel
143✔
672
   * @param {string} rawCommand
143✔
673
   * @returns {Promise<void>}
143✔
674
   * @private
143✔
675
   */
143✔
676
  async onServerMessage(channel, rawCommand) {
143✔
677
    /**
188✔
678
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
188✔
679
     */
188✔
680
    const json = sjson.safeParse(rawCommand);
188✔
681
    if (!json) {
188!
682
      return;
×
683
    }
×
684
    const { command, data } = json;
188✔
685

188✔
686
    this.#logger.trace({ channel, command, data }, 'server message');
188✔
687

188✔
688
    if (has(this.#serverActions, command)) {
188✔
689
      const action = this.#serverActions[command];
171✔
690
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
171✔
691
        // @ts-expect-error TS2345 `data` is validated
171✔
692
        action(data);
171✔
693
      }
171✔
694
    }
171✔
695
  }
188✔
696

143✔
697
  /**
143✔
698
   * Stop the socket server.
143✔
699
   *
143✔
700
   * @returns {Promise<void>}
143✔
701
   */
143✔
702
  async destroy() {
143✔
703
    clearInterval(this.#pinger);
143✔
704

143✔
705
    this.#closing = true;
143✔
706
    for (const connection of this.#wss.clients) {
143✔
707
      connection.close();
22✔
708
    }
22✔
709

143✔
710
    this.#recountGuests.cancel();
143✔
711

143✔
712
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
143✔
713
    await closeWsServer();
143✔
714
    await this.#redisSubscription.quit();
143✔
715
  }
143✔
716

143✔
717
  /**
143✔
718
   * Get the connection instance for a specific user.
143✔
719
   *
143✔
720
   * @param {User|string} user The user.
143✔
721
   * @returns {Connection|undefined}
143✔
722
   */
143✔
723
  connection(user) {
143✔
724
    const userID = typeof user === 'object' ? user.id : user;
×
725
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
726
  }
×
727

143✔
728
  ping() {
143✔
729
    this.#connections.forEach((connection) => {
×
730
      if ('socket' in connection) {
×
731
        connection.ping();
×
732
      }
×
733
    });
×
734
  }
×
735

143✔
736
  /**
143✔
737
   * Broadcast a command to all connected clients.
143✔
738
   *
143✔
739
   * @param {string} command Command name.
143✔
740
   * @param {import('type-fest').JsonValue} data Command data.
143✔
741
   */
143✔
742
  broadcast(command, data) {
143✔
743
    this.#logger.trace({
156✔
744
      command,
156✔
745
      data,
156✔
746
      to: this.#connections.map((connection) => (
156✔
747
        'user' in connection ? connection.user.id : null
98!
748
      )),
156✔
749
    }, 'broadcast');
156✔
750

156✔
751
    this.#connections.forEach((connection) => {
156✔
752
      connection.send(command, data);
98✔
753
    });
156✔
754
  }
156✔
755

143✔
756
  /**
143✔
757
   * Send a command to a single user.
143✔
758
   *
143✔
759
   * @param {User|string} user User or user ID to send the command to.
143✔
760
   * @param {string} command Command name.
143✔
761
   * @param {import('type-fest').JsonValue} data Command data.
143✔
762
   */
143✔
763
  sendTo(user, command, data) {
143✔
764
    const userID = typeof user === 'object' ? user.id : user;
15!
765

15✔
766
    this.#connections.forEach((connection) => {
15✔
767
      if ('user' in connection && connection.user.id === userID) {
16✔
768
        connection.send(command, data);
13✔
769
      }
13✔
770
    });
15✔
771
  }
15✔
772

143✔
773
  async getGuestCount() {
143✔
774
    const { redis } = this.#uw;
×
775
    const rawCount = await redis.get('http-api:guests');
×
776
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
777
      return 0;
×
778
    }
×
779
    return parseInt(rawCount, 10);
×
780
  }
×
781

143✔
782
  async #recountGuestsInternal() {
143✔
783
    const { redis } = this.#uw;
×
784
    const guests = this.#connections
×
785
      .filter((connection) => connection instanceof GuestConnection)
×
786
      .length;
×
787

×
788
    const lastGuestCount = await this.getGuestCount();
×
789
    if (guests !== lastGuestCount) {
×
790
      await redis.set('http-api:guests', guests);
×
791
      this.broadcast('guests', guests);
×
792
    }
×
793
  }
×
794
}
143✔
795

1✔
796
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc