• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 7050118218

30 Nov 2023 05:31PM UTC coverage: 80.477% (-0.02%) from 80.492%
7050118218

push

github

goto-bus-stop
Fix dupe check

640 of 776 branches covered (0.0%)

Branch coverage included in aggregate %.

8305 of 10339 relevant lines covered (80.33%)

44.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.24
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import mongoose from 'mongoose';
1✔
3
import lodash from 'lodash';
1✔
4
import sjson from 'secure-json-parse';
1✔
5
import { WebSocketServer } from 'ws';
1✔
6
import Ajv from 'ajv';
1✔
7
import ms from 'ms';
1✔
8
import { stdSerializers } from 'pino';
1✔
9
import { socketVote } from './controllers/booth.js';
1✔
10
import { disconnectUser } from './controllers/users.js';
1✔
11
import AuthRegistry from './AuthRegistry.js';
1✔
12
import GuestConnection from './sockets/GuestConnection.js';
1✔
13
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
14
import LostConnection from './sockets/LostConnection.js';
1✔
15
import { serializeUser } from './utils/serialize.js';
1✔
16

1✔
17
const { debounce, isEmpty } = lodash;
1✔
18
const { ObjectId } = mongoose.mongo;
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./models/index.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
56✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
56✔
60
}
56✔
61

1✔
62
class SocketServer {
92✔
63
  /**
92✔
64
   * @param {import('./Uwave.js').Boot} uw
92✔
65
   * @param {{ secret: Buffer|string }} options
92✔
66
   */
92✔
67
  static async plugin(uw, options) {
92✔
68
    uw.socketServer = new SocketServer(uw, {
92✔
69
      secret: options.secret,
92✔
70
      server: uw.server,
92✔
71
    });
92✔
72

92✔
73
    uw.after(async () => {
92✔
74
      try {
92✔
75
        await uw.socketServer.initLostConnections();
92✔
76
      } catch (err) {
92!
77
        // No need to prevent startup for this
×
78
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
79
      }
×
80
    });
92✔
81

92✔
82
    uw.onClose(async () => {
92✔
83
      await uw.socketServer.destroy();
92✔
84
    });
92✔
85
  }
92✔
86

92✔
87
  #uw;
92✔
88

92✔
89
  #logger;
92✔
90

92✔
91
  #redisSubscription;
92✔
92

92✔
93
  #wss;
92✔
94

92✔
95
  #closing = false;
92✔
96

92✔
97
  /** @type {Connection[]} */
92✔
98
  #connections = [];
92✔
99

92✔
100
  #pinger;
92✔
101

92✔
102
  /**
92✔
103
   * Update online guests count and broadcast an update if necessary.
92✔
104
   */
92✔
105
  #recountGuests;
92✔
106

92✔
107
  /**
92✔
108
   * Handlers for commands that come in from clients.
92✔
109
   * @type {ClientActions}
92✔
110
   */
92✔
111
  #clientActions;
92✔
112

92✔
113
  /**
92✔
114
   * @type {{
92✔
115
   *   [K in keyof ClientActionParameters]:
92✔
116
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
92✔
117
   * }}
92✔
118
   */
92✔
119
  #clientActionSchemas;
92✔
120

92✔
121
  /**
92✔
122
   * Handlers for commands that come in from the server side.
92✔
123
   *
92✔
124
   * @type {import('./redisMessages.js').ServerActions}
92✔
125
   */
92✔
126
  #serverActions;
92✔
127

92✔
128
  /**
92✔
129
   * Create a socket server.
92✔
130
   *
92✔
131
   * @param {import('./Uwave.js').default} uw üWave Core instance.
92✔
132
   * @param {object} options Socket server options.
92✔
133
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
92✔
134
   *     users to reconnect before removing them.
92✔
135
   * @param {Buffer|string} options.secret
92✔
136
   * @param {import('http').Server | import('https').Server} [options.server]
92✔
137
   * @param {number} [options.port]
92✔
138
   */
92✔
139
  constructor(uw, options) {
92✔
140
    if (!uw) {
92!
141
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
142
    }
×
143

92✔
144
    this.#uw = uw;
92✔
145
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
92✔
146
      serializers: {
92✔
147
        req: stdSerializers.req,
92✔
148
      },
92✔
149
    });
92✔
150
    this.#redisSubscription = uw.redis.duplicate();
92✔
151

92✔
152
    this.options = {
92✔
153
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
92✔
154
      onError: (_socket, err) => {
92✔
155
        throw err;
×
156
      },
92✔
157
      timeout: 30,
92✔
158
      ...options,
92✔
159
    };
92✔
160

92✔
161
    // TODO put this behind a symbol, it's just public for tests
92✔
162
    this.authRegistry = new AuthRegistry(uw.redis);
92✔
163

92✔
164
    this.#wss = new WebSocketServer({
92✔
165
      server: options.server,
92✔
166
      port: options.server ? undefined : options.port,
92!
167
    });
92✔
168

92✔
169
    this.#redisSubscription.subscribe('uwave').catch((error) => {
92✔
170
      this.#logger.error(error);
×
171
    });
92✔
172
    this.#redisSubscription.on('message', (channel, command) => {
92✔
173
      // this returns a promise, but we don't handle the error case:
56✔
174
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
56✔
175
      this.onServerMessage(channel, command);
56✔
176
    });
92✔
177

92✔
178
    this.#wss.on('error', (error) => {
92✔
179
      this.onError(error);
×
180
    });
92✔
181
    this.#wss.on('connection', (socket, request) => {
92✔
182
      this.onSocketConnected(socket, request);
5✔
183
    });
92✔
184

92✔
185
    this.#pinger = setInterval(() => {
92✔
186
      this.ping();
×
187
    }, ms('10 seconds'));
92✔
188

92✔
189
    this.#recountGuests = debounce(() => {
92✔
190
      this.#recountGuestsInternal().catch((error) => {
×
191
        this.#logger.error({ err: error }, 'counting guests failed');
×
192
      });
×
193
    }, ms('2 seconds'));
92✔
194

92✔
195
    this.#clientActions = {
92✔
196
      sendChat: (user, message) => {
92✔
197
        this.#logger.trace({ user, message }, 'sendChat');
×
198
        this.#uw.chat.send(user, message);
×
199
      },
92✔
200
      vote: (user, direction) => {
92✔
201
        socketVote(this.#uw, user.id, direction);
×
202
      },
92✔
203
      logout: (user, _, connection) => {
92✔
204
        this.replace(connection, this.createGuestConnection(connection.socket));
×
205
        if (!this.connection(user)) {
×
206
          disconnectUser(this.#uw, user._id);
×
207
        }
×
208
      },
92✔
209
    };
92✔
210

92✔
211
    this.#clientActionSchemas = {
92✔
212
      sendChat: ajv.compile({
92✔
213
        type: 'string',
92✔
214
      }),
92✔
215
      vote: ajv.compile({
92✔
216
        type: 'integer',
92✔
217
        enum: [-1, 1],
92✔
218
      }),
92✔
219
      logout: ajv.compile(true),
92✔
220
    };
92✔
221

92✔
222
    this.#serverActions = {
92✔
223
      /**
92✔
224
       * Broadcast the next track.
92✔
225
       */
92✔
226
      'advance:complete': (next) => {
92✔
227
        if (next) {
5✔
228
          this.broadcast('advance', {
5✔
229
            historyID: next.historyID,
5✔
230
            userID: next.userID,
5✔
231
            itemID: next.itemID,
5✔
232
            media: next.media,
5✔
233
            playedAt: new Date(next.playedAt).getTime(),
5✔
234
          });
5✔
235
        } else {
5!
236
          this.broadcast('advance', null);
×
237
        }
×
238
      },
92✔
239
      /**
92✔
240
       * Broadcast a skip notification.
92✔
241
       */
92✔
242
      'booth:skip': ({ moderatorID, userID, reason }) => {
92✔
243
        this.broadcast('skip', { moderatorID, userID, reason });
×
244
      },
92✔
245
      /**
92✔
246
       * Broadcast a chat message.
92✔
247
       */
92✔
248
      'chat:message': (message) => {
92✔
249
        this.broadcast('chatMessage', message);
×
250
      },
92✔
251
      /**
92✔
252
       * Delete chat messages. The delete filter can have an _id property to
92✔
253
       * delete a specific message, a userID property to delete messages by a
92✔
254
       * user, or be empty to delete all messages.
92✔
255
       */
92✔
256
      'chat:delete': ({ moderatorID, filter }) => {
92✔
257
        if ('id' in filter) {
×
258
          this.broadcast('chatDeleteByID', {
×
259
            moderatorID,
×
260
            _id: filter.id,
×
261
          });
×
262
        } else if ('userID' in filter) {
×
263
          this.broadcast('chatDeleteByUser', {
×
264
            moderatorID,
×
265
            userID: filter.userID,
×
266
          });
×
267
        } else if (isEmpty(filter)) {
×
268
          this.broadcast('chatDelete', { moderatorID });
×
269
        }
×
270
      },
92✔
271
      /**
92✔
272
       * Broadcast that a user was muted in chat.
92✔
273
       */
92✔
274
      'chat:mute': ({ moderatorID, userID, duration }) => {
92✔
275
        this.broadcast('chatMute', {
×
276
          userID,
×
277
          moderatorID,
×
278
          expiresAt: Date.now() + duration,
×
279
        });
×
280
      },
92✔
281
      /**
92✔
282
       * Broadcast that a user was unmuted in chat.
92✔
283
       */
92✔
284
      'chat:unmute': ({ moderatorID, userID }) => {
92✔
285
        this.broadcast('chatUnmute', { userID, moderatorID });
×
286
      },
92✔
287
      /**
92✔
288
       * Broadcast a vote for the current track.
92✔
289
       */
92✔
290
      'booth:vote': ({ userID, direction }) => {
92✔
291
        this.broadcast('vote', {
2✔
292
          _id: userID,
2✔
293
          value: direction,
2✔
294
        });
2✔
295
      },
92✔
296
      /**
92✔
297
       * Broadcast a favorite for the current track.
92✔
298
       */
92✔
299
      'booth:favorite': ({ userID }) => {
92✔
300
        this.broadcast('favorite', { userID });
×
301
      },
92✔
302
      /**
92✔
303
       * Cycle a single user's playlist.
92✔
304
       */
92✔
305
      'playlist:cycle': ({ userID, playlistID }) => {
92✔
306
        this.sendTo(userID, 'playlistCycle', { playlistID });
5✔
307
      },
92✔
308
      /**
92✔
309
       * Broadcast that a user joined the waitlist.
92✔
310
       */
92✔
311
      'waitlist:join': ({ userID, waitlist }) => {
92✔
312
        this.broadcast('waitlistJoin', { userID, waitlist });
7✔
313
      },
92✔
314
      /**
92✔
315
       * Broadcast that a user left the waitlist.
92✔
316
       */
92✔
317
      'waitlist:leave': ({ userID, waitlist }) => {
92✔
318
        this.broadcast('waitlistLeave', { userID, waitlist });
×
319
      },
92✔
320
      /**
92✔
321
       * Broadcast that a user was added to the waitlist.
92✔
322
       */
92✔
323
      'waitlist:add': ({
92✔
324
        userID, moderatorID, position, waitlist,
1✔
325
      }) => {
1✔
326
        this.broadcast('waitlistAdd', {
1✔
327
          userID, moderatorID, position, waitlist,
1✔
328
        });
1✔
329
      },
92✔
330
      /**
92✔
331
       * Broadcast that a user was removed from the waitlist.
92✔
332
       */
92✔
333
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
92✔
334
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
335
      },
92✔
336
      /**
92✔
337
       * Broadcast that a user was moved in the waitlist.
92✔
338
       */
92✔
339
      'waitlist:move': ({
92✔
340
        userID, moderatorID, position, waitlist,
×
341
      }) => {
×
342
        this.broadcast('waitlistMove', {
×
343
          userID, moderatorID, position, waitlist,
×
344
        });
×
345
      },
92✔
346
      /**
92✔
347
       * Broadcast a waitlist update.
92✔
348
       */
92✔
349
      'waitlist:update': (waitlist) => {
92✔
350
        this.broadcast('waitlistUpdate', waitlist);
5✔
351
      },
92✔
352
      /**
92✔
353
       * Broadcast that the waitlist was cleared.
92✔
354
       */
92✔
355
      'waitlist:clear': ({ moderatorID }) => {
92✔
356
        this.broadcast('waitlistClear', { moderatorID });
×
357
      },
92✔
358
      /**
92✔
359
       * Broadcast that the waitlist was locked.
92✔
360
       */
92✔
361
      'waitlist:lock': ({ moderatorID, locked }) => {
92✔
362
        this.broadcast('waitlistLock', { moderatorID, locked });
×
363
      },
92✔
364

92✔
365
      'acl:allow': ({ userID, roles }) => {
92✔
366
        this.broadcast('acl:allow', { userID, roles });
17✔
367
      },
92✔
368
      'acl:disallow': ({ userID, roles }) => {
92✔
369
        this.broadcast('acl:disallow', { userID, roles });
1✔
370
      },
92✔
371

92✔
372
      'user:update': ({ userID, moderatorID, new: update }) => {
92✔
373
        // TODO Remove this remnant of the old roles system
×
374
        if ('role' in update) {
×
375
          this.broadcast('roleChange', {
×
376
            moderatorID,
×
377
            userID,
×
378
            role: update.role,
×
379
          });
×
380
        }
×
381
        if ('username' in update) {
×
382
          this.broadcast('nameChange', {
×
383
            moderatorID,
×
384
            userID,
×
385
            username: update.username,
×
386
          });
×
387
        }
×
388
      },
92✔
389
      'user:join': async ({ userID }) => {
92✔
390
        const { users, redis } = this.#uw;
5✔
391
        const user = await users.getUser(userID);
5✔
392
        if (user) {
5✔
393
          // TODO this should not be the socket server code's responsibility
5✔
394
          await redis.rpush('users', user.id);
5✔
395
          this.broadcast('join', serializeUser(user));
5✔
396
        }
5✔
397
      },
92✔
398
      /**
92✔
399
       * Broadcast that a user left the server.
92✔
400
       */
92✔
401
      'user:leave': ({ userID }) => {
92✔
402
        this.broadcast('leave', userID);
×
403
      },
92✔
404
      /**
92✔
405
       * Broadcast a ban event.
92✔
406
       */
92✔
407
      'user:ban': ({
92✔
408
        moderatorID, userID, permanent = false, duration, expiresAt,
2✔
409
      }) => {
2✔
410
        this.broadcast('ban', {
2✔
411
          moderatorID, userID, permanent, duration, expiresAt,
2✔
412
        });
2✔
413

2✔
414
        this.#connections.forEach((connection) => {
2✔
415
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
416
            connection.ban();
×
417
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
418
            connection.close();
×
419
          }
×
420
        });
2✔
421
      },
92✔
422
      /**
92✔
423
       * Broadcast an unban event.
92✔
424
       */
92✔
425
      'user:unban': ({ moderatorID, userID }) => {
92✔
426
        this.broadcast('unban', { moderatorID, userID });
1✔
427
      },
92✔
428
      /**
92✔
429
       * Force-close a connection.
92✔
430
       */
92✔
431
      'http-api:socket:close': (userID) => {
92✔
432
        this.#connections.forEach((connection) => {
×
433
          if ('user' in connection && connection.user.id === userID) {
×
434
            connection.close();
×
435
          }
×
436
        });
×
437
      },
92✔
438

92✔
439
      'emotes:reload': () => {
92✔
440
        this.broadcast('reloadEmotes', null);
×
441
      },
92✔
442
    };
92✔
443
  }
92✔
444

92✔
445
  /**
92✔
446
   * Create `LostConnection`s for every user that's known to be online, but that
92✔
447
   * is not currently connected to the socket server.
92✔
448
   * @private
92✔
449
   */
92✔
450
  async initLostConnections() {
92✔
451
    const { User } = this.#uw.models;
92✔
452
    const userIDs = await this.#uw.redis.lrange('users', 0, -1);
92✔
453
    const disconnectedIDs = userIDs
92✔
454
      .filter((userID) => !this.connection(userID))
92✔
455
      .map((userID) => new ObjectId(userID));
92✔
456

92✔
457
    /** @type {User[]} */
92✔
458
    const disconnectedUsers = await User.find({
92✔
459
      _id: { $in: disconnectedIDs },
92✔
460
    }).exec();
92✔
461
    disconnectedUsers.forEach((user) => {
92✔
462
      this.add(this.createLostConnection(user));
×
463
    });
92✔
464
  }
92✔
465

92✔
466
  /**
92✔
467
   * @param {import('ws').WebSocket} socket
92✔
468
   * @param {import('http').IncomingMessage} request
92✔
469
   * @private
92✔
470
   */
92✔
471
  onSocketConnected(socket, request) {
92✔
472
    this.#logger.info({ req: request }, 'new connection');
5✔
473

5✔
474
    socket.on('error', (error) => {
5✔
475
      this.onSocketError(socket, error);
×
476
    });
5✔
477
    this.add(this.createGuestConnection(socket));
5✔
478
  }
5✔
479

92✔
480
  /**
92✔
481
   * @param {import('ws').WebSocket} socket
92✔
482
   * @param {Error} error
92✔
483
   * @private
92✔
484
   */
92✔
485
  onSocketError(socket, error) {
92✔
486
    this.#logger.warn({ err: error }, 'socket error');
×
487

×
488
    this.options.onError(socket, error);
×
489
  }
×
490

92✔
491
  /**
92✔
492
   * @param {Error} error
92✔
493
   * @private
92✔
494
   */
92✔
495
  onError(error) {
92✔
496
    this.#logger.error({ err: error }, 'server error');
×
497

×
498
    this.options.onError(undefined, error);
×
499
  }
×
500

92✔
501
  /**
92✔
502
   * Get a LostConnection for a user, if one exists.
92✔
503
   *
92✔
504
   * @param {User} user
92✔
505
   * @private
92✔
506
   */
92✔
507
  getLostConnection(user) {
92✔
508
    return this.#connections.find((connection) => (
×
509
      connection instanceof LostConnection && connection.user.id === user.id
×
510
    ));
×
511
  }
×
512

92✔
513
  /**
92✔
514
   * Create a connection instance for an unauthenticated user.
92✔
515
   *
92✔
516
   * @param {import('ws').WebSocket} socket
92✔
517
   * @private
92✔
518
   */
92✔
519
  createGuestConnection(socket) {
92✔
520
    const connection = new GuestConnection(this.#uw, socket, {
5✔
521
      authRegistry: this.authRegistry,
5✔
522
    });
5✔
523
    connection.on('close', () => {
5✔
524
      this.remove(connection);
×
525
    });
5✔
526
    connection.on('authenticate', async (user) => {
5✔
527
      const isReconnect = await connection.isReconnect(user);
5✔
528
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
5✔
529
      if (isReconnect) {
5!
530
        const previousConnection = this.getLostConnection(user);
×
531
        if (previousConnection) this.remove(previousConnection);
×
532
      }
×
533

5✔
534
      this.replace(connection, this.createAuthedConnection(socket, user));
5✔
535

5✔
536
      if (!isReconnect) {
5✔
537
        this.#uw.publish('user:join', { userID: user.id });
5✔
538
      }
5✔
539
    });
5✔
540
    return connection;
5✔
541
  }
5✔
542

92✔
543
  /**
92✔
544
   * Create a connection instance for an authenticated user.
92✔
545
   *
92✔
546
   * @param {import('ws').WebSocket} socket
92✔
547
   * @param {User} user
92✔
548
   * @returns {AuthedConnection}
92✔
549
   * @private
92✔
550
   */
92✔
551
  createAuthedConnection(socket, user) {
92✔
552
    const connection = new AuthedConnection(this.#uw, socket, user);
5✔
553
    connection.on('close', ({ banned }) => {
5✔
554
      if (banned) {
5!
555
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
556
        disconnectUser(this.#uw, user._id);
×
557
      } else if (!this.#closing) {
5!
558
        this.#logger.info({ userId: user.id }, 'lost connection');
×
559
        this.add(this.createLostConnection(user));
×
560
      }
×
561
      this.remove(connection);
5✔
562
    });
5✔
563
    connection.on(
5✔
564
      'command',
5✔
565
      /**
5✔
566
       * @param {string} command
5✔
567
       * @param {import('type-fest').JsonValue} data
5✔
568
       */
5✔
569
      (command, data) => {
5✔
570
        this.#logger.trace({ userId: user.id, command, data }, 'command');
×
571
        if (has(this.#clientActions, command)) {
×
572
          // Ignore incorrect input
×
573
          const validate = this.#clientActionSchemas[command];
×
574
          if (validate && !validate(data)) {
×
575
            return;
×
576
          }
×
577

×
578
          const action = this.#clientActions[command];
×
579
          // @ts-expect-error TS2345 `data` is validated
×
580
          action(user, data, connection);
×
581
        }
×
582
      },
5✔
583
    );
5✔
584
    return connection;
5✔
585
  }
5✔
586

92✔
587
  /**
92✔
588
   * Create a connection instance for a user who disconnected.
92✔
589
   *
92✔
590
   * @param {User} user
92✔
591
   * @returns {LostConnection}
92✔
592
   * @private
92✔
593
   */
92✔
594
  createLostConnection(user) {
92✔
595
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
596
    connection.on('close', () => {
×
597
      this.#logger.info({ userId: user.id }, 'user left');
×
598
      this.remove(connection);
×
599
      // Only register that the user left if they didn't have another connection
×
600
      // still open.
×
601
      if (!this.connection(user)) {
×
602
        disconnectUser(this.#uw, user._id);
×
603
      }
×
604
    });
×
605
    return connection;
×
606
  }
×
607

92✔
608
  /**
92✔
609
   * Add a connection.
92✔
610
   *
92✔
611
   * @param {Connection} connection
92✔
612
   * @private
92✔
613
   */
92✔
614
  add(connection) {
92✔
615
    const userId = 'user' in connection ? connection.user.id : null;
10✔
616
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
10✔
617

10✔
618
    this.#connections.push(connection);
10✔
619
    this.#recountGuests();
10✔
620
  }
10✔
621

92✔
622
  /**
92✔
623
   * Remove a connection.
92✔
624
   *
92✔
625
   * @param {Connection} connection
92✔
626
   * @private
92✔
627
   */
92✔
628
  remove(connection) {
92✔
629
    const userId = 'user' in connection ? connection.user.id : null;
10✔
630
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
10✔
631

10✔
632
    const i = this.#connections.indexOf(connection);
10✔
633
    this.#connections.splice(i, 1);
10✔
634

10✔
635
    connection.removed();
10✔
636
    this.#recountGuests();
10✔
637
  }
10✔
638

92✔
639
  /**
92✔
640
   * Replace a connection instance with another connection instance. Useful when
92✔
641
   * a connection changes "type", like GuestConnection → AuthedConnection.
92✔
642
   *
92✔
643
   * @param {Connection} oldConnection
92✔
644
   * @param {Connection} newConnection
92✔
645
   * @private
92✔
646
   */
92✔
647
  replace(oldConnection, newConnection) {
92✔
648
    this.remove(oldConnection);
5✔
649
    this.add(newConnection);
5✔
650
  }
5✔
651

92✔
652
  /**
92✔
653
   * Handle command messages coming in from Redis.
92✔
654
   * Some commands are intended to broadcast immediately to all connected
92✔
655
   * clients, but others require special action.
92✔
656
   *
92✔
657
   * @param {string} channel
92✔
658
   * @param {string} rawCommand
92✔
659
   * @return {Promise<void>}
92✔
660
   * @private
92✔
661
   */
92✔
662
  async onServerMessage(channel, rawCommand) {
92✔
663
    /**
56✔
664
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
56✔
665
     */
56✔
666
    const json = sjson.safeParse(rawCommand);
56✔
667
    if (!json) {
56!
668
      return;
×
669
    }
×
670
    const { command, data } = json;
56✔
671

56✔
672
    this.#logger.trace({ channel, command, data }, 'server message');
56✔
673

56✔
674
    if (has(this.#serverActions, command)) {
56✔
675
      const action = this.#serverActions[command];
51✔
676
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
51✔
677
        // @ts-expect-error TS2345 `data` is validated
51✔
678
        action(data);
51✔
679
      }
51✔
680
    }
51✔
681
  }
56✔
682

92✔
683
  /**
92✔
684
   * Stop the socket server.
92✔
685
   *
92✔
686
   * @return {Promise<void>}
92✔
687
   */
92✔
688
  async destroy() {
92✔
689
    clearInterval(this.#pinger);
92✔
690

92✔
691
    this.#closing = true;
92✔
692
    for (const connection of this.#wss.clients) {
92✔
693
      connection.close();
5✔
694
    }
5✔
695

92✔
696
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
92✔
697
    await closeWsServer();
92✔
698
    await this.#redisSubscription.quit();
92✔
699

92✔
700
    this.#recountGuests.cancel();
92✔
701
  }
92✔
702

92✔
703
  /**
92✔
704
   * Get the connection instance for a specific user.
92✔
705
   *
92✔
706
   * @param {User|string} user The user.
92✔
707
   * @return {Connection|undefined}
92✔
708
   */
92✔
709
  connection(user) {
92✔
710
    const userID = typeof user === 'object' ? user.id : user;
×
711
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
712
  }
×
713

92✔
714
  ping() {
92✔
715
    this.#connections.forEach((connection) => {
×
716
      if ('socket' in connection) {
×
717
        connection.ping();
×
718
      }
×
719
    });
×
720
  }
×
721

92✔
722
  /**
92✔
723
   * Broadcast a command to all connected clients.
92✔
724
   *
92✔
725
   * @param {string} command Command name.
92✔
726
   * @param {import('type-fest').JsonValue} data Command data.
92✔
727
   */
92✔
728
  broadcast(command, data) {
92✔
729
    this.#logger.trace({
46✔
730
      command,
46✔
731
      data,
46✔
732
      to: this.#connections.map((connection) => (
46✔
733
        'user' in connection ? connection.user.id : null
20!
734
      )),
46✔
735
    }, 'broadcast');
46✔
736

46✔
737
    this.#connections.forEach((connection) => {
46✔
738
      connection.send(command, data);
20✔
739
    });
46✔
740
  }
46✔
741

92✔
742
  /**
92✔
743
   * Send a command to a single user.
92✔
744
   *
92✔
745
   * @param {User|string} user User or user ID to send the command to.
92✔
746
   * @param {string} command Command name.
92✔
747
   * @param {import('type-fest').JsonValue} data Command data.
92✔
748
   */
92✔
749
  sendTo(user, command, data) {
92✔
750
    const userID = typeof user === 'object' ? user.id : user;
5!
751

5✔
752
    this.#connections.forEach((connection) => {
5✔
753
      if ('user' in connection && connection.user.id === userID) {
4✔
754
        connection.send(command, data);
3✔
755
      }
3✔
756
    });
5✔
757
  }
5✔
758

92✔
759
  async getGuestCount() {
92✔
760
    const { redis } = this.#uw;
×
761
    const rawCount = await redis.get('http-api:guests');
×
762
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
763
      return 0;
×
764
    }
×
765
    return parseInt(rawCount, 10);
×
766
  }
×
767

92✔
768
  async #recountGuestsInternal() {
92✔
769
    const { redis } = this.#uw;
×
770
    const guests = this.#connections
×
771
      .filter((connection) => connection instanceof GuestConnection)
×
772
      .length;
×
773

×
774
    const lastGuestCount = await this.getGuestCount();
×
775
    if (guests !== lastGuestCount) {
×
776
      await redis.set('http-api:guests', guests);
×
777
      this.broadcast('guests', guests);
×
778
    }
×
779
  }
×
780
}
92✔
781

1✔
782
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc