• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12212259011

07 Dec 2024 10:47AM UTC coverage: 85.282% (-0.03%) from 85.316%
12212259011

Pull #683

github

goto-bus-stop
Tweak guest counting
Pull Request #683: Various tweaks to make the socket server shut down cleanly

936 of 1115 branches covered (83.95%)

Branch coverage included in aggregate %.

66 of 95 new or added lines in 4 files covered. (69.47%)

4 existing lines in 3 files now uncovered.

10015 of 11726 relevant lines covered (85.41%)

90.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.2
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import { stdSerializers } from 'pino';
1✔
7
import { socketVote } from './controllers/booth.js';
1✔
8
import { disconnectUser } from './controllers/users.js';
1✔
9
import AuthRegistry from './AuthRegistry.js';
1✔
10
import GuestConnection from './sockets/GuestConnection.js';
1✔
11
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
12
import LostConnection from './sockets/LostConnection.js';
1✔
13
import { serializeUser } from './utils/serialize.js';
1✔
14

1✔
15
const { isEmpty } = lodash;
1✔
16

1✔
17
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
18

1✔
19
const PING_INTERVAL = 10_000;
1✔
20
const GUEST_COUNT_INTERVAL = 2_000;
1✔
21

1✔
22
/**
1✔
23
 * @typedef {import('./schema.js').User} User
1✔
24
 */
1✔
25

1✔
26
/**
1✔
27
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
28
 */
1✔
29

1✔
30
/**
1✔
31
 * @typedef {object} ClientActionParameters
1✔
32
 * @prop {string} sendChat
1✔
33
 * @prop {-1 | 1} vote
1✔
34
 * @prop {undefined} logout
1✔
35
 */
1✔
36

1✔
37
/**
1✔
38
 * @typedef {{
1✔
39
 *   [Name in keyof ClientActionParameters]: (
1✔
40
 *     user: User,
1✔
41
 *     parameter: ClientActionParameters[Name],
1✔
42
 *     connection: AuthedConnection
1✔
43
 *   ) => void
1✔
44
 * }} ClientActions
1✔
45
 */
1✔
46

1✔
47
const ajv = new Ajv({
1✔
48
  coerceTypes: false,
1✔
49
  ownProperties: true,
1✔
50
  removeAdditional: true,
1✔
51
  useDefaults: false,
1✔
52
});
1✔
53

1✔
54
/**
1✔
55
 * @template {object} T
1✔
56
 * @param {T} object
1✔
57
 * @param {PropertyKey} property
1✔
58
 * @returns {property is keyof T}
1✔
59
 */
1✔
60
function has(object, property) {
201✔
61
  return Object.prototype.hasOwnProperty.call(object, property);
201✔
62
}
201✔
63

1✔
64
class SocketServer {
144✔
65
  /**
144✔
66
   * @param {import('./Uwave.js').Boot} uw
144✔
67
   * @param {{ secret: Buffer|string }} options
144✔
68
   */
144✔
69
  static async plugin(uw, options) {
144✔
70
    uw.socketServer = new SocketServer(uw, {
144✔
71
      secret: options.secret,
144✔
72
      server: uw.server,
144✔
73
    });
144✔
74

144✔
75
    uw.after(async () => {
144✔
76
      try {
144✔
77
        await uw.socketServer.initLostConnections();
144✔
78
      } catch (err) {
144!
79
        // No need to prevent startup for this.
×
80
        // We do need to clear the `users` list because the lost connection handlers
×
81
        // will not do so.
×
82
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
83
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
84
      }
×
85
    });
144✔
86

144✔
87
    uw.onClose(async () => {
144✔
88
      await uw.socketServer.destroy();
144✔
89
    });
144✔
90
  }
144✔
91

144✔
92
  #uw;
144✔
93

144✔
94
  #logger;
144✔
95

144✔
96
  #redisSubscription;
144✔
97

144✔
98
  #wss;
144✔
99

144✔
100
  #closing = false;
144✔
101

144✔
102
  /** @type {Connection[]} */
144✔
103
  #connections = [];
144✔
104

144✔
105
  #pinger;
144✔
106

144✔
107
  /** Update online guests count and broadcast an update if necessary. */
144✔
108
  #guestCountInterval;
144✔
109

144✔
110
  #guestCountDirty = true;
144✔
111

144✔
112
  /**
144✔
113
   * Handlers for commands that come in from clients.
144✔
114
   *
144✔
115
   * @type {ClientActions}
144✔
116
   */
144✔
117
  #clientActions;
144✔
118

144✔
119
  /**
144✔
120
   * @type {{
144✔
121
   *   [K in keyof ClientActionParameters]:
144✔
122
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
144✔
123
   * }}
144✔
124
   */
144✔
125
  #clientActionSchemas;
144✔
126

144✔
127
  /**
144✔
128
   * Handlers for commands that come in from the server side.
144✔
129
   *
144✔
130
   * @type {import('./redisMessages.js').ServerActions}
144✔
131
   */
144✔
132
  #serverActions;
144✔
133

144✔
134
  /**
144✔
135
   * Create a socket server.
144✔
136
   *
144✔
137
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
144✔
138
   * @param {object} options Socket server options.
144✔
139
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
144✔
140
   *     users to reconnect before removing them.
144✔
141
   * @param {Buffer|string} options.secret
144✔
142
   * @param {import('http').Server | import('https').Server} [options.server]
144✔
143
   * @param {number} [options.port]
144✔
144
   * @private
144✔
145
   */
144✔
146
  constructor(uw, options) {
144✔
147
    if (!uw) {
144!
148
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
149
    }
×
150

144✔
151
    this.#uw = uw;
144✔
152
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
144✔
153
      serializers: {
144✔
154
        req: stdSerializers.req,
144✔
155
      },
144✔
156
    });
144✔
157
    this.#redisSubscription = uw.redis.duplicate();
144✔
158

144✔
159
    this.options = {
144✔
160
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
144✔
161
      onError: (_socket, err) => {
144✔
162
        throw err;
×
163
      },
144✔
164
      timeout: 30,
144✔
165
      ...options,
144✔
166
    };
144✔
167

144✔
168
    // TODO put this behind a symbol, it's just public for tests
144✔
169
    this.authRegistry = new AuthRegistry(uw.redis);
144✔
170

144✔
171
    this.#wss = new WebSocketServer({
144✔
172
      server: options.server,
144✔
173
      port: options.server ? undefined : options.port,
144!
174
    });
144✔
175

144✔
176
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
144✔
177
    this.#redisSubscription.on('message', (channel, command) => {
144✔
178
      // this returns a promise, but we don't handle the error case:
194✔
179
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
194✔
180
      this.onServerMessage(channel, command);
194✔
181
    });
144✔
182

144✔
183
    this.#wss.on('error', (error) => {
144✔
184
      this.onError(error);
×
185
    });
144✔
186
    this.#wss.on('connection', (socket, request) => {
144✔
187
      this.onSocketConnected(socket, request);
25✔
188
    });
144✔
189

144✔
190
    this.#pinger = setInterval(() => {
144✔
191
      this.ping();
×
192
    }, PING_INTERVAL);
144✔
193

144✔
194
    this.#guestCountInterval = setInterval(() => {
144✔
NEW
195
      if (!this.#guestCountDirty) {
×
UNCOV
196
        return;
×
UNCOV
197
      }
×
NEW
198

×
NEW
199
      this.#recountGuests().catch((error) => {
×
200
        this.#logger.error({ err: error }, 'counting guests failed');
×
201
      });
×
202
    }, GUEST_COUNT_INTERVAL);
144✔
203

144✔
204
    this.#clientActions = {
144✔
205
      sendChat: (user, message) => {
144✔
206
        this.#logger.trace({ user, message }, 'sendChat');
7✔
207
        this.#uw.chat.send(user, message);
7✔
208
      },
144✔
209
      vote: (user, direction) => {
144✔
210
        socketVote(this.#uw, user.id, direction);
×
211
      },
144✔
212
      logout: (user, _, connection) => {
144✔
213
        this.replace(connection, this.createGuestConnection(connection.socket));
×
214
        if (!this.connection(user)) {
×
215
          disconnectUser(this.#uw, user.id);
×
216
        }
×
217
      },
144✔
218
    };
144✔
219

144✔
220
    this.#clientActionSchemas = {
144✔
221
      sendChat: ajv.compile({
144✔
222
        type: 'string',
144✔
223
      }),
144✔
224
      vote: ajv.compile({
144✔
225
        type: 'integer',
144✔
226
        enum: [-1, 1],
144✔
227
      }),
144✔
228
      logout: ajv.compile(true),
144✔
229
    };
144✔
230

144✔
231
    this.#serverActions = {
144✔
232
      /**
144✔
233
       * Broadcast the next track.
144✔
234
       */
144✔
235
      'advance:complete': (next) => {
144✔
236
        if (next) {
15✔
237
          this.broadcast('advance', {
15✔
238
            historyID: next.historyID,
15✔
239
            userID: next.userID,
15✔
240
            media: next.media,
15✔
241
            playedAt: new Date(next.playedAt).getTime(),
15✔
242
          });
15✔
243
        } else {
15!
244
          this.broadcast('advance', null);
×
245
        }
×
246
      },
144✔
247
      /**
144✔
248
       * Broadcast a skip notification.
144✔
249
       */
144✔
250
      'booth:skip': ({ moderatorID, userID, reason }) => {
144✔
251
        this.broadcast('skip', { moderatorID, userID, reason });
×
252
      },
144✔
253
      /**
144✔
254
       * Broadcast a chat message.
144✔
255
       */
144✔
256
      'chat:message': (message) => {
144✔
257
        this.broadcast('chatMessage', message);
6✔
258
      },
144✔
259
      /**
144✔
260
       * Delete chat messages. The delete filter can have an _id property to
144✔
261
       * delete a specific message, a userID property to delete messages by a
144✔
262
       * user, or be empty to delete all messages.
144✔
263
       */
144✔
264
      'chat:delete': ({ moderatorID, filter }) => {
144✔
265
        if ('id' in filter) {
6✔
266
          this.broadcast('chatDeleteByID', {
2✔
267
            moderatorID,
2✔
268
            _id: filter.id,
2✔
269
          });
2✔
270
        } else if ('userID' in filter) {
6✔
271
          this.broadcast('chatDeleteByUser', {
2✔
272
            moderatorID,
2✔
273
            userID: filter.userID,
2✔
274
          });
2✔
275
        } else if (isEmpty(filter)) {
2✔
276
          this.broadcast('chatDelete', { moderatorID });
2✔
277
        }
2✔
278
      },
144✔
279
      /**
144✔
280
       * Broadcast that a user was muted in chat.
144✔
281
       */
144✔
282
      'chat:mute': ({ moderatorID, userID, duration }) => {
144✔
283
        this.broadcast('chatMute', {
1✔
284
          userID,
1✔
285
          moderatorID,
1✔
286
          expiresAt: Date.now() + duration,
1✔
287
        });
1✔
288
      },
144✔
289
      /**
144✔
290
       * Broadcast that a user was unmuted in chat.
144✔
291
       */
144✔
292
      'chat:unmute': ({ moderatorID, userID }) => {
144✔
293
        this.broadcast('chatUnmute', { userID, moderatorID });
×
294
      },
144✔
295
      /**
144✔
296
       * Broadcast a vote for the current track.
144✔
297
       */
144✔
298
      'booth:vote': ({ userID, direction }) => {
144✔
299
        this.broadcast('vote', {
2✔
300
          _id: userID,
2✔
301
          value: direction,
2✔
302
        });
2✔
303
      },
144✔
304
      /**
144✔
305
       * Broadcast a favorite for the current track.
144✔
306
       */
144✔
307
      'booth:favorite': ({ userID }) => {
144✔
308
        this.broadcast('favorite', { userID });
1✔
309
      },
144✔
310
      /**
144✔
311
       * Cycle a single user's playlist.
144✔
312
       */
144✔
313
      'playlist:cycle': ({ userID, playlistID }) => {
144✔
314
        this.sendTo(userID, 'playlistCycle', { playlistID });
15✔
315
      },
144✔
316
      /**
144✔
317
       * Broadcast that a user joined the waitlist.
144✔
318
       */
144✔
319
      'waitlist:join': ({ userID, waitlist }) => {
144✔
320
        this.broadcast('waitlistJoin', { userID, waitlist });
18✔
321
      },
144✔
322
      /**
144✔
323
       * Broadcast that a user left the waitlist.
144✔
324
       */
144✔
325
      'waitlist:leave': ({ userID, waitlist }) => {
144✔
326
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
327
      },
144✔
328
      /**
144✔
329
       * Broadcast that a user was added to the waitlist.
144✔
330
       */
144✔
331
      'waitlist:add': ({
144✔
332
        userID, moderatorID, position, waitlist,
1✔
333
      }) => {
1✔
334
        this.broadcast('waitlistAdd', {
1✔
335
          userID, moderatorID, position, waitlist,
1✔
336
        });
1✔
337
      },
144✔
338
      /**
144✔
339
       * Broadcast that a user was removed from the waitlist.
144✔
340
       */
144✔
341
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
144✔
342
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
343
      },
144✔
344
      /**
144✔
345
       * Broadcast that a user was moved in the waitlist.
144✔
346
       */
144✔
347
      'waitlist:move': ({
144✔
348
        userID, moderatorID, position, waitlist,
×
349
      }) => {
×
350
        this.broadcast('waitlistMove', {
×
351
          userID, moderatorID, position, waitlist,
×
352
        });
×
353
      },
144✔
354
      /**
144✔
355
       * Broadcast a waitlist update.
144✔
356
       */
144✔
357
      'waitlist:update': (waitlist) => {
144✔
358
        this.broadcast('waitlistUpdate', waitlist);
15✔
359
      },
144✔
360
      /**
144✔
361
       * Broadcast that the waitlist was cleared.
144✔
362
       */
144✔
363
      'waitlist:clear': ({ moderatorID }) => {
144✔
364
        this.broadcast('waitlistClear', { moderatorID });
×
365
      },
144✔
366
      /**
144✔
367
       * Broadcast that the waitlist was locked.
144✔
368
       */
144✔
369
      'waitlist:lock': ({ moderatorID, locked }) => {
144✔
370
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
371
      },
144✔
372

144✔
373
      'acl:allow': ({ userID, roles }) => {
144✔
374
        this.broadcast('acl:allow', { userID, roles });
57✔
375
      },
144✔
376
      'acl:disallow': ({ userID, roles }) => {
144✔
377
        this.broadcast('acl:disallow', { userID, roles });
2✔
378
      },
144✔
379

144✔
380
      'user:update': ({ userID, moderatorID, new: update }) => {
144✔
381
        // TODO Remove this remnant of the old roles system
3✔
382
        if ('role' in update) {
3!
383
          this.broadcast('roleChange', {
×
384
            moderatorID,
×
385
            userID,
×
386
            role: update.role,
×
387
          });
×
388
        }
×
389
        if ('username' in update) {
3✔
390
          this.broadcast('nameChange', {
3✔
391
            moderatorID,
3✔
392
            userID,
3✔
393
            username: update.username,
3✔
394
          });
3✔
395
        }
3✔
396
      },
144✔
397
      'user:join': async ({ userID }) => {
144✔
398
        const { users, redis } = this.#uw;
24✔
399
        const user = await users.getUser(userID);
24✔
400
        if (user) {
24✔
401
          // TODO this should not be the socket server code's responsibility
24✔
402
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
24✔
403
          this.broadcast('join', serializeUser(user));
24✔
404
        }
24✔
405
      },
144✔
406
      /**
144✔
407
       * Broadcast that a user left the server.
144✔
408
       */
144✔
409
      'user:leave': ({ userID }) => {
144✔
410
        this.broadcast('leave', userID);
×
411
      },
144✔
412
      /**
144✔
413
       * Broadcast a ban event.
144✔
414
       */
144✔
415
      'user:ban': ({
144✔
416
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
417
      }) => {
3✔
418
        this.broadcast('ban', {
3✔
419
          moderatorID, userID, permanent, duration, expiresAt,
3✔
420
        });
3✔
421

3✔
422
        this.#connections.forEach((connection) => {
3✔
423
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
424
            connection.ban();
×
425
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
426
            connection.close();
×
427
          }
×
428
        });
3✔
429
      },
144✔
430
      /**
144✔
431
       * Broadcast an unban event.
144✔
432
       */
144✔
433
      'user:unban': ({ moderatorID, userID }) => {
144✔
434
        this.broadcast('unban', { moderatorID, userID });
1✔
435
      },
144✔
436
      /**
144✔
437
       * Force-close a connection.
144✔
438
       */
144✔
439
      'http-api:socket:close': (userID) => {
144✔
440
        this.#connections.forEach((connection) => {
×
441
          if ('user' in connection && connection.user.id === userID) {
×
442
            connection.close();
×
443
          }
×
444
        });
×
445
      },
144✔
446

144✔
447
      'emotes:reload': () => {
144✔
448
        this.broadcast('reloadEmotes', null);
×
449
      },
144✔
450
    };
144✔
451
  }
144✔
452

144✔
453
  /**
144✔
454
   * Create `LostConnection`s for every user that's known to be online, but that
144✔
455
   * is not currently connected to the socket server.
144✔
456
   *
144✔
457
   * @private
144✔
458
   */
144✔
459
  async initLostConnections() {
144✔
460
    const { db, redis } = this.#uw;
144✔
461
    const userIDs = /** @type {import('./schema').UserID[]} */ (
144✔
462
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
144✔
463
    );
144✔
464
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
144✔
465

144✔
466
    if (disconnectedIDs.length === 0) {
144✔
467
      return;
144✔
468
    }
144✔
469

×
470
    const disconnectedUsers = await db.selectFrom('users')
×
471
      .where('id', 'in', disconnectedIDs)
×
472
      .selectAll()
×
473
      .execute();
×
474
    disconnectedUsers.forEach((user) => {
×
475
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
×
476
    });
×
477
  }
144✔
478

144✔
479
  /**
144✔
480
   * @param {import('ws').WebSocket} socket
144✔
481
   * @param {import('http').IncomingMessage} request
144✔
482
   * @private
144✔
483
   */
144✔
484
  onSocketConnected(socket, request) {
144✔
485
    this.#logger.info({ req: request }, 'new connection');
25✔
486

25✔
487
    socket.on('error', (error) => {
25✔
488
      this.onSocketError(socket, error);
×
489
    });
25✔
490
    this.add(this.createGuestConnection(socket));
25✔
491
  }
25✔
492

144✔
493
  /**
144✔
494
   * @param {import('ws').WebSocket} socket
144✔
495
   * @param {Error} error
144✔
496
   * @private
144✔
497
   */
144✔
498
  onSocketError(socket, error) {
144✔
499
    this.#logger.warn({ err: error }, 'socket error');
×
500

×
501
    this.options.onError(socket, error);
×
502
  }
×
503

144✔
504
  /**
144✔
505
   * @param {Error} error
144✔
506
   * @private
144✔
507
   */
144✔
508
  onError(error) {
144✔
509
    this.#logger.error({ err: error }, 'server error');
×
510

×
511
    this.options.onError(undefined, error);
×
512
  }
×
513

144✔
514
  /**
144✔
515
   * Get a LostConnection for a user, if one exists.
144✔
516
   *
144✔
517
   * @param {User} user
144✔
518
   * @private
144✔
519
   */
144✔
520
  getLostConnection(user) {
144✔
521
    return this.#connections.find((connection) => (
1✔
522
      connection instanceof LostConnection && connection.user.id === user.id
3✔
523
    ));
1✔
524
  }
1✔
525

144✔
526
  /**
144✔
527
   * Create a connection instance for an unauthenticated user.
144✔
528
   *
144✔
529
   * @param {import('ws').WebSocket} socket
144✔
530
   * @private
144✔
531
   */
144✔
532
  createGuestConnection(socket) {
144✔
533
    const connection = new GuestConnection(this.#uw, socket, {
25✔
534
      authRegistry: this.authRegistry,
25✔
535
    });
25✔
536
    connection.on('close', () => {
25✔
537
      this.remove(connection);
×
538
    });
25✔
539
    connection.on('authenticate', async (user, sessionID) => {
25✔
540
      const isReconnect = await connection.isReconnect(sessionID);
25✔
541
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
25✔
542
      if (isReconnect) {
25✔
543
        const previousConnection = this.getLostConnection(sessionID);
1✔
544
        if (previousConnection) this.remove(previousConnection);
1!
545
      }
1✔
546

25✔
547
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
25✔
548

25✔
549
      if (!isReconnect) {
25✔
550
        this.#uw.publish('user:join', { userID: user.id });
24✔
551
      }
24✔
552
    });
25✔
553
    return connection;
25✔
554
  }
25✔
555

144✔
556
  /**
144✔
557
   * Create a connection instance for an authenticated user.
144✔
558
   *
144✔
559
   * @param {import('ws').WebSocket} socket
144✔
560
   * @param {User} user
144✔
561
   * @param {string} sessionID
144✔
562
   * @returns {AuthedConnection}
144✔
563
   * @private
144✔
564
   */
144✔
565
  createAuthedConnection(socket, user, sessionID) {
144✔
566
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
25✔
567
    connection.on('close', ({ banned }) => {
25✔
568
      if (banned) {
25!
569
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
570
        disconnectUser(this.#uw, user.id);
×
571
      } else if (!this.#closing) {
25✔
572
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
573
        this.add(this.createLostConnection(user, sessionID));
2✔
574
      }
2✔
575
      this.remove(connection);
25✔
576
    });
25✔
577
    connection.on(
25✔
578
      'command',
25✔
579
      /**
25✔
580
       * @param {string} command
25✔
581
       * @param {import('type-fest').JsonValue} data
25✔
582
       */
25✔
583
      (command, data) => {
25✔
584
        this.#logger.trace({ userId: user.id, command, data }, 'command');
7✔
585
        if (has(this.#clientActions, command)) {
7✔
586
          // Ignore incorrect input
7✔
587
          const validate = this.#clientActionSchemas[command];
7✔
588
          if (validate && !validate(data)) {
7!
589
            return;
×
590
          }
×
591

7✔
592
          const action = this.#clientActions[command];
7✔
593
          // @ts-expect-error TS2345 `data` is validated
7✔
594
          action(user, data, connection);
7✔
595
        }
7✔
596
      },
25✔
597
    );
25✔
598
    return connection;
25✔
599
  }
25✔
600

144✔
601
  /**
144✔
602
   * Create a connection instance for a user who disconnected.
144✔
603
   *
144✔
604
   * @param {User} user
144✔
605
   * @param {string} sessionID
144✔
606
   * @returns {LostConnection}
144✔
607
   * @private
144✔
608
   */
144✔
609
  createLostConnection(user, sessionID) {
144✔
610
    const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
2✔
611
    connection.on('close', () => {
2✔
612
      this.#logger.info({ userId: user.id }, 'user left');
2✔
613
      this.remove(connection);
2✔
614
      // Only register that the user left if they didn't have another connection
2✔
615
      // still open.
2✔
616
      if (!this.connection(user)) {
2✔
617
        disconnectUser(this.#uw, user.id);
1✔
618
      }
1✔
619
    });
2✔
620
    return connection;
2✔
621
  }
2✔
622

144✔
623
  /**
144✔
624
   * Add a connection.
144✔
625
   *
144✔
626
   * @param {Connection} connection
144✔
627
   * @private
144✔
628
   */
144✔
629
  add(connection) {
144✔
630
    const userID = 'user' in connection ? connection.user.id : null;
52✔
631
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
632
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
52✔
633

52✔
634
    this.#connections.push(connection);
52✔
635
    this.#guestCountDirty = true;
52✔
636
  }
52✔
637

144✔
638
  /**
144✔
639
   * Remove a connection.
144✔
640
   *
144✔
641
   * @param {Connection} connection
144✔
642
   * @private
144✔
643
   */
144✔
644
  remove(connection) {
144✔
645
    const userID = 'user' in connection ? connection.user.id : null;
52✔
646
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
647
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
52✔
648

52✔
649
    const i = this.#connections.indexOf(connection);
52✔
650
    this.#connections.splice(i, 1);
52✔
651

52✔
652
    connection.removed();
52✔
653
    this.#guestCountDirty = true;
52✔
654
  }
52✔
655

144✔
656
  /**
144✔
657
   * Replace a connection instance with another connection instance. Useful when
144✔
658
   * a connection changes "type", like GuestConnection → AuthedConnection.
144✔
659
   *
144✔
660
   * @param {Connection} oldConnection
144✔
661
   * @param {Connection} newConnection
144✔
662
   * @private
144✔
663
   */
144✔
664
  replace(oldConnection, newConnection) {
144✔
665
    this.remove(oldConnection);
25✔
666
    this.add(newConnection);
25✔
667
  }
25✔
668

144✔
669
  /**
144✔
670
   * Handle command messages coming in from Redis.
144✔
671
   * Some commands are intended to broadcast immediately to all connected
144✔
672
   * clients, but others require special action.
144✔
673
   *
144✔
674
   * @param {string} channel
144✔
675
   * @param {string} rawCommand
144✔
676
   * @returns {Promise<void>}
144✔
677
   * @private
144✔
678
   */
144✔
679
  async onServerMessage(channel, rawCommand) {
144✔
680
    /**
194✔
681
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
194✔
682
     */
194✔
683
    const json = sjson.safeParse(rawCommand);
194✔
684
    if (!json) {
194!
685
      return;
×
686
    }
×
687
    const { command, data } = json;
194✔
688

194✔
689
    this.#logger.trace({ channel, command, data }, 'server message');
194✔
690

194✔
691
    if (has(this.#serverActions, command)) {
194✔
692
      const action = this.#serverActions[command];
177✔
693
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
177✔
694
        // @ts-expect-error TS2345 `data` is validated
177✔
695
        action(data);
177✔
696
      }
177✔
697
    }
177✔
698
  }
194✔
699

144✔
700
  /**
144✔
701
   * Stop the socket server.
144✔
702
   *
144✔
703
   * @returns {Promise<void>}
144✔
704
   */
144✔
705
  async destroy() {
144✔
706
    clearInterval(this.#pinger);
144✔
707

144✔
708
    this.#closing = true;
144✔
709
    clearInterval(this.#guestCountInterval);
144✔
710

144✔
711
    for (const connection of this.#connections) {
144✔
712
      connection.close();
25✔
713
    }
25✔
714

144✔
715
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
144✔
716
    await closeWsServer();
144✔
717
    await this.#redisSubscription.quit();
144✔
718
  }
144✔
719

144✔
720
  /**
144✔
721
   * Get the connection instance for a specific user.
144✔
722
   *
144✔
723
   * @param {User|string} user The user.
144✔
724
   * @returns {Connection|undefined}
144✔
725
   */
144✔
726
  connection(user) {
144✔
727
    const userID = typeof user === 'object' ? user.id : user;
2!
728
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
2✔
729
  }
2✔
730

144✔
731
  ping() {
144✔
732
    this.#connections.forEach((connection) => {
×
733
      if ('socket' in connection) {
×
734
        connection.ping();
×
735
      }
×
736
    });
×
737
  }
×
738

144✔
739
  /**
144✔
740
   * Broadcast a command to all connected clients.
144✔
741
   *
144✔
742
   * @param {string} command Command name.
144✔
743
   * @param {import('type-fest').JsonValue} data Command data.
144✔
744
   */
144✔
745
  broadcast(command, data) {
144✔
746
    this.#logger.trace({
162✔
747
      command,
162✔
748
      data,
162✔
749
      to: this.#connections.map((connection) => (
162✔
750
        'user' in connection ? connection.user.id : null
109!
751
      )),
162✔
752
    }, 'broadcast');
162✔
753

162✔
754
    this.#connections.forEach((connection) => {
162✔
755
      connection.send(command, data);
109✔
756
    });
162✔
757
  }
162✔
758

144✔
759
  /**
144✔
760
   * Send a command to a single user.
144✔
761
   *
144✔
762
   * @param {User|string} user User or user ID to send the command to.
144✔
763
   * @param {string} command Command name.
144✔
764
   * @param {import('type-fest').JsonValue} data Command data.
144✔
765
   */
144✔
766
  sendTo(user, command, data) {
144✔
767
    const userID = typeof user === 'object' ? user.id : user;
15!
768

15✔
769
    this.#connections.forEach((connection) => {
15✔
770
      if ('user' in connection && connection.user.id === userID) {
16✔
771
        connection.send(command, data);
13✔
772
      }
13✔
773
    });
15✔
774
  }
15✔
775

144✔
776
  async getGuestCount() {
144✔
777
    const { redis } = this.#uw;
×
778
    const rawCount = await redis.get('http-api:guests');
×
779
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
780
      return 0;
×
781
    }
×
782
    return parseInt(rawCount, 10);
×
783
  }
×
784

144✔
785
  async #recountGuests() {
144✔
786
    const { redis } = this.#uw;
×
787
    const guests = this.#connections
×
788
      .filter((connection) => connection instanceof GuestConnection)
×
789
      .length;
×
790

×
791
    const lastGuestCount = await this.getGuestCount();
×
792
    if (guests !== lastGuestCount) {
×
793
      await redis.set('http-api:guests', guests);
×
794
      this.broadcast('guests', guests);
×
795
    }
×
796
  }
×
797
}
144✔
798

1✔
799
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc