• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 12038920397

26 Nov 2024 08:59PM UTC coverage: 82.71% (-0.06%) from 82.772%
12038920397

Pull #675

github

web-flow
Merge f3558602c into 5099496ff
Pull Request #675: Make the migration script more robust

850 of 1025 branches covered (82.93%)

Branch coverage included in aggregate %.

0 of 34 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

9588 of 11595 relevant lines covered (82.69%)

84.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.92
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
/**
1✔
19
 * @typedef {import('./schema.js').User} User
1✔
20
 */
1✔
21

1✔
22
/**
1✔
23
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
24
 */
1✔
25

1✔
26
/**
1✔
27
 * @typedef {object} ClientActionParameters
1✔
28
 * @prop {string} sendChat
1✔
29
 * @prop {-1 | 1} vote
1✔
30
 * @prop {undefined} logout
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {{
1✔
35
 *   [Name in keyof ClientActionParameters]: (
1✔
36
 *     user: User,
1✔
37
 *     parameter: ClientActionParameters[Name],
1✔
38
 *     connection: AuthedConnection
1✔
39
 *   ) => void
1✔
40
 * }} ClientActions
1✔
41
 */
1✔
42

1✔
43
const ajv = new Ajv({
1✔
44
  coerceTypes: false,
1✔
45
  ownProperties: true,
1✔
46
  removeAdditional: true,
1✔
47
  useDefaults: false,
1✔
48
});
1✔
49

1✔
50
/**
1✔
51
 * @template {object} T
1✔
52
 * @param {T} object
1✔
53
 * @param {PropertyKey} property
1✔
54
 * @returns {property is keyof T}
1✔
55
 */
1✔
56
function has(object, property) {
160✔
57
  return Object.prototype.hasOwnProperty.call(object, property);
160✔
58
}
160✔
59

1✔
60
class SocketServer {
132✔
61
  /**
132✔
62
   * @param {import('./Uwave.js').Boot} uw
132✔
63
   * @param {{ secret: Buffer|string }} options
132✔
64
   */
132✔
65
  static async plugin(uw, options) {
132✔
66
    uw.socketServer = new SocketServer(uw, {
132✔
67
      secret: options.secret,
132✔
68
      server: uw.server,
132✔
69
    });
132✔
70

132✔
71
    uw.after(async () => {
132✔
72
      try {
132✔
73
        await uw.socketServer.initLostConnections();
132✔
74
      } catch (err) {
132!
75
        // No need to prevent startup for this.
×
76
        // We do need to clear the `users` list because the lost connection handlers
×
77
        // will not do so.
×
78
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
79
        await uw.redis.del('users');
×
80
      }
×
81
    });
132✔
82

132✔
83
    uw.onClose(async () => {
132✔
84
      await uw.socketServer.destroy();
132✔
85
    });
132✔
86
  }
132✔
87

132✔
88
  #uw;
132✔
89

132✔
90
  #logger;
132✔
91

132✔
92
  #redisSubscription;
132✔
93

132✔
94
  #wss;
132✔
95

132✔
96
  #closing = false;
132✔
97

132✔
98
  /** @type {Connection[]} */
132✔
99
  #connections = [];
132✔
100

132✔
101
  #pinger;
132✔
102

132✔
103
  /**
132✔
104
   * Update online guests count and broadcast an update if necessary.
132✔
105
   */
132✔
106
  #recountGuests;
132✔
107

132✔
108
  /**
132✔
109
   * Handlers for commands that come in from clients.
132✔
110
   *
132✔
111
   * @type {ClientActions}
132✔
112
   */
132✔
113
  #clientActions;
132✔
114

132✔
115
  /**
132✔
116
   * @type {{
132✔
117
   *   [K in keyof ClientActionParameters]:
132✔
118
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
132✔
119
   * }}
132✔
120
   */
132✔
121
  #clientActionSchemas;
132✔
122

132✔
123
  /**
132✔
124
   * Handlers for commands that come in from the server side.
132✔
125
   *
132✔
126
   * @type {import('./redisMessages.js').ServerActions}
132✔
127
   */
132✔
128
  #serverActions;
132✔
129

132✔
130
  /**
132✔
131
   * Create a socket server.
132✔
132
   *
132✔
133
   * @param {import('./Uwave.js').default} uw üWave Core instance.
132✔
134
   * @param {object} options Socket server options.
132✔
135
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
132✔
136
   *     users to reconnect before removing them.
132✔
137
   * @param {Buffer|string} options.secret
132✔
138
   * @param {import('http').Server | import('https').Server} [options.server]
132✔
139
   * @param {number} [options.port]
132✔
140
   */
132✔
141
  constructor(uw, options) {
132✔
142
    if (!uw) {
132!
143
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
144
    }
×
145

132✔
146
    this.#uw = uw;
132✔
147
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
132✔
148
      serializers: {
132✔
149
        req: stdSerializers.req,
132✔
150
      },
132✔
151
    });
132✔
152
    this.#redisSubscription = uw.redis.duplicate();
132✔
153

132✔
154
    this.options = {
132✔
155
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
132✔
156
      onError: (_socket, err) => {
132✔
157
        throw err;
×
158
      },
132✔
159
      timeout: 30,
132✔
160
      ...options,
132✔
161
    };
132✔
162

132✔
163
    // TODO put this behind a symbol, it's just public for tests
132✔
164
    this.authRegistry = new AuthRegistry(uw.redis);
132✔
165

132✔
166
    this.#wss = new WebSocketServer({
132✔
167
      server: options.server,
132✔
168
      port: options.server ? undefined : options.port,
132!
169
    });
132✔
170

132✔
171
    this.#redisSubscription.subscribe('uwave').catch((error) => {
132✔
UNCOV
172
      this.#logger.error(error);
×
173
    });
132✔
174
    this.#redisSubscription.on('message', (channel, command) => {
132✔
175
      // this returns a promise, but we don't handle the error case:
157✔
176
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
157✔
177
      this.onServerMessage(channel, command);
157✔
178
    });
132✔
179

132✔
180
    this.#wss.on('error', (error) => {
132✔
181
      this.onError(error);
×
182
    });
132✔
183
    this.#wss.on('connection', (socket, request) => {
132✔
184
      this.onSocketConnected(socket, request);
17✔
185
    });
132✔
186

132✔
187
    this.#pinger = setInterval(() => {
132✔
188
      this.ping();
×
189
    }, ms('10 seconds'));
132✔
190

132✔
191
    this.#recountGuests = debounce(() => {
132✔
192
      this.#recountGuestsInternal().catch((error) => {
×
193
        this.#logger.error({ err: error }, 'counting guests failed');
×
194
      });
×
195
    }, ms('2 seconds'));
132✔
196

132✔
197
    this.#clientActions = {
132✔
198
      sendChat: (user, message) => {
132✔
199
        this.#logger.trace({ user, message }, 'sendChat');
3✔
200
        this.#uw.chat.send(user, message);
3✔
201
      },
132✔
202
      vote: (user, direction) => {
132✔
203
        socketVote(this.#uw, user.id, direction);
×
204
      },
132✔
205
      logout: (user, _, connection) => {
132✔
206
        this.replace(connection, this.createGuestConnection(connection.socket));
×
207
        if (!this.connection(user)) {
×
208
          disconnectUser(this.#uw, user.id);
×
209
        }
×
210
      },
132✔
211
    };
132✔
212

132✔
213
    this.#clientActionSchemas = {
132✔
214
      sendChat: ajv.compile({
132✔
215
        type: 'string',
132✔
216
      }),
132✔
217
      vote: ajv.compile({
132✔
218
        type: 'integer',
132✔
219
        enum: [-1, 1],
132✔
220
      }),
132✔
221
      logout: ajv.compile(true),
132✔
222
    };
132✔
223

132✔
224
    this.#serverActions = {
132✔
225
      /**
132✔
226
       * Broadcast the next track.
132✔
227
       */
132✔
228
      'advance:complete': (next) => {
132✔
229
        if (next) {
11✔
230
          this.broadcast('advance', {
11✔
231
            historyID: next.historyID,
11✔
232
            userID: next.userID,
11✔
233
            media: next.media,
11✔
234
            playedAt: new Date(next.playedAt).getTime(),
11✔
235
          });
11✔
236
        } else {
11!
237
          this.broadcast('advance', null);
×
238
        }
×
239
      },
132✔
240
      /**
132✔
241
       * Broadcast a skip notification.
132✔
242
       */
132✔
243
      'booth:skip': ({ moderatorID, userID, reason }) => {
132✔
244
        this.broadcast('skip', { moderatorID, userID, reason });
×
245
      },
132✔
246
      /**
132✔
247
       * Broadcast a chat message.
132✔
248
       */
132✔
249
      'chat:message': (message) => {
132✔
250
        this.broadcast('chatMessage', message);
2✔
251
      },
132✔
252
      /**
132✔
253
       * Delete chat messages. The delete filter can have an _id property to
132✔
254
       * delete a specific message, a userID property to delete messages by a
132✔
255
       * user, or be empty to delete all messages.
132✔
256
       */
132✔
257
      'chat:delete': ({ moderatorID, filter }) => {
132✔
258
        if ('id' in filter) {
6✔
259
          this.broadcast('chatDeleteByID', {
2✔
260
            moderatorID,
2✔
261
            _id: filter.id,
2✔
262
          });
2✔
263
        } else if ('userID' in filter) {
6✔
264
          this.broadcast('chatDeleteByUser', {
2✔
265
            moderatorID,
2✔
266
            userID: filter.userID,
2✔
267
          });
2✔
268
        } else if (isEmpty(filter)) {
2✔
269
          this.broadcast('chatDelete', { moderatorID });
2✔
270
        }
2✔
271
      },
132✔
272
      /**
132✔
273
       * Broadcast that a user was muted in chat.
132✔
274
       */
132✔
275
      'chat:mute': ({ moderatorID, userID, duration }) => {
132✔
276
        this.broadcast('chatMute', {
1✔
277
          userID,
1✔
278
          moderatorID,
1✔
279
          expiresAt: Date.now() + duration,
1✔
280
        });
1✔
281
      },
132✔
282
      /**
132✔
283
       * Broadcast that a user was unmuted in chat.
132✔
284
       */
132✔
285
      'chat:unmute': ({ moderatorID, userID }) => {
132✔
286
        this.broadcast('chatUnmute', { userID, moderatorID });
×
287
      },
132✔
288
      /**
132✔
289
       * Broadcast a vote for the current track.
132✔
290
       */
132✔
291
      'booth:vote': ({ userID, direction }) => {
132✔
292
        this.broadcast('vote', {
2✔
293
          _id: userID,
2✔
294
          value: direction,
2✔
295
        });
2✔
296
      },
132✔
297
      /**
132✔
298
       * Broadcast a favorite for the current track.
132✔
299
       */
132✔
300
      'booth:favorite': ({ userID }) => {
132✔
301
        this.broadcast('favorite', { userID });
×
302
      },
132✔
303
      /**
132✔
304
       * Cycle a single user's playlist.
132✔
305
       */
132✔
306
      'playlist:cycle': ({ userID, playlistID }) => {
132✔
307
        this.sendTo(userID, 'playlistCycle', { playlistID });
11✔
308
      },
132✔
309
      /**
132✔
310
       * Broadcast that a user joined the waitlist.
132✔
311
       */
132✔
312
      'waitlist:join': ({ userID, waitlist }) => {
132✔
313
        this.broadcast('waitlistJoin', { userID, waitlist });
13✔
314
      },
132✔
315
      /**
132✔
316
       * Broadcast that a user left the waitlist.
132✔
317
       */
132✔
318
      'waitlist:leave': ({ userID, waitlist }) => {
132✔
319
        this.broadcast('waitlistLeave', { userID, waitlist });
×
320
      },
132✔
321
      /**
132✔
322
       * Broadcast that a user was added to the waitlist.
132✔
323
       */
132✔
324
      'waitlist:add': ({
132✔
325
        userID, moderatorID, position, waitlist,
1✔
326
      }) => {
1✔
327
        this.broadcast('waitlistAdd', {
1✔
328
          userID, moderatorID, position, waitlist,
1✔
329
        });
1✔
330
      },
132✔
331
      /**
132✔
332
       * Broadcast that a user was removed from the waitlist.
132✔
333
       */
132✔
334
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
132✔
335
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
336
      },
132✔
337
      /**
132✔
338
       * Broadcast that a user was moved in the waitlist.
132✔
339
       */
132✔
340
      'waitlist:move': ({
132✔
341
        userID, moderatorID, position, waitlist,
×
342
      }) => {
×
343
        this.broadcast('waitlistMove', {
×
344
          userID, moderatorID, position, waitlist,
×
345
        });
×
346
      },
132✔
347
      /**
132✔
348
       * Broadcast a waitlist update.
132✔
349
       */
132✔
350
      'waitlist:update': (waitlist) => {
132✔
351
        this.broadcast('waitlistUpdate', waitlist);
11✔
352
      },
132✔
353
      /**
132✔
354
       * Broadcast that the waitlist was cleared.
132✔
355
       */
132✔
356
      'waitlist:clear': ({ moderatorID }) => {
132✔
357
        this.broadcast('waitlistClear', { moderatorID });
×
358
      },
132✔
359
      /**
132✔
360
       * Broadcast that the waitlist was locked.
132✔
361
       */
132✔
362
      'waitlist:lock': ({ moderatorID, locked }) => {
132✔
363
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
364
      },
132✔
365

132✔
366
      'acl:allow': ({ userID, roles }) => {
132✔
367
        this.broadcast('acl:allow', { userID, roles });
50✔
368
      },
132✔
369
      'acl:disallow': ({ userID, roles }) => {
132✔
370
        this.broadcast('acl:disallow', { userID, roles });
2✔
371
      },
132✔
372

132✔
373
      'user:update': ({ userID, moderatorID, new: update }) => {
132✔
374
        // TODO Remove this remnant of the old roles system
3✔
375
        if ('role' in update) {
3!
376
          this.broadcast('roleChange', {
×
377
            moderatorID,
×
378
            userID,
×
379
            role: update.role,
×
380
          });
×
381
        }
×
382
        if ('username' in update) {
3✔
383
          this.broadcast('nameChange', {
3✔
384
            moderatorID,
3✔
385
            userID,
3✔
386
            username: update.username,
3✔
387
          });
3✔
388
        }
3✔
389
      },
132✔
390
      'user:join': async ({ userID }) => {
132✔
391
        const { users, redis } = this.#uw;
17✔
392
        const user = await users.getUser(userID);
17✔
393
        if (user) {
17✔
394
          // TODO this should not be the socket server code's responsibility
17✔
395
          await redis.rpush('users', user.id);
17✔
396
          this.broadcast('join', serializeUser(user));
17✔
397
        }
17✔
398
      },
132✔
399
      /**
132✔
400
       * Broadcast that a user left the server.
132✔
401
       */
132✔
402
      'user:leave': ({ userID }) => {
132✔
403
        this.broadcast('leave', userID);
×
404
      },
132✔
405
      /**
132✔
406
       * Broadcast a ban event.
132✔
407
       */
132✔
408
      'user:ban': ({
132✔
409
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
410
      }) => {
3✔
411
        this.broadcast('ban', {
3✔
412
          moderatorID, userID, permanent, duration, expiresAt,
3✔
413
        });
3✔
414

3✔
415
        this.#connections.forEach((connection) => {
3✔
416
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
417
            connection.ban();
×
418
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
419
            connection.close();
×
420
          }
×
421
        });
3✔
422
      },
132✔
423
      /**
132✔
424
       * Broadcast an unban event.
132✔
425
       */
132✔
426
      'user:unban': ({ moderatorID, userID }) => {
132✔
427
        this.broadcast('unban', { moderatorID, userID });
1✔
428
      },
132✔
429
      /**
132✔
430
       * Force-close a connection.
132✔
431
       */
132✔
432
      'http-api:socket:close': (userID) => {
132✔
433
        this.#connections.forEach((connection) => {
×
434
          if ('user' in connection && connection.user.id === userID) {
×
435
            connection.close();
×
436
          }
×
437
        });
×
438
      },
132✔
439

132✔
440
      'emotes:reload': () => {
132✔
441
        this.broadcast('reloadEmotes', null);
×
442
      },
132✔
443
    };
132✔
444
  }
132✔
445

132✔
446
  /**
132✔
447
   * Create `LostConnection`s for every user that's known to be online, but that
132✔
448
   * is not currently connected to the socket server.
132✔
449
   *
132✔
450
   * @private
132✔
451
   */
132✔
452
  async initLostConnections() {
132✔
453
    const { db, redis } = this.#uw;
132✔
454
    const userIDs = /** @type {import('./schema').UserID[]} */ (await redis.lrange('users', 0, -1));
132✔
455
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
132✔
456

132✔
457
    if (disconnectedIDs.length === 0) {
132✔
458
      return;
132✔
459
    }
132✔
460

×
461
    const disconnectedUsers = await db.selectFrom('users')
×
462
      .where('id', 'in', disconnectedIDs)
×
463
      .selectAll()
×
464
      .execute();
×
465
    disconnectedUsers.forEach((user) => {
×
466
      this.add(this.createLostConnection(user));
×
467
    });
×
468
  }
132✔
469

132✔
470
  /**
132✔
471
   * @param {import('ws').WebSocket} socket
132✔
472
   * @param {import('http').IncomingMessage} request
132✔
473
   * @private
132✔
474
   */
132✔
475
  onSocketConnected(socket, request) {
132✔
476
    this.#logger.info({ req: request }, 'new connection');
17✔
477

17✔
478
    socket.on('error', (error) => {
17✔
479
      this.onSocketError(socket, error);
×
480
    });
17✔
481
    this.add(this.createGuestConnection(socket));
17✔
482
  }
17✔
483

132✔
484
  /**
132✔
485
   * @param {import('ws').WebSocket} socket
132✔
486
   * @param {Error} error
132✔
487
   * @private
132✔
488
   */
132✔
489
  onSocketError(socket, error) {
132✔
490
    this.#logger.warn({ err: error }, 'socket error');
×
491

×
492
    this.options.onError(socket, error);
×
493
  }
×
494

132✔
495
  /**
132✔
496
   * @param {Error} error
132✔
497
   * @private
132✔
498
   */
132✔
499
  onError(error) {
132✔
500
    this.#logger.error({ err: error }, 'server error');
×
501

×
502
    this.options.onError(undefined, error);
×
503
  }
×
504

132✔
505
  /**
132✔
506
   * Get a LostConnection for a user, if one exists.
132✔
507
   *
132✔
508
   * @param {User} user
132✔
509
   * @private
132✔
510
   */
132✔
511
  getLostConnection(user) {
132✔
512
    return this.#connections.find((connection) => (
×
513
      connection instanceof LostConnection && connection.user.id === user.id
×
514
    ));
×
515
  }
×
516

132✔
517
  /**
132✔
518
   * Create a connection instance for an unauthenticated user.
132✔
519
   *
132✔
520
   * @param {import('ws').WebSocket} socket
132✔
521
   * @private
132✔
522
   */
132✔
523
  createGuestConnection(socket) {
132✔
524
    const connection = new GuestConnection(this.#uw, socket, {
17✔
525
      authRegistry: this.authRegistry,
17✔
526
    });
17✔
527
    connection.on('close', () => {
17✔
528
      this.remove(connection);
×
529
    });
17✔
530
    connection.on('authenticate', async (user) => {
17✔
531
      const isReconnect = await connection.isReconnect(user);
17✔
532
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
17✔
533
      if (isReconnect) {
17!
534
        const previousConnection = this.getLostConnection(user);
×
535
        if (previousConnection) this.remove(previousConnection);
×
536
      }
×
537

17✔
538
      this.replace(connection, this.createAuthedConnection(socket, user));
17✔
539

17✔
540
      if (!isReconnect) {
17✔
541
        this.#uw.publish('user:join', { userID: user.id });
17✔
542
      }
17✔
543
    });
17✔
544
    return connection;
17✔
545
  }
17✔
546

132✔
547
  /**
132✔
548
   * Create a connection instance for an authenticated user.
132✔
549
   *
132✔
550
   * @param {import('ws').WebSocket} socket
132✔
551
   * @param {User} user
132✔
552
   * @returns {AuthedConnection}
132✔
553
   * @private
132✔
554
   */
132✔
555
  createAuthedConnection(socket, user) {
132✔
556
    const connection = new AuthedConnection(this.#uw, socket, user);
17✔
557
    connection.on('close', ({ banned }) => {
17✔
558
      if (banned) {
17!
559
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
560
        disconnectUser(this.#uw, user.id);
×
561
      } else if (!this.#closing) {
17!
562
        this.#logger.info({ userId: user.id }, 'lost connection');
×
563
        this.add(this.createLostConnection(user));
×
564
      }
×
565
      this.remove(connection);
17✔
566
    });
17✔
567
    connection.on(
17✔
568
      'command',
17✔
569
      /**
17✔
570
       * @param {string} command
17✔
571
       * @param {import('type-fest').JsonValue} data
17✔
572
       */
17✔
573
      (command, data) => {
17✔
574
        this.#logger.trace({ userId: user.id, command, data }, 'command');
3✔
575
        if (has(this.#clientActions, command)) {
3✔
576
          // Ignore incorrect input
3✔
577
          const validate = this.#clientActionSchemas[command];
3✔
578
          if (validate && !validate(data)) {
3!
579
            return;
×
580
          }
×
581

3✔
582
          const action = this.#clientActions[command];
3✔
583
          // @ts-expect-error TS2345 `data` is validated
3✔
584
          action(user, data, connection);
3✔
585
        }
3✔
586
      },
17✔
587
    );
17✔
588
    return connection;
17✔
589
  }
17✔
590

132✔
591
  /**
132✔
592
   * Create a connection instance for a user who disconnected.
132✔
593
   *
132✔
594
   * @param {User} user
132✔
595
   * @returns {LostConnection}
132✔
596
   * @private
132✔
597
   */
132✔
598
  createLostConnection(user) {
132✔
599
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
600
    connection.on('close', () => {
×
601
      this.#logger.info({ userId: user.id }, 'user left');
×
602
      this.remove(connection);
×
603
      // Only register that the user left if they didn't have another connection
×
604
      // still open.
×
605
      if (!this.connection(user)) {
×
606
        disconnectUser(this.#uw, user.id);
×
607
      }
×
608
    });
×
609
    return connection;
×
610
  }
×
611

132✔
612
  /**
132✔
613
   * Add a connection.
132✔
614
   *
132✔
615
   * @param {Connection} connection
132✔
616
   * @private
132✔
617
   */
132✔
618
  add(connection) {
132✔
619
    const userId = 'user' in connection ? connection.user.id : null;
34✔
620
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
34✔
621

34✔
622
    this.#connections.push(connection);
34✔
623
    this.#recountGuests();
34✔
624
  }
34✔
625

132✔
626
  /**
132✔
627
   * Remove a connection.
132✔
628
   *
132✔
629
   * @param {Connection} connection
132✔
630
   * @private
132✔
631
   */
132✔
632
  remove(connection) {
132✔
633
    const userId = 'user' in connection ? connection.user.id : null;
34✔
634
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
34✔
635

34✔
636
    const i = this.#connections.indexOf(connection);
34✔
637
    this.#connections.splice(i, 1);
34✔
638

34✔
639
    connection.removed();
34✔
640
    this.#recountGuests();
34✔
641
  }
34✔
642

132✔
643
  /**
132✔
644
   * Replace a connection instance with another connection instance. Useful when
132✔
645
   * a connection changes "type", like GuestConnection → AuthedConnection.
132✔
646
   *
132✔
647
   * @param {Connection} oldConnection
132✔
648
   * @param {Connection} newConnection
132✔
649
   * @private
132✔
650
   */
132✔
651
  replace(oldConnection, newConnection) {
132✔
652
    this.remove(oldConnection);
17✔
653
    this.add(newConnection);
17✔
654
  }
17✔
655

132✔
656
  /**
132✔
657
   * Handle command messages coming in from Redis.
132✔
658
   * Some commands are intended to broadcast immediately to all connected
132✔
659
   * clients, but others require special action.
132✔
660
   *
132✔
661
   * @param {string} channel
132✔
662
   * @param {string} rawCommand
132✔
663
   * @returns {Promise<void>}
132✔
664
   * @private
132✔
665
   */
132✔
666
  async onServerMessage(channel, rawCommand) {
132✔
667
    /**
157✔
668
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
157✔
669
     */
157✔
670
    const json = sjson.safeParse(rawCommand);
157✔
671
    if (!json) {
157!
672
      return;
×
673
    }
×
674
    const { command, data } = json;
157✔
675

157✔
676
    this.#logger.trace({ channel, command, data }, 'server message');
157✔
677

157✔
678
    if (has(this.#serverActions, command)) {
157✔
679
      const action = this.#serverActions[command];
140✔
680
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
140✔
681
        // @ts-expect-error TS2345 `data` is validated
140✔
682
        action(data);
140✔
683
      }
140✔
684
    }
140✔
685
  }
157✔
686

132✔
687
  /**
132✔
688
   * Stop the socket server.
132✔
689
   *
132✔
690
   * @returns {Promise<void>}
132✔
691
   */
132✔
692
  async destroy() {
132✔
693
    clearInterval(this.#pinger);
132✔
694

132✔
695
    this.#closing = true;
132✔
696
    for (const connection of this.#wss.clients) {
132✔
697
      connection.close();
17✔
698
    }
17✔
699

132✔
700
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
132✔
701
    await closeWsServer();
132✔
702
    await this.#redisSubscription.quit();
132✔
703

132✔
704
    this.#recountGuests.cancel();
132✔
705
  }
132✔
706

132✔
707
  /**
132✔
708
   * Get the connection instance for a specific user.
132✔
709
   *
132✔
710
   * @param {User|string} user The user.
132✔
711
   * @returns {Connection|undefined}
132✔
712
   */
132✔
713
  connection(user) {
132✔
714
    const userID = typeof user === 'object' ? user.id : user;
×
715
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
716
  }
×
717

132✔
718
  ping() {
132✔
719
    this.#connections.forEach((connection) => {
×
720
      if ('socket' in connection) {
×
721
        connection.ping();
×
722
      }
×
723
    });
×
724
  }
×
725

132✔
726
  /**
132✔
727
   * Broadcast a command to all connected clients.
132✔
728
   *
132✔
729
   * @param {string} command Command name.
132✔
730
   * @param {import('type-fest').JsonValue} data Command data.
132✔
731
   */
132✔
732
  broadcast(command, data) {
132✔
733
    this.#logger.trace({
129✔
734
      command,
129✔
735
      data,
129✔
736
      to: this.#connections.map((connection) => (
129✔
737
        'user' in connection ? connection.user.id : null
68!
738
      )),
129✔
739
    }, 'broadcast');
129✔
740

129✔
741
    this.#connections.forEach((connection) => {
129✔
742
      connection.send(command, data);
68✔
743
    });
129✔
744
  }
129✔
745

132✔
746
  /**
132✔
747
   * Send a command to a single user.
132✔
748
   *
132✔
749
   * @param {User|string} user User or user ID to send the command to.
132✔
750
   * @param {string} command Command name.
132✔
751
   * @param {import('type-fest').JsonValue} data Command data.
132✔
752
   */
132✔
753
  sendTo(user, command, data) {
132✔
754
    const userID = typeof user === 'object' ? user.id : user;
11!
755

11✔
756
    this.#connections.forEach((connection) => {
11✔
757
      if ('user' in connection && connection.user.id === userID) {
11✔
758
        connection.send(command, data);
9✔
759
      }
9✔
760
    });
11✔
761
  }
11✔
762

132✔
763
  async getGuestCount() {
132✔
764
    const { redis } = this.#uw;
×
765
    const rawCount = await redis.get('http-api:guests');
×
766
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
767
      return 0;
×
768
    }
×
769
    return parseInt(rawCount, 10);
×
770
  }
×
771

132✔
772
  async #recountGuestsInternal() {
132✔
773
    const { redis } = this.#uw;
×
774
    const guests = this.#connections
×
775
      .filter((connection) => connection instanceof GuestConnection)
×
776
      .length;
×
777

×
778
    const lastGuestCount = await this.getGuestCount();
×
779
    if (guests !== lastGuestCount) {
×
780
      await redis.set('http-api:guests', guests);
×
781
      this.broadcast('guests', guests);
×
782
    }
×
783
  }
×
784
}
132✔
785

1✔
786
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc