• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12018248150

25 Nov 2024 08:27PM UTC coverage: 81.786% (-0.2%) from 81.949%
12018248150

Pull #667

github

goto-bus-stop
Remove remnants of replace booth

This was probably a nice idea we had back when üWave was first started.
A moderator would be able to instantly put a user in the booth, skipping
the waitlist.

In practice we never implemented it, and it would require client-side
support. I doubt it's all that useful and it is just dead code right
now.
Pull Request #667: Remove remnants of replace booth

826 of 998 branches covered (82.77%)

Branch coverage included in aggregate %.

9057 of 11086 relevant lines covered (81.7%)

85.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.78
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
/**
1✔
19
 * @typedef {import('./schema.js').User} User
1✔
20
 */
1✔
21

1✔
22
/**
1✔
23
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
24
 */
1✔
25

1✔
26
/**
1✔
27
 * @typedef {object} ClientActionParameters
1✔
28
 * @prop {string} sendChat
1✔
29
 * @prop {-1 | 1} vote
1✔
30
 * @prop {undefined} logout
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {{
1✔
35
 *   [Name in keyof ClientActionParameters]: (
1✔
36
 *     user: User,
1✔
37
 *     parameter: ClientActionParameters[Name],
1✔
38
 *     connection: AuthedConnection
1✔
39
 *   ) => void
1✔
40
 * }} ClientActions
1✔
41
 */
1✔
42

1✔
43
const ajv = new Ajv({
1✔
44
  coerceTypes: false,
1✔
45
  ownProperties: true,
1✔
46
  removeAdditional: true,
1✔
47
  useDefaults: false,
1✔
48
});
1✔
49

1✔
50
/**
1✔
51
 * @template {object} T
1✔
52
 * @param {T} object
1✔
53
 * @param {PropertyKey} property
1✔
54
 * @returns {property is keyof T}
1✔
55
 */
1✔
56
function has(object, property) {
125✔
57
  return Object.prototype.hasOwnProperty.call(object, property);
125✔
58
}
125✔
59

1✔
60
class SocketServer {
126✔
61
  /**
126✔
62
   * @param {import('./Uwave.js').Boot} uw
126✔
63
   * @param {{ secret: Buffer|string }} options
126✔
64
   */
126✔
65
  static async plugin(uw, options) {
126✔
66
    uw.socketServer = new SocketServer(uw, {
126✔
67
      secret: options.secret,
126✔
68
      server: uw.server,
126✔
69
    });
126✔
70

126✔
71
    uw.after(async () => {
126✔
72
      try {
126✔
73
        await uw.socketServer.initLostConnections();
126✔
74
      } catch (err) {
126!
75
        // No need to prevent startup for this.
×
76
        // We do need to clear the `users` list because the lost connection handlers
×
77
        // will not do so.
×
78
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
79
        await uw.redis.del('users');
×
80
      }
×
81
    });
126✔
82

126✔
83
    uw.onClose(async () => {
126✔
84
      await uw.socketServer.destroy();
126✔
85
    });
126✔
86
  }
126✔
87

126✔
88
  #uw;
126✔
89

126✔
90
  #logger;
126✔
91

126✔
92
  #redisSubscription;
126✔
93

126✔
94
  #wss;
126✔
95

126✔
96
  #closing = false;
126✔
97

126✔
98
  /** @type {Connection[]} */
126✔
99
  #connections = [];
126✔
100

126✔
101
  #pinger;
126✔
102

126✔
103
  /**
126✔
104
   * Update online guests count and broadcast an update if necessary.
126✔
105
   */
126✔
106
  #recountGuests;
126✔
107

126✔
108
  /**
126✔
109
   * Handlers for commands that come in from clients.
126✔
110
   *
126✔
111
   * @type {ClientActions}
126✔
112
   */
126✔
113
  #clientActions;
126✔
114

126✔
115
  /**
126✔
116
   * @type {{
126✔
117
   *   [K in keyof ClientActionParameters]:
126✔
118
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
126✔
119
   * }}
126✔
120
   */
126✔
121
  #clientActionSchemas;
126✔
122

126✔
123
  /**
126✔
124
   * Handlers for commands that come in from the server side.
126✔
125
   *
126✔
126
   * @type {import('./redisMessages.js').ServerActions}
126✔
127
   */
126✔
128
  #serverActions;
126✔
129

126✔
130
  /**
126✔
131
   * Create a socket server.
126✔
132
   *
126✔
133
   * @param {import('./Uwave.js').default} uw üWave Core instance.
126✔
134
   * @param {object} options Socket server options.
126✔
135
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
126✔
136
   *     users to reconnect before removing them.
126✔
137
   * @param {Buffer|string} options.secret
126✔
138
   * @param {import('http').Server | import('https').Server} [options.server]
126✔
139
   * @param {number} [options.port]
126✔
140
   */
126✔
141
  constructor(uw, options) {
126✔
142
    if (!uw) {
126!
143
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
144
    }
×
145

126✔
146
    this.#uw = uw;
126✔
147
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
126✔
148
      serializers: {
126✔
149
        req: stdSerializers.req,
126✔
150
      },
126✔
151
    });
126✔
152
    this.#redisSubscription = uw.redis.duplicate();
126✔
153

126✔
154
    this.options = {
126✔
155
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
126✔
156
      onError: (_socket, err) => {
126✔
157
        throw err;
×
158
      },
126✔
159
      timeout: 30,
126✔
160
      ...options,
126✔
161
    };
126✔
162

126✔
163
    // TODO put this behind a symbol, it's just public for tests
126✔
164
    this.authRegistry = new AuthRegistry(uw.redis);
126✔
165

126✔
166
    this.#wss = new WebSocketServer({
126✔
167
      server: options.server,
126✔
168
      port: options.server ? undefined : options.port,
126!
169
    });
126✔
170

126✔
171
    this.#redisSubscription.subscribe('uwave').catch((error) => {
126✔
172
      this.#logger.error(error);
×
173
    });
126✔
174
    this.#redisSubscription.on('message', (channel, command) => {
126✔
175
      // this returns a promise, but we don't handle the error case:
122✔
176
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
122✔
177
      this.onServerMessage(channel, command);
122✔
178
    });
126✔
179

126✔
180
    this.#wss.on('error', (error) => {
126✔
181
      this.onError(error);
×
182
    });
126✔
183
    this.#wss.on('connection', (socket, request) => {
126✔
184
      this.onSocketConnected(socket, request);
14✔
185
    });
126✔
186

126✔
187
    this.#pinger = setInterval(() => {
126✔
188
      this.ping();
×
189
    }, ms('10 seconds'));
126✔
190

126✔
191
    this.#recountGuests = debounce(() => {
126✔
192
      this.#recountGuestsInternal().catch((error) => {
×
193
        this.#logger.error({ err: error }, 'counting guests failed');
×
194
      });
×
195
    }, ms('2 seconds'));
126✔
196

126✔
197
    this.#clientActions = {
126✔
198
      sendChat: (user, message) => {
126✔
199
        this.#logger.trace({ user, message }, 'sendChat');
3✔
200
        this.#uw.chat.send(user, message);
3✔
201
      },
126✔
202
      vote: (user, direction) => {
126✔
203
        socketVote(this.#uw, user.id, direction);
×
204
      },
126✔
205
      logout: (user, _, connection) => {
126✔
206
        this.replace(connection, this.createGuestConnection(connection.socket));
×
207
        if (!this.connection(user)) {
×
208
          disconnectUser(this.#uw, user.id);
×
209
        }
×
210
      },
126✔
211
    };
126✔
212

126✔
213
    this.#clientActionSchemas = {
126✔
214
      sendChat: ajv.compile({
126✔
215
        type: 'string',
126✔
216
      }),
126✔
217
      vote: ajv.compile({
126✔
218
        type: 'integer',
126✔
219
        enum: [-1, 1],
126✔
220
      }),
126✔
221
      logout: ajv.compile(true),
126✔
222
    };
126✔
223

126✔
224
    this.#serverActions = {
126✔
225
      /**
126✔
226
       * Broadcast the next track.
126✔
227
       */
126✔
228
      'advance:complete': (next) => {
126✔
229
        if (next) {
8✔
230
          this.broadcast('advance', {
8✔
231
            historyID: next.historyID,
8✔
232
            userID: next.userID,
8✔
233
            media: next.media,
8✔
234
            playedAt: new Date(next.playedAt).getTime(),
8✔
235
          });
8✔
236
        } else {
8!
237
          this.broadcast('advance', null);
×
238
        }
×
239
      },
126✔
240
      /**
126✔
241
       * Broadcast a skip notification.
126✔
242
       */
126✔
243
      'booth:skip': ({ moderatorID, userID, reason }) => {
126✔
244
        this.broadcast('skip', { moderatorID, userID, reason });
×
245
      },
126✔
246
      /**
126✔
247
       * Broadcast a chat message.
126✔
248
       */
126✔
249
      'chat:message': (message) => {
126✔
250
        this.broadcast('chatMessage', message);
2✔
251
      },
126✔
252
      /**
126✔
253
       * Delete chat messages. The delete filter can have an _id property to
126✔
254
       * delete a specific message, a userID property to delete messages by a
126✔
255
       * user, or be empty to delete all messages.
126✔
256
       */
126✔
257
      'chat:delete': ({ moderatorID, filter }) => {
126✔
258
        if ('id' in filter) {
6✔
259
          this.broadcast('chatDeleteByID', {
2✔
260
            moderatorID,
2✔
261
            _id: filter.id,
2✔
262
          });
2✔
263
        } else if ('userID' in filter) {
6✔
264
          this.broadcast('chatDeleteByUser', {
2✔
265
            moderatorID,
2✔
266
            userID: filter.userID,
2✔
267
          });
2✔
268
        } else if (isEmpty(filter)) {
2✔
269
          this.broadcast('chatDelete', { moderatorID });
2✔
270
        }
2✔
271
      },
126✔
272
      /**
126✔
273
       * Broadcast that a user was muted in chat.
126✔
274
       */
126✔
275
      'chat:mute': ({ moderatorID, userID, duration }) => {
126✔
276
        this.broadcast('chatMute', {
1✔
277
          userID,
1✔
278
          moderatorID,
1✔
279
          expiresAt: Date.now() + duration,
1✔
280
        });
1✔
281
      },
126✔
282
      /**
126✔
283
       * Broadcast that a user was unmuted in chat.
126✔
284
       */
126✔
285
      'chat:unmute': ({ moderatorID, userID }) => {
126✔
286
        this.broadcast('chatUnmute', { userID, moderatorID });
×
287
      },
126✔
288
      /**
126✔
289
       * Broadcast a vote for the current track.
126✔
290
       */
126✔
291
      'booth:vote': ({ userID, direction }) => {
126✔
292
        this.broadcast('vote', {
2✔
293
          _id: userID,
2✔
294
          value: direction,
2✔
295
        });
2✔
296
      },
126✔
297
      /**
126✔
298
       * Broadcast a favorite for the current track.
126✔
299
       */
126✔
300
      'booth:favorite': ({ userID }) => {
126✔
301
        this.broadcast('favorite', { userID });
×
302
      },
126✔
303
      /**
126✔
304
       * Cycle a single user's playlist.
126✔
305
       */
126✔
306
      'playlist:cycle': ({ userID, playlistID }) => {
126✔
307
        this.sendTo(userID, 'playlistCycle', { playlistID });
8✔
308
      },
126✔
309
      /**
126✔
310
       * Broadcast that a user joined the waitlist.
126✔
311
       */
126✔
312
      'waitlist:join': ({ userID, waitlist }) => {
126✔
313
        this.broadcast('waitlistJoin', { userID, waitlist });
10✔
314
      },
126✔
315
      /**
126✔
316
       * Broadcast that a user left the waitlist.
126✔
317
       */
126✔
318
      'waitlist:leave': ({ userID, waitlist }) => {
126✔
319
        this.broadcast('waitlistLeave', { userID, waitlist });
×
320
      },
126✔
321
      /**
126✔
322
       * Broadcast that a user was added to the waitlist.
126✔
323
       */
126✔
324
      'waitlist:add': ({
126✔
325
        userID, moderatorID, position, waitlist,
1✔
326
      }) => {
1✔
327
        this.broadcast('waitlistAdd', {
1✔
328
          userID, moderatorID, position, waitlist,
1✔
329
        });
1✔
330
      },
126✔
331
      /**
126✔
332
       * Broadcast that a user was removed from the waitlist.
126✔
333
       */
126✔
334
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
126✔
335
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
336
      },
126✔
337
      /**
126✔
338
       * Broadcast that a user was moved in the waitlist.
126✔
339
       */
126✔
340
      'waitlist:move': ({
126✔
341
        userID, moderatorID, position, waitlist,
×
342
      }) => {
×
343
        this.broadcast('waitlistMove', {
×
344
          userID, moderatorID, position, waitlist,
×
345
        });
×
346
      },
126✔
347
      /**
126✔
348
       * Broadcast a waitlist update.
126✔
349
       */
126✔
350
      'waitlist:update': (waitlist) => {
126✔
351
        this.broadcast('waitlistUpdate', waitlist);
8✔
352
      },
126✔
353
      /**
126✔
354
       * Broadcast that the waitlist was cleared.
126✔
355
       */
126✔
356
      'waitlist:clear': ({ moderatorID }) => {
126✔
357
        this.broadcast('waitlistClear', { moderatorID });
×
358
      },
126✔
359
      /**
126✔
360
       * Broadcast that the waitlist was locked.
126✔
361
       */
126✔
362
      'waitlist:lock': ({ moderatorID, locked }) => {
126✔
363
        this.broadcast('waitlistLock', { moderatorID, locked });
×
364
      },
126✔
365

126✔
366
      'acl:allow': ({ userID, roles }) => {
126✔
367
        this.broadcast('acl:allow', { userID, roles });
42✔
368
      },
126✔
369
      'acl:disallow': ({ userID, roles }) => {
126✔
370
        this.broadcast('acl:disallow', { userID, roles });
2✔
371
      },
126✔
372

126✔
373
      'user:update': ({ userID, moderatorID, new: update }) => {
126✔
374
        // TODO Remove this remnant of the old roles system
3✔
375
        if ('role' in update) {
3!
376
          this.broadcast('roleChange', {
×
377
            moderatorID,
×
378
            userID,
×
379
            role: update.role,
×
380
          });
×
381
        }
×
382
        if ('username' in update) {
3✔
383
          this.broadcast('nameChange', {
3✔
384
            moderatorID,
3✔
385
            userID,
3✔
386
            username: update.username,
3✔
387
          });
3✔
388
        }
3✔
389
      },
126✔
390
      'user:join': async ({ userID }) => {
126✔
391
        const { users, redis } = this.#uw;
14✔
392
        const user = await users.getUser(userID);
14✔
393
        if (user) {
14✔
394
          // TODO this should not be the socket server code's responsibility
14✔
395
          await redis.rpush('users', user.id);
14✔
396
          this.broadcast('join', serializeUser(user));
14✔
397
        }
14✔
398
      },
126✔
399
      /**
126✔
400
       * Broadcast that a user left the server.
126✔
401
       */
126✔
402
      'user:leave': ({ userID }) => {
126✔
403
        this.broadcast('leave', userID);
×
404
      },
126✔
405
      /**
126✔
406
       * Broadcast a ban event.
126✔
407
       */
126✔
408
      'user:ban': ({
126✔
409
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
410
      }) => {
3✔
411
        this.broadcast('ban', {
3✔
412
          moderatorID, userID, permanent, duration, expiresAt,
3✔
413
        });
3✔
414

3✔
415
        this.#connections.forEach((connection) => {
3✔
416
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
417
            connection.ban();
×
418
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
419
            connection.close();
×
420
          }
×
421
        });
3✔
422
      },
126✔
423
      /**
126✔
424
       * Broadcast an unban event.
126✔
425
       */
126✔
426
      'user:unban': ({ moderatorID, userID }) => {
126✔
427
        this.broadcast('unban', { moderatorID, userID });
1✔
428
      },
126✔
429
      /**
126✔
430
       * Force-close a connection.
126✔
431
       */
126✔
432
      'http-api:socket:close': (userID) => {
126✔
433
        this.#connections.forEach((connection) => {
×
434
          if ('user' in connection && connection.user.id === userID) {
×
435
            connection.close();
×
436
          }
×
437
        });
×
438
      },
126✔
439

126✔
440
      'emotes:reload': () => {
126✔
441
        this.broadcast('reloadEmotes', null);
×
442
      },
126✔
443
    };
126✔
444
  }
126✔
445

126✔
446
  /**
126✔
447
   * Create `LostConnection`s for every user that's known to be online, but that
126✔
448
   * is not currently connected to the socket server.
126✔
449
   *
126✔
450
   * @private
126✔
451
   */
126✔
452
  async initLostConnections() {
126✔
453
    const { db, redis } = this.#uw;
126✔
454
    const userIDs = /** @type {import('./schema').UserID[]} */ (await redis.lrange('users', 0, -1));
126✔
455
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
126✔
456

126✔
457
    if (disconnectedIDs.length === 0) {
126✔
458
      return;
126✔
459
    }
126✔
460

×
461
    const disconnectedUsers = await db.selectFrom('users')
×
462
      .where('id', 'in', disconnectedIDs)
×
463
      .selectAll()
×
464
      .execute();
×
465
    disconnectedUsers.forEach((user) => {
×
466
      this.add(this.createLostConnection(user));
×
467
    });
×
468
  }
126✔
469

126✔
470
  /**
126✔
471
   * @param {import('ws').WebSocket} socket
126✔
472
   * @param {import('http').IncomingMessage} request
126✔
473
   * @private
126✔
474
   */
126✔
475
  onSocketConnected(socket, request) {
126✔
476
    this.#logger.info({ req: request }, 'new connection');
14✔
477

14✔
478
    socket.on('error', (error) => {
14✔
479
      this.onSocketError(socket, error);
×
480
    });
14✔
481
    this.add(this.createGuestConnection(socket));
14✔
482
  }
14✔
483

126✔
484
  /**
126✔
485
   * @param {import('ws').WebSocket} socket
126✔
486
   * @param {Error} error
126✔
487
   * @private
126✔
488
   */
126✔
489
  onSocketError(socket, error) {
126✔
490
    this.#logger.warn({ err: error }, 'socket error');
×
491

×
492
    this.options.onError(socket, error);
×
493
  }
×
494

126✔
495
  /**
126✔
496
   * @param {Error} error
126✔
497
   * @private
126✔
498
   */
126✔
499
  onError(error) {
126✔
500
    this.#logger.error({ err: error }, 'server error');
×
501

×
502
    this.options.onError(undefined, error);
×
503
  }
×
504

126✔
505
  /**
126✔
506
   * Get a LostConnection for a user, if one exists.
126✔
507
   *
126✔
508
   * @param {User} user
126✔
509
   * @private
126✔
510
   */
126✔
511
  getLostConnection(user) {
126✔
512
    return this.#connections.find((connection) => (
×
513
      connection instanceof LostConnection && connection.user.id === user.id
×
514
    ));
×
515
  }
×
516

126✔
517
  /**
126✔
518
   * Create a connection instance for an unauthenticated user.
126✔
519
   *
126✔
520
   * @param {import('ws').WebSocket} socket
126✔
521
   * @private
126✔
522
   */
126✔
523
  createGuestConnection(socket) {
126✔
524
    const connection = new GuestConnection(this.#uw, socket, {
14✔
525
      authRegistry: this.authRegistry,
14✔
526
    });
14✔
527
    connection.on('close', () => {
14✔
528
      this.remove(connection);
×
529
    });
14✔
530
    connection.on('authenticate', async (user) => {
14✔
531
      const isReconnect = await connection.isReconnect(user);
14✔
532
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
14✔
533
      if (isReconnect) {
14!
534
        const previousConnection = this.getLostConnection(user);
×
535
        if (previousConnection) this.remove(previousConnection);
×
536
      }
×
537

14✔
538
      this.replace(connection, this.createAuthedConnection(socket, user));
14✔
539

14✔
540
      if (!isReconnect) {
14✔
541
        this.#uw.publish('user:join', { userID: user.id });
14✔
542
      }
14✔
543
    });
14✔
544
    return connection;
14✔
545
  }
14✔
546

126✔
547
  /**
126✔
548
   * Create a connection instance for an authenticated user.
126✔
549
   *
126✔
550
   * @param {import('ws').WebSocket} socket
126✔
551
   * @param {User} user
126✔
552
   * @returns {AuthedConnection}
126✔
553
   * @private
126✔
554
   */
126✔
555
  createAuthedConnection(socket, user) {
126✔
556
    const connection = new AuthedConnection(this.#uw, socket, user);
14✔
557
    connection.on('close', ({ banned }) => {
14✔
558
      if (banned) {
14!
559
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
560
        disconnectUser(this.#uw, user.id);
×
561
      } else if (!this.#closing) {
14!
562
        this.#logger.info({ userId: user.id }, 'lost connection');
×
563
        this.add(this.createLostConnection(user));
×
564
      }
×
565
      this.remove(connection);
14✔
566
    });
14✔
567
    connection.on(
14✔
568
      'command',
14✔
569
      /**
14✔
570
       * @param {string} command
14✔
571
       * @param {import('type-fest').JsonValue} data
14✔
572
       */
14✔
573
      (command, data) => {
14✔
574
        this.#logger.trace({ userId: user.id, command, data }, 'command');
3✔
575
        if (has(this.#clientActions, command)) {
3✔
576
          // Ignore incorrect input
3✔
577
          const validate = this.#clientActionSchemas[command];
3✔
578
          if (validate && !validate(data)) {
3!
579
            return;
×
580
          }
×
581

3✔
582
          const action = this.#clientActions[command];
3✔
583
          // @ts-expect-error TS2345 `data` is validated
3✔
584
          action(user, data, connection);
3✔
585
        }
3✔
586
      },
14✔
587
    );
14✔
588
    return connection;
14✔
589
  }
14✔
590

126✔
591
  /**
126✔
592
   * Create a connection instance for a user who disconnected.
126✔
593
   *
126✔
594
   * @param {User} user
126✔
595
   * @returns {LostConnection}
126✔
596
   * @private
126✔
597
   */
126✔
598
  createLostConnection(user) {
126✔
599
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
600
    connection.on('close', () => {
×
601
      this.#logger.info({ userId: user.id }, 'user left');
×
602
      this.remove(connection);
×
603
      // Only register that the user left if they didn't have another connection
×
604
      // still open.
×
605
      if (!this.connection(user)) {
×
606
        disconnectUser(this.#uw, user.id);
×
607
      }
×
608
    });
×
609
    return connection;
×
610
  }
×
611

126✔
612
  /**
126✔
613
   * Add a connection.
126✔
614
   *
126✔
615
   * @param {Connection} connection
126✔
616
   * @private
126✔
617
   */
126✔
618
  add(connection) {
126✔
619
    const userId = 'user' in connection ? connection.user.id : null;
28✔
620
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
28✔
621

28✔
622
    this.#connections.push(connection);
28✔
623
    this.#recountGuests();
28✔
624
  }
28✔
625

126✔
626
  /**
126✔
627
   * Remove a connection.
126✔
628
   *
126✔
629
   * @param {Connection} connection
126✔
630
   * @private
126✔
631
   */
126✔
632
  remove(connection) {
126✔
633
    const userId = 'user' in connection ? connection.user.id : null;
28✔
634
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
28✔
635

28✔
636
    const i = this.#connections.indexOf(connection);
28✔
637
    this.#connections.splice(i, 1);
28✔
638

28✔
639
    connection.removed();
28✔
640
    this.#recountGuests();
28✔
641
  }
28✔
642

126✔
643
  /**
126✔
644
   * Replace a connection instance with another connection instance. Useful when
126✔
645
   * a connection changes "type", like GuestConnection → AuthedConnection.
126✔
646
   *
126✔
647
   * @param {Connection} oldConnection
126✔
648
   * @param {Connection} newConnection
126✔
649
   * @private
126✔
650
   */
126✔
651
  replace(oldConnection, newConnection) {
126✔
652
    this.remove(oldConnection);
14✔
653
    this.add(newConnection);
14✔
654
  }
14✔
655

126✔
656
  /**
126✔
657
   * Handle command messages coming in from Redis.
126✔
658
   * Some commands are intended to broadcast immediately to all connected
126✔
659
   * clients, but others require special action.
126✔
660
   *
126✔
661
   * @param {string} channel
126✔
662
   * @param {string} rawCommand
126✔
663
   * @returns {Promise<void>}
126✔
664
   * @private
126✔
665
   */
126✔
666
  async onServerMessage(channel, rawCommand) {
126✔
667
    /**
122✔
668
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
122✔
669
     */
122✔
670
    const json = sjson.safeParse(rawCommand);
122✔
671
    if (!json) {
122!
672
      return;
×
673
    }
×
674
    const { command, data } = json;
122✔
675

122✔
676
    this.#logger.trace({ channel, command, data }, 'server message');
122✔
677

122✔
678
    if (has(this.#serverActions, command)) {
122✔
679
      const action = this.#serverActions[command];
111✔
680
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
111✔
681
        // @ts-expect-error TS2345 `data` is validated
111✔
682
        action(data);
111✔
683
      }
111✔
684
    }
111✔
685
  }
122✔
686

126✔
687
  /**
126✔
688
   * Stop the socket server.
126✔
689
   *
126✔
690
   * @returns {Promise<void>}
126✔
691
   */
126✔
692
  async destroy() {
126✔
693
    clearInterval(this.#pinger);
126✔
694

126✔
695
    this.#closing = true;
126✔
696
    for (const connection of this.#wss.clients) {
126✔
697
      connection.close();
14✔
698
    }
14✔
699

126✔
700
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
126✔
701
    await closeWsServer();
126✔
702
    await this.#redisSubscription.quit();
126✔
703

126✔
704
    this.#recountGuests.cancel();
126✔
705
  }
126✔
706

126✔
707
  /**
126✔
708
   * Get the connection instance for a specific user.
126✔
709
   *
126✔
710
   * @param {User|string} user The user.
126✔
711
   * @returns {Connection|undefined}
126✔
712
   */
126✔
713
  connection(user) {
126✔
714
    const userID = typeof user === 'object' ? user.id : user;
×
715
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
716
  }
×
717

126✔
718
  ping() {
126✔
719
    this.#connections.forEach((connection) => {
×
720
      if ('socket' in connection) {
×
721
        connection.ping();
×
722
      }
×
723
    });
×
724
  }
×
725

126✔
726
  /**
126✔
727
   * Broadcast a command to all connected clients.
126✔
728
   *
126✔
729
   * @param {string} command Command name.
126✔
730
   * @param {import('type-fest').JsonValue} data Command data.
126✔
731
   */
126✔
732
  broadcast(command, data) {
126✔
733
    this.#logger.trace({
103✔
734
      command,
103✔
735
      data,
103✔
736
      to: this.#connections.map((connection) => (
103✔
737
        'user' in connection ? connection.user.id : null
51!
738
      )),
103✔
739
    }, 'broadcast');
103✔
740

103✔
741
    this.#connections.forEach((connection) => {
103✔
742
      connection.send(command, data);
51✔
743
    });
103✔
744
  }
103✔
745

126✔
746
  /**
126✔
747
   * Send a command to a single user.
126✔
748
   *
126✔
749
   * @param {User|string} user User or user ID to send the command to.
126✔
750
   * @param {string} command Command name.
126✔
751
   * @param {import('type-fest').JsonValue} data Command data.
126✔
752
   */
126✔
753
  sendTo(user, command, data) {
126✔
754
    const userID = typeof user === 'object' ? user.id : user;
8!
755

8✔
756
    this.#connections.forEach((connection) => {
8✔
757
      if ('user' in connection && connection.user.id === userID) {
8✔
758
        connection.send(command, data);
6✔
759
      }
6✔
760
    });
8✔
761
  }
8✔
762

126✔
763
  async getGuestCount() {
126✔
764
    const { redis } = this.#uw;
×
765
    const rawCount = await redis.get('http-api:guests');
×
766
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
767
      return 0;
×
768
    }
×
769
    return parseInt(rawCount, 10);
×
770
  }
×
771

126✔
772
  async #recountGuestsInternal() {
126✔
773
    const { redis } = this.#uw;
×
774
    const guests = this.#connections
×
775
      .filter((connection) => connection instanceof GuestConnection)
×
776
      .length;
×
777

×
778
    const lastGuestCount = await this.getGuestCount();
×
779
    if (guests !== lastGuestCount) {
×
780
      await redis.set('http-api:guests', guests);
×
781
      this.broadcast('guests', guests);
×
782
    }
×
783
  }
×
784
}
126✔
785

1✔
786
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc