• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 7050123893

30 Nov 2023 05:31PM UTC coverage: 80.455% (-0.02%) from 80.477%
7050123893

push

github

goto-bus-stop
Clear active users on startup if lost connection handlers were not set up

640 of 776 branches covered (0.0%)

Branch coverage included in aggregate %.

0 of 4 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

8305 of 10342 relevant lines covered (80.3%)

44.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.95
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import mongoose from 'mongoose';
1✔
3
import lodash from 'lodash';
1✔
4
import sjson from 'secure-json-parse';
1✔
5
import { WebSocketServer } from 'ws';
1✔
6
import Ajv from 'ajv';
1✔
7
import ms from 'ms';
1✔
8
import { stdSerializers } from 'pino';
1✔
9
import { socketVote } from './controllers/booth.js';
1✔
10
import { disconnectUser } from './controllers/users.js';
1✔
11
import AuthRegistry from './AuthRegistry.js';
1✔
12
import GuestConnection from './sockets/GuestConnection.js';
1✔
13
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
14
import LostConnection from './sockets/LostConnection.js';
1✔
15
import { serializeUser } from './utils/serialize.js';
1✔
16

1✔
17
const { debounce, isEmpty } = lodash;
1✔
18
const { ObjectId } = mongoose.mongo;
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./models/index.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
56✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
56✔
60
}
56✔
61

1✔
62
class SocketServer {
92✔
63
  /**
92✔
64
   * @param {import('./Uwave.js').Boot} uw
92✔
65
   * @param {{ secret: Buffer|string }} options
92✔
66
   */
92✔
67
  static async plugin(uw, options) {
92✔
68
    uw.socketServer = new SocketServer(uw, {
92✔
69
      secret: options.secret,
92✔
70
      server: uw.server,
92✔
71
    });
92✔
72

92✔
73
    uw.after(async () => {
92✔
74
      try {
92✔
75
        await uw.socketServer.initLostConnections();
92✔
76
      } catch (err) {
92!
NEW
77
        // No need to prevent startup for this.
×
NEW
78
        // We do need to clear the `users` list because the lost connection handlers
×
NEW
79
        // will not do so.
×
80
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
NEW
81
        await uw.redis.del('users');
×
UNCOV
82
      }
×
83
    });
92✔
84

92✔
85
    uw.onClose(async () => {
92✔
86
      await uw.socketServer.destroy();
92✔
87
    });
92✔
88
  }
92✔
89

92✔
90
  #uw;
92✔
91

92✔
92
  #logger;
92✔
93

92✔
94
  #redisSubscription;
92✔
95

92✔
96
  #wss;
92✔
97

92✔
98
  #closing = false;
92✔
99

92✔
100
  /** @type {Connection[]} */
92✔
101
  #connections = [];
92✔
102

92✔
103
  #pinger;
92✔
104

92✔
105
  /**
92✔
106
   * Update online guests count and broadcast an update if necessary.
92✔
107
   */
92✔
108
  #recountGuests;
92✔
109

92✔
110
  /**
92✔
111
   * Handlers for commands that come in from clients.
92✔
112
   * @type {ClientActions}
92✔
113
   */
92✔
114
  #clientActions;
92✔
115

92✔
116
  /**
92✔
117
   * @type {{
92✔
118
   *   [K in keyof ClientActionParameters]:
92✔
119
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
92✔
120
   * }}
92✔
121
   */
92✔
122
  #clientActionSchemas;
92✔
123

92✔
124
  /**
92✔
125
   * Handlers for commands that come in from the server side.
92✔
126
   *
92✔
127
   * @type {import('./redisMessages.js').ServerActions}
92✔
128
   */
92✔
129
  #serverActions;
92✔
130

92✔
131
  /**
92✔
132
   * Create a socket server.
92✔
133
   *
92✔
134
   * @param {import('./Uwave.js').default} uw üWave Core instance.
92✔
135
   * @param {object} options Socket server options.
92✔
136
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
92✔
137
   *     users to reconnect before removing them.
92✔
138
   * @param {Buffer|string} options.secret
92✔
139
   * @param {import('http').Server | import('https').Server} [options.server]
92✔
140
   * @param {number} [options.port]
92✔
141
   */
92✔
142
  constructor(uw, options) {
92✔
143
    if (!uw) {
92!
144
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
145
    }
×
146

92✔
147
    this.#uw = uw;
92✔
148
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
92✔
149
      serializers: {
92✔
150
        req: stdSerializers.req,
92✔
151
      },
92✔
152
    });
92✔
153
    this.#redisSubscription = uw.redis.duplicate();
92✔
154

92✔
155
    this.options = {
92✔
156
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
92✔
157
      onError: (_socket, err) => {
92✔
158
        throw err;
×
159
      },
92✔
160
      timeout: 30,
92✔
161
      ...options,
92✔
162
    };
92✔
163

92✔
164
    // TODO put this behind a symbol, it's just public for tests
92✔
165
    this.authRegistry = new AuthRegistry(uw.redis);
92✔
166

92✔
167
    this.#wss = new WebSocketServer({
92✔
168
      server: options.server,
92✔
169
      port: options.server ? undefined : options.port,
92!
170
    });
92✔
171

92✔
172
    this.#redisSubscription.subscribe('uwave').catch((error) => {
92✔
173
      this.#logger.error(error);
×
174
    });
92✔
175
    this.#redisSubscription.on('message', (channel, command) => {
92✔
176
      // this returns a promise, but we don't handle the error case:
56✔
177
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
56✔
178
      this.onServerMessage(channel, command);
56✔
179
    });
92✔
180

92✔
181
    this.#wss.on('error', (error) => {
92✔
182
      this.onError(error);
×
183
    });
92✔
184
    this.#wss.on('connection', (socket, request) => {
92✔
185
      this.onSocketConnected(socket, request);
5✔
186
    });
92✔
187

92✔
188
    this.#pinger = setInterval(() => {
92✔
189
      this.ping();
×
190
    }, ms('10 seconds'));
92✔
191

92✔
192
    this.#recountGuests = debounce(() => {
92✔
193
      this.#recountGuestsInternal().catch((error) => {
×
194
        this.#logger.error({ err: error }, 'counting guests failed');
×
195
      });
×
196
    }, ms('2 seconds'));
92✔
197

92✔
198
    this.#clientActions = {
92✔
199
      sendChat: (user, message) => {
92✔
200
        this.#logger.trace({ user, message }, 'sendChat');
×
201
        this.#uw.chat.send(user, message);
×
202
      },
92✔
203
      vote: (user, direction) => {
92✔
204
        socketVote(this.#uw, user.id, direction);
×
205
      },
92✔
206
      logout: (user, _, connection) => {
92✔
207
        this.replace(connection, this.createGuestConnection(connection.socket));
×
208
        if (!this.connection(user)) {
×
209
          disconnectUser(this.#uw, user._id);
×
210
        }
×
211
      },
92✔
212
    };
92✔
213

92✔
214
    this.#clientActionSchemas = {
92✔
215
      sendChat: ajv.compile({
92✔
216
        type: 'string',
92✔
217
      }),
92✔
218
      vote: ajv.compile({
92✔
219
        type: 'integer',
92✔
220
        enum: [-1, 1],
92✔
221
      }),
92✔
222
      logout: ajv.compile(true),
92✔
223
    };
92✔
224

92✔
225
    this.#serverActions = {
92✔
226
      /**
92✔
227
       * Broadcast the next track.
92✔
228
       */
92✔
229
      'advance:complete': (next) => {
92✔
230
        if (next) {
5✔
231
          this.broadcast('advance', {
5✔
232
            historyID: next.historyID,
5✔
233
            userID: next.userID,
5✔
234
            itemID: next.itemID,
5✔
235
            media: next.media,
5✔
236
            playedAt: new Date(next.playedAt).getTime(),
5✔
237
          });
5✔
238
        } else {
5!
239
          this.broadcast('advance', null);
×
240
        }
×
241
      },
92✔
242
      /**
92✔
243
       * Broadcast a skip notification.
92✔
244
       */
92✔
245
      'booth:skip': ({ moderatorID, userID, reason }) => {
92✔
246
        this.broadcast('skip', { moderatorID, userID, reason });
×
247
      },
92✔
248
      /**
92✔
249
       * Broadcast a chat message.
92✔
250
       */
92✔
251
      'chat:message': (message) => {
92✔
252
        this.broadcast('chatMessage', message);
×
253
      },
92✔
254
      /**
92✔
255
       * Delete chat messages. The delete filter can have an _id property to
92✔
256
       * delete a specific message, a userID property to delete messages by a
92✔
257
       * user, or be empty to delete all messages.
92✔
258
       */
92✔
259
      'chat:delete': ({ moderatorID, filter }) => {
92✔
260
        if ('id' in filter) {
×
261
          this.broadcast('chatDeleteByID', {
×
262
            moderatorID,
×
263
            _id: filter.id,
×
264
          });
×
265
        } else if ('userID' in filter) {
×
266
          this.broadcast('chatDeleteByUser', {
×
267
            moderatorID,
×
268
            userID: filter.userID,
×
269
          });
×
270
        } else if (isEmpty(filter)) {
×
271
          this.broadcast('chatDelete', { moderatorID });
×
272
        }
×
273
      },
92✔
274
      /**
92✔
275
       * Broadcast that a user was muted in chat.
92✔
276
       */
92✔
277
      'chat:mute': ({ moderatorID, userID, duration }) => {
92✔
278
        this.broadcast('chatMute', {
×
279
          userID,
×
280
          moderatorID,
×
281
          expiresAt: Date.now() + duration,
×
282
        });
×
283
      },
92✔
284
      /**
92✔
285
       * Broadcast that a user was unmuted in chat.
92✔
286
       */
92✔
287
      'chat:unmute': ({ moderatorID, userID }) => {
92✔
288
        this.broadcast('chatUnmute', { userID, moderatorID });
×
289
      },
92✔
290
      /**
92✔
291
       * Broadcast a vote for the current track.
92✔
292
       */
92✔
293
      'booth:vote': ({ userID, direction }) => {
92✔
294
        this.broadcast('vote', {
2✔
295
          _id: userID,
2✔
296
          value: direction,
2✔
297
        });
2✔
298
      },
92✔
299
      /**
92✔
300
       * Broadcast a favorite for the current track.
92✔
301
       */
92✔
302
      'booth:favorite': ({ userID }) => {
92✔
303
        this.broadcast('favorite', { userID });
×
304
      },
92✔
305
      /**
92✔
306
       * Cycle a single user's playlist.
92✔
307
       */
92✔
308
      'playlist:cycle': ({ userID, playlistID }) => {
92✔
309
        this.sendTo(userID, 'playlistCycle', { playlistID });
5✔
310
      },
92✔
311
      /**
92✔
312
       * Broadcast that a user joined the waitlist.
92✔
313
       */
92✔
314
      'waitlist:join': ({ userID, waitlist }) => {
92✔
315
        this.broadcast('waitlistJoin', { userID, waitlist });
7✔
316
      },
92✔
317
      /**
92✔
318
       * Broadcast that a user left the waitlist.
92✔
319
       */
92✔
320
      'waitlist:leave': ({ userID, waitlist }) => {
92✔
321
        this.broadcast('waitlistLeave', { userID, waitlist });
×
322
      },
92✔
323
      /**
92✔
324
       * Broadcast that a user was added to the waitlist.
92✔
325
       */
92✔
326
      'waitlist:add': ({
92✔
327
        userID, moderatorID, position, waitlist,
1✔
328
      }) => {
1✔
329
        this.broadcast('waitlistAdd', {
1✔
330
          userID, moderatorID, position, waitlist,
1✔
331
        });
1✔
332
      },
92✔
333
      /**
92✔
334
       * Broadcast that a user was removed from the waitlist.
92✔
335
       */
92✔
336
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
92✔
337
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
338
      },
92✔
339
      /**
92✔
340
       * Broadcast that a user was moved in the waitlist.
92✔
341
       */
92✔
342
      'waitlist:move': ({
92✔
343
        userID, moderatorID, position, waitlist,
×
344
      }) => {
×
345
        this.broadcast('waitlistMove', {
×
346
          userID, moderatorID, position, waitlist,
×
347
        });
×
348
      },
92✔
349
      /**
92✔
350
       * Broadcast a waitlist update.
92✔
351
       */
92✔
352
      'waitlist:update': (waitlist) => {
92✔
353
        this.broadcast('waitlistUpdate', waitlist);
5✔
354
      },
92✔
355
      /**
92✔
356
       * Broadcast that the waitlist was cleared.
92✔
357
       */
92✔
358
      'waitlist:clear': ({ moderatorID }) => {
92✔
359
        this.broadcast('waitlistClear', { moderatorID });
×
360
      },
92✔
361
      /**
92✔
362
       * Broadcast that the waitlist was locked.
92✔
363
       */
92✔
364
      'waitlist:lock': ({ moderatorID, locked }) => {
92✔
365
        this.broadcast('waitlistLock', { moderatorID, locked });
×
366
      },
92✔
367

92✔
368
      'acl:allow': ({ userID, roles }) => {
92✔
369
        this.broadcast('acl:allow', { userID, roles });
17✔
370
      },
92✔
371
      'acl:disallow': ({ userID, roles }) => {
92✔
372
        this.broadcast('acl:disallow', { userID, roles });
1✔
373
      },
92✔
374

92✔
375
      'user:update': ({ userID, moderatorID, new: update }) => {
92✔
376
        // TODO Remove this remnant of the old roles system
×
377
        if ('role' in update) {
×
378
          this.broadcast('roleChange', {
×
379
            moderatorID,
×
380
            userID,
×
381
            role: update.role,
×
382
          });
×
383
        }
×
384
        if ('username' in update) {
×
385
          this.broadcast('nameChange', {
×
386
            moderatorID,
×
387
            userID,
×
388
            username: update.username,
×
389
          });
×
390
        }
×
391
      },
92✔
392
      'user:join': async ({ userID }) => {
92✔
393
        const { users, redis } = this.#uw;
5✔
394
        const user = await users.getUser(userID);
5✔
395
        if (user) {
5✔
396
          // TODO this should not be the socket server code's responsibility
5✔
397
          await redis.rpush('users', user.id);
5✔
398
          this.broadcast('join', serializeUser(user));
5✔
399
        }
5✔
400
      },
92✔
401
      /**
92✔
402
       * Broadcast that a user left the server.
92✔
403
       */
92✔
404
      'user:leave': ({ userID }) => {
92✔
405
        this.broadcast('leave', userID);
×
406
      },
92✔
407
      /**
92✔
408
       * Broadcast a ban event.
92✔
409
       */
92✔
410
      'user:ban': ({
92✔
411
        moderatorID, userID, permanent = false, duration, expiresAt,
2✔
412
      }) => {
2✔
413
        this.broadcast('ban', {
2✔
414
          moderatorID, userID, permanent, duration, expiresAt,
2✔
415
        });
2✔
416

2✔
417
        this.#connections.forEach((connection) => {
2✔
418
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
419
            connection.ban();
×
420
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
421
            connection.close();
×
422
          }
×
423
        });
2✔
424
      },
92✔
425
      /**
92✔
426
       * Broadcast an unban event.
92✔
427
       */
92✔
428
      'user:unban': ({ moderatorID, userID }) => {
92✔
429
        this.broadcast('unban', { moderatorID, userID });
1✔
430
      },
92✔
431
      /**
92✔
432
       * Force-close a connection.
92✔
433
       */
92✔
434
      'http-api:socket:close': (userID) => {
92✔
435
        this.#connections.forEach((connection) => {
×
436
          if ('user' in connection && connection.user.id === userID) {
×
437
            connection.close();
×
438
          }
×
439
        });
×
440
      },
92✔
441

92✔
442
      'emotes:reload': () => {
92✔
443
        this.broadcast('reloadEmotes', null);
×
444
      },
92✔
445
    };
92✔
446
  }
92✔
447

92✔
448
  /**
92✔
449
   * Create `LostConnection`s for every user that's known to be online, but that
92✔
450
   * is not currently connected to the socket server.
92✔
451
   * @private
92✔
452
   */
92✔
453
  async initLostConnections() {
92✔
454
    const { User } = this.#uw.models;
92✔
455
    const userIDs = await this.#uw.redis.lrange('users', 0, -1);
92✔
456
    const disconnectedIDs = userIDs
92✔
457
      .filter((userID) => !this.connection(userID))
92✔
458
      .map((userID) => new ObjectId(userID));
92✔
459

92✔
460
    /** @type {User[]} */
92✔
461
    const disconnectedUsers = await User.find({
92✔
462
      _id: { $in: disconnectedIDs },
92✔
463
    }).exec();
92✔
464
    disconnectedUsers.forEach((user) => {
92✔
465
      this.add(this.createLostConnection(user));
×
466
    });
92✔
467
  }
92✔
468

92✔
469
  /**
92✔
470
   * @param {import('ws').WebSocket} socket
92✔
471
   * @param {import('http').IncomingMessage} request
92✔
472
   * @private
92✔
473
   */
92✔
474
  onSocketConnected(socket, request) {
92✔
475
    this.#logger.info({ req: request }, 'new connection');
5✔
476

5✔
477
    socket.on('error', (error) => {
5✔
478
      this.onSocketError(socket, error);
×
479
    });
5✔
480
    this.add(this.createGuestConnection(socket));
5✔
481
  }
5✔
482

92✔
483
  /**
92✔
484
   * @param {import('ws').WebSocket} socket
92✔
485
   * @param {Error} error
92✔
486
   * @private
92✔
487
   */
92✔
488
  onSocketError(socket, error) {
92✔
489
    this.#logger.warn({ err: error }, 'socket error');
×
490

×
491
    this.options.onError(socket, error);
×
492
  }
×
493

92✔
494
  /**
92✔
495
   * @param {Error} error
92✔
496
   * @private
92✔
497
   */
92✔
498
  onError(error) {
92✔
499
    this.#logger.error({ err: error }, 'server error');
×
500

×
501
    this.options.onError(undefined, error);
×
502
  }
×
503

92✔
504
  /**
92✔
505
   * Get a LostConnection for a user, if one exists.
92✔
506
   *
92✔
507
   * @param {User} user
92✔
508
   * @private
92✔
509
   */
92✔
510
  getLostConnection(user) {
92✔
511
    return this.#connections.find((connection) => (
×
512
      connection instanceof LostConnection && connection.user.id === user.id
×
513
    ));
×
514
  }
×
515

92✔
516
  /**
92✔
517
   * Create a connection instance for an unauthenticated user.
92✔
518
   *
92✔
519
   * @param {import('ws').WebSocket} socket
92✔
520
   * @private
92✔
521
   */
92✔
522
  createGuestConnection(socket) {
92✔
523
    const connection = new GuestConnection(this.#uw, socket, {
5✔
524
      authRegistry: this.authRegistry,
5✔
525
    });
5✔
526
    connection.on('close', () => {
5✔
527
      this.remove(connection);
×
528
    });
5✔
529
    connection.on('authenticate', async (user) => {
5✔
530
      const isReconnect = await connection.isReconnect(user);
5✔
531
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
5✔
532
      if (isReconnect) {
5!
533
        const previousConnection = this.getLostConnection(user);
×
534
        if (previousConnection) this.remove(previousConnection);
×
535
      }
×
536

5✔
537
      this.replace(connection, this.createAuthedConnection(socket, user));
5✔
538

5✔
539
      if (!isReconnect) {
5✔
540
        this.#uw.publish('user:join', { userID: user.id });
5✔
541
      }
5✔
542
    });
5✔
543
    return connection;
5✔
544
  }
5✔
545

92✔
546
  /**
92✔
547
   * Create a connection instance for an authenticated user.
92✔
548
   *
92✔
549
   * @param {import('ws').WebSocket} socket
92✔
550
   * @param {User} user
92✔
551
   * @returns {AuthedConnection}
92✔
552
   * @private
92✔
553
   */
92✔
554
  createAuthedConnection(socket, user) {
92✔
555
    const connection = new AuthedConnection(this.#uw, socket, user);
5✔
556
    connection.on('close', ({ banned }) => {
5✔
557
      if (banned) {
5!
558
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
559
        disconnectUser(this.#uw, user._id);
×
560
      } else if (!this.#closing) {
5!
561
        this.#logger.info({ userId: user.id }, 'lost connection');
×
562
        this.add(this.createLostConnection(user));
×
563
      }
×
564
      this.remove(connection);
5✔
565
    });
5✔
566
    connection.on(
5✔
567
      'command',
5✔
568
      /**
5✔
569
       * @param {string} command
5✔
570
       * @param {import('type-fest').JsonValue} data
5✔
571
       */
5✔
572
      (command, data) => {
5✔
573
        this.#logger.trace({ userId: user.id, command, data }, 'command');
×
574
        if (has(this.#clientActions, command)) {
×
575
          // Ignore incorrect input
×
576
          const validate = this.#clientActionSchemas[command];
×
577
          if (validate && !validate(data)) {
×
578
            return;
×
579
          }
×
580

×
581
          const action = this.#clientActions[command];
×
582
          // @ts-expect-error TS2345 `data` is validated
×
583
          action(user, data, connection);
×
584
        }
×
585
      },
5✔
586
    );
5✔
587
    return connection;
5✔
588
  }
5✔
589

92✔
590
  /**
92✔
591
   * Create a connection instance for a user who disconnected.
92✔
592
   *
92✔
593
   * @param {User} user
92✔
594
   * @returns {LostConnection}
92✔
595
   * @private
92✔
596
   */
92✔
597
  createLostConnection(user) {
92✔
598
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
599
    connection.on('close', () => {
×
600
      this.#logger.info({ userId: user.id }, 'user left');
×
601
      this.remove(connection);
×
602
      // Only register that the user left if they didn't have another connection
×
603
      // still open.
×
604
      if (!this.connection(user)) {
×
605
        disconnectUser(this.#uw, user._id);
×
606
      }
×
607
    });
×
608
    return connection;
×
609
  }
×
610

92✔
611
  /**
92✔
612
   * Add a connection.
92✔
613
   *
92✔
614
   * @param {Connection} connection
92✔
615
   * @private
92✔
616
   */
92✔
617
  add(connection) {
92✔
618
    const userId = 'user' in connection ? connection.user.id : null;
10✔
619
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
10✔
620

10✔
621
    this.#connections.push(connection);
10✔
622
    this.#recountGuests();
10✔
623
  }
10✔
624

92✔
625
  /**
92✔
626
   * Remove a connection.
92✔
627
   *
92✔
628
   * @param {Connection} connection
92✔
629
   * @private
92✔
630
   */
92✔
631
  remove(connection) {
92✔
632
    const userId = 'user' in connection ? connection.user.id : null;
10✔
633
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
10✔
634

10✔
635
    const i = this.#connections.indexOf(connection);
10✔
636
    this.#connections.splice(i, 1);
10✔
637

10✔
638
    connection.removed();
10✔
639
    this.#recountGuests();
10✔
640
  }
10✔
641

92✔
642
  /**
92✔
643
   * Replace a connection instance with another connection instance. Useful when
92✔
644
   * a connection changes "type", like GuestConnection → AuthedConnection.
92✔
645
   *
92✔
646
   * @param {Connection} oldConnection
92✔
647
   * @param {Connection} newConnection
92✔
648
   * @private
92✔
649
   */
92✔
650
  replace(oldConnection, newConnection) {
92✔
651
    this.remove(oldConnection);
5✔
652
    this.add(newConnection);
5✔
653
  }
5✔
654

92✔
655
  /**
92✔
656
   * Handle command messages coming in from Redis.
92✔
657
   * Some commands are intended to broadcast immediately to all connected
92✔
658
   * clients, but others require special action.
92✔
659
   *
92✔
660
   * @param {string} channel
92✔
661
   * @param {string} rawCommand
92✔
662
   * @return {Promise<void>}
92✔
663
   * @private
92✔
664
   */
92✔
665
  async onServerMessage(channel, rawCommand) {
92✔
666
    /**
56✔
667
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
56✔
668
     */
56✔
669
    const json = sjson.safeParse(rawCommand);
56✔
670
    if (!json) {
56!
671
      return;
×
672
    }
×
673
    const { command, data } = json;
56✔
674

56✔
675
    this.#logger.trace({ channel, command, data }, 'server message');
56✔
676

56✔
677
    if (has(this.#serverActions, command)) {
56✔
678
      const action = this.#serverActions[command];
51✔
679
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
51✔
680
        // @ts-expect-error TS2345 `data` is validated
51✔
681
        action(data);
51✔
682
      }
51✔
683
    }
51✔
684
  }
56✔
685

92✔
686
  /**
92✔
687
   * Stop the socket server.
92✔
688
   *
92✔
689
   * @return {Promise<void>}
92✔
690
   */
92✔
691
  async destroy() {
92✔
692
    clearInterval(this.#pinger);
92✔
693

92✔
694
    this.#closing = true;
92✔
695
    for (const connection of this.#wss.clients) {
92✔
696
      connection.close();
5✔
697
    }
5✔
698

92✔
699
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
92✔
700
    await closeWsServer();
92✔
701
    await this.#redisSubscription.quit();
92✔
702

92✔
703
    this.#recountGuests.cancel();
92✔
704
  }
92✔
705

92✔
706
  /**
92✔
707
   * Get the connection instance for a specific user.
92✔
708
   *
92✔
709
   * @param {User|string} user The user.
92✔
710
   * @return {Connection|undefined}
92✔
711
   */
92✔
712
  connection(user) {
92✔
713
    const userID = typeof user === 'object' ? user.id : user;
×
714
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
715
  }
×
716

92✔
717
  ping() {
92✔
718
    this.#connections.forEach((connection) => {
×
719
      if ('socket' in connection) {
×
720
        connection.ping();
×
721
      }
×
722
    });
×
723
  }
×
724

92✔
725
  /**
92✔
726
   * Broadcast a command to all connected clients.
92✔
727
   *
92✔
728
   * @param {string} command Command name.
92✔
729
   * @param {import('type-fest').JsonValue} data Command data.
92✔
730
   */
92✔
731
  broadcast(command, data) {
92✔
732
    this.#logger.trace({
46✔
733
      command,
46✔
734
      data,
46✔
735
      to: this.#connections.map((connection) => (
46✔
736
        'user' in connection ? connection.user.id : null
20!
737
      )),
46✔
738
    }, 'broadcast');
46✔
739

46✔
740
    this.#connections.forEach((connection) => {
46✔
741
      connection.send(command, data);
20✔
742
    });
46✔
743
  }
46✔
744

92✔
745
  /**
92✔
746
   * Send a command to a single user.
92✔
747
   *
92✔
748
   * @param {User|string} user User or user ID to send the command to.
92✔
749
   * @param {string} command Command name.
92✔
750
   * @param {import('type-fest').JsonValue} data Command data.
92✔
751
   */
92✔
752
  sendTo(user, command, data) {
92✔
753
    const userID = typeof user === 'object' ? user.id : user;
5!
754

5✔
755
    this.#connections.forEach((connection) => {
5✔
756
      if ('user' in connection && connection.user.id === userID) {
4✔
757
        connection.send(command, data);
3✔
758
      }
3✔
759
    });
5✔
760
  }
5✔
761

92✔
762
  async getGuestCount() {
92✔
763
    const { redis } = this.#uw;
×
764
    const rawCount = await redis.get('http-api:guests');
×
765
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
766
      return 0;
×
767
    }
×
768
    return parseInt(rawCount, 10);
×
769
  }
×
770

92✔
771
  async #recountGuestsInternal() {
92✔
772
    const { redis } = this.#uw;
×
773
    const guests = this.#connections
×
774
      .filter((connection) => connection instanceof GuestConnection)
×
775
      .length;
×
776

×
777
    const lastGuestCount = await this.getGuestCount();
×
778
    if (guests !== lastGuestCount) {
×
779
      await redis.set('http-api:guests', guests);
×
780
      this.broadcast('guests', guests);
×
781
    }
×
782
  }
×
783
}
92✔
784

1✔
785
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc