• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 4368272791

pending completion
4368272791

Pull #554

github

GitHub
Merge 5103a85ff into d7b30c5ce
Pull Request #554: Add twitch emotes integration

639 of 774 branches covered (82.56%)

Branch coverage included in aggregate %.

7 of 7 new or added lines in 2 files covered. (100.0%)

8465 of 10454 relevant lines covered (80.97%)

35.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.44
/src/SocketServer.js
1
'use strict';
1✔
2

1✔
3
const { promisify } = require('util');
1✔
4
const { debounce, isEmpty } = require('lodash');
1✔
5
const sjson = require('secure-json-parse');
1✔
6
const WebSocket = require('ws');
1✔
7
const Ajv = require('ajv').default;
1✔
8
const ms = require('ms');
1✔
9
const { stdSerializers } = require('pino');
1✔
10
const { socketVote } = require('./controllers/booth');
1✔
11
const { disconnectUser } = require('./controllers/users');
1✔
12
const AuthRegistry = require('./AuthRegistry');
1✔
13
const GuestConnection = require('./sockets/GuestConnection');
1✔
14
const AuthedConnection = require('./sockets/AuthedConnection');
1✔
15
const LostConnection = require('./sockets/LostConnection');
1✔
16
const { serializeUser } = require('./utils/serialize');
1✔
17

1✔
18
/**
1✔
19
 * @typedef {import('./models').User} User
1✔
20
 */
1✔
21

1✔
22
/**
1✔
23
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
24
 */
1✔
25

1✔
26
/**
1✔
27
 * @typedef {object} ClientActionParameters
1✔
28
 * @prop {string} sendChat
1✔
29
 * @prop {-1 | 1} vote
1✔
30
 * @prop {undefined} logout
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {{
1✔
35
 *   [Name in keyof ClientActionParameters]: (
1✔
36
 *     user: User,
1✔
37
 *     parameter: ClientActionParameters[Name],
1✔
38
 *     connection: AuthedConnection
1✔
39
 *   ) => void
1✔
40
 * }} ClientActions
1✔
41
 */
1✔
42

1✔
43
const ajv = new Ajv({
1✔
44
  coerceTypes: false,
1✔
45
  ownProperties: true,
1✔
46
  removeAdditional: true,
1✔
47
  useDefaults: false,
1✔
48
});
1✔
49

1✔
50
/**
1✔
51
 * @template {object} T
1✔
52
 * @param {T} object
1✔
53
 * @param {PropertyKey} property
1✔
54
 * @returns {property is keyof T}
1✔
55
 */
1✔
56
function has(object, property) {
56✔
57
  return Object.prototype.hasOwnProperty.call(object, property);
56✔
58
}
56✔
59

1✔
60
class SocketServer {
1✔
61
  /**
1✔
62
   * @param {import('./Uwave').Boot} uw
1✔
63
   * @param {{ secret: Buffer|string }} options
1✔
64
   */
1✔
65
  static async plugin(uw, options) {
1✔
66
    uw.socketServer = new SocketServer(uw, {
92✔
67
      secret: options.secret,
92✔
68
      server: uw.server,
92✔
69
    });
92✔
70

92✔
71
    uw.after(async () => {
92✔
72
      await uw.socketServer.initLostConnections();
92✔
73
    });
92✔
74

92✔
75
    uw.onClose(async () => {
92✔
76
      await uw.socketServer.destroy();
92✔
77
    });
92✔
78
  }
92✔
79

1✔
80
  #uw;
1✔
81

92✔
82
  #logger;
92✔
83

92✔
84
  #redisSubscription;
92✔
85

92✔
86
  #wss;
92✔
87

92✔
88
  #closing = false;
92✔
89

92✔
90
  /** @type {Connection[]} */
92✔
91
  #connections = [];
92✔
92

92✔
93
  #pinger;
92✔
94

92✔
95
  /**
92✔
96
   * Update online guests count and broadcast an update if necessary.
92✔
97
   */
92✔
98
  #recountGuests;
92✔
99

92✔
100
  /**
92✔
101
   * Handlers for commands that come in from clients.
92✔
102
   * @type {ClientActions}
92✔
103
   */
92✔
104
  #clientActions;
92✔
105

92✔
106
  /**
92✔
107
   * @type {{
92✔
108
   *   [K in keyof ClientActionParameters]:
92✔
109
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
92✔
110
   * }}
92✔
111
   */
92✔
112
  #clientActionSchemas;
92✔
113

92✔
114
  /**
92✔
115
   * Handlers for commands that come in from the server side.
92✔
116
   *
92✔
117
   * @type {import('./redisMessages').ServerActions}
92✔
118
   */
92✔
119
  #serverActions;
1✔
120

1✔
121
  /**
1✔
122
   * Create a socket server.
1✔
123
   *
1✔
124
   * @param {import('./Uwave')} uw üWave Core instance.
1✔
125
   * @param {object} options Socket server options.
1✔
126
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
1✔
127
   *     users to reconnect before removing them.
1✔
128
   * @param {Buffer|string} options.secret
1✔
129
   * @param {import('http').Server | import('https').Server} [options.server]
1✔
130
   * @param {number} [options.port]
1✔
131
   */
1✔
132
  constructor(uw, options) {
1✔
133
    if (!uw) {
92!
134
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
135
    }
×
136

92✔
137
    this.#uw = uw;
92✔
138
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
92✔
139
      serializers: {
92✔
140
        req: stdSerializers.req,
92✔
141
      },
92✔
142
    });
92✔
143
    this.#redisSubscription = uw.redis.duplicate();
92✔
144

92✔
145
    this.options = {
92✔
146
      /** @type {(_socket: import('ws') | undefined, err: Error) => void} */
92✔
147
      onError: (_socket, err) => {
92✔
148
        throw err;
×
149
      },
92✔
150
      timeout: 30,
92✔
151
      ...options,
92✔
152
    };
92✔
153

92✔
154
    // TODO put this behind a symbol, it's just public for tests
92✔
155
    this.authRegistry = new AuthRegistry(uw.redis);
92✔
156

92✔
157
    this.#wss = new WebSocket.Server({
92✔
158
      server: options.server,
92✔
159
      port: options.server ? undefined : options.port,
92!
160
    });
92✔
161

92✔
162
    this.#redisSubscription.subscribe('uwave').catch((error) => {
92✔
163
      this.#logger.error(error);
×
164
    });
92✔
165
    this.#redisSubscription.on('message', (channel, command) => {
92✔
166
      // this returns a promise, but we don't handle the error case:
56✔
167
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
56✔
168
      this.onServerMessage(channel, command);
56✔
169
    });
92✔
170

92✔
171
    this.#wss.on('error', (error) => {
92✔
172
      this.onError(error);
×
173
    });
92✔
174
    this.#wss.on('connection', (socket, request) => {
92✔
175
      this.onSocketConnected(socket, request);
5✔
176
    });
92✔
177

92✔
178
    this.#pinger = setInterval(() => {
92✔
179
      this.ping();
×
180
    }, ms('10 seconds'));
92✔
181

92✔
182
    this.#recountGuests = debounce(() => {
92✔
183
      this.#recountGuestsInternal().catch((error) => {
×
184
        this.#logger.error({ err: error }, 'counting guests failed');
×
185
      });
×
186
    }, ms('2 seconds'));
92✔
187

92✔
188
    this.#clientActions = {
92✔
189
      sendChat: (user, message) => {
92✔
190
        this.#logger.trace({ user, message }, 'sendChat');
×
191
        this.#uw.chat.send(user, message);
×
192
      },
92✔
193
      vote: (user, direction) => {
92✔
194
        socketVote(this.#uw, user.id, direction);
×
195
      },
92✔
196
      logout: (user, _, connection) => {
92✔
197
        this.replace(connection, this.createGuestConnection(connection.socket));
×
198
        if (!this.connection(user)) {
×
199
          disconnectUser(this.#uw, user._id);
×
200
        }
×
201
      },
92✔
202
    };
92✔
203

92✔
204
    this.#clientActionSchemas = {
92✔
205
      sendChat: ajv.compile({
92✔
206
        type: 'string',
92✔
207
      }),
92✔
208
      vote: ajv.compile({
92✔
209
        type: 'integer',
92✔
210
        enum: [-1, 1],
92✔
211
      }),
92✔
212
      logout: ajv.compile(true),
92✔
213
    };
92✔
214

92✔
215
    this.#serverActions = {
92✔
216
      /**
92✔
217
       * Broadcast the next track.
92✔
218
       */
92✔
219
      'advance:complete': (next) => {
92✔
220
        if (next) {
5✔
221
          this.broadcast('advance', {
5✔
222
            historyID: next.historyID,
5✔
223
            userID: next.userID,
5✔
224
            itemID: next.itemID,
5✔
225
            media: next.media,
5✔
226
            playedAt: new Date(next.playedAt).getTime(),
5✔
227
          });
5✔
228
        } else {
5!
229
          this.broadcast('advance', null);
×
230
        }
×
231
      },
92✔
232
      /**
92✔
233
       * Broadcast a skip notification.
92✔
234
       */
92✔
235
      'booth:skip': ({ moderatorID, userID, reason }) => {
92✔
236
        this.broadcast('skip', { moderatorID, userID, reason });
×
237
      },
92✔
238
      /**
92✔
239
       * Broadcast a chat message.
92✔
240
       */
92✔
241
      'chat:message': (message) => {
92✔
242
        this.broadcast('chatMessage', message);
×
243
      },
92✔
244
      /**
92✔
245
       * Delete chat messages. The delete filter can have an _id property to
92✔
246
       * delete a specific message, a userID property to delete messages by a
92✔
247
       * user, or be empty to delete all messages.
92✔
248
       */
92✔
249
      'chat:delete': ({ moderatorID, filter }) => {
92✔
250
        if ('id' in filter) {
×
251
          this.broadcast('chatDeleteByID', {
×
252
            moderatorID,
×
253
            _id: filter.id,
×
254
          });
×
255
        } else if ('userID' in filter) {
×
256
          this.broadcast('chatDeleteByUser', {
×
257
            moderatorID,
×
258
            userID: filter.userID,
×
259
          });
×
260
        } else if (isEmpty(filter)) {
×
261
          this.broadcast('chatDelete', { moderatorID });
×
262
        }
×
263
      },
92✔
264
      /**
92✔
265
       * Broadcast that a user was muted in chat.
92✔
266
       */
92✔
267
      'chat:mute': ({ moderatorID, userID, duration }) => {
92✔
268
        this.broadcast('chatMute', {
×
269
          userID,
×
270
          moderatorID,
×
271
          expiresAt: Date.now() + duration,
×
272
        });
×
273
      },
92✔
274
      /**
92✔
275
       * Broadcast that a user was unmuted in chat.
92✔
276
       */
92✔
277
      'chat:unmute': ({ moderatorID, userID }) => {
92✔
278
        this.broadcast('chatUnmute', { userID, moderatorID });
×
279
      },
92✔
280
      /**
92✔
281
       * Broadcast a vote for the current track.
92✔
282
       */
92✔
283
      'booth:vote': ({ userID, direction }) => {
92✔
284
        this.broadcast('vote', {
2✔
285
          _id: userID,
2✔
286
          value: direction,
2✔
287
        });
2✔
288
      },
92✔
289
      /**
92✔
290
       * Broadcast a favorite for the current track.
92✔
291
       */
92✔
292
      'booth:favorite': ({ userID }) => {
92✔
293
        this.broadcast('favorite', { userID });
×
294
      },
92✔
295
      /**
92✔
296
       * Cycle a single user's playlist.
92✔
297
       */
92✔
298
      'playlist:cycle': ({ userID, playlistID }) => {
92✔
299
        this.sendTo(userID, 'playlistCycle', { playlistID });
5✔
300
      },
92✔
301
      /**
92✔
302
       * Broadcast that a user joined the waitlist.
92✔
303
       */
92✔
304
      'waitlist:join': ({ userID, waitlist }) => {
92✔
305
        this.broadcast('waitlistJoin', { userID, waitlist });
7✔
306
      },
92✔
307
      /**
92✔
308
       * Broadcast that a user left the waitlist.
92✔
309
       */
92✔
310
      'waitlist:leave': ({ userID, waitlist }) => {
92✔
311
        this.broadcast('waitlistLeave', { userID, waitlist });
×
312
      },
92✔
313
      /**
92✔
314
       * Broadcast that a user was added to the waitlist.
92✔
315
       */
92✔
316
      'waitlist:add': ({
92✔
317
        userID, moderatorID, position, waitlist,
1✔
318
      }) => {
1✔
319
        this.broadcast('waitlistAdd', {
1✔
320
          userID, moderatorID, position, waitlist,
1✔
321
        });
1✔
322
      },
92✔
323
      /**
92✔
324
       * Broadcast that a user was removed from the waitlist.
92✔
325
       */
92✔
326
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
92✔
327
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
328
      },
92✔
329
      /**
92✔
330
       * Broadcast that a user was moved in the waitlist.
92✔
331
       */
92✔
332
      'waitlist:move': ({
92✔
333
        userID, moderatorID, position, waitlist,
×
334
      }) => {
×
335
        this.broadcast('waitlistMove', {
×
336
          userID, moderatorID, position, waitlist,
×
337
        });
×
338
      },
92✔
339
      /**
92✔
340
       * Broadcast a waitlist update.
92✔
341
       */
92✔
342
      'waitlist:update': (waitlist) => {
92✔
343
        this.broadcast('waitlistUpdate', waitlist);
5✔
344
      },
92✔
345
      /**
92✔
346
       * Broadcast that the waitlist was cleared.
92✔
347
       */
92✔
348
      'waitlist:clear': ({ moderatorID }) => {
92✔
349
        this.broadcast('waitlistClear', { moderatorID });
×
350
      },
92✔
351
      /**
92✔
352
       * Broadcast that the waitlist was locked.
92✔
353
       */
92✔
354
      'waitlist:lock': ({ moderatorID, locked }) => {
92✔
355
        this.broadcast('waitlistLock', { moderatorID, locked });
×
356
      },
92✔
357

92✔
358
      'acl:allow': ({ userID, roles }) => {
92✔
359
        this.broadcast('acl:allow', { userID, roles });
17✔
360
      },
92✔
361
      'acl:disallow': ({ userID, roles }) => {
92✔
362
        this.broadcast('acl:disallow', { userID, roles });
1✔
363
      },
92✔
364

92✔
365
      'user:update': ({ userID, moderatorID, new: update }) => {
92✔
366
        // TODO Remove this remnant of the old roles system
×
367
        if ('role' in update) {
×
368
          this.broadcast('roleChange', {
×
369
            moderatorID,
×
370
            userID,
×
371
            role: update.role,
×
372
          });
×
373
        }
×
374
        if ('username' in update) {
×
375
          this.broadcast('nameChange', {
×
376
            moderatorID,
×
377
            userID,
×
378
            username: update.username,
×
379
          });
×
380
        }
×
381
      },
92✔
382
      'user:join': async ({ userID }) => {
92✔
383
        const { users, redis } = this.#uw;
5✔
384
        const user = await users.getUser(userID);
5✔
385
        if (user) {
5✔
386
          // TODO this should not be the socket server code's responsibility
5✔
387
          await redis.rpush('users', user.id);
5✔
388
          this.broadcast('join', serializeUser(user));
5✔
389
        }
5✔
390
      },
92✔
391
      /**
92✔
392
       * Broadcast that a user left the server.
92✔
393
       */
92✔
394
      'user:leave': ({ userID }) => {
92✔
395
        this.broadcast('leave', userID);
×
396
      },
92✔
397
      /**
92✔
398
       * Broadcast a ban event.
92✔
399
       */
92✔
400
      'user:ban': ({
92✔
401
        moderatorID, userID, permanent = false, duration, expiresAt,
2✔
402
      }) => {
2✔
403
        this.broadcast('ban', {
2✔
404
          moderatorID, userID, permanent, duration, expiresAt,
2✔
405
        });
2✔
406

2✔
407
        this.#connections.forEach((connection) => {
2✔
408
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
409
            connection.ban();
×
410
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
411
            connection.close();
×
412
          }
×
413
        });
2✔
414
      },
92✔
415
      /**
92✔
416
       * Broadcast an unban event.
92✔
417
       */
92✔
418
      'user:unban': ({ moderatorID, userID }) => {
92✔
419
        this.broadcast('unban', { moderatorID, userID });
1✔
420
      },
92✔
421
      /**
92✔
422
       * Force-close a connection.
92✔
423
       */
92✔
424
      'http-api:socket:close': (userID) => {
92✔
425
        this.#connections.forEach((connection) => {
×
426
          if ('user' in connection && connection.user.id === userID) {
×
427
            connection.close();
×
428
          }
×
429
        });
×
430
      },
92✔
431

92✔
432
      'emotes:reload': () => {
92✔
433
        this.broadcast('reloadEmotes', null);
×
434
      },
92✔
435
    };
92✔
436
  }
92✔
437

1✔
438
  /**
1✔
439
   * Create `LostConnection`s for every user that's known to be online, but that
1✔
440
   * is not currently connected to the socket server.
1✔
441
   * @private
1✔
442
   */
1✔
443
  async initLostConnections() {
1✔
444
    const { User } = this.#uw.models;
92✔
445
    const userIDs = await this.#uw.redis.lrange('users', 0, -1);
92✔
446
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
92✔
447

92✔
448
    /** @type {User[]} */
92✔
449
    const disconnectedUsers = await User.where('_id').in(disconnectedIDs);
92✔
450
    disconnectedUsers.forEach((user) => {
92✔
451
      this.add(this.createLostConnection(user));
×
452
    });
92✔
453
  }
92✔
454

1✔
455
  /**
1✔
456
   * @param {import('ws')} socket
1✔
457
   * @param {import('http').IncomingMessage} request
1✔
458
   * @private
1✔
459
   */
1✔
460
  onSocketConnected(socket, request) {
1✔
461
    this.#logger.info({ req: request }, 'new connection');
5✔
462

5✔
463
    socket.on('error', (error) => {
5✔
464
      this.onSocketError(socket, error);
×
465
    });
5✔
466
    this.add(this.createGuestConnection(socket));
5✔
467
  }
5✔
468

1✔
469
  /**
1✔
470
   * @param {import('ws')} socket
1✔
471
   * @param {Error} error
1✔
472
   * @private
1✔
473
   */
1✔
474
  onSocketError(socket, error) {
1✔
475
    this.#logger.warn({ err: error }, 'socket error');
×
476

×
477
    this.options.onError(socket, error);
×
478
  }
×
479

1✔
480
  /**
1✔
481
   * @param {Error} error
1✔
482
   * @private
1✔
483
   */
1✔
484
  onError(error) {
1✔
485
    this.#logger.error({ err: error }, 'server error');
×
486

×
487
    this.options.onError(undefined, error);
×
488
  }
×
489

1✔
490
  /**
1✔
491
   * Get a LostConnection for a user, if one exists.
1✔
492
   *
1✔
493
   * @param {User} user
1✔
494
   * @private
1✔
495
   */
1✔
496
  getLostConnection(user) {
1✔
497
    return this.#connections.find((connection) => (
×
498
      connection instanceof LostConnection && connection.user.id === user.id
×
499
    ));
×
500
  }
×
501

1✔
502
  /**
1✔
503
   * Create a connection instance for an unauthenticated user.
1✔
504
   *
1✔
505
   * @param {import('ws')} socket
1✔
506
   * @private
1✔
507
   */
1✔
508
  createGuestConnection(socket) {
1✔
509
    const connection = new GuestConnection(this.#uw, socket, {
5✔
510
      authRegistry: this.authRegistry,
5✔
511
    });
5✔
512
    connection.on('close', () => {
5✔
513
      this.remove(connection);
×
514
    });
5✔
515
    connection.on('authenticate', async (user) => {
5✔
516
      const isReconnect = await connection.isReconnect(user);
5✔
517
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
5✔
518
      if (isReconnect) {
5!
519
        const previousConnection = this.getLostConnection(user);
×
520
        if (previousConnection) this.remove(previousConnection);
×
521
      }
×
522

5✔
523
      this.replace(connection, this.createAuthedConnection(socket, user));
5✔
524

5✔
525
      if (!isReconnect) {
5✔
526
        this.#uw.publish('user:join', { userID: user.id });
5✔
527
      }
5✔
528
    });
5✔
529
    return connection;
5✔
530
  }
5✔
531

1✔
532
  /**
1✔
533
   * Create a connection instance for an authenticated user.
1✔
534
   *
1✔
535
   * @param {WebSocket} socket
1✔
536
   * @param {User} user
1✔
537
   * @returns {AuthedConnection}
1✔
538
   * @private
1✔
539
   */
1✔
540
  createAuthedConnection(socket, user) {
1✔
541
    const connection = new AuthedConnection(this.#uw, socket, user);
5✔
542
    connection.on('close', ({ banned }) => {
5✔
543
      if (banned) {
5!
544
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
545
        disconnectUser(this.#uw, user._id);
×
546
      } else if (!this.#closing) {
5!
547
        this.#logger.info({ userId: user.id }, 'lost connection');
×
548
        this.add(this.createLostConnection(user));
×
549
      }
×
550
      this.remove(connection);
5✔
551
    });
5✔
552
    connection.on(
5✔
553
      'command',
5✔
554
      /**
5✔
555
       * @param {string} command
5✔
556
       * @param {import('type-fest').JsonValue} data
5✔
557
       */
5✔
558
      (command, data) => {
5✔
559
        this.#logger.trace({ userId: user.id, command, data }, 'command');
×
560
        if (has(this.#clientActions, command)) {
×
561
          // Ignore incorrect input
×
562
          const validate = this.#clientActionSchemas[command];
×
563
          if (validate && !validate(data)) {
×
564
            return;
×
565
          }
×
566

×
567
          const action = this.#clientActions[command];
×
568
          // @ts-expect-error TS2345 `data` is validated
×
569
          action(user, data, connection);
×
570
        }
×
571
      },
5✔
572
    );
5✔
573
    return connection;
5✔
574
  }
5✔
575

1✔
576
  /**
1✔
577
   * Create a connection instance for a user who disconnected.
1✔
578
   *
1✔
579
   * @param {User} user
1✔
580
   * @returns {LostConnection}
1✔
581
   * @private
1✔
582
   */
1✔
583
  createLostConnection(user) {
1✔
584
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
585
    connection.on('close', () => {
×
586
      this.#logger.info({ userId: user.id }, 'user left');
×
587
      this.remove(connection);
×
588
      // Only register that the user left if they didn't have another connection
×
589
      // still open.
×
590
      if (!this.connection(user)) {
×
591
        disconnectUser(this.#uw, user._id);
×
592
      }
×
593
    });
×
594
    return connection;
×
595
  }
×
596

1✔
597
  /**
1✔
598
   * Add a connection.
1✔
599
   *
1✔
600
   * @param {Connection} connection
1✔
601
   * @private
1✔
602
   */
1✔
603
  add(connection) {
1✔
604
    const userId = 'user' in connection ? connection.user.id : null;
10✔
605
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
10✔
606

10✔
607
    this.#connections.push(connection);
10✔
608
    this.#recountGuests();
10✔
609
  }
10✔
610

1✔
611
  /**
1✔
612
   * Remove a connection.
1✔
613
   *
1✔
614
   * @param {Connection} connection
1✔
615
   * @private
1✔
616
   */
1✔
617
  remove(connection) {
1✔
618
    const userId = 'user' in connection ? connection.user.id : null;
10✔
619
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
10✔
620

10✔
621
    const i = this.#connections.indexOf(connection);
10✔
622
    this.#connections.splice(i, 1);
10✔
623

10✔
624
    connection.removed();
10✔
625
    this.#recountGuests();
10✔
626
  }
10✔
627

1✔
628
  /**
1✔
629
   * Replace a connection instance with another connection instance. Useful when
1✔
630
   * a connection changes "type", like GuestConnection → AuthedConnection.
1✔
631
   *
1✔
632
   * @param {Connection} oldConnection
1✔
633
   * @param {Connection} newConnection
1✔
634
   * @private
1✔
635
   */
1✔
636
  replace(oldConnection, newConnection) {
1✔
637
    this.remove(oldConnection);
5✔
638
    this.add(newConnection);
5✔
639
  }
5✔
640

1✔
641
  /**
1✔
642
   * Handle command messages coming in from Redis.
1✔
643
   * Some commands are intended to broadcast immediately to all connected
1✔
644
   * clients, but others require special action.
1✔
645
   *
1✔
646
   * @param {string} channel
1✔
647
   * @param {string} rawCommand
1✔
648
   * @return {Promise<void>}
1✔
649
   * @private
1✔
650
   */
1✔
651
  async onServerMessage(channel, rawCommand) {
1✔
652
    /**
56✔
653
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
56✔
654
     */
56✔
655
    const json = sjson.safeParse(rawCommand);
56✔
656
    if (!json) {
56!
657
      return;
×
658
    }
×
659
    const { command, data } = json;
56✔
660

56✔
661
    this.#logger.trace({ channel, command, data }, 'server message');
56✔
662

56✔
663
    if (has(this.#serverActions, command)) {
56✔
664
      const action = this.#serverActions[command];
51✔
665
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
51✔
666
        // @ts-expect-error TS2345 `data` is validated
51✔
667
        action(data);
51✔
668
      }
51✔
669
    }
51✔
670
  }
56✔
671

1✔
672
  /**
1✔
673
   * Stop the socket server.
1✔
674
   *
1✔
675
   * @return {Promise<void>}
1✔
676
   */
1✔
677
  async destroy() {
1✔
678
    clearInterval(this.#pinger);
92✔
679

92✔
680
    this.#closing = true;
92✔
681
    for (const connection of this.#wss.clients) {
92✔
682
      connection.close();
5✔
683
    }
5✔
684

92✔
685
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
92✔
686
    await closeWsServer();
92✔
687
    await this.#redisSubscription.quit();
92✔
688

92✔
689
    this.#recountGuests.cancel();
92✔
690
  }
92✔
691

1✔
692
  /**
1✔
693
   * Get the connection instance for a specific user.
1✔
694
   *
1✔
695
   * @param {User|string} user The user.
1✔
696
   * @return {Connection|undefined}
1✔
697
   */
1✔
698
  connection(user) {
1✔
699
    const userID = typeof user === 'object' ? user.id : user;
×
700
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
701
  }
×
702

1✔
703
  ping() {
1✔
704
    this.#connections.forEach((connection) => {
×
705
      if ('socket' in connection) {
×
706
        connection.ping();
×
707
      }
×
708
    });
×
709
  }
×
710

1✔
711
  /**
1✔
712
   * Broadcast a command to all connected clients.
1✔
713
   *
1✔
714
   * @param {string} command Command name.
1✔
715
   * @param {import('type-fest').JsonValue} data Command data.
1✔
716
   */
1✔
717
  broadcast(command, data) {
1✔
718
    this.#logger.trace({
46✔
719
      command,
46✔
720
      data,
46✔
721
      to: this.#connections.map((connection) => (
46✔
722
        'user' in connection ? connection.user.id : null
20!
723
      )),
46✔
724
    }, 'broadcast');
46✔
725

46✔
726
    this.#connections.forEach((connection) => {
46✔
727
      connection.send(command, data);
20✔
728
    });
46✔
729
  }
46✔
730

1✔
731
  /**
1✔
732
   * Send a command to a single user.
1✔
733
   *
1✔
734
   * @param {User|string} user User or user ID to send the command to.
1✔
735
   * @param {string} command Command name.
1✔
736
   * @param {import('type-fest').JsonValue} data Command data.
1✔
737
   */
1✔
738
  sendTo(user, command, data) {
1✔
739
    const userID = typeof user === 'object' ? user.id : user;
5!
740

5✔
741
    this.#connections.forEach((connection) => {
5✔
742
      if ('user' in connection && connection.user.id === userID) {
4✔
743
        connection.send(command, data);
3✔
744
      }
3✔
745
    });
5✔
746
  }
5✔
747

1✔
748
  async getGuestCount() {
1✔
749
    const { redis } = this.#uw;
×
750
    const rawCount = await redis.get('http-api:guests');
×
751
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
752
      return 0;
×
753
    }
×
754
    return parseInt(rawCount, 10);
×
755
  }
×
756

1✔
757
  async #recountGuestsInternal() {
1✔
758
    const { redis } = this.#uw;
×
759
    const guests = this.#connections
×
760
      .filter((connection) => connection instanceof GuestConnection)
×
761
      .length;
×
762

×
763
    const lastGuestCount = await this.getGuestCount();
×
764
    if (guests !== lastGuestCount) {
×
765
      await redis.set('http-api:guests', guests);
×
766
      this.broadcast('guests', guests);
×
767
    }
×
768
  }
×
769
}
1✔
770

1✔
771
module.exports = SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc