• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 7049930908

30 Nov 2023 05:15PM UTC coverage: 80.502% (+0.01%) from 80.492%
7049930908

Pull #596

github

goto-bus-stop
Explicitly pass ObjectId to mongoose instead of relying on auto casting
Pull Request #596: Explicitly pass ObjectId to mongoose instead of relying on auto casting

640 of 775 branches covered (0.0%)

Branch coverage included in aggregate %.

14 of 14 new or added lines in 1 file covered. (100.0%)

100 existing lines in 1 file now uncovered.

8303 of 10334 relevant lines covered (80.35%)

44.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.58
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import mongoose from 'mongoose';
1✔
3
import lodash from 'lodash';
1✔
4
import sjson from 'secure-json-parse';
1✔
5
import { WebSocketServer } from 'ws';
1✔
6
import Ajv from 'ajv';
1✔
7
import ms from 'ms';
1✔
8
import { stdSerializers } from 'pino';
1✔
9
import { socketVote } from './controllers/booth.js';
1✔
10
import { disconnectUser } from './controllers/users.js';
1✔
11
import AuthRegistry from './AuthRegistry.js';
1✔
12
import GuestConnection from './sockets/GuestConnection.js';
1✔
13
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
14
import LostConnection from './sockets/LostConnection.js';
1✔
15
import { serializeUser } from './utils/serialize.js';
1✔
16

1✔
17
const { debounce, isEmpty } = lodash;
1✔
18
const { ObjectId } = mongoose.mongo;
1✔
19

1✔
20
/**
1✔
21
 * @typedef {import('./models/index.js').User} User
1✔
22
 */
1✔
23

1✔
24
/**
1✔
25
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
26
 */
1✔
27

1✔
28
/**
1✔
29
 * @typedef {object} ClientActionParameters
1✔
30
 * @prop {string} sendChat
1✔
31
 * @prop {-1 | 1} vote
1✔
32
 * @prop {undefined} logout
1✔
33
 */
1✔
34

1✔
35
/**
1✔
36
 * @typedef {{
1✔
37
 *   [Name in keyof ClientActionParameters]: (
1✔
38
 *     user: User,
1✔
39
 *     parameter: ClientActionParameters[Name],
1✔
40
 *     connection: AuthedConnection
1✔
41
 *   ) => void
1✔
42
 * }} ClientActions
1✔
43
 */
1✔
44

1✔
45
const ajv = new Ajv({
1✔
46
  coerceTypes: false,
1✔
47
  ownProperties: true,
1✔
48
  removeAdditional: true,
1✔
49
  useDefaults: false,
1✔
50
});
1✔
51

1✔
52
/**
1✔
53
 * @template {object} T
1✔
54
 * @param {T} object
1✔
55
 * @param {PropertyKey} property
1✔
56
 * @returns {property is keyof T}
1✔
57
 */
1✔
58
function has(object, property) {
56✔
59
  return Object.prototype.hasOwnProperty.call(object, property);
56✔
60
}
56✔
61

1✔
62
class SocketServer {
92✔
63
  /**
92✔
64
   * @param {import('./Uwave.js').Boot} uw
92✔
65
   * @param {{ secret: Buffer|string }} options
92✔
66
   */
92✔
67
  static async plugin(uw, options) {
92✔
68
    uw.socketServer = new SocketServer(uw, {
92✔
69
      secret: options.secret,
92✔
70
      server: uw.server,
92✔
71
    });
92✔
72

92✔
73
    uw.after(async () => {
92✔
74
      await uw.socketServer.initLostConnections();
92✔
75
    });
92✔
76

92✔
77
    uw.onClose(async () => {
92✔
78
      await uw.socketServer.destroy();
92✔
79
    });
92✔
80
  }
92✔
81

92✔
82
  #uw;
92✔
83

92✔
84
  #logger;
92✔
85

92✔
86
  #redisSubscription;
92✔
87

92✔
88
  #wss;
92✔
89

92✔
90
  #closing = false;
92✔
91

92✔
92
  /** @type {Connection[]} */
92✔
93
  #connections = [];
92✔
94

92✔
95
  #pinger;
92✔
96

92✔
97
  /**
92✔
98
   * Update online guests count and broadcast an update if necessary.
92✔
99
   */
92✔
100
  #recountGuests;
92✔
101

92✔
102
  /**
92✔
103
   * Handlers for commands that come in from clients.
92✔
104
   * @type {ClientActions}
92✔
105
   */
92✔
106
  #clientActions;
92✔
107

92✔
108
  /**
92✔
109
   * @type {{
92✔
110
   *   [K in keyof ClientActionParameters]:
92✔
111
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
92✔
112
   * }}
92✔
113
   */
92✔
114
  #clientActionSchemas;
92✔
115

92✔
116
  /**
92✔
117
   * Handlers for commands that come in from the server side.
92✔
118
   *
92✔
119
   * @type {import('./redisMessages.js').ServerActions}
92✔
120
   */
92✔
121
  #serverActions;
92✔
122

92✔
123
  /**
92✔
124
   * Create a socket server.
92✔
125
   *
92✔
126
   * @param {import('./Uwave.js').default} uw üWave Core instance.
92✔
127
   * @param {object} options Socket server options.
92✔
128
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
92✔
129
   *     users to reconnect before removing them.
92✔
130
   * @param {Buffer|string} options.secret
92✔
131
   * @param {import('http').Server | import('https').Server} [options.server]
92✔
132
   * @param {number} [options.port]
92✔
133
   */
92✔
134
  constructor(uw, options) {
92✔
135
    if (!uw) {
92!
UNCOV
136
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
UNCOV
137
    }
×
138

92✔
139
    this.#uw = uw;
92✔
140
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
92✔
141
      serializers: {
92✔
142
        req: stdSerializers.req,
92✔
143
      },
92✔
144
    });
92✔
145
    this.#redisSubscription = uw.redis.duplicate();
92✔
146

92✔
147
    this.options = {
92✔
148
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
92✔
149
      onError: (_socket, err) => {
92✔
UNCOV
150
        throw err;
×
151
      },
92✔
152
      timeout: 30,
92✔
153
      ...options,
92✔
154
    };
92✔
155

92✔
156
    // TODO put this behind a symbol, it's just public for tests
92✔
157
    this.authRegistry = new AuthRegistry(uw.redis);
92✔
158

92✔
159
    this.#wss = new WebSocketServer({
92✔
160
      server: options.server,
92✔
161
      port: options.server ? undefined : options.port,
92!
162
    });
92✔
163

92✔
164
    this.#redisSubscription.subscribe('uwave').catch((error) => {
92✔
UNCOV
165
      this.#logger.error(error);
×
166
    });
92✔
167
    this.#redisSubscription.on('message', (channel, command) => {
92✔
168
      // this returns a promise, but we don't handle the error case:
56✔
169
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
56✔
170
      this.onServerMessage(channel, command);
56✔
171
    });
92✔
172

92✔
173
    this.#wss.on('error', (error) => {
92✔
UNCOV
174
      this.onError(error);
×
175
    });
92✔
176
    this.#wss.on('connection', (socket, request) => {
92✔
177
      this.onSocketConnected(socket, request);
5✔
178
    });
92✔
179

92✔
180
    this.#pinger = setInterval(() => {
92✔
UNCOV
181
      this.ping();
×
182
    }, ms('10 seconds'));
92✔
183

92✔
184
    this.#recountGuests = debounce(() => {
92✔
UNCOV
185
      this.#recountGuestsInternal().catch((error) => {
×
186
        this.#logger.error({ err: error }, 'counting guests failed');
×
UNCOV
187
      });
×
188
    }, ms('2 seconds'));
92✔
189

92✔
190
    this.#clientActions = {
92✔
191
      sendChat: (user, message) => {
92✔
192
        this.#logger.trace({ user, message }, 'sendChat');
×
UNCOV
193
        this.#uw.chat.send(user, message);
×
194
      },
92✔
195
      vote: (user, direction) => {
92✔
UNCOV
196
        socketVote(this.#uw, user.id, direction);
×
197
      },
92✔
198
      logout: (user, _, connection) => {
92✔
UNCOV
199
        this.replace(connection, this.createGuestConnection(connection.socket));
×
UNCOV
200
        if (!this.connection(user)) {
×
201
          disconnectUser(this.#uw, user._id);
×
UNCOV
202
        }
×
203
      },
92✔
204
    };
92✔
205

92✔
206
    this.#clientActionSchemas = {
92✔
207
      sendChat: ajv.compile({
92✔
208
        type: 'string',
92✔
209
      }),
92✔
210
      vote: ajv.compile({
92✔
211
        type: 'integer',
92✔
212
        enum: [-1, 1],
92✔
213
      }),
92✔
214
      logout: ajv.compile(true),
92✔
215
    };
92✔
216

92✔
217
    this.#serverActions = {
92✔
218
      /**
92✔
219
       * Broadcast the next track.
92✔
220
       */
92✔
221
      'advance:complete': (next) => {
92✔
222
        if (next) {
5✔
223
          this.broadcast('advance', {
5✔
224
            historyID: next.historyID,
5✔
225
            userID: next.userID,
5✔
226
            itemID: next.itemID,
5✔
227
            media: next.media,
5✔
228
            playedAt: new Date(next.playedAt).getTime(),
5✔
229
          });
5✔
230
        } else {
5!
UNCOV
231
          this.broadcast('advance', null);
×
UNCOV
232
        }
×
233
      },
92✔
234
      /**
92✔
235
       * Broadcast a skip notification.
92✔
236
       */
92✔
237
      'booth:skip': ({ moderatorID, userID, reason }) => {
92✔
UNCOV
238
        this.broadcast('skip', { moderatorID, userID, reason });
×
239
      },
92✔
240
      /**
92✔
241
       * Broadcast a chat message.
92✔
242
       */
92✔
243
      'chat:message': (message) => {
92✔
UNCOV
244
        this.broadcast('chatMessage', message);
×
245
      },
92✔
246
      /**
92✔
247
       * Delete chat messages. The delete filter can have an _id property to
92✔
248
       * delete a specific message, a userID property to delete messages by a
92✔
249
       * user, or be empty to delete all messages.
92✔
250
       */
92✔
251
      'chat:delete': ({ moderatorID, filter }) => {
92✔
UNCOV
252
        if ('id' in filter) {
×
UNCOV
253
          this.broadcast('chatDeleteByID', {
×
UNCOV
254
            moderatorID,
×
UNCOV
255
            _id: filter.id,
×
UNCOV
256
          });
×
257
        } else if ('userID' in filter) {
×
258
          this.broadcast('chatDeleteByUser', {
×
259
            moderatorID,
×
260
            userID: filter.userID,
×
261
          });
×
262
        } else if (isEmpty(filter)) {
×
263
          this.broadcast('chatDelete', { moderatorID });
×
264
        }
×
265
      },
92✔
266
      /**
92✔
267
       * Broadcast that a user was muted in chat.
92✔
268
       */
92✔
269
      'chat:mute': ({ moderatorID, userID, duration }) => {
92✔
UNCOV
270
        this.broadcast('chatMute', {
×
UNCOV
271
          userID,
×
UNCOV
272
          moderatorID,
×
UNCOV
273
          expiresAt: Date.now() + duration,
×
UNCOV
274
        });
×
275
      },
92✔
276
      /**
92✔
277
       * Broadcast that a user was unmuted in chat.
92✔
278
       */
92✔
279
      'chat:unmute': ({ moderatorID, userID }) => {
92✔
UNCOV
280
        this.broadcast('chatUnmute', { userID, moderatorID });
×
281
      },
92✔
282
      /**
92✔
283
       * Broadcast a vote for the current track.
92✔
284
       */
92✔
285
      'booth:vote': ({ userID, direction }) => {
92✔
286
        this.broadcast('vote', {
2✔
287
          _id: userID,
2✔
288
          value: direction,
2✔
289
        });
2✔
290
      },
92✔
291
      /**
92✔
292
       * Broadcast a favorite for the current track.
92✔
293
       */
92✔
294
      'booth:favorite': ({ userID }) => {
92✔
UNCOV
295
        this.broadcast('favorite', { userID });
×
296
      },
92✔
297
      /**
92✔
298
       * Cycle a single user's playlist.
92✔
299
       */
92✔
300
      'playlist:cycle': ({ userID, playlistID }) => {
92✔
301
        this.sendTo(userID, 'playlistCycle', { playlistID });
5✔
302
      },
92✔
303
      /**
92✔
304
       * Broadcast that a user joined the waitlist.
92✔
305
       */
92✔
306
      'waitlist:join': ({ userID, waitlist }) => {
92✔
307
        this.broadcast('waitlistJoin', { userID, waitlist });
7✔
308
      },
92✔
309
      /**
92✔
310
       * Broadcast that a user left the waitlist.
92✔
311
       */
92✔
312
      'waitlist:leave': ({ userID, waitlist }) => {
92✔
UNCOV
313
        this.broadcast('waitlistLeave', { userID, waitlist });
×
314
      },
92✔
315
      /**
92✔
316
       * Broadcast that a user was added to the waitlist.
92✔
317
       */
92✔
318
      'waitlist:add': ({
92✔
319
        userID, moderatorID, position, waitlist,
1✔
320
      }) => {
1✔
321
        this.broadcast('waitlistAdd', {
1✔
322
          userID, moderatorID, position, waitlist,
1✔
323
        });
1✔
324
      },
92✔
325
      /**
92✔
326
       * Broadcast that a user was removed from the waitlist.
92✔
327
       */
92✔
328
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
92✔
UNCOV
329
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
330
      },
92✔
331
      /**
92✔
332
       * Broadcast that a user was moved in the waitlist.
92✔
333
       */
92✔
334
      'waitlist:move': ({
92✔
UNCOV
335
        userID, moderatorID, position, waitlist,
×
UNCOV
336
      }) => {
×
UNCOV
337
        this.broadcast('waitlistMove', {
×
UNCOV
338
          userID, moderatorID, position, waitlist,
×
UNCOV
339
        });
×
340
      },
92✔
341
      /**
92✔
342
       * Broadcast a waitlist update.
92✔
343
       */
92✔
344
      'waitlist:update': (waitlist) => {
92✔
345
        this.broadcast('waitlistUpdate', waitlist);
5✔
346
      },
92✔
347
      /**
92✔
348
       * Broadcast that the waitlist was cleared.
92✔
349
       */
92✔
350
      'waitlist:clear': ({ moderatorID }) => {
92✔
UNCOV
351
        this.broadcast('waitlistClear', { moderatorID });
×
352
      },
92✔
353
      /**
92✔
354
       * Broadcast that the waitlist was locked.
92✔
355
       */
92✔
356
      'waitlist:lock': ({ moderatorID, locked }) => {
92✔
UNCOV
357
        this.broadcast('waitlistLock', { moderatorID, locked });
×
358
      },
92✔
359

92✔
360
      'acl:allow': ({ userID, roles }) => {
92✔
361
        this.broadcast('acl:allow', { userID, roles });
17✔
362
      },
92✔
363
      'acl:disallow': ({ userID, roles }) => {
92✔
364
        this.broadcast('acl:disallow', { userID, roles });
1✔
365
      },
92✔
366

92✔
367
      'user:update': ({ userID, moderatorID, new: update }) => {
92✔
UNCOV
368
        // TODO Remove this remnant of the old roles system
×
UNCOV
369
        if ('role' in update) {
×
UNCOV
370
          this.broadcast('roleChange', {
×
UNCOV
371
            moderatorID,
×
UNCOV
372
            userID,
×
373
            role: update.role,
×
374
          });
×
375
        }
×
376
        if ('username' in update) {
×
377
          this.broadcast('nameChange', {
×
378
            moderatorID,
×
379
            userID,
×
380
            username: update.username,
×
381
          });
×
382
        }
×
383
      },
92✔
384
      'user:join': async ({ userID }) => {
92✔
385
        const { users, redis } = this.#uw;
5✔
386
        const user = await users.getUser(userID);
5✔
387
        if (user) {
5✔
388
          // TODO this should not be the socket server code's responsibility
5✔
389
          await redis.rpush('users', user.id);
5✔
390
          this.broadcast('join', serializeUser(user));
5✔
391
        }
5✔
392
      },
92✔
393
      /**
92✔
394
       * Broadcast that a user left the server.
92✔
395
       */
92✔
396
      'user:leave': ({ userID }) => {
92✔
UNCOV
397
        this.broadcast('leave', userID);
×
398
      },
92✔
399
      /**
92✔
400
       * Broadcast a ban event.
92✔
401
       */
92✔
402
      'user:ban': ({
92✔
403
        moderatorID, userID, permanent = false, duration, expiresAt,
2✔
404
      }) => {
2✔
405
        this.broadcast('ban', {
2✔
406
          moderatorID, userID, permanent, duration, expiresAt,
2✔
407
        });
2✔
408

2✔
409
        this.#connections.forEach((connection) => {
2✔
UNCOV
410
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
UNCOV
411
            connection.ban();
×
UNCOV
412
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
UNCOV
413
            connection.close();
×
UNCOV
414
          }
×
415
        });
2✔
416
      },
92✔
417
      /**
92✔
418
       * Broadcast an unban event.
92✔
419
       */
92✔
420
      'user:unban': ({ moderatorID, userID }) => {
92✔
421
        this.broadcast('unban', { moderatorID, userID });
1✔
422
      },
92✔
423
      /**
92✔
424
       * Force-close a connection.
92✔
425
       */
92✔
426
      'http-api:socket:close': (userID) => {
92✔
UNCOV
427
        this.#connections.forEach((connection) => {
×
UNCOV
428
          if ('user' in connection && connection.user.id === userID) {
×
UNCOV
429
            connection.close();
×
UNCOV
430
          }
×
UNCOV
431
        });
×
432
      },
92✔
433

92✔
434
      'emotes:reload': () => {
92✔
435
        this.broadcast('reloadEmotes', null);
×
436
      },
92✔
437
    };
92✔
438
  }
92✔
439

92✔
440
  /**
92✔
441
   * Create `LostConnection`s for every user that's known to be online, but that
92✔
442
   * is not currently connected to the socket server.
92✔
443
   * @private
92✔
444
   */
92✔
445
  async initLostConnections() {
92✔
446
    const { User } = this.#uw.models;
92✔
447
    const userIDs = await this.#uw.redis.lrange('users', 0, -1);
92✔
448
    const disconnectedIDs = userIDs
92✔
449
      .filter((userID) => !this.connection(userID))
92✔
450
      .map((userID) => new ObjectId(userID));
92✔
451

92✔
452
    /** @type {User[]} */
92✔
453
    const disconnectedUsers = await User.find({
92✔
454
      _id: { $in: disconnectedIDs },
92✔
455
    }).exec();
92✔
456
    disconnectedUsers.forEach((user) => {
92✔
UNCOV
457
      this.add(this.createLostConnection(user));
×
458
    });
92✔
459
  }
92✔
460

92✔
461
  /**
92✔
462
   * @param {import('ws').WebSocket} socket
92✔
463
   * @param {import('http').IncomingMessage} request
92✔
464
   * @private
92✔
465
   */
92✔
466
  onSocketConnected(socket, request) {
92✔
467
    this.#logger.info({ req: request }, 'new connection');
5✔
468

5✔
469
    socket.on('error', (error) => {
5✔
UNCOV
470
      this.onSocketError(socket, error);
×
471
    });
5✔
472
    this.add(this.createGuestConnection(socket));
5✔
473
  }
5✔
474

92✔
475
  /**
92✔
476
   * @param {import('ws').WebSocket} socket
92✔
477
   * @param {Error} error
92✔
478
   * @private
92✔
479
   */
92✔
480
  onSocketError(socket, error) {
92✔
UNCOV
481
    this.#logger.warn({ err: error }, 'socket error');
×
UNCOV
482

×
UNCOV
483
    this.options.onError(socket, error);
×
UNCOV
484
  }
×
485

92✔
486
  /**
92✔
487
   * @param {Error} error
92✔
488
   * @private
92✔
489
   */
92✔
490
  onError(error) {
92✔
UNCOV
491
    this.#logger.error({ err: error }, 'server error');
×
UNCOV
492

×
UNCOV
493
    this.options.onError(undefined, error);
×
UNCOV
494
  }
×
495

92✔
496
  /**
92✔
497
   * Get a LostConnection for a user, if one exists.
92✔
498
   *
92✔
499
   * @param {User} user
92✔
500
   * @private
92✔
501
   */
92✔
502
  getLostConnection(user) {
92✔
UNCOV
503
    return this.#connections.find((connection) => (
×
UNCOV
504
      connection instanceof LostConnection && connection.user.id === user.id
×
UNCOV
505
    ));
×
UNCOV
506
  }
×
507

92✔
508
  /**
92✔
509
   * Create a connection instance for an unauthenticated user.
92✔
510
   *
92✔
511
   * @param {import('ws').WebSocket} socket
92✔
512
   * @private
92✔
513
   */
92✔
514
  createGuestConnection(socket) {
92✔
515
    const connection = new GuestConnection(this.#uw, socket, {
5✔
516
      authRegistry: this.authRegistry,
5✔
517
    });
5✔
518
    connection.on('close', () => {
5✔
UNCOV
519
      this.remove(connection);
×
520
    });
5✔
521
    connection.on('authenticate', async (user) => {
5✔
522
      const isReconnect = await connection.isReconnect(user);
5✔
523
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
5✔
524
      if (isReconnect) {
5!
UNCOV
525
        const previousConnection = this.getLostConnection(user);
×
UNCOV
526
        if (previousConnection) this.remove(previousConnection);
×
UNCOV
527
      }
×
528

5✔
529
      this.replace(connection, this.createAuthedConnection(socket, user));
5✔
530

5✔
531
      if (!isReconnect) {
5✔
532
        this.#uw.publish('user:join', { userID: user.id });
5✔
533
      }
5✔
534
    });
5✔
535
    return connection;
5✔
536
  }
5✔
537

92✔
538
  /**
92✔
539
   * Create a connection instance for an authenticated user.
92✔
540
   *
92✔
541
   * @param {import('ws').WebSocket} socket
92✔
542
   * @param {User} user
92✔
543
   * @returns {AuthedConnection}
92✔
544
   * @private
92✔
545
   */
92✔
546
  createAuthedConnection(socket, user) {
92✔
547
    const connection = new AuthedConnection(this.#uw, socket, user);
5✔
548
    connection.on('close', ({ banned }) => {
5✔
549
      if (banned) {
5!
UNCOV
550
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
UNCOV
551
        disconnectUser(this.#uw, user._id);
×
552
      } else if (!this.#closing) {
5!
UNCOV
553
        this.#logger.info({ userId: user.id }, 'lost connection');
×
UNCOV
554
        this.add(this.createLostConnection(user));
×
555
      }
×
556
      this.remove(connection);
5✔
557
    });
5✔
558
    connection.on(
5✔
559
      'command',
5✔
560
      /**
5✔
561
       * @param {string} command
5✔
562
       * @param {import('type-fest').JsonValue} data
5✔
563
       */
5✔
564
      (command, data) => {
5✔
UNCOV
565
        this.#logger.trace({ userId: user.id, command, data }, 'command');
×
UNCOV
566
        if (has(this.#clientActions, command)) {
×
UNCOV
567
          // Ignore incorrect input
×
UNCOV
568
          const validate = this.#clientActionSchemas[command];
×
UNCOV
569
          if (validate && !validate(data)) {
×
570
            return;
×
571
          }
×
572

×
573
          const action = this.#clientActions[command];
×
574
          // @ts-expect-error TS2345 `data` is validated
×
575
          action(user, data, connection);
×
576
        }
×
577
      },
5✔
578
    );
5✔
579
    return connection;
5✔
580
  }
5✔
581

92✔
582
  /**
92✔
583
   * Create a connection instance for a user who disconnected.
92✔
584
   *
92✔
585
   * @param {User} user
92✔
586
   * @returns {LostConnection}
92✔
587
   * @private
92✔
588
   */
92✔
589
  createLostConnection(user) {
92✔
UNCOV
590
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
UNCOV
591
    connection.on('close', () => {
×
UNCOV
592
      this.#logger.info({ userId: user.id }, 'user left');
×
UNCOV
593
      this.remove(connection);
×
UNCOV
594
      // Only register that the user left if they didn't have another connection
×
595
      // still open.
×
596
      if (!this.connection(user)) {
×
597
        disconnectUser(this.#uw, user._id);
×
598
      }
×
599
    });
×
600
    return connection;
×
601
  }
×
602

92✔
603
  /**
92✔
604
   * Add a connection.
92✔
605
   *
92✔
606
   * @param {Connection} connection
92✔
607
   * @private
92✔
608
   */
92✔
609
  add(connection) {
92✔
610
    const userId = 'user' in connection ? connection.user.id : null;
10✔
611
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
10✔
612

10✔
613
    this.#connections.push(connection);
10✔
614
    this.#recountGuests();
10✔
615
  }
10✔
616

92✔
617
  /**
92✔
618
   * Remove a connection.
92✔
619
   *
92✔
620
   * @param {Connection} connection
92✔
621
   * @private
92✔
622
   */
92✔
623
  remove(connection) {
92✔
624
    const userId = 'user' in connection ? connection.user.id : null;
10✔
625
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
10✔
626

10✔
627
    const i = this.#connections.indexOf(connection);
10✔
628
    this.#connections.splice(i, 1);
10✔
629

10✔
630
    connection.removed();
10✔
631
    this.#recountGuests();
10✔
632
  }
10✔
633

92✔
634
  /**
92✔
635
   * Replace a connection instance with another connection instance. Useful when
92✔
636
   * a connection changes "type", like GuestConnection → AuthedConnection.
92✔
637
   *
92✔
638
   * @param {Connection} oldConnection
92✔
639
   * @param {Connection} newConnection
92✔
640
   * @private
92✔
641
   */
92✔
642
  replace(oldConnection, newConnection) {
92✔
643
    this.remove(oldConnection);
5✔
644
    this.add(newConnection);
5✔
645
  }
5✔
646

92✔
647
  /**
92✔
648
   * Handle command messages coming in from Redis.
92✔
649
   * Some commands are intended to broadcast immediately to all connected
92✔
650
   * clients, but others require special action.
92✔
651
   *
92✔
652
   * @param {string} channel
92✔
653
   * @param {string} rawCommand
92✔
654
   * @return {Promise<void>}
92✔
655
   * @private
92✔
656
   */
92✔
657
  async onServerMessage(channel, rawCommand) {
92✔
658
    /**
56✔
659
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
56✔
660
     */
56✔
661
    const json = sjson.safeParse(rawCommand);
56✔
662
    if (!json) {
56!
UNCOV
663
      return;
×
UNCOV
664
    }
×
665
    const { command, data } = json;
56✔
666

56✔
667
    this.#logger.trace({ channel, command, data }, 'server message');
56✔
668

56✔
669
    if (has(this.#serverActions, command)) {
56✔
670
      const action = this.#serverActions[command];
51✔
671
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
51✔
672
        // @ts-expect-error TS2345 `data` is validated
51✔
673
        action(data);
51✔
674
      }
51✔
675
    }
51✔
676
  }
56✔
677

92✔
678
  /**
92✔
679
   * Stop the socket server.
92✔
680
   *
92✔
681
   * @return {Promise<void>}
92✔
682
   */
92✔
683
  async destroy() {
92✔
684
    clearInterval(this.#pinger);
92✔
685

92✔
686
    this.#closing = true;
92✔
687
    for (const connection of this.#wss.clients) {
92✔
688
      connection.close();
5✔
689
    }
5✔
690

92✔
691
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
92✔
692
    await closeWsServer();
92✔
693
    await this.#redisSubscription.quit();
92✔
694

92✔
695
    this.#recountGuests.cancel();
92✔
696
  }
92✔
697

92✔
698
  /**
92✔
699
   * Get the connection instance for a specific user.
92✔
700
   *
92✔
701
   * @param {User|string} user The user.
92✔
702
   * @return {Connection|undefined}
92✔
703
   */
92✔
704
  connection(user) {
92✔
UNCOV
705
    const userID = typeof user === 'object' ? user.id : user;
×
UNCOV
706
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
UNCOV
707
  }
×
708

92✔
709
  ping() {
92✔
710
    this.#connections.forEach((connection) => {
×
711
      if ('socket' in connection) {
×
712
        connection.ping();
×
UNCOV
713
      }
×
UNCOV
714
    });
×
715
  }
×
716

92✔
717
  /**
92✔
718
   * Broadcast a command to all connected clients.
92✔
719
   *
92✔
720
   * @param {string} command Command name.
92✔
721
   * @param {import('type-fest').JsonValue} data Command data.
92✔
722
   */
92✔
723
  broadcast(command, data) {
92✔
724
    this.#logger.trace({
46✔
725
      command,
46✔
726
      data,
46✔
727
      to: this.#connections.map((connection) => (
46✔
728
        'user' in connection ? connection.user.id : null
20!
729
      )),
46✔
730
    }, 'broadcast');
46✔
731

46✔
732
    this.#connections.forEach((connection) => {
46✔
733
      connection.send(command, data);
20✔
734
    });
46✔
735
  }
46✔
736

92✔
737
  /**
92✔
738
   * Send a command to a single user.
92✔
739
   *
92✔
740
   * @param {User|string} user User or user ID to send the command to.
92✔
741
   * @param {string} command Command name.
92✔
742
   * @param {import('type-fest').JsonValue} data Command data.
92✔
743
   */
92✔
744
  sendTo(user, command, data) {
92✔
745
    const userID = typeof user === 'object' ? user.id : user;
5!
746

5✔
747
    this.#connections.forEach((connection) => {
5✔
748
      if ('user' in connection && connection.user.id === userID) {
4✔
749
        connection.send(command, data);
3✔
750
      }
3✔
751
    });
5✔
752
  }
5✔
753

92✔
754
  async getGuestCount() {
92✔
UNCOV
755
    const { redis } = this.#uw;
×
UNCOV
756
    const rawCount = await redis.get('http-api:guests');
×
UNCOV
757
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
UNCOV
758
      return 0;
×
UNCOV
759
    }
×
760
    return parseInt(rawCount, 10);
×
761
  }
×
762

92✔
763
  async #recountGuestsInternal() {
92✔
764
    const { redis } = this.#uw;
×
765
    const guests = this.#connections
×
766
      .filter((connection) => connection instanceof GuestConnection)
×
UNCOV
767
      .length;
×
UNCOV
768

×
769
    const lastGuestCount = await this.getGuestCount();
×
770
    if (guests !== lastGuestCount) {
×
771
      await redis.set('http-api:guests', guests);
×
772
      this.broadcast('guests', guests);
×
773
    }
×
774
  }
×
775
}
92✔
776

1✔
777
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc