• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12201136025

06 Dec 2024 03:06PM UTC coverage: 85.316% (+1.0%) from 84.36%
12201136025

Pull #682

github

goto-bus-stop
use off the shelf redis store
Pull Request #682: Improve session handling

933 of 1112 branches covered (83.9%)

Branch coverage included in aggregate %.

79 of 120 new or added lines in 11 files covered. (65.83%)

6 existing lines in 1 file now uncovered.

9984 of 11684 relevant lines covered (85.45%)

90.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.51
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./schema.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
201✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
201✔
60
}
201✔
61

1✔
62
class SocketServer {
144✔
63
  /**
144✔
64
   * @param {import('./Uwave.js').Boot} uw
144✔
65
   * @param {{ secret: Buffer|string }} options
144✔
66
   */
144✔
67
  static async plugin(uw, options) {
144✔
68
    uw.socketServer = new SocketServer(uw, {
144✔
69
      secret: options.secret,
144✔
70
      server: uw.server,
144✔
71
    });
144✔
72

144✔
73
    uw.after(async () => {
144✔
74
      try {
144✔
75
        await uw.socketServer.initLostConnections();
144✔
76
      } catch (err) {
144!
77
        // No need to prevent startup for this.
×
78
        // We do need to clear the `users` list because the lost connection handlers
×
79
        // will not do so.
×
80
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
81
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
82
      }
×
83
    });
144✔
84

144✔
85
    uw.onClose(async () => {
144✔
86
      await uw.socketServer.destroy();
144✔
87
    });
144✔
88
  }
144✔
89

144✔
90
  #uw;
144✔
91

144✔
92
  #logger;
144✔
93

144✔
94
  #redisSubscription;
144✔
95

144✔
96
  #wss;
144✔
97

144✔
98
  #closing = false;
144✔
99

144✔
100
  /** @type {Connection[]} */
144✔
101
  #connections = [];
144✔
102

144✔
103
  #pinger;
144✔
104

144✔
105
  /**
144✔
106
   * Update online guests count and broadcast an update if necessary.
144✔
107
   */
144✔
108
  #recountGuests;
144✔
109

144✔
110
  /**
144✔
111
   * Handlers for commands that come in from clients.
144✔
112
   *
144✔
113
   * @type {ClientActions}
144✔
114
   */
144✔
115
  #clientActions;
144✔
116

144✔
117
  /**
144✔
118
   * @type {{
144✔
119
   *   [K in keyof ClientActionParameters]:
144✔
120
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
144✔
121
   * }}
144✔
122
   */
144✔
123
  #clientActionSchemas;
144✔
124

144✔
125
  /**
144✔
126
   * Handlers for commands that come in from the server side.
144✔
127
   *
144✔
128
   * @type {import('./redisMessages.js').ServerActions}
144✔
129
   */
144✔
130
  #serverActions;
144✔
131

144✔
132
  /**
144✔
133
   * Create a socket server.
144✔
134
   *
144✔
135
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
144✔
136
   * @param {object} options Socket server options.
144✔
137
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
144✔
138
   *     users to reconnect before removing them.
144✔
139
   * @param {Buffer|string} options.secret
144✔
140
   * @param {import('http').Server | import('https').Server} [options.server]
144✔
141
   * @param {number} [options.port]
144✔
142
   * @private
144✔
143
   */
144✔
144
  constructor(uw, options) {
144✔
145
    if (!uw) {
144!
146
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
147
    }
×
148

144✔
149
    this.#uw = uw;
144✔
150
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
144✔
151
      serializers: {
144✔
152
        req: stdSerializers.req,
144✔
153
      },
144✔
154
    });
144✔
155
    this.#redisSubscription = uw.redis.duplicate();
144✔
156

144✔
157
    this.options = {
144✔
158
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
144✔
159
      onError: (_socket, err) => {
144✔
160
        throw err;
×
161
      },
144✔
162
      timeout: 30,
144✔
163
      ...options,
144✔
164
    };
144✔
165

144✔
166
    // TODO put this behind a symbol, it's just public for tests
144✔
167
    this.authRegistry = new AuthRegistry(uw.redis);
144✔
168

144✔
169
    this.#wss = new WebSocketServer({
144✔
170
      server: options.server,
144✔
171
      port: options.server ? undefined : options.port,
144!
172
    });
144✔
173

144✔
174
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
144✔
175
    this.#redisSubscription.on('message', (channel, command) => {
144✔
176
      // this returns a promise, but we don't handle the error case:
194✔
177
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
194✔
178
      this.onServerMessage(channel, command);
194✔
179
    });
144✔
180

144✔
181
    this.#wss.on('error', (error) => {
144✔
182
      this.onError(error);
×
183
    });
144✔
184
    this.#wss.on('connection', (socket, request) => {
144✔
185
      this.onSocketConnected(socket, request);
25✔
186
    });
144✔
187

144✔
188
    this.#pinger = setInterval(() => {
144✔
189
      this.ping();
×
190
    }, ms('10 seconds'));
144✔
191

144✔
192
    this.#recountGuests = debounce(() => {
144✔
193
      if (this.#closing) {
21✔
194
        return;
21✔
195
      }
21✔
196
      this.#recountGuestsInternal().catch((error) => {
×
197
        this.#logger.error({ err: error }, 'counting guests failed');
×
198
      });
×
199
    }, ms('2 seconds'));
144✔
200

144✔
201
    this.#clientActions = {
144✔
202
      sendChat: (user, message) => {
144✔
203
        this.#logger.trace({ user, message }, 'sendChat');
7✔
204
        this.#uw.chat.send(user, message);
7✔
205
      },
144✔
206
      vote: (user, direction) => {
144✔
207
        socketVote(this.#uw, user.id, direction);
×
208
      },
144✔
209
      logout: (user, _, connection) => {
144✔
210
        this.replace(connection, this.createGuestConnection(connection.socket));
×
211
        if (!this.connection(user)) {
×
212
          disconnectUser(this.#uw, user.id);
×
213
        }
×
214
      },
144✔
215
    };
144✔
216

144✔
217
    this.#clientActionSchemas = {
144✔
218
      sendChat: ajv.compile({
144✔
219
        type: 'string',
144✔
220
      }),
144✔
221
      vote: ajv.compile({
144✔
222
        type: 'integer',
144✔
223
        enum: [-1, 1],
144✔
224
      }),
144✔
225
      logout: ajv.compile(true),
144✔
226
    };
144✔
227

144✔
228
    this.#serverActions = {
144✔
229
      /**
144✔
230
       * Broadcast the next track.
144✔
231
       */
144✔
232
      'advance:complete': (next) => {
144✔
233
        if (next) {
15✔
234
          this.broadcast('advance', {
15✔
235
            historyID: next.historyID,
15✔
236
            userID: next.userID,
15✔
237
            media: next.media,
15✔
238
            playedAt: new Date(next.playedAt).getTime(),
15✔
239
          });
15✔
240
        } else {
15!
241
          this.broadcast('advance', null);
×
242
        }
×
243
      },
144✔
244
      /**
144✔
245
       * Broadcast a skip notification.
144✔
246
       */
144✔
247
      'booth:skip': ({ moderatorID, userID, reason }) => {
144✔
248
        this.broadcast('skip', { moderatorID, userID, reason });
×
249
      },
144✔
250
      /**
144✔
251
       * Broadcast a chat message.
144✔
252
       */
144✔
253
      'chat:message': (message) => {
144✔
254
        this.broadcast('chatMessage', message);
6✔
255
      },
144✔
256
      /**
144✔
257
       * Delete chat messages. The delete filter can have an _id property to
144✔
258
       * delete a specific message, a userID property to delete messages by a
144✔
259
       * user, or be empty to delete all messages.
144✔
260
       */
144✔
261
      'chat:delete': ({ moderatorID, filter }) => {
144✔
262
        if ('id' in filter) {
6✔
263
          this.broadcast('chatDeleteByID', {
2✔
264
            moderatorID,
2✔
265
            _id: filter.id,
2✔
266
          });
2✔
267
        } else if ('userID' in filter) {
6✔
268
          this.broadcast('chatDeleteByUser', {
2✔
269
            moderatorID,
2✔
270
            userID: filter.userID,
2✔
271
          });
2✔
272
        } else if (isEmpty(filter)) {
2✔
273
          this.broadcast('chatDelete', { moderatorID });
2✔
274
        }
2✔
275
      },
144✔
276
      /**
144✔
277
       * Broadcast that a user was muted in chat.
144✔
278
       */
144✔
279
      'chat:mute': ({ moderatorID, userID, duration }) => {
144✔
280
        this.broadcast('chatMute', {
1✔
281
          userID,
1✔
282
          moderatorID,
1✔
283
          expiresAt: Date.now() + duration,
1✔
284
        });
1✔
285
      },
144✔
286
      /**
144✔
287
       * Broadcast that a user was unmuted in chat.
144✔
288
       */
144✔
289
      'chat:unmute': ({ moderatorID, userID }) => {
144✔
290
        this.broadcast('chatUnmute', { userID, moderatorID });
×
291
      },
144✔
292
      /**
144✔
293
       * Broadcast a vote for the current track.
144✔
294
       */
144✔
295
      'booth:vote': ({ userID, direction }) => {
144✔
296
        this.broadcast('vote', {
2✔
297
          _id: userID,
2✔
298
          value: direction,
2✔
299
        });
2✔
300
      },
144✔
301
      /**
144✔
302
       * Broadcast a favorite for the current track.
144✔
303
       */
144✔
304
      'booth:favorite': ({ userID }) => {
144✔
305
        this.broadcast('favorite', { userID });
1✔
306
      },
144✔
307
      /**
144✔
308
       * Cycle a single user's playlist.
144✔
309
       */
144✔
310
      'playlist:cycle': ({ userID, playlistID }) => {
144✔
311
        this.sendTo(userID, 'playlistCycle', { playlistID });
15✔
312
      },
144✔
313
      /**
144✔
314
       * Broadcast that a user joined the waitlist.
144✔
315
       */
144✔
316
      'waitlist:join': ({ userID, waitlist }) => {
144✔
317
        this.broadcast('waitlistJoin', { userID, waitlist });
18✔
318
      },
144✔
319
      /**
144✔
320
       * Broadcast that a user left the waitlist.
144✔
321
       */
144✔
322
      'waitlist:leave': ({ userID, waitlist }) => {
144✔
323
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
324
      },
144✔
325
      /**
144✔
326
       * Broadcast that a user was added to the waitlist.
144✔
327
       */
144✔
328
      'waitlist:add': ({
144✔
329
        userID, moderatorID, position, waitlist,
1✔
330
      }) => {
1✔
331
        this.broadcast('waitlistAdd', {
1✔
332
          userID, moderatorID, position, waitlist,
1✔
333
        });
1✔
334
      },
144✔
335
      /**
144✔
336
       * Broadcast that a user was removed from the waitlist.
144✔
337
       */
144✔
338
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
144✔
339
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
340
      },
144✔
341
      /**
144✔
342
       * Broadcast that a user was moved in the waitlist.
144✔
343
       */
144✔
344
      'waitlist:move': ({
144✔
345
        userID, moderatorID, position, waitlist,
×
346
      }) => {
×
347
        this.broadcast('waitlistMove', {
×
348
          userID, moderatorID, position, waitlist,
×
349
        });
×
350
      },
144✔
351
      /**
144✔
352
       * Broadcast a waitlist update.
144✔
353
       */
144✔
354
      'waitlist:update': (waitlist) => {
144✔
355
        this.broadcast('waitlistUpdate', waitlist);
15✔
356
      },
144✔
357
      /**
144✔
358
       * Broadcast that the waitlist was cleared.
144✔
359
       */
144✔
360
      'waitlist:clear': ({ moderatorID }) => {
144✔
361
        this.broadcast('waitlistClear', { moderatorID });
×
362
      },
144✔
363
      /**
144✔
364
       * Broadcast that the waitlist was locked.
144✔
365
       */
144✔
366
      'waitlist:lock': ({ moderatorID, locked }) => {
144✔
367
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
368
      },
144✔
369

144✔
370
      'acl:allow': ({ userID, roles }) => {
144✔
371
        this.broadcast('acl:allow', { userID, roles });
57✔
372
      },
144✔
373
      'acl:disallow': ({ userID, roles }) => {
144✔
374
        this.broadcast('acl:disallow', { userID, roles });
2✔
375
      },
144✔
376

144✔
377
      'user:update': ({ userID, moderatorID, new: update }) => {
144✔
378
        // TODO Remove this remnant of the old roles system
3✔
379
        if ('role' in update) {
3!
380
          this.broadcast('roleChange', {
×
381
            moderatorID,
×
382
            userID,
×
383
            role: update.role,
×
384
          });
×
385
        }
×
386
        if ('username' in update) {
3✔
387
          this.broadcast('nameChange', {
3✔
388
            moderatorID,
3✔
389
            userID,
3✔
390
            username: update.username,
3✔
391
          });
3✔
392
        }
3✔
393
      },
144✔
394
      'user:join': async ({ userID }) => {
144✔
395
        const { users, redis } = this.#uw;
24✔
396
        const user = await users.getUser(userID);
24✔
397
        if (user) {
24✔
398
          // TODO this should not be the socket server code's responsibility
24✔
399
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
24✔
400
          this.broadcast('join', serializeUser(user));
24✔
401
        }
24✔
402
      },
144✔
403
      /**
144✔
404
       * Broadcast that a user left the server.
144✔
405
       */
144✔
406
      'user:leave': ({ userID }) => {
144✔
407
        this.broadcast('leave', userID);
×
408
      },
144✔
409
      /**
144✔
410
       * Broadcast a ban event.
144✔
411
       */
144✔
412
      'user:ban': ({
144✔
413
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
414
      }) => {
3✔
415
        this.broadcast('ban', {
3✔
416
          moderatorID, userID, permanent, duration, expiresAt,
3✔
417
        });
3✔
418

3✔
419
        this.#connections.forEach((connection) => {
3✔
420
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
421
            connection.ban();
×
422
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
423
            connection.close();
×
424
          }
×
425
        });
3✔
426
      },
144✔
427
      /**
144✔
428
       * Broadcast an unban event.
144✔
429
       */
144✔
430
      'user:unban': ({ moderatorID, userID }) => {
144✔
431
        this.broadcast('unban', { moderatorID, userID });
1✔
432
      },
144✔
433
      /**
144✔
434
       * Force-close a connection.
144✔
435
       */
144✔
436
      'http-api:socket:close': (userID) => {
144✔
437
        this.#connections.forEach((connection) => {
×
438
          if ('user' in connection && connection.user.id === userID) {
×
439
            connection.close();
×
440
          }
×
441
        });
×
442
      },
144✔
443

144✔
444
      'emotes:reload': () => {
144✔
445
        this.broadcast('reloadEmotes', null);
×
446
      },
144✔
447
    };
144✔
448
  }
144✔
449

144✔
450
  /**
144✔
451
   * Create `LostConnection`s for every user that's known to be online, but that
144✔
452
   * is not currently connected to the socket server.
144✔
453
   *
144✔
454
   * @private
144✔
455
   */
144✔
456
  async initLostConnections() {
144✔
457
    const { db, redis } = this.#uw;
144✔
458
    const userIDs = /** @type {import('./schema').UserID[]} */ (
144✔
459
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
144✔
460
    );
144✔
461
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
144✔
462

144✔
463
    if (disconnectedIDs.length === 0) {
144✔
464
      return;
144✔
465
    }
144✔
466

×
467
    const disconnectedUsers = await db.selectFrom('users')
×
468
      .where('id', 'in', disconnectedIDs)
×
469
      .selectAll()
×
470
      .execute();
×
471
    disconnectedUsers.forEach((user) => {
×
NEW
472
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
×
473
    });
×
474
  }
144✔
475

144✔
476
  /**
144✔
477
   * @param {import('ws').WebSocket} socket
144✔
478
   * @param {import('http').IncomingMessage} request
144✔
479
   * @private
144✔
480
   */
144✔
481
  onSocketConnected(socket, request) {
144✔
482
    this.#logger.info({ req: request }, 'new connection');
25✔
483

25✔
484
    socket.on('error', (error) => {
25✔
485
      this.onSocketError(socket, error);
×
486
    });
25✔
487
    this.add(this.createGuestConnection(socket));
25✔
488
  }
25✔
489

144✔
490
  /**
144✔
491
   * @param {import('ws').WebSocket} socket
144✔
492
   * @param {Error} error
144✔
493
   * @private
144✔
494
   */
144✔
495
  onSocketError(socket, error) {
144✔
496
    this.#logger.warn({ err: error }, 'socket error');
×
497

×
498
    this.options.onError(socket, error);
×
499
  }
×
500

144✔
501
  /**
144✔
502
   * @param {Error} error
144✔
503
   * @private
144✔
504
   */
144✔
505
  onError(error) {
144✔
506
    this.#logger.error({ err: error }, 'server error');
×
507

×
508
    this.options.onError(undefined, error);
×
509
  }
×
510

144✔
511
  /**
144✔
512
   * Get a LostConnection for a user, if one exists.
144✔
513
   *
144✔
514
   * @param {User} user
144✔
515
   * @private
144✔
516
   */
144✔
517
  getLostConnection(user) {
144✔
518
    return this.#connections.find((connection) => (
1✔
519
      connection instanceof LostConnection && connection.user.id === user.id
3✔
520
    ));
1✔
521
  }
1✔
522

144✔
523
  /**
144✔
524
   * Create a connection instance for an unauthenticated user.
144✔
525
   *
144✔
526
   * @param {import('ws').WebSocket} socket
144✔
527
   * @private
144✔
528
   */
144✔
529
  createGuestConnection(socket) {
144✔
530
    const connection = new GuestConnection(this.#uw, socket, {
25✔
531
      authRegistry: this.authRegistry,
25✔
532
    });
25✔
533
    connection.on('close', () => {
25✔
534
      this.remove(connection);
×
535
    });
25✔
536
    connection.on('authenticate', async (user, sessionID) => {
25✔
537
      const isReconnect = await connection.isReconnect(sessionID);
25✔
538
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
25✔
539
      if (isReconnect) {
25✔
540
        const previousConnection = this.getLostConnection(sessionID);
1✔
541
        if (previousConnection) this.remove(previousConnection);
1!
542
      }
1✔
543

25✔
544
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
25✔
545

25✔
546
      if (!isReconnect) {
25✔
547
        this.#uw.publish('user:join', { userID: user.id });
24✔
548
      }
24✔
549
    });
25✔
550
    return connection;
25✔
551
  }
25✔
552

144✔
553
  /**
144✔
554
   * Create a connection instance for an authenticated user.
144✔
555
   *
144✔
556
   * @param {import('ws').WebSocket} socket
144✔
557
   * @param {User} user
144✔
558
   * @param {string} sessionID
144✔
559
   * @returns {AuthedConnection}
144✔
560
   * @private
144✔
561
   */
144✔
562
  createAuthedConnection(socket, user, sessionID) {
144✔
563
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
25✔
564
    connection.on('close', ({ banned }) => {
25✔
565
      if (banned) {
25!
566
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
567
        disconnectUser(this.#uw, user.id);
×
568
      } else if (!this.#closing) {
25✔
569
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
570
        this.add(this.createLostConnection(user, sessionID));
2✔
571
      }
2✔
572
      this.remove(connection);
25✔
573
    });
25✔
574
    connection.on(
25✔
575
      'command',
25✔
576
      /**
25✔
577
       * @param {string} command
25✔
578
       * @param {import('type-fest').JsonValue} data
25✔
579
       */
25✔
580
      (command, data) => {
25✔
581
        this.#logger.trace({ userId: user.id, command, data }, 'command');
7✔
582
        if (has(this.#clientActions, command)) {
7✔
583
          // Ignore incorrect input
7✔
584
          const validate = this.#clientActionSchemas[command];
7✔
585
          if (validate && !validate(data)) {
7!
586
            return;
×
587
          }
×
588

7✔
589
          const action = this.#clientActions[command];
7✔
590
          // @ts-expect-error TS2345 `data` is validated
7✔
591
          action(user, data, connection);
7✔
592
        }
7✔
593
      },
25✔
594
    );
25✔
595
    return connection;
25✔
596
  }
25✔
597

144✔
598
  /**
144✔
599
   * Create a connection instance for a user who disconnected.
144✔
600
   *
144✔
601
   * @param {User} user
144✔
602
   * @param {string} sessionID
144✔
603
   * @returns {LostConnection}
144✔
604
   * @private
144✔
605
   */
144✔
606
  createLostConnection(user, sessionID) {
144✔
607
    const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
2✔
608
    connection.on('close', () => {
2✔
609
      this.#logger.info({ userId: user.id }, 'user left');
2✔
610
      this.remove(connection);
2✔
611
      // Only register that the user left if they didn't have another connection
2✔
612
      // still open.
2✔
613
      if (!this.connection(user)) {
2✔
614
        disconnectUser(this.#uw, user.id);
2✔
615
      }
2✔
616
    });
2✔
617
    return connection;
2✔
618
  }
2✔
619

144✔
620
  /**
144✔
621
   * Add a connection.
144✔
622
   *
144✔
623
   * @param {Connection} connection
144✔
624
   * @private
144✔
625
   */
144✔
626
  add(connection) {
144✔
627
    const userID = 'user' in connection ? connection.user.id : null;
52✔
628
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
629
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
52✔
630

52✔
631
    this.#connections.push(connection);
52✔
632
    this.#recountGuests();
52✔
633
  }
52✔
634

144✔
635
  /**
144✔
636
   * Remove a connection.
144✔
637
   *
144✔
638
   * @param {Connection} connection
144✔
639
   * @private
144✔
640
   */
144✔
641
  remove(connection) {
144✔
642
    const userID = 'user' in connection ? connection.user.id : null;
52✔
643
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
644
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
52✔
645

52✔
646
    const i = this.#connections.indexOf(connection);
52✔
647
    this.#connections.splice(i, 1);
52✔
648

52✔
649
    connection.removed();
52✔
650
    this.#recountGuests();
52✔
651
  }
52✔
652

144✔
653
  /**
144✔
654
   * Replace a connection instance with another connection instance. Useful when
144✔
655
   * a connection changes "type", like GuestConnection → AuthedConnection.
144✔
656
   *
144✔
657
   * @param {Connection} oldConnection
144✔
658
   * @param {Connection} newConnection
144✔
659
   * @private
144✔
660
   */
144✔
661
  replace(oldConnection, newConnection) {
144✔
662
    this.remove(oldConnection);
25✔
663
    this.add(newConnection);
25✔
664
  }
25✔
665

144✔
666
  /**
144✔
667
   * Handle command messages coming in from Redis.
144✔
668
   * Some commands are intended to broadcast immediately to all connected
144✔
669
   * clients, but others require special action.
144✔
670
   *
144✔
671
   * @param {string} channel
144✔
672
   * @param {string} rawCommand
144✔
673
   * @returns {Promise<void>}
144✔
674
   * @private
144✔
675
   */
144✔
676
  async onServerMessage(channel, rawCommand) {
144✔
677
    /**
194✔
678
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
194✔
679
     */
194✔
680
    const json = sjson.safeParse(rawCommand);
194✔
681
    if (!json) {
194!
682
      return;
×
683
    }
×
684
    const { command, data } = json;
194✔
685

194✔
686
    this.#logger.trace({ channel, command, data }, 'server message');
194✔
687

194✔
688
    if (has(this.#serverActions, command)) {
194✔
689
      const action = this.#serverActions[command];
177✔
690
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
177✔
691
        // @ts-expect-error TS2345 `data` is validated
177✔
692
        action(data);
177✔
693
      }
177✔
694
    }
177✔
695
  }
194✔
696

144✔
697
  /**
144✔
698
   * Stop the socket server.
144✔
699
   *
144✔
700
   * @returns {Promise<void>}
144✔
701
   */
144✔
702
  async destroy() {
144✔
703
    clearInterval(this.#pinger);
144✔
704

144✔
705
    this.#closing = true;
144✔
706
    for (const connection of this.#wss.clients) {
144✔
707
      connection.close();
23✔
708
    }
23✔
709

144✔
710
    this.#recountGuests.cancel();
144✔
711

144✔
712
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
144✔
713
    await closeWsServer();
144✔
714
    await this.#redisSubscription.quit();
144✔
715
  }
144✔
716

144✔
717
  /**
144✔
718
   * Get the connection instance for a specific user.
144✔
719
   *
144✔
720
   * @param {User|string} user The user.
144✔
721
   * @returns {Connection|undefined}
144✔
722
   */
144✔
723
  connection(user) {
144✔
724
    const userID = typeof user === 'object' ? user.id : user;
2!
725
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
2✔
726
  }
2✔
727

144✔
728
  ping() {
144✔
729
    this.#connections.forEach((connection) => {
×
730
      if ('socket' in connection) {
×
731
        connection.ping();
×
732
      }
×
733
    });
×
734
  }
×
735

144✔
736
  /**
144✔
737
   * Broadcast a command to all connected clients.
144✔
738
   *
144✔
739
   * @param {string} command Command name.
144✔
740
   * @param {import('type-fest').JsonValue} data Command data.
144✔
741
   */
144✔
742
  broadcast(command, data) {
144✔
743
    this.#logger.trace({
162✔
744
      command,
162✔
745
      data,
162✔
746
      to: this.#connections.map((connection) => (
162✔
747
        'user' in connection ? connection.user.id : null
109!
748
      )),
162✔
749
    }, 'broadcast');
162✔
750

162✔
751
    this.#connections.forEach((connection) => {
162✔
752
      connection.send(command, data);
109✔
753
    });
162✔
754
  }
162✔
755

144✔
756
  /**
144✔
757
   * Send a command to a single user.
144✔
758
   *
144✔
759
   * @param {User|string} user User or user ID to send the command to.
144✔
760
   * @param {string} command Command name.
144✔
761
   * @param {import('type-fest').JsonValue} data Command data.
144✔
762
   */
144✔
763
  sendTo(user, command, data) {
144✔
764
    const userID = typeof user === 'object' ? user.id : user;
15!
765

15✔
766
    this.#connections.forEach((connection) => {
15✔
767
      if ('user' in connection && connection.user.id === userID) {
16✔
768
        connection.send(command, data);
13✔
769
      }
13✔
770
    });
15✔
771
  }
15✔
772

144✔
773
  async getGuestCount() {
144✔
774
    const { redis } = this.#uw;
×
775
    const rawCount = await redis.get('http-api:guests');
×
776
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
777
      return 0;
×
778
    }
×
779
    return parseInt(rawCount, 10);
×
780
  }
×
781

144✔
782
  async #recountGuestsInternal() {
144✔
783
    const { redis } = this.#uw;
×
784
    const guests = this.#connections
×
785
      .filter((connection) => connection instanceof GuestConnection)
×
786
      .length;
×
787

×
788
    const lastGuestCount = await this.getGuestCount();
×
789
    if (guests !== lastGuestCount) {
×
790
      await redis.set('http-api:guests', guests);
×
791
      this.broadcast('guests', guests);
×
792
    }
×
793
  }
×
794
}
144✔
795

1✔
796
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc