• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 11085094286

28 Sep 2024 03:39PM UTC coverage: 79.715% (-0.4%) from 80.131%
11085094286

Pull #637

github

web-flow
Merge 11ccf3b06 into 14c162f19
Pull Request #637: Switch to a relational database, closes #549

751 of 918 branches covered (81.81%)

Branch coverage included in aggregate %.

1891 of 2530 new or added lines in 50 files covered. (74.74%)

13 existing lines in 7 files now uncovered.

9191 of 11554 relevant lines covered (79.55%)

68.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.19
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import ms from 'ms';
1✔
7
import { stdSerializers } from 'pino';
1✔
8
import { socketVote } from './controllers/booth.js';
1✔
9
import { disconnectUser } from './controllers/users.js';
1✔
10
import AuthRegistry from './AuthRegistry.js';
1✔
11
import GuestConnection from './sockets/GuestConnection.js';
1✔
12
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
13
import LostConnection from './sockets/LostConnection.js';
1✔
14
import { serializeUser } from './utils/serialize.js';
1✔
15

1✔
16
const { debounce, isEmpty } = lodash;
1✔
17

1✔
18
/**
1✔
19
 * @typedef {import('./schema.js').User} User
1✔
20
 */
1✔
21

1✔
22
/**
1✔
23
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
24
 */
1✔
25

1✔
26
/**
1✔
27
 * @typedef {object} ClientActionParameters
1✔
28
 * @prop {string} sendChat
1✔
29
 * @prop {-1 | 1} vote
1✔
30
 * @prop {undefined} logout
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {{
1✔
35
 *   [Name in keyof ClientActionParameters]: (
1✔
36
 *     user: User,
1✔
37
 *     parameter: ClientActionParameters[Name],
1✔
38
 *     connection: AuthedConnection
1✔
39
 *   ) => void
1✔
40
 * }} ClientActions
1✔
41
 */
1✔
42

1✔
43
const ajv = new Ajv({
1✔
44
  coerceTypes: false,
1✔
45
  ownProperties: true,
1✔
46
  removeAdditional: true,
1✔
47
  useDefaults: false,
1✔
48
});
1✔
49

1✔
50
/**
1✔
51
 * @template {object} T
1✔
52
 * @param {T} object
1✔
53
 * @param {PropertyKey} property
1✔
54
 * @returns {property is keyof T}
1✔
55
 */
1✔
56
function has(object, property) {
61✔
57
  return Object.prototype.hasOwnProperty.call(object, property);
61✔
58
}
61✔
59

1✔
60
class SocketServer {
92✔
61
  /**
92✔
62
   * @param {import('./Uwave.js').Boot} uw
92✔
63
   * @param {{ secret: Buffer|string }} options
92✔
64
   */
92✔
65
  static async plugin(uw, options) {
92✔
66
    uw.socketServer = new SocketServer(uw, {
92✔
67
      secret: options.secret,
92✔
68
      server: uw.server,
92✔
69
    });
92✔
70

92✔
71
    uw.after(async () => {
92✔
72
      try {
92✔
73
        await uw.socketServer.initLostConnections();
92✔
74
      } catch (err) {
92!
75
        // No need to prevent startup for this.
×
76
        // We do need to clear the `users` list because the lost connection handlers
×
77
        // will not do so.
×
78
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
79
        await uw.redis.del('users');
×
80
      }
×
81
    });
92✔
82

92✔
83
    uw.onClose(async () => {
92✔
84
      await uw.socketServer.destroy();
92✔
85
    });
92✔
86
  }
92✔
87

92✔
88
  #uw;
92✔
89

92✔
90
  #logger;
92✔
91

92✔
92
  #redisSubscription;
92✔
93

92✔
94
  #wss;
92✔
95

92✔
96
  #closing = false;
92✔
97

92✔
98
  /** @type {Connection[]} */
92✔
99
  #connections = [];
92✔
100

92✔
101
  #pinger;
92✔
102

92✔
103
  /**
92✔
104
   * Update online guests count and broadcast an update if necessary.
92✔
105
   */
92✔
106
  #recountGuests;
92✔
107

92✔
108
  /**
92✔
109
   * Handlers for commands that come in from clients.
92✔
110
   *
92✔
111
   * @type {ClientActions}
92✔
112
   */
92✔
113
  #clientActions;
92✔
114

92✔
115
  /**
92✔
116
   * @type {{
92✔
117
   *   [K in keyof ClientActionParameters]:
92✔
118
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
92✔
119
   * }}
92✔
120
   */
92✔
121
  #clientActionSchemas;
92✔
122

92✔
123
  /**
92✔
124
   * Handlers for commands that come in from the server side.
92✔
125
   *
92✔
126
   * @type {import('./redisMessages.js').ServerActions}
92✔
127
   */
92✔
128
  #serverActions;
92✔
129

92✔
130
  /**
92✔
131
   * Create a socket server.
92✔
132
   *
92✔
133
   * @param {import('./Uwave.js').default} uw üWave Core instance.
92✔
134
   * @param {object} options Socket server options.
92✔
135
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
92✔
136
   *     users to reconnect before removing them.
92✔
137
   * @param {Buffer|string} options.secret
92✔
138
   * @param {import('http').Server | import('https').Server} [options.server]
92✔
139
   * @param {number} [options.port]
92✔
140
   */
92✔
141
  constructor(uw, options) {
92✔
142
    if (!uw) {
92!
143
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
144
    }
×
145

92✔
146
    this.#uw = uw;
92✔
147
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
92✔
148
      serializers: {
92✔
149
        req: stdSerializers.req,
92✔
150
      },
92✔
151
    });
92✔
152
    this.#redisSubscription = uw.redis.duplicate();
92✔
153

92✔
154
    this.options = {
92✔
155
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
92✔
156
      onError: (_socket, err) => {
92✔
157
        throw err;
×
158
      },
92✔
159
      timeout: 30,
92✔
160
      ...options,
92✔
161
    };
92✔
162

92✔
163
    // TODO put this behind a symbol, it's just public for tests
92✔
164
    this.authRegistry = new AuthRegistry(uw.redis);
92✔
165

92✔
166
    this.#wss = new WebSocketServer({
92✔
167
      server: options.server,
92✔
168
      port: options.server ? undefined : options.port,
92!
169
    });
92✔
170

92✔
171
    this.#redisSubscription.subscribe('uwave').catch((error) => {
92✔
172
      this.#logger.error(error);
1✔
173
    });
92✔
174
    this.#redisSubscription.on('message', (channel, command) => {
92✔
175
      // this returns a promise, but we don't handle the error case:
61✔
176
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
61✔
177
      this.onServerMessage(channel, command);
61✔
178
    });
92✔
179

92✔
180
    this.#wss.on('error', (error) => {
92✔
181
      this.onError(error);
×
182
    });
92✔
183
    this.#wss.on('connection', (socket, request) => {
92✔
184
      this.onSocketConnected(socket, request);
5✔
185
    });
92✔
186

92✔
187
    this.#pinger = setInterval(() => {
92✔
188
      this.ping();
×
189
    }, ms('10 seconds'));
92✔
190

92✔
191
    this.#recountGuests = debounce(() => {
92✔
192
      this.#recountGuestsInternal().catch((error) => {
×
193
        this.#logger.error({ err: error }, 'counting guests failed');
×
194
      });
×
195
    }, ms('2 seconds'));
92✔
196

92✔
197
    this.#clientActions = {
92✔
198
      sendChat: (user, message) => {
92✔
199
        this.#logger.trace({ user, message }, 'sendChat');
×
200
        this.#uw.chat.send(user, message);
×
201
      },
92✔
202
      vote: (user, direction) => {
92✔
203
        socketVote(this.#uw, user.id, direction);
×
204
      },
92✔
205
      logout: (user, _, connection) => {
92✔
206
        this.replace(connection, this.createGuestConnection(connection.socket));
×
207
        if (!this.connection(user)) {
×
NEW
208
          disconnectUser(this.#uw, user.id);
×
209
        }
×
210
      },
92✔
211
    };
92✔
212

92✔
213
    this.#clientActionSchemas = {
92✔
214
      sendChat: ajv.compile({
92✔
215
        type: 'string',
92✔
216
      }),
92✔
217
      vote: ajv.compile({
92✔
218
        type: 'integer',
92✔
219
        enum: [-1, 1],
92✔
220
      }),
92✔
221
      logout: ajv.compile(true),
92✔
222
    };
92✔
223

92✔
224
    this.#serverActions = {
92✔
225
      /**
92✔
226
       * Broadcast the next track.
92✔
227
       */
92✔
228
      'advance:complete': (next) => {
92✔
229
        if (next) {
5✔
230
          this.broadcast('advance', {
5✔
231
            historyID: next.historyID,
5✔
232
            userID: next.userID,
5✔
233
            media: next.media,
5✔
234
            playedAt: new Date(next.playedAt).getTime(),
5✔
235
          });
5✔
236
        } else {
5!
237
          this.broadcast('advance', null);
×
238
        }
×
239
      },
92✔
240
      /**
92✔
241
       * Broadcast a skip notification.
92✔
242
       */
92✔
243
      'booth:skip': ({ moderatorID, userID, reason }) => {
92✔
244
        this.broadcast('skip', { moderatorID, userID, reason });
×
245
      },
92✔
246
      /**
92✔
247
       * Broadcast a chat message.
92✔
248
       */
92✔
249
      'chat:message': (message) => {
92✔
250
        this.broadcast('chatMessage', message);
×
251
      },
92✔
252
      /**
92✔
253
       * Delete chat messages. The delete filter can have an _id property to
92✔
254
       * delete a specific message, a userID property to delete messages by a
92✔
255
       * user, or be empty to delete all messages.
92✔
256
       */
92✔
257
      'chat:delete': ({ moderatorID, filter }) => {
92✔
258
        if ('id' in filter) {
×
259
          this.broadcast('chatDeleteByID', {
×
260
            moderatorID,
×
261
            _id: filter.id,
×
262
          });
×
263
        } else if ('userID' in filter) {
×
264
          this.broadcast('chatDeleteByUser', {
×
265
            moderatorID,
×
266
            userID: filter.userID,
×
267
          });
×
268
        } else if (isEmpty(filter)) {
×
269
          this.broadcast('chatDelete', { moderatorID });
×
270
        }
×
271
      },
92✔
272
      /**
92✔
273
       * Broadcast that a user was muted in chat.
92✔
274
       */
92✔
275
      'chat:mute': ({ moderatorID, userID, duration }) => {
92✔
276
        this.broadcast('chatMute', {
×
277
          userID,
×
278
          moderatorID,
×
279
          expiresAt: Date.now() + duration,
×
280
        });
×
281
      },
92✔
282
      /**
92✔
283
       * Broadcast that a user was unmuted in chat.
92✔
284
       */
92✔
285
      'chat:unmute': ({ moderatorID, userID }) => {
92✔
286
        this.broadcast('chatUnmute', { userID, moderatorID });
×
287
      },
92✔
288
      /**
92✔
289
       * Broadcast a vote for the current track.
92✔
290
       */
92✔
291
      'booth:vote': ({ userID, direction }) => {
92✔
292
        this.broadcast('vote', {
2✔
293
          _id: userID,
2✔
294
          value: direction,
2✔
295
        });
2✔
296
      },
92✔
297
      /**
92✔
298
       * Broadcast a favorite for the current track.
92✔
299
       */
92✔
300
      'booth:favorite': ({ userID }) => {
92✔
301
        this.broadcast('favorite', { userID });
×
302
      },
92✔
303
      /**
92✔
304
       * Cycle a single user's playlist.
92✔
305
       */
92✔
306
      'playlist:cycle': ({ userID, playlistID }) => {
92✔
307
        this.sendTo(userID, 'playlistCycle', { playlistID });
5✔
308
      },
92✔
309
      /**
92✔
310
       * Broadcast that a user joined the waitlist.
92✔
311
       */
92✔
312
      'waitlist:join': ({ userID, waitlist }) => {
92✔
313
        this.broadcast('waitlistJoin', { userID, waitlist });
7✔
314
      },
92✔
315
      /**
92✔
316
       * Broadcast that a user left the waitlist.
92✔
317
       */
92✔
318
      'waitlist:leave': ({ userID, waitlist }) => {
92✔
319
        this.broadcast('waitlistLeave', { userID, waitlist });
×
320
      },
92✔
321
      /**
92✔
322
       * Broadcast that a user was added to the waitlist.
92✔
323
       */
92✔
324
      'waitlist:add': ({
92✔
325
        userID, moderatorID, position, waitlist,
1✔
326
      }) => {
1✔
327
        this.broadcast('waitlistAdd', {
1✔
328
          userID, moderatorID, position, waitlist,
1✔
329
        });
1✔
330
      },
92✔
331
      /**
92✔
332
       * Broadcast that a user was removed from the waitlist.
92✔
333
       */
92✔
334
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
92✔
335
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
336
      },
92✔
337
      /**
92✔
338
       * Broadcast that a user was moved in the waitlist.
92✔
339
       */
92✔
340
      'waitlist:move': ({
92✔
341
        userID, moderatorID, position, waitlist,
×
342
      }) => {
×
343
        this.broadcast('waitlistMove', {
×
344
          userID, moderatorID, position, waitlist,
×
345
        });
×
346
      },
92✔
347
      /**
92✔
348
       * Broadcast a waitlist update.
92✔
349
       */
92✔
350
      'waitlist:update': (waitlist) => {
92✔
351
        this.broadcast('waitlistUpdate', waitlist);
5✔
352
      },
92✔
353
      /**
92✔
354
       * Broadcast that the waitlist was cleared.
92✔
355
       */
92✔
356
      'waitlist:clear': ({ moderatorID }) => {
92✔
357
        this.broadcast('waitlistClear', { moderatorID });
×
358
      },
92✔
359
      /**
92✔
360
       * Broadcast that the waitlist was locked.
92✔
361
       */
92✔
362
      'waitlist:lock': ({ moderatorID, locked }) => {
92✔
363
        this.broadcast('waitlistLock', { moderatorID, locked });
×
364
      },
92✔
365

92✔
366
      'acl:allow': ({ userID, roles }) => {
92✔
367
        this.broadcast('acl:allow', { userID, roles });
21✔
368
      },
92✔
369
      'acl:disallow': ({ userID, roles }) => {
92✔
370
        this.broadcast('acl:disallow', { userID, roles });
1✔
371
      },
92✔
372

92✔
373
      'user:update': ({ userID, moderatorID, new: update }) => {
92✔
374
        // TODO Remove this remnant of the old roles system
×
375
        if ('role' in update) {
×
376
          this.broadcast('roleChange', {
×
377
            moderatorID,
×
378
            userID,
×
379
            role: update.role,
×
380
          });
×
381
        }
×
382
        if ('username' in update) {
×
383
          this.broadcast('nameChange', {
×
384
            moderatorID,
×
385
            userID,
×
386
            username: update.username,
×
387
          });
×
388
        }
×
389
      },
92✔
390
      'user:join': async ({ userID }) => {
92✔
391
        const { users, redis } = this.#uw;
5✔
392
        const user = await users.getUser(userID);
5✔
393
        if (user) {
5✔
394
          // TODO this should not be the socket server code's responsibility
5✔
395
          await redis.rpush('users', user.id);
5✔
396
          this.broadcast('join', serializeUser(user));
5✔
397
        }
5✔
398
      },
92✔
399
      /**
92✔
400
       * Broadcast that a user left the server.
92✔
401
       */
92✔
402
      'user:leave': ({ userID }) => {
92✔
403
        this.broadcast('leave', userID);
×
404
      },
92✔
405
      /**
92✔
406
       * Broadcast a ban event.
92✔
407
       */
92✔
408
      'user:ban': ({
92✔
409
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
410
      }) => {
3✔
411
        this.broadcast('ban', {
3✔
412
          moderatorID, userID, permanent, duration, expiresAt,
3✔
413
        });
3✔
414

3✔
415
        this.#connections.forEach((connection) => {
3✔
416
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
417
            connection.ban();
×
418
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
419
            connection.close();
×
420
          }
×
421
        });
3✔
422
      },
92✔
423
      /**
92✔
424
       * Broadcast an unban event.
92✔
425
       */
92✔
426
      'user:unban': ({ moderatorID, userID }) => {
92✔
427
        this.broadcast('unban', { moderatorID, userID });
1✔
428
      },
92✔
429
      /**
92✔
430
       * Force-close a connection.
92✔
431
       */
92✔
432
      'http-api:socket:close': (userID) => {
92✔
433
        this.#connections.forEach((connection) => {
×
434
          if ('user' in connection && connection.user.id === userID) {
×
435
            connection.close();
×
436
          }
×
437
        });
×
438
      },
92✔
439

92✔
440
      'emotes:reload': () => {
92✔
441
        this.broadcast('reloadEmotes', null);
×
442
      },
92✔
443
    };
92✔
444
  }
92✔
445

92✔
446
  /**
92✔
447
   * Create `LostConnection`s for every user that's known to be online, but that
92✔
448
   * is not currently connected to the socket server.
92✔
449
   *
92✔
450
   * @private
92✔
451
   */
92✔
452
  async initLostConnections() {
92✔
453
    const { db, redis } = this.#uw;
92✔
454
    const userIDs = /** @type {import('./schema').UserID[]} */ (await redis.lrange('users', 0, -1));
92✔
455
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
92✔
456

92✔
457
    if (disconnectedIDs.length === 0) {
92✔
458
      return;
92✔
459
    }
92✔
NEW
460

×
NEW
461
    const disconnectedUsers = await db.selectFrom('users')
×
NEW
462
      .where('id', 'in', disconnectedIDs)
×
NEW
463
      .selectAll()
×
NEW
464
      .execute();
×
465
    disconnectedUsers.forEach((user) => {
×
466
      this.add(this.createLostConnection(user));
×
UNCOV
467
    });
×
468
  }
92✔
469

92✔
470
  /**
92✔
471
   * @param {import('ws').WebSocket} socket
92✔
472
   * @param {import('http').IncomingMessage} request
92✔
473
   * @private
92✔
474
   */
92✔
475
  onSocketConnected(socket, request) {
92✔
476
    this.#logger.info({ req: request }, 'new connection');
5✔
477

5✔
478
    socket.on('error', (error) => {
5✔
479
      this.onSocketError(socket, error);
×
480
    });
5✔
481
    this.add(this.createGuestConnection(socket));
5✔
482
  }
5✔
483

92✔
484
  /**
92✔
485
   * @param {import('ws').WebSocket} socket
92✔
486
   * @param {Error} error
92✔
487
   * @private
92✔
488
   */
92✔
489
  onSocketError(socket, error) {
92✔
490
    this.#logger.warn({ err: error }, 'socket error');
×
491

×
492
    this.options.onError(socket, error);
×
493
  }
×
494

92✔
495
  /**
92✔
496
   * @param {Error} error
92✔
497
   * @private
92✔
498
   */
92✔
499
  onError(error) {
92✔
500
    this.#logger.error({ err: error }, 'server error');
×
501

×
502
    this.options.onError(undefined, error);
×
503
  }
×
504

92✔
505
  /**
92✔
506
   * Get a LostConnection for a user, if one exists.
92✔
507
   *
92✔
508
   * @param {User} user
92✔
509
   * @private
92✔
510
   */
92✔
511
  getLostConnection(user) {
92✔
512
    return this.#connections.find((connection) => (
×
513
      connection instanceof LostConnection && connection.user.id === user.id
×
514
    ));
×
515
  }
×
516

92✔
517
  /**
92✔
518
   * Create a connection instance for an unauthenticated user.
92✔
519
   *
92✔
520
   * @param {import('ws').WebSocket} socket
92✔
521
   * @private
92✔
522
   */
92✔
523
  createGuestConnection(socket) {
92✔
524
    const connection = new GuestConnection(this.#uw, socket, {
5✔
525
      authRegistry: this.authRegistry,
5✔
526
    });
5✔
527
    connection.on('close', () => {
5✔
528
      this.remove(connection);
×
529
    });
5✔
530
    connection.on('authenticate', async (user) => {
5✔
531
      const isReconnect = await connection.isReconnect(user);
5✔
532
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
5✔
533
      if (isReconnect) {
5!
534
        const previousConnection = this.getLostConnection(user);
×
535
        if (previousConnection) this.remove(previousConnection);
×
536
      }
×
537

5✔
538
      this.replace(connection, this.createAuthedConnection(socket, user));
5✔
539

5✔
540
      if (!isReconnect) {
5✔
541
        this.#uw.publish('user:join', { userID: user.id });
5✔
542
      }
5✔
543
    });
5✔
544
    return connection;
5✔
545
  }
5✔
546

92✔
547
  /**
92✔
548
   * Create a connection instance for an authenticated user.
92✔
549
   *
92✔
550
   * @param {import('ws').WebSocket} socket
92✔
551
   * @param {User} user
92✔
552
   * @returns {AuthedConnection}
92✔
553
   * @private
92✔
554
   */
92✔
555
  createAuthedConnection(socket, user) {
92✔
556
    const connection = new AuthedConnection(this.#uw, socket, user);
5✔
557
    connection.on('close', ({ banned }) => {
5✔
558
      if (banned) {
5!
559
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
NEW
560
        disconnectUser(this.#uw, user.id);
×
561
      } else if (!this.#closing) {
5!
562
        this.#logger.info({ userId: user.id }, 'lost connection');
×
563
        this.add(this.createLostConnection(user));
×
564
      }
×
565
      this.remove(connection);
5✔
566
    });
5✔
567
    connection.on(
5✔
568
      'command',
5✔
569
      /**
5✔
570
       * @param {string} command
5✔
571
       * @param {import('type-fest').JsonValue} data
5✔
572
       */
5✔
573
      (command, data) => {
5✔
574
        this.#logger.trace({ userId: user.id, command, data }, 'command');
×
575
        if (has(this.#clientActions, command)) {
×
576
          // Ignore incorrect input
×
577
          const validate = this.#clientActionSchemas[command];
×
578
          if (validate && !validate(data)) {
×
579
            return;
×
580
          }
×
581

×
582
          const action = this.#clientActions[command];
×
583
          // @ts-expect-error TS2345 `data` is validated
×
584
          action(user, data, connection);
×
585
        }
×
586
      },
5✔
587
    );
5✔
588
    return connection;
5✔
589
  }
5✔
590

92✔
591
  /**
92✔
592
   * Create a connection instance for a user who disconnected.
92✔
593
   *
92✔
594
   * @param {User} user
92✔
595
   * @returns {LostConnection}
92✔
596
   * @private
92✔
597
   */
92✔
598
  createLostConnection(user) {
92✔
599
    const connection = new LostConnection(this.#uw, user, this.options.timeout);
×
600
    connection.on('close', () => {
×
601
      this.#logger.info({ userId: user.id }, 'user left');
×
602
      this.remove(connection);
×
603
      // Only register that the user left if they didn't have another connection
×
604
      // still open.
×
605
      if (!this.connection(user)) {
×
NEW
606
        disconnectUser(this.#uw, user.id);
×
607
      }
×
608
    });
×
609
    return connection;
×
610
  }
×
611

92✔
612
  /**
92✔
613
   * Add a connection.
92✔
614
   *
92✔
615
   * @param {Connection} connection
92✔
616
   * @private
92✔
617
   */
92✔
618
  add(connection) {
92✔
619
    const userId = 'user' in connection ? connection.user.id : null;
10✔
620
    this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');
10✔
621

10✔
622
    this.#connections.push(connection);
10✔
623
    this.#recountGuests();
10✔
624
  }
10✔
625

92✔
626
  /**
92✔
627
   * Remove a connection.
92✔
628
   *
92✔
629
   * @param {Connection} connection
92✔
630
   * @private
92✔
631
   */
92✔
632
  remove(connection) {
92✔
633
    const userId = 'user' in connection ? connection.user.id : null;
10✔
634
    this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');
10✔
635

10✔
636
    const i = this.#connections.indexOf(connection);
10✔
637
    this.#connections.splice(i, 1);
10✔
638

10✔
639
    connection.removed();
10✔
640
    this.#recountGuests();
10✔
641
  }
10✔
642

92✔
643
  /**
92✔
644
   * Replace a connection instance with another connection instance. Useful when
92✔
645
   * a connection changes "type", like GuestConnection → AuthedConnection.
92✔
646
   *
92✔
647
   * @param {Connection} oldConnection
92✔
648
   * @param {Connection} newConnection
92✔
649
   * @private
92✔
650
   */
92✔
651
  replace(oldConnection, newConnection) {
92✔
652
    this.remove(oldConnection);
5✔
653
    this.add(newConnection);
5✔
654
  }
5✔
655

92✔
656
  /**
92✔
657
   * Handle command messages coming in from Redis.
92✔
658
   * Some commands are intended to broadcast immediately to all connected
92✔
659
   * clients, but others require special action.
92✔
660
   *
92✔
661
   * @param {string} channel
92✔
662
   * @param {string} rawCommand
92✔
663
   * @returns {Promise<void>}
92✔
664
   * @private
92✔
665
   */
92✔
666
  async onServerMessage(channel, rawCommand) {
92✔
667
    /**
61✔
668
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
61✔
669
     */
61✔
670
    const json = sjson.safeParse(rawCommand);
61✔
671
    if (!json) {
61!
672
      return;
×
673
    }
×
674
    const { command, data } = json;
61✔
675

61✔
676
    this.#logger.trace({ channel, command, data }, 'server message');
61✔
677

61✔
678
    if (has(this.#serverActions, command)) {
61✔
679
      const action = this.#serverActions[command];
56✔
680
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
56✔
681
        // @ts-expect-error TS2345 `data` is validated
56✔
682
        action(data);
56✔
683
      }
56✔
684
    }
56✔
685
  }
61✔
686

92✔
687
  /**
92✔
688
   * Stop the socket server.
92✔
689
   *
92✔
690
   * @returns {Promise<void>}
92✔
691
   */
92✔
692
  async destroy() {
92✔
693
    clearInterval(this.#pinger);
92✔
694

92✔
695
    this.#closing = true;
92✔
696
    for (const connection of this.#wss.clients) {
92✔
697
      connection.close();
5✔
698
    }
5✔
699

92✔
700
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
92✔
701
    await closeWsServer();
92✔
702
    await this.#redisSubscription.quit();
92✔
703

92✔
704
    this.#recountGuests.cancel();
92✔
705
  }
92✔
706

92✔
707
  /**
92✔
708
   * Get the connection instance for a specific user.
92✔
709
   *
92✔
710
   * @param {User|string} user The user.
92✔
711
   * @returns {Connection|undefined}
92✔
712
   */
92✔
713
  connection(user) {
92✔
714
    const userID = typeof user === 'object' ? user.id : user;
×
715
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
×
716
  }
×
717

92✔
718
  ping() {
92✔
719
    this.#connections.forEach((connection) => {
×
720
      if ('socket' in connection) {
×
721
        connection.ping();
×
722
      }
×
723
    });
×
724
  }
×
725

92✔
726
  /**
92✔
727
   * Broadcast a command to all connected clients.
92✔
728
   *
92✔
729
   * @param {string} command Command name.
92✔
730
   * @param {import('type-fest').JsonValue} data Command data.
92✔
731
   */
92✔
732
  broadcast(command, data) {
92✔
733
    this.#logger.trace({
51✔
734
      command,
51✔
735
      data,
51✔
736
      to: this.#connections.map((connection) => (
51✔
737
        'user' in connection ? connection.user.id : null
20!
738
      )),
51✔
739
    }, 'broadcast');
51✔
740

51✔
741
    this.#connections.forEach((connection) => {
51✔
742
      connection.send(command, data);
20✔
743
    });
51✔
744
  }
51✔
745

92✔
746
  /**
92✔
747
   * Send a command to a single user.
92✔
748
   *
92✔
749
   * @param {User|string} user User or user ID to send the command to.
92✔
750
   * @param {string} command Command name.
92✔
751
   * @param {import('type-fest').JsonValue} data Command data.
92✔
752
   */
92✔
753
  sendTo(user, command, data) {
92✔
754
    const userID = typeof user === 'object' ? user.id : user;
5!
755

5✔
756
    this.#connections.forEach((connection) => {
5✔
757
      if ('user' in connection && connection.user.id === userID) {
4✔
758
        connection.send(command, data);
3✔
759
      }
3✔
760
    });
5✔
761
  }
5✔
762

92✔
763
  async getGuestCount() {
92✔
764
    const { redis } = this.#uw;
×
765
    const rawCount = await redis.get('http-api:guests');
×
766
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
767
      return 0;
×
768
    }
×
769
    return parseInt(rawCount, 10);
×
770
  }
×
771

92✔
772
  async #recountGuestsInternal() {
92✔
773
    const { redis } = this.#uw;
×
774
    const guests = this.#connections
×
775
      .filter((connection) => connection instanceof GuestConnection)
×
776
      .length;
×
777

×
778
    const lastGuestCount = await this.getGuestCount();
×
779
    if (guests !== lastGuestCount) {
×
780
      await redis.set('http-api:guests', guests);
×
781
      this.broadcast('guests', guests);
×
782
    }
×
783
  }
×
784
}
92✔
785

1✔
786
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc