• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12200614762

06 Dec 2024 02:32PM UTC coverage: 84.144% (-0.2%) from 84.36%
12200614762

Pull #682

github

goto-bus-stop
Mimick session ID for JWT auth

The main point is to allow the tests to continue to use JWT auth,
while also testing session features such as missing messages for lost
connections
Pull Request #682: Improve session handling

906 of 1085 branches covered (83.5%)

Branch coverage included in aggregate %.

83 of 154 new or added lines in 11 files covered. (53.9%)

6 existing lines in 1 file now uncovered.

9867 of 11718 relevant lines covered (84.2%)

90.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.5
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./schema.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
191✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
191✔
60
}
191✔
61

1✔
62
class SocketServer {
143✔
63
  /**
143✔
64
   * @param {import('./Uwave.js').Boot} uw
143✔
65
   * @param {{ secret: Buffer|string }} options
143✔
66
   */
143✔
67
  static async plugin(uw, options) {
143✔
68
    uw.socketServer = new SocketServer(uw, {
143✔
69
      secret: options.secret,
143✔
70
      server: uw.server,
143✔
71
    });
143✔
72

143✔
73
    uw.after(async () => {
143✔
74
      try {
143✔
75
        await uw.socketServer.initLostConnections();
143✔
76
      } catch (err) {
143!
77
        // No need to prevent startup for this.
×
78
        // We do need to clear the `users` list because the lost connection handlers
×
79
        // will not do so.
×
80
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
81
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
82
      }
×
83
    });
143✔
84

143✔
85
    uw.onClose(async () => {
143✔
86
      await uw.socketServer.destroy();
143✔
87
    });
143✔
88
  }
143✔
89

143✔
90
  #uw;
143✔
91

143✔
92
  #logger;
143✔
93

143✔
94
  #redisSubscription;
143✔
95

143✔
96
  #wss;
143✔
97

143✔
98
  #closing = false;
143✔
99

143✔
100
  /** @type {Connection[]} */
143✔
101
  #connections = [];
143✔
102

143✔
103
  #pinger;
143✔
104

143✔
105
  /**
143✔
106
   * Update online guests count and broadcast an update if necessary.
143✔
107
   */
143✔
108
  #recountGuests;
143✔
109

143✔
110
  /**
143✔
111
   * Handlers for commands that come in from clients.
143✔
112
   *
143✔
113
   * @type {ClientActions}
143✔
114
   */
143✔
115
  #clientActions;
143✔
116

143✔
117
  /**
143✔
118
   * @type {{
143✔
119
   *   [K in keyof ClientActionParameters]:
143✔
120
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
143✔
121
   * }}
143✔
122
   */
143✔
123
  #clientActionSchemas;
143✔
124

143✔
125
  /**
143✔
126
   * Handlers for commands that come in from the server side.
143✔
127
   *
143✔
128
   * @type {import('./redisMessages.js').ServerActions}
143✔
129
   */
143✔
130
  #serverActions;
143✔
131

143✔
132
  /**
143✔
133
   * Create a socket server.
143✔
134
   *
143✔
135
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
143✔
136
   * @param {object} options Socket server options.
143✔
137
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
143✔
138
   *     users to reconnect before removing them.
143✔
139
   * @param {Buffer|string} options.secret
143✔
140
   * @param {import('http').Server | import('https').Server} [options.server]
143✔
141
   * @param {number} [options.port]
143✔
142
   * @private
143✔
143
   */
143✔
144
  constructor(uw, options) {
143✔
145
    if (!uw) {
143!
146
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
147
    }
×
148

143✔
149
    this.#uw = uw;
143✔
150
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
143✔
151
      serializers: {
143✔
152
        req: stdSerializers.req,
143✔
153
      },
143✔
154
    });
143✔
155
    this.#redisSubscription = uw.redis.duplicate();
143✔
156

143✔
157
    this.options = {
143✔
158
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
143✔
159
      onError: (_socket, err) => {
143✔
160
        throw err;
×
161
      },
143✔
162
      timeout: 30,
143✔
163
      ...options,
143✔
164
    };
143✔
165

143✔
166
    // TODO put this behind a symbol, it's just public for tests
143✔
167
    this.authRegistry = new AuthRegistry(uw.redis);
143✔
168

143✔
169
    this.#wss = new WebSocketServer({
143✔
170
      server: options.server,
143✔
171
      port: options.server ? undefined : options.port,
143!
172
    });
143✔
173

143✔
174
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
143✔
175
    this.#redisSubscription.on('message', (channel, command) => {
143✔
176
      // this returns a promise, but we don't handle the error case:
188✔
177
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
188✔
178
      this.onServerMessage(channel, command);
188✔
179
    });
143✔
180

143✔
181
    this.#wss.on('error', (error) => {
143✔
182
      this.onError(error);
×
183
    });
143✔
184
    this.#wss.on('connection', (socket, request) => {
143✔
185
      this.onSocketConnected(socket, request);
22✔
186
    });
143✔
187

143✔
188
    this.#pinger = setInterval(() => {
143✔
189
      this.ping();
×
190
    }, ms('10 seconds'));
143✔
191

143✔
192
    this.#recountGuests = debounce(() => {
143✔
193
      if (this.#closing) {
19✔
194
        return;
19✔
195
      }
19✔
196
      this.#recountGuestsInternal().catch((error) => {
×
197
        this.#logger.error({ err: error }, 'counting guests failed');
×
198
      });
×
199
    }, ms('2 seconds'));
143✔
200

143✔
201
    this.#clientActions = {
143✔
202
      sendChat: (user, message) => {
143✔
203
        this.#logger.trace({ user, message }, 'sendChat');
3✔
204
        this.#uw.chat.send(user, message);
3✔
205
      },
143✔
206
      vote: (user, direction) => {
143✔
207
        socketVote(this.#uw, user.id, direction);
×
208
      },
143✔
209
      logout: (user, _, connection) => {
143✔
210
        this.replace(connection, this.createGuestConnection(connection.socket));
×
211
        if (!this.connection(user)) {
×
212
          disconnectUser(this.#uw, user.id);
×
213
        }
×
214
      },
143✔
215
    };
143✔
216

143✔
217
    this.#clientActionSchemas = {
143✔
218
      sendChat: ajv.compile({
143✔
219
        type: 'string',
143✔
220
      }),
143✔
221
      vote: ajv.compile({
143✔
222
        type: 'integer',
143✔
223
        enum: [-1, 1],
143✔
224
      }),
143✔
225
      logout: ajv.compile(true),
143✔
226
    };
143✔
227

143✔
228
    this.#serverActions = {
143✔
229
      /**
143✔
230
       * Broadcast the next track.
143✔
231
       */
143✔
232
      'advance:complete': (next) => {
143✔
233
        if (next) {
15✔
234
          this.broadcast('advance', {
15✔
235
            historyID: next.historyID,
15✔
236
            userID: next.userID,
15✔
237
            media: next.media,
15✔
238
            playedAt: new Date(next.playedAt).getTime(),
15✔
239
          });
15✔
240
        } else {
15!
241
          this.broadcast('advance', null);
×
242
        }
×
243
      },
143✔
244
      /**
143✔
245
       * Broadcast a skip notification.
143✔
246
       */
143✔
247
      'booth:skip': ({ moderatorID, userID, reason }) => {
143✔
248
        this.broadcast('skip', { moderatorID, userID, reason });
×
249
      },
143✔
250
      /**
143✔
251
       * Broadcast a chat message.
143✔
252
       */
143✔
253
      'chat:message': (message) => {
143✔
254
        this.broadcast('chatMessage', message);
2✔
255
      },
143✔
256
      /**
143✔
257
       * Delete chat messages. The delete filter can have an _id property to
143✔
258
       * delete a specific message, a userID property to delete messages by a
143✔
259
       * user, or be empty to delete all messages.
143✔
260
       */
143✔
261
      'chat:delete': ({ moderatorID, filter }) => {
143✔
262
        if ('id' in filter) {
6✔
263
          this.broadcast('chatDeleteByID', {
2✔
264
            moderatorID,
2✔
265
            _id: filter.id,
2✔
266
          });
2✔
267
        } else if ('userID' in filter) {
6✔
268
          this.broadcast('chatDeleteByUser', {
2✔
269
            moderatorID,
2✔
270
            userID: filter.userID,
2✔
271
          });
2✔
272
        } else if (isEmpty(filter)) {
2✔
273
          this.broadcast('chatDelete', { moderatorID });
2✔
274
        }
2✔
275
      },
143✔
276
      /**
143✔
277
       * Broadcast that a user was muted in chat.
143✔
278
       */
143✔
279
      'chat:mute': ({ moderatorID, userID, duration }) => {
143✔
280
        this.broadcast('chatMute', {
1✔
281
          userID,
1✔
282
          moderatorID,
1✔
283
          expiresAt: Date.now() + duration,
1✔
284
        });
1✔
285
      },
143✔
286
      /**
143✔
287
       * Broadcast that a user was unmuted in chat.
143✔
288
       */
143✔
289
      'chat:unmute': ({ moderatorID, userID }) => {
143✔
290
        this.broadcast('chatUnmute', { userID, moderatorID });
×
291
      },
143✔
292
      /**
143✔
293
       * Broadcast a vote for the current track.
143✔
294
       */
143✔
295
      'booth:vote': ({ userID, direction }) => {
143✔
296
        this.broadcast('vote', {
2✔
297
          _id: userID,
2✔
298
          value: direction,
2✔
299
        });
2✔
300
      },
143✔
301
      /**
143✔
302
       * Broadcast a favorite for the current track.
143✔
303
       */
143✔
304
      'booth:favorite': ({ userID }) => {
143✔
305
        this.broadcast('favorite', { userID });
1✔
306
      },
143✔
307
      /**
143✔
308
       * Cycle a single user's playlist.
143✔
309
       */
143✔
310
      'playlist:cycle': ({ userID, playlistID }) => {
143✔
311
        this.sendTo(userID, 'playlistCycle', { playlistID });
15✔
312
      },
143✔
313
      /**
143✔
314
       * Broadcast that a user joined the waitlist.
143✔
315
       */
143✔
316
      'waitlist:join': ({ userID, waitlist }) => {
143✔
317
        this.broadcast('waitlistJoin', { userID, waitlist });
18✔
318
      },
143✔
319
      /**
143✔
320
       * Broadcast that a user left the waitlist.
143✔
321
       */
143✔
322
      'waitlist:leave': ({ userID, waitlist }) => {
143✔
323
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
324
      },
143✔
325
      /**
143✔
326
       * Broadcast that a user was added to the waitlist.
143✔
327
       */
143✔
328
      'waitlist:add': ({
143✔
329
        userID, moderatorID, position, waitlist,
1✔
330
      }) => {
1✔
331
        this.broadcast('waitlistAdd', {
1✔
332
          userID, moderatorID, position, waitlist,
1✔
333
        });
1✔
334
      },
143✔
335
      /**
143✔
336
       * Broadcast that a user was removed from the waitlist.
143✔
337
       */
143✔
338
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
143✔
339
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
340
      },
143✔
341
      /**
143✔
342
       * Broadcast that a user was moved in the waitlist.
143✔
343
       */
143✔
344
      'waitlist:move': ({
143✔
345
        userID, moderatorID, position, waitlist,
×
346
      }) => {
×
347
        this.broadcast('waitlistMove', {
×
348
          userID, moderatorID, position, waitlist,
×
349
        });
×
350
      },
143✔
351
      /**
143✔
352
       * Broadcast a waitlist update.
143✔
353
       */
143✔
354
      'waitlist:update': (waitlist) => {
143✔
355
        this.broadcast('waitlistUpdate', waitlist);
15✔
356
      },
143✔
357
      /**
143✔
358
       * Broadcast that the waitlist was cleared.
143✔
359
       */
143✔
360
      'waitlist:clear': ({ moderatorID }) => {
143✔
361
        this.broadcast('waitlistClear', { moderatorID });
×
362
      },
143✔
363
      /**
143✔
364
       * Broadcast that the waitlist was locked.
143✔
365
       */
143✔
366
      'waitlist:lock': ({ moderatorID, locked }) => {
143✔
367
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
368
      },
143✔
369

143✔
370
      'acl:allow': ({ userID, roles }) => {
143✔
371
        this.broadcast('acl:allow', { userID, roles });
57✔
372
      },
143✔
373
      'acl:disallow': ({ userID, roles }) => {
143✔
374
        this.broadcast('acl:disallow', { userID, roles });
2✔
375
      },
143✔
376

143✔
377
      'user:update': ({ userID, moderatorID, new: update }) => {
143✔
378
        // TODO Remove this remnant of the old roles system
3✔
379
        if ('role' in update) {
3!
380
          this.broadcast('roleChange', {
×
381
            moderatorID,
×
382
            userID,
×
383
            role: update.role,
×
384
          });
×
385
        }
×
386
        if ('username' in update) {
3✔
387
          this.broadcast('nameChange', {
3✔
388
            moderatorID,
3✔
389
            userID,
3✔
390
            username: update.username,
3✔
391
          });
3✔
392
        }
3✔
393
      },
143✔
394
      'user:join': async ({ userID }) => {
143✔
395
        const { users, redis } = this.#uw;
22✔
396
        const user = await users.getUser(userID);
22✔
397
        if (user) {
22✔
398
          // TODO this should not be the socket server code's responsibility
22✔
399
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
22✔
400
          this.broadcast('join', serializeUser(user));
22✔
401
        }
22✔
402
      },
143✔
403
      /**
143✔
404
       * Broadcast that a user left the server.
143✔
405
       */
143✔
406
      'user:leave': ({ userID }) => {
143✔
407
        this.broadcast('leave', userID);
×
408
      },
143✔
409
      /**
143✔
410
       * Broadcast a ban event.
143✔
411
       */
143✔
412
      'user:ban': ({
143✔
413
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
414
      }) => {
3✔
415
        this.broadcast('ban', {
3✔
416
          moderatorID, userID, permanent, duration, expiresAt,
3✔
417
        });
3✔
418

3✔
419
        this.#connections.forEach((connection) => {
3✔
420
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
421
            connection.ban();
×
422
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
423
            connection.close();
×
424
          }
×
425
        });
3✔
426
      },
143✔
427
      /**
143✔
428
       * Broadcast an unban event.
143✔
429
       */
143✔
430
      'user:unban': ({ moderatorID, userID }) => {
143✔
431
        this.broadcast('unban', { moderatorID, userID });
1✔
432
      },
143✔
433
      /**
143✔
434
       * Force-close a connection.
143✔
435
       */
143✔
436
      'http-api:socket:close': (userID) => {
143✔
437
        this.#connections.forEach((connection) => {
×
438
          if ('user' in connection && connection.user.id === userID) {
×
439
            connection.close();
×
440
          }
×
441
        });
×
442
      },
143✔
443

143✔
444
      'emotes:reload': () => {
143✔
445
        this.broadcast('reloadEmotes', null);
×
446
      },
143✔
447
    };
143✔
448
  }
143✔
449

143✔
450
  /**
143✔
451
   * Create `LostConnection`s for every user that's known to be online, but that
143✔
452
   * is not currently connected to the socket server.
143✔
453
   *
143✔
454
   * @private
143✔
455
   */
143✔
456
  async initLostConnections() {
143✔
457
    const { db, redis } = this.#uw;
143✔
458
    const userIDs = /** @type {import('./schema').UserID[]} */ (
143✔
459
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
143✔
460
    );
143✔
461
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
143✔
462

143✔
463
    if (disconnectedIDs.length === 0) {
143✔
464
      return;
143✔
465
    }
143✔
466

×
467
    const disconnectedUsers = await db.selectFrom('users')
×
468
      .where('id', 'in', disconnectedIDs)
×
469
      .selectAll()
×
470
      .execute();
×
471
    disconnectedUsers.forEach((user) => {
×
NEW
472
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
×
473
    });
×
474
  }
143✔
475

143✔
476
  /**
143✔
477
   * @param {import('ws').WebSocket} socket
143✔
478
   * @param {import('http').IncomingMessage} request
143✔
479
   * @private
143✔
480
   */
143✔
481
  onSocketConnected(socket, request) {
143✔
482
    this.#logger.info({ req: request }, 'new connection');
22✔
483

22✔
484
    socket.on('error', (error) => {
22✔
485
      this.onSocketError(socket, error);
×
486
    });
22✔
487
    this.add(this.createGuestConnection(socket));
22✔
488
  }
22✔
489

143✔
490
  /**
143✔
491
   * @param {import('ws').WebSocket} socket
143✔
492
   * @param {Error} error
143✔
493
   * @private
143✔
494
   */
143✔
495
  onSocketError(socket, error) {
143✔
496
    this.#logger.warn({ err: error }, 'socket error');
×
497

×
498
    this.options.onError(socket, error);
×
499
  }
×
500

143✔
501
  /**
143✔
502
   * @param {Error} error
143✔
503
   * @private
143✔
504
   */
143✔
505
  onError(error) {
143✔
506
    this.#logger.error({ err: error }, 'server error');
×
507

×
508
    this.options.onError(undefined, error);
×
509
  }
×
510

143✔
511
  /**
143✔
512
   * Get a LostConnection for a user, if one exists.
143✔
513
   *
143✔
514
   * @param {User} user
143✔
515
   * @private
143✔
516
   */
143✔
517
  getLostConnection(user) {
143✔
518
    return this.#connections.find((connection) => (
×
519
      connection instanceof LostConnection && connection.user.id === user.id
×
520
    ));
×
521
  }
×
522

143✔
523
  /**
143✔
524
   * Create a connection instance for an unauthenticated user.
143✔
525
   *
143✔
526
   * @param {import('ws').WebSocket} socket
143✔
527
   * @private
143✔
528
   */
143✔
529
  createGuestConnection(socket) {
143✔
530
    const connection = new GuestConnection(this.#uw, socket, {
22✔
531
      authRegistry: this.authRegistry,
22✔
532
    });
22✔
533
    connection.on('close', () => {
22✔
534
      this.remove(connection);
×
535
    });
22✔
536
    connection.on('authenticate', async (user, sessionID) => {
22✔
537
      const isReconnect = await connection.isReconnect(sessionID);
22✔
538
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
22✔
539
      if (isReconnect) {
22!
NEW
540
        const previousConnection = this.getLostConnection(sessionID);
×
541
        if (previousConnection) this.remove(previousConnection);
×
542
      }
×
543

22✔
544
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
22✔
545

22✔
546
      if (!isReconnect) {
22✔
547
        this.#uw.publish('user:join', { userID: user.id });
22✔
548
      }
22✔
549
    });
22✔
550
    return connection;
22✔
551
  }
22✔
552

143✔
553
  /**
143✔
554
   * Create a connection instance for an authenticated user.
143✔
555
   *
143✔
556
   * @param {import('ws').WebSocket} socket
143✔
557
   * @param {User} user
143✔
558
   * @param {string} sessionID
143✔
559
   * @returns {AuthedConnection}
143✔
560
   * @private
143✔
561
   */
143✔
562
  createAuthedConnection(socket, user, sessionID) {
143✔
563
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
22✔
564
    connection.on('close', ({ banned }) => {
22✔
565
      if (banned) {
22!
566
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
567
        disconnectUser(this.#uw, user.id);
×
568
      } else if (!this.#closing) {
22!
569
        this.#logger.info({ userId: user.id }, 'lost connection');
×
NEW
570
        this.add(this.createLostConnection(user, sessionID));
×
571
      }
×
572
      this.remove(connection);
22✔
573
    });
22✔
574
    connection.on(
22✔
575
      'command',
22✔
576
      /**
22✔
577
       * @param {string} command
22✔
578
       * @param {import('type-fest').JsonValue} data
22✔
579
       */
22✔
580
      (command, data) => {
22✔
581
        this.#logger.trace({ userId: user.id, command, data }, 'command');
3✔
582
        if (has(this.#clientActions, command)) {
3✔
583
          // Ignore incorrect input
3✔
584
          const validate = this.#clientActionSchemas[command];
3✔
585
          if (validate && !validate(data)) {
3!
586
            return;
×
587
          }
×
588

3✔
589
          const action = this.#clientActions[command];
3✔
590
          // @ts-expect-error TS2345 `data` is validated
3✔
591
          action(user, data, connection);
3✔
592
        }
3✔
593
      },
22✔
594
    );
22✔
595
    return connection;
22✔
596
  }
22✔
597

143✔
598
  /**
143✔
599
   * Create a connection instance for a user who disconnected.
143✔
600
   *
143✔
601
   * @param {User} user
143✔
602
   * @param {string} sessionID
143✔
603
   * @returns {LostConnection}
143✔
604
   * @private
143✔
605
   */
143✔
606
  createLostConnection(user, sessionID) {
143✔
NEW
607
    const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
×
608
    connection.on('close', () => {
×
609
      this.#logger.info({ userId: user.id }, 'user left');
×
610
      this.remove(connection);
×
611
      // Only register that the user left if they didn't have another connection
×
612
      // still open.
×
613
      if (!this.connection(user)) {
×
614
        disconnectUser(this.#uw, user.id);
×
615
      }
×
616
    });
×
617
    return connection;
×
618
  }
×
619

143✔
620
  /**
143✔
621
   * Add a connection.
143✔
622
   *
143✔
623
   * @param {Connection} connection
143✔
624
   * @private
143✔
625
   */
143✔
626
  add(connection) {
143✔
627
    const userID = 'user' in connection ? connection.user.id : null;
44✔
628
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
44✔
629
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
44✔
630

44✔
631
    this.#connections.push(connection);
44✔
632
    this.#recountGuests();
44✔
633
  }
44✔
634

143✔
635
  /**
143✔
636
   * Remove a connection.
143✔
637
   *
143✔
638
   * @param {Connection} connection
143✔
639
   * @private
143✔
640
   */
143✔
641
  remove(connection) {
143✔
642
    const userID = 'user' in connection ? connection.user.id : null;
44✔
643
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
44✔
644
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
44✔
645

44✔
646
    const i = this.#connections.indexOf(connection);
44✔
647
    this.#connections.splice(i, 1);
44✔
648

44✔
649
    connection.removed();
44✔
650
    this.#recountGuests();
44✔
651
  }
44✔
652

143✔
653
  /**
143✔
654
   * Replace a connection instance with another connection instance. Useful when
143✔
655
   * a connection changes "type", like GuestConnection → AuthedConnection.
143✔
656
   *
143✔
657
   * @param {Connection} oldConnection
143✔
658
   * @param {Connection} newConnection
143✔
659
   * @private
143✔
660
   */
143✔
661
  replace(oldConnection, newConnection) {
143✔
662
    this.remove(oldConnection);
22✔
663
    this.add(newConnection);
22✔
664
  }
22✔
665

143✔
666
  /**
143✔
667
   * Handle command messages coming in from Redis.
143✔
668
   * Some commands are intended to broadcast immediately to all connected
143✔
669
   * clients, but others require special action.
143✔
670
   *
143✔
671
   * @param {string} channel
143✔
672
   * @param {string} rawCommand
143✔
673
   * @returns {Promise<void>}
143✔
674
   * @private
143✔
675
   */
143✔
676
  async onServerMessage(channel, rawCommand) {
143✔
677
    /**
188✔
678
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
188✔
679
     */
188✔
680
    const json = sjson.safeParse(rawCommand);
188✔
681
    if (!json) {
188!
682
      return;
×
683
    }
×
684
    const { command, data } = json;
188✔
685

188✔
686
    this.#logger.trace({ channel, command, data }, 'server message');
188✔
687

188✔
688
    if (has(this.#serverActions, command)) {
188✔
689
      const action = this.#serverActions[command];
171✔
690
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
171✔
691
        // @ts-expect-error TS2345 `data` is validated
171✔
692
        action(data);
171✔
693
      }
171✔
694
    }
171✔
695
  }
188✔
696

143✔
697
  /**
143✔
698
   * Stop the socket server.
143✔
699
   *
143✔
700
   * @returns {Promise<void>}
143✔
701
   */
143✔
702
  async destroy() {
143✔
703
    clearInterval(this.#pinger);
143✔
704

143✔
705
    this.#closing = true;
143✔
706
    for (const connection of this.#wss.clients) {
143✔
707
      connection.close();
22✔
708
    }
22✔
709

143✔
710
    this.#recountGuests.cancel();
143✔
711

143✔
712
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
143✔
713
    await closeWsServer();
143✔
714
    await this.#redisSubscription.quit();
143✔
715
  }
143✔
716

143✔
717
  /**
143✔
718
   * Get the connection instance for a specific user.
143✔
719
   *
143✔
720
   * @param {User|string} user The user.
143✔
721
   * @returns {Connection|undefined}
143✔
722
   */
143✔
723
  connection(user) {
143✔
724
    const userID = typeof user === 'object' ? user.id : user;
×
725
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
726
  }
×
727

143✔
728
  ping() {
143✔
729
    this.#connections.forEach((connection) => {
×
730
      if ('socket' in connection) {
×
731
        connection.ping();
×
732
      }
×
733
    });
×
734
  }
×
735

143✔
736
  /**
143✔
737
   * Broadcast a command to all connected clients.
143✔
738
   *
143✔
739
   * @param {string} command Command name.
143✔
740
   * @param {import('type-fest').JsonValue} data Command data.
143✔
741
   */
143✔
742
  broadcast(command, data) {
143✔
743
    this.#logger.trace({
156✔
744
      command,
156✔
745
      data,
156✔
746
      to: this.#connections.map((connection) => (
156✔
747
        'user' in connection ? connection.user.id : null
98!
748
      )),
156✔
749
    }, 'broadcast');
156✔
750

156✔
751
    this.#connections.forEach((connection) => {
156✔
752
      connection.send(command, data);
98✔
753
    });
156✔
754
  }
156✔
755

143✔
756
  /**
143✔
757
   * Send a command to a single user.
143✔
758
   *
143✔
759
   * @param {User|string} user User or user ID to send the command to.
143✔
760
   * @param {string} command Command name.
143✔
761
   * @param {import('type-fest').JsonValue} data Command data.
143✔
762
   */
143✔
763
  sendTo(user, command, data) {
143✔
764
    const userID = typeof user === 'object' ? user.id : user;
15!
765

15✔
766
    this.#connections.forEach((connection) => {
15✔
767
      if ('user' in connection && connection.user.id === userID) {
16✔
768
        connection.send(command, data);
13✔
769
      }
13✔
770
    });
15✔
771
  }
15✔
772

143✔
773
  async getGuestCount() {
143✔
774
    const { redis } = this.#uw;
×
775
    const rawCount = await redis.get('http-api:guests');
×
776
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
777
      return 0;
×
778
    }
×
779
    return parseInt(rawCount, 10);
×
780
  }
×
781

143✔
782
  async #recountGuestsInternal() {
143✔
783
    const { redis } = this.#uw;
×
784
    const guests = this.#connections
×
785
      .filter((connection) => connection instanceof GuestConnection)
×
786
      .length;
×
787

×
788
    const lastGuestCount = await this.getGuestCount();
×
789
    if (guests !== lastGuestCount) {
×
790
      await redis.set('http-api:guests', guests);
×
791
      this.broadcast('guests', guests);
×
792
    }
×
793
  }
×
794
}
143✔
795

1✔
796
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc