• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 20159741168

12 Dec 2025 07:35AM UTC coverage: 86.181% (-0.001%) from 86.182%
20159741168

Pull #727

github

web-flow
Merge 97ffcb5a1 into 78131da9c
Pull Request #727: Store maybe-missed socket messages in SQLite

984 of 1174 branches covered (83.82%)

Branch coverage included in aggregate %.

123 of 143 new or added lines in 5 files covered. (86.01%)

24 existing lines in 3 files now uncovered.

10273 of 11888 relevant lines covered (86.41%)

98.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.22
/src/SocketServer.js
1
import { promisify } from 'node:util';
1✔
2
import lodash from 'lodash';
1✔
3
import sjson from 'secure-json-parse';
1✔
4
import { WebSocketServer } from 'ws';
1✔
5
import Ajv from 'ajv';
1✔
6
import { stdSerializers } from 'pino';
1✔
7
import { socketVote } from './controllers/booth.js';
1✔
8
import { disconnectUser } from './controllers/users.js';
1✔
9
import AuthRegistry from './AuthRegistry.js';
1✔
10
import GuestConnection from './sockets/GuestConnection.js';
1✔
11
import AuthedConnection from './sockets/AuthedConnection.js';
1✔
12
import LostConnection from './sockets/LostConnection.js';
1✔
13
import { serializeUser } from './utils/serialize.js';
1✔
14
import { ulid, encodeTime } from 'ulid';
1✔
15
import { jsonb } from './utils/sqlite.js';
1✔
16
import { subMinutes } from 'date-fns';
1✔
17

1✔
18
const { isEmpty } = lodash;
1✔
19

1✔
20
export const REDIS_ACTIVE_SESSIONS = 'users';
1✔
21

1✔
22
const PING_INTERVAL = 10_000;
1✔
23
const GUEST_COUNT_INTERVAL = 2_000;
1✔
24

1✔
25
/**
1✔
26
 * @typedef {import('./schema.js').User} User
1✔
27
 */
1✔
28

1✔
29
/**
1✔
30
 * @typedef {GuestConnection | AuthedConnection | LostConnection} Connection
1✔
31
 */
1✔
32

1✔
33
/**
1✔
34
 * @typedef {object} ClientActionParameters
1✔
35
 * @prop {string} sendChat
1✔
36
 * @prop {-1 | 1} vote
1✔
37
 * @prop {undefined} logout
1✔
38
 */
1✔
39

1✔
40
/**
1✔
41
 * @typedef {{
1✔
42
 *   [Name in keyof ClientActionParameters]: (
1✔
43
 *     user: User,
1✔
44
 *     parameter: ClientActionParameters[Name],
1✔
45
 *     connection: AuthedConnection
1✔
46
 *   ) => void
1✔
47
 * }} ClientActions
1✔
48
 */
1✔
49

1✔
50
const ajv = new Ajv({
1✔
51
  coerceTypes: false,
1✔
52
  ownProperties: true,
1✔
53
  removeAdditional: true,
1✔
54
  useDefaults: false,
1✔
55
});
1✔
56

1✔
57
/**
1✔
58
 * @template {object} T
1✔
59
 * @param {T} object
1✔
60
 * @param {PropertyKey} property
1✔
61
 * @returns {property is keyof T}
1✔
62
 */
1✔
63
function has(object, property) {
237✔
64
  return Object.prototype.hasOwnProperty.call(object, property);
237✔
65
}
237✔
66

1✔
67
class SocketServer {
1✔
68
  /**
154✔
69
   * @param {import('./Uwave.js').Boot} uw
154✔
70
   * @param {{ secret: Buffer|string }} options
154✔
71
   */
154✔
72
  static async plugin(uw, options) {
154✔
73
    uw.socketServer = new SocketServer(uw, {
154✔
74
      secret: options.secret,
154✔
75
      server: uw.server,
154✔
76
    });
154✔
77

154✔
78
    uw.after(async () => {
154✔
79
      try {
154✔
80
        await uw.socketServer.initLostConnections();
154✔
81
      } catch (err) {
154!
82
        // No need to prevent startup for this.
×
83
        // We do need to clear the `users` list because the lost connection handlers
×
84
        // will not do so.
×
85
        uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
×
86
        await uw.redis.del(REDIS_ACTIVE_SESSIONS);
×
87
      }
×
88
    });
154✔
89

154✔
90
    uw.onClose(async () => {
154✔
91
      await uw.socketServer.destroy();
154✔
92
    });
154✔
93
  }
154✔
94

154✔
95
  #uw;
154✔
96

154✔
97
  #logger;
154✔
98

154✔
99
  #redisSubscription;
154✔
100

154✔
101
  #wss;
154✔
102

154✔
103
  #closing = false;
154✔
104

154✔
105
  /** @type {Connection[]} */
154✔
106
  #connections = [];
154✔
107

154✔
108
  #pinger;
154✔
109

154✔
110
  /** Update online guests count and broadcast an update if necessary. */
154✔
111
  #guestCountInterval;
154✔
112

154✔
113
  #guestCountDirty = true;
154✔
114

154✔
115
  /**
154✔
116
   * Handlers for commands that come in from clients.
154✔
117
   *
154✔
118
   * @type {ClientActions}
154✔
119
   */
154✔
120
  #clientActions;
154✔
121

154✔
122
  /**
154✔
123
   * @type {{
154✔
124
   *   [K in keyof ClientActionParameters]:
154✔
125
   *     import('ajv').ValidateFunction<ClientActionParameters[K]>
154✔
126
   * }}
154✔
127
   */
154✔
128
  #clientActionSchemas;
154✔
129

154✔
130
  /**
154✔
131
   * Handlers for commands that come in from the server side.
154✔
132
   *
154✔
133
   * @type {import('./redisMessages.js').ServerActions}
154✔
134
   */
154✔
135
  #serverActions;
154✔
136

154✔
137
  /**
154✔
138
   * Create a socket server.
154✔
139
   *
154✔
140
   * @param {import('./Uwave.js').Boot} uw üWave Core instance.
154✔
141
   * @param {object} options Socket server options.
154✔
142
   * @param {number} [options.timeout] Time in seconds to wait for disconnected
154✔
143
   *     users to reconnect before removing them.
154✔
144
   * @param {Buffer|string} options.secret
154✔
145
   * @param {import('http').Server | import('https').Server} [options.server]
154✔
146
   * @param {number} [options.port]
154✔
147
   * @private
154✔
148
   */
154✔
149
  constructor(uw, options) {
154✔
150
    if (!uw) {
154!
151
      throw new TypeError(`The "uw" argument must be of type UwaveServer. Received ${typeof uw}`);
×
152
    }
×
153

154✔
154
    this.#uw = uw;
154✔
155
    this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
154✔
156
      serializers: {
154✔
157
        req: stdSerializers.req,
154✔
158
      },
154✔
159
    });
154✔
160
    this.#redisSubscription = uw.redis.duplicate();
154✔
161

154✔
162
    this.options = {
154✔
163
      /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
154✔
164
      onError: (_socket, err) => {
154✔
165
        throw err;
×
166
      },
154✔
167
      timeout: 30,
154✔
168
      ...options,
154✔
169
    };
154✔
170

154✔
171
    // TODO put this behind a symbol, it's just public for tests
154✔
172
    this.authRegistry = new AuthRegistry(uw.redis);
154✔
173

154✔
174
    this.#wss = new WebSocketServer({
154✔
175
      server: options.server,
154✔
176
      port: options.server ? undefined : options.port,
154!
177
    });
154✔
178

154✔
179
    uw.use(() => this.#redisSubscription.subscribe('uwave'));
154✔
180
    this.#redisSubscription.on('message', (channel, command) => {
154✔
181
      // this returns a promise, but we don't handle the error case:
230✔
182
      // there is not much we can do, so just let node.js crash w/ an unhandled rejection
230✔
183
      this.onServerMessage(channel, command);
230✔
184
    });
154✔
185

154✔
186
    this.#wss.on('error', (error) => {
154✔
187
      this.onError(error);
×
188
    });
154✔
189
    this.#wss.on('connection', (socket, request) => {
154✔
190
      this.onSocketConnected(socket, request);
25✔
191
    });
154✔
192

154✔
193
    this.#pinger = setInterval(() => {
154✔
194
      this.ping();
×
195
    }, PING_INTERVAL);
154✔
196

154✔
197
    this.#guestCountInterval = setInterval(() => {
154✔
198
      if (!this.#guestCountDirty) {
×
199
        return;
×
200
      }
×
201

×
202
      this.#recountGuests().catch((error) => {
×
203
        this.#logger.error({ err: error }, 'counting guests failed');
×
204
      });
×
205
    }, GUEST_COUNT_INTERVAL);
154✔
206

154✔
207
    this.#clientActions = {
154✔
208
      sendChat: (user, message) => {
154✔
209
        this.#logger.trace({ user, message }, 'sendChat');
7✔
210
        this.#uw.chat.send(user, message);
7✔
211
      },
154✔
212
      vote: (user, direction) => {
154✔
213
        socketVote(this.#uw, user.id, direction);
×
214
      },
154✔
215
      logout: (user, _, connection) => {
154✔
216
        this.replace(connection, this.createGuestConnection(connection.socket));
×
217
        if (!this.connection(user)) {
×
218
          disconnectUser(this.#uw, user.id);
×
219
        }
×
220
      },
154✔
221
    };
154✔
222

154✔
223
    this.#clientActionSchemas = {
154✔
224
      sendChat: ajv.compile({
154✔
225
        type: 'string',
154✔
226
      }),
154✔
227
      vote: ajv.compile({
154✔
228
        type: 'integer',
154✔
229
        enum: [-1, 1],
154✔
230
      }),
154✔
231
      logout: ajv.compile(true),
154✔
232
    };
154✔
233

154✔
234
    this.#serverActions = {
154✔
235
      /**
154✔
236
       * Broadcast the next track.
154✔
237
       */
154✔
238
      'advance:complete': (next) => {
154✔
239
        if (next) {
18✔
240
          this.broadcast('advance', {
18✔
241
            historyID: next.historyID,
18✔
242
            userID: next.userID,
18✔
243
            media: next.media,
18✔
244
            playedAt: new Date(next.playedAt).getTime(),
18✔
245
          });
18✔
246
        } else {
18!
247
          this.broadcast('advance', null);
×
248
        }
×
249
      },
154✔
250
      /**
154✔
251
       * Broadcast a skip notification.
154✔
252
       */
154✔
253
      'booth:skip': ({ moderatorID, userID, reason }) => {
154✔
254
        this.broadcast('skip', { moderatorID, userID, reason });
×
255
      },
154✔
256
      /**
154✔
257
       * Broadcast a chat message.
154✔
258
       */
154✔
259
      'chat:message': (message) => {
154✔
260
        this.broadcast('chatMessage', message);
6✔
261
      },
154✔
262
      /**
154✔
263
       * Delete chat messages. The delete filter can have an _id property to
154✔
264
       * delete a specific message, a userID property to delete messages by a
154✔
265
       * user, or be empty to delete all messages.
154✔
266
       */
154✔
267
      'chat:delete': ({ moderatorID, filter }) => {
154✔
268
        if ('id' in filter) {
6✔
269
          this.broadcast('chatDeleteByID', {
2✔
270
            moderatorID,
2✔
271
            _id: filter.id,
2✔
272
          });
2✔
273
        } else if ('userID' in filter) {
6✔
274
          this.broadcast('chatDeleteByUser', {
2✔
275
            moderatorID,
2✔
276
            userID: filter.userID,
2✔
277
          });
2✔
278
        } else if (isEmpty(filter)) {
2✔
279
          this.broadcast('chatDelete', { moderatorID });
2✔
280
        }
2✔
281
      },
154✔
282
      /**
154✔
283
       * Broadcast that a user was muted in chat.
154✔
284
       */
154✔
285
      'chat:mute': ({ moderatorID, userID, duration }) => {
154✔
286
        this.broadcast('chatMute', {
1✔
287
          userID,
1✔
288
          moderatorID,
1✔
289
          expiresAt: Date.now() + duration,
1✔
290
        });
1✔
291
      },
154✔
292
      /**
154✔
293
       * Broadcast that a user was unmuted in chat.
154✔
294
       */
154✔
295
      'chat:unmute': ({ moderatorID, userID }) => {
154✔
296
        this.broadcast('chatUnmute', { userID, moderatorID });
×
297
      },
154✔
298
      /**
154✔
299
       * Broadcast a vote for the current track.
154✔
300
       */
154✔
301
      'booth:vote': ({ userID, direction }) => {
154✔
302
        this.broadcast('vote', {
2✔
303
          _id: userID,
2✔
304
          value: direction,
2✔
305
        });
2✔
306
      },
154✔
307
      /**
154✔
308
       * Broadcast a favorite for the current track.
154✔
309
       */
154✔
310
      'booth:favorite': ({ userID }) => {
154✔
311
        this.broadcast('favorite', { userID });
1✔
312
      },
154✔
313
      /**
154✔
314
       * Cycle a single user's playlist.
154✔
315
       */
154✔
316
      'playlist:cycle': ({ userID, playlistID }) => {
154✔
317
        this.sendTo(userID, 'playlistCycle', { playlistID });
18✔
318
      },
154✔
319
      /**
154✔
320
       * Broadcast that a user joined the waitlist.
154✔
321
       */
154✔
322
      'waitlist:join': ({ userID, waitlist }) => {
154✔
323
        this.broadcast('waitlistJoin', { userID, waitlist });
31✔
324
      },
154✔
325
      /**
154✔
326
       * Broadcast that a user left the waitlist.
154✔
327
       */
154✔
328
      'waitlist:leave': ({ userID, waitlist }) => {
154✔
329
        this.broadcast('waitlistLeave', { userID, waitlist });
1✔
330
      },
154✔
331
      /**
154✔
332
       * Broadcast that a user was added to the waitlist.
154✔
333
       */
154✔
334
      'waitlist:add': ({
154✔
335
        userID, moderatorID, position, waitlist,
1✔
336
      }) => {
1✔
337
        this.broadcast('waitlistAdd', {
1✔
338
          userID, moderatorID, position, waitlist,
1✔
339
        });
1✔
340
      },
154✔
341
      /**
154✔
342
       * Broadcast that a user was removed from the waitlist.
154✔
343
       */
154✔
344
      'waitlist:remove': ({ userID, moderatorID, waitlist }) => {
154✔
345
        this.broadcast('waitlistRemove', { userID, moderatorID, waitlist });
×
346
      },
154✔
347
      /**
154✔
348
       * Broadcast that a user was moved in the waitlist.
154✔
349
       */
154✔
350
      'waitlist:move': ({
154✔
351
        userID, moderatorID, position, waitlist,
3✔
352
      }) => {
3✔
353
        this.broadcast('waitlistMove', {
3✔
354
          userID, moderatorID, position, waitlist,
3✔
355
        });
3✔
356
      },
154✔
357
      /**
154✔
358
       * Broadcast a waitlist update.
154✔
359
       */
154✔
360
      'waitlist:update': (waitlist) => {
154✔
361
        this.broadcast('waitlistUpdate', waitlist);
18✔
362
      },
154✔
363
      /**
154✔
364
       * Broadcast that the waitlist was cleared.
154✔
365
       */
154✔
366
      'waitlist:clear': ({ moderatorID }) => {
154✔
367
        this.broadcast('waitlistClear', { moderatorID });
×
368
      },
154✔
369
      /**
154✔
370
       * Broadcast that the waitlist was locked.
154✔
371
       */
154✔
372
      'waitlist:lock': ({ moderatorID, locked }) => {
154✔
373
        this.broadcast('waitlistLock', { moderatorID, locked });
6✔
374
      },
154✔
375

154✔
376
      'acl:allow': ({ userID, roles }) => {
154✔
377
        this.broadcast('acl:allow', { userID, roles });
66✔
378
      },
154✔
379
      'acl:disallow': ({ userID, roles }) => {
154✔
380
        this.broadcast('acl:disallow', { userID, roles });
2✔
381
      },
154✔
382

154✔
383
      'user:update': ({ userID, moderatorID, new: update }) => {
154✔
384
        // TODO Remove this remnant of the old roles system
3✔
385
        if ('role' in update) {
3!
386
          this.broadcast('roleChange', {
×
387
            moderatorID,
×
388
            userID,
×
389
            role: update.role,
×
390
          });
×
391
        }
×
392
        if ('username' in update) {
3✔
393
          this.broadcast('nameChange', {
3✔
394
            moderatorID,
3✔
395
            userID,
3✔
396
            username: update.username,
3✔
397
          });
3✔
398
        }
3✔
399
      },
154✔
400
      'user:join': async ({ userID }) => {
154✔
401
        const { users, redis } = this.#uw;
24✔
402
        const user = await users.getUser(userID);
24✔
403
        if (user) {
24✔
404
          // TODO this should not be the socket server code's responsibility
24✔
405
          await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
24✔
406
          this.broadcast('join', serializeUser(user));
24✔
407
        }
24✔
408
      },
154✔
409
      /**
154✔
410
       * Broadcast that a user left the server.
154✔
411
       */
154✔
412
      'user:leave': ({ userID }) => {
154✔
413
        this.broadcast('leave', userID);
×
414
      },
154✔
415
      /**
154✔
416
       * Broadcast a ban event.
154✔
417
       */
154✔
418
      'user:ban': ({
154✔
419
        moderatorID, userID, permanent = false, duration, expiresAt,
3✔
420
      }) => {
3✔
421
        this.broadcast('ban', {
3✔
422
          moderatorID, userID, permanent, duration, expiresAt,
3✔
423
        });
3✔
424

3✔
425
        this.#connections.forEach((connection) => {
3✔
426
          if (connection instanceof AuthedConnection && connection.user.id === userID) {
×
427
            connection.ban();
×
428
          } else if (connection instanceof LostConnection && connection.user.id === userID) {
×
429
            connection.close();
×
430
          }
×
431
        });
3✔
432
      },
154✔
433
      /**
154✔
434
       * Broadcast an unban event.
154✔
435
       */
154✔
436
      'user:unban': ({ moderatorID, userID }) => {
154✔
437
        this.broadcast('unban', { moderatorID, userID });
1✔
438
      },
154✔
439
      /**
154✔
440
       * Force-close a connection.
154✔
441
       */
154✔
442
      'http-api:socket:close': (userID) => {
154✔
443
        this.#connections.forEach((connection) => {
×
444
          if ('user' in connection && connection.user.id === userID) {
×
445
            connection.close();
×
446
          }
×
447
        });
×
448
      },
154✔
449

154✔
450
      'emotes:reload': () => {
154✔
451
        this.broadcast('reloadEmotes', null);
×
452
      },
154✔
453
    };
154✔
454
  }
154✔
455

154✔
456
  /**
154✔
457
   * Create `LostConnection`s for every user that's known to be online, but that
154✔
458
   * is not currently connected to the socket server.
154✔
459
   *
154✔
460
   * @private
154✔
461
   */
154✔
462
  async initLostConnections() {
154✔
463
    const { db, redis } = this.#uw;
154✔
464
    const userIDs = /** @type {import('./schema').UserID[]} */ (
154✔
465
      await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
154✔
466
    );
154✔
467
    const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));
154✔
468

154✔
469
    if (disconnectedIDs.length === 0) {
154✔
470
      return;
154✔
471
    }
154✔
472

×
473
    const disconnectedUsers = await db.selectFrom('users')
×
474
      .where('id', 'in', disconnectedIDs)
×
475
      .selectAll()
×
476
      .execute();
×
477
    disconnectedUsers.forEach((user) => {
×
NEW
478
      this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
×
479
    });
×
480
  }
154✔
481

154✔
482
  /**
154✔
483
   * @param {import('ws').WebSocket} socket
154✔
484
   * @param {import('http').IncomingMessage} request
154✔
485
   * @private
154✔
486
   */
154✔
487
  onSocketConnected(socket, request) {
154✔
488
    this.#logger.info({ req: request }, 'new connection');
25✔
489

25✔
490
    socket.on('error', (error) => {
25✔
491
      this.onSocketError(socket, error);
×
492
    });
25✔
493
    this.add(this.createGuestConnection(socket));
25✔
494
  }
25✔
495

154✔
496
  /**
154✔
497
   * @param {import('ws').WebSocket} socket
154✔
498
   * @param {Error} error
154✔
499
   * @private
154✔
500
   */
154✔
501
  onSocketError(socket, error) {
154✔
502
    this.#logger.warn({ err: error }, 'socket error');
×
503

×
504
    this.options.onError(socket, error);
×
505
  }
×
506

154✔
507
  /**
154✔
508
   * @param {Error} error
154✔
509
   * @private
154✔
510
   */
154✔
511
  onError(error) {
154✔
512
    this.#logger.error({ err: error }, 'server error');
×
513

×
514
    this.options.onError(undefined, error);
×
515
  }
×
516

154✔
517
  /**
154✔
518
   * Get a LostConnection for a user, if one exists.
154✔
519
   *
154✔
520
   * @param {User} user
154✔
521
   * @private
154✔
522
   */
154✔
523
  getLostConnection(user) {
154✔
524
    return this.#connections.find((connection) => (
1✔
525
      connection instanceof LostConnection && connection.user.id === user.id
3✔
526
    ));
1✔
527
  }
1✔
528

154✔
529
  /**
154✔
530
   * Create a connection instance for an unauthenticated user.
154✔
531
   *
154✔
532
   * @param {import('ws').WebSocket} socket
154✔
533
   * @private
154✔
534
   */
154✔
535
  createGuestConnection(socket) {
154✔
536
    const connection = new GuestConnection(this.#uw, socket, {
25✔
537
      authRegistry: this.authRegistry,
25✔
538
    });
25✔
539
    connection.on('close', () => {
25✔
540
      this.remove(connection);
×
541
    });
25✔
542
    connection.on('authenticate', async (user, sessionID) => {
25✔
543
      const isReconnect = await connection.isReconnect(sessionID);
25✔
544
      this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
25✔
545
      if (isReconnect) {
25✔
546
        const previousConnection = this.getLostConnection(sessionID);
1✔
547
        if (previousConnection) this.remove(previousConnection);
1!
548
      }
1✔
549

25✔
550
      this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
25✔
551

25✔
552
      if (!isReconnect) {
25✔
553
        this.#uw.publish('user:join', { userID: user.id });
24✔
554
      }
24✔
555
    });
25✔
556
    return connection;
25✔
557
  }
25✔
558

154✔
559
  /**
154✔
560
   * Create a connection instance for an authenticated user.
154✔
561
   *
154✔
562
   * @param {import('ws').WebSocket} socket
154✔
563
   * @param {User} user
154✔
564
   * @param {string} sessionID
154✔
565
   * @returns {AuthedConnection}
154✔
566
   * @private
154✔
567
   */
154✔
568
  createAuthedConnection(socket, user, sessionID) {
154✔
569
    const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
25✔
570
    connection.on('close', ({ banned, lastEventID }) => {
25✔
571
      if (banned) {
25!
572
        this.#logger.info({ userId: user.id }, 'removing connection after ban');
×
573
        disconnectUser(this.#uw, user.id);
×
574
      } else if (!this.#closing) {
25✔
575
        this.#logger.info({ userId: user.id }, 'lost connection');
2✔
576
        this.add(this.createLostConnection(user, sessionID, lastEventID));
2✔
577
      }
2✔
578
      this.remove(connection);
25✔
579
    });
25✔
580
    connection.on(
25✔
581
      'command',
25✔
582
      /**
25✔
583
       * @param {string} command
25✔
584
       * @param {import('type-fest').JsonValue} data
25✔
585
       */
25✔
586
      (command, data) => {
25✔
587
        this.#logger.trace({ userId: user.id, command, data }, 'command');
7✔
588
        if (has(this.#clientActions, command)) {
7✔
589
          // Ignore incorrect input
7✔
590
          const validate = this.#clientActionSchemas[command];
7✔
591
          if (validate && !validate(data)) {
7!
UNCOV
592
            return;
×
593
          }
×
594

7✔
595
          const action = this.#clientActions[command];
7✔
596
          // @ts-expect-error TS2345 `data` is validated
7✔
597
          action(user, data, connection);
7✔
598
        }
7✔
599
      },
25✔
600
    );
25✔
601
    return connection;
25✔
602
  }
25✔
603

154✔
604
  /**
154✔
605
   * Create a connection instance for a user who disconnected.
154✔
606
   *
154✔
607
   * @param {User} user
154✔
608
   * @param {string} sessionID
154✔
609
   * @param {string|null} lastEventID
154✔
610
   * @returns {LostConnection}
154✔
611
   * @private
154✔
612
   */
154✔
613
  createLostConnection(user, sessionID, lastEventID) {
154✔
614
    const connection = new LostConnection(
2✔
615
      this.#uw,
2✔
616
      user,
2✔
617
      sessionID,
2✔
618
      lastEventID,
2✔
619
      this.options.timeout,
2✔
620
    );
2✔
621
    connection.on('close', () => {
2✔
622
      this.#logger.info({ userId: user.id }, 'user left');
1✔
623
      this.remove(connection);
1✔
624
      // Only register that the user left if they didn't have another connection
1✔
625
      // still open.
1✔
626
      if (!this.connection(user)) {
1!
UNCOV
627
        disconnectUser(this.#uw, user.id);
×
628
      }
×
629
    });
2✔
630
    return connection;
2✔
631
  }
2✔
632

154✔
633
  /**
154✔
634
   * Add a connection.
154✔
635
   *
154✔
636
   * @param {Connection} connection
154✔
637
   * @private
154✔
638
   */
154✔
639
  add(connection) {
154✔
640
    const userID = 'user' in connection ? connection.user.id : null;
52✔
641
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
52✔
642
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');
52✔
643

52✔
644
    this.#connections.push(connection);
52✔
645
    this.#guestCountDirty = true;
52✔
646
  }
52✔
647

154✔
648
  /**
154✔
649
   * Remove a connection.
154✔
650
   *
154✔
651
   * @param {Connection} connection
154✔
652
   * @private
154✔
653
   */
154✔
654
  remove(connection) {
154✔
655
    const userID = 'user' in connection ? connection.user.id : null;
51✔
656
    const sessionID = 'sessionID' in connection ? connection.sessionID : null;
51✔
657
    this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'remove connection');
51✔
658

51✔
659
    const i = this.#connections.indexOf(connection);
51✔
660
    this.#connections.splice(i, 1);
51✔
661

51✔
662
    connection.removed();
51✔
663
    this.#guestCountDirty = true;
51✔
664
  }
51✔
665

154✔
666
  /**
154✔
667
   * Replace a connection instance with another connection instance. Useful when
154✔
668
   * a connection changes "type", like GuestConnection → AuthedConnection.
154✔
669
   *
154✔
670
   * @param {Connection} oldConnection
154✔
671
   * @param {Connection} newConnection
154✔
672
   * @private
154✔
673
   */
154✔
674
  replace(oldConnection, newConnection) {
154✔
675
    this.remove(oldConnection);
25✔
676
    this.add(newConnection);
25✔
677
  }
25✔
678

154✔
679
  /**
154✔
680
   * Handle command messages coming in from Redis.
154✔
681
   * Some commands are intended to broadcast immediately to all connected
154✔
682
   * clients, but others require special action.
154✔
683
   *
154✔
684
   * @param {string} channel
154✔
685
   * @param {string} rawCommand
154✔
686
   * @returns {Promise<void>}
154✔
687
   * @private
154✔
688
   */
154✔
689
  async onServerMessage(channel, rawCommand) {
154✔
690
    /**
230✔
691
     * @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
230✔
692
     */
230✔
693
    const json = sjson.safeParse(rawCommand);
230✔
694
    if (!json) {
230!
UNCOV
695
      return;
×
696
    }
×
697
    const { command, data } = json;
230✔
698

230✔
699
    this.#logger.trace({ channel, command, data }, 'server message');
230✔
700

230✔
701
    if (has(this.#serverActions, command)) {
230✔
702
      const action = this.#serverActions[command];
211✔
703
      if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
211✔
704
        // @ts-expect-error TS2345 `data` is validated
211✔
705
        action(data);
211✔
706
      }
211✔
707
    }
211✔
708
  }
230✔
709

154✔
710
  /**
154✔
711
   * Stop the socket server.
154✔
712
   *
154✔
713
   * @returns {Promise<void>}
154✔
714
   */
154✔
715
  async destroy() {
154✔
716
    clearInterval(this.#pinger);
154✔
717

154✔
718
    this.#closing = true;
154✔
719
    clearInterval(this.#guestCountInterval);
154✔
720

154✔
721
    for (const connection of this.#connections) {
154✔
722
      connection.close();
24✔
723
    }
24✔
724

154✔
725
    const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
154✔
726
    await closeWsServer();
154✔
727
    await this.#redisSubscription.quit();
154✔
728
  }
154✔
729

154✔
730
  /**
154✔
731
   * Get the connection instance for a specific user.
154✔
732
   *
154✔
733
   * @param {User|import('./schema.js').UserID} user The user.
154✔
734
   * @returns {Connection|undefined}
154✔
735
   */
154✔
736
  connection(user) {
154✔
737
    const userID = typeof user === 'object' ? user.id : user;
1!
738
    return this.#connections.find((connection) => 'user' in connection && connection.user.id === userID);
1✔
739
  }
1✔
740

154✔
741
  ping() {
154✔
UNCOV
742
    this.#connections.forEach((connection) => {
×
743
      connection.ping();
×
744
    });
×
NEW
745

×
NEW
746
    this.#cleanupMessageQueue().catch((err) => {
×
NEW
747
      this.#logger.error({ err }, 'failed to clean up socket message queue');
×
NEW
748
    });
×
NEW
749
  }
×
750

154✔
751
  async #cleanupMessageQueue() {
154✔
NEW
752
    const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
×
NEW
753

×
NEW
754
    await this.#uw.db.deleteFrom('socketMessageQueue')
×
NEW
755
      .where('id', '<', oldestID)
×
NEW
756
      .execute();
×
NEW
757
  }
×
758

154✔
759
  /**
154✔
760
   * Broadcast a command to all connected clients.
154✔
761
   *
154✔
762
   * @param {string} command Command name.
154✔
763
   * @param {import('type-fest').JsonValue} data Command data.
154✔
764
   * @param {import('./schema.js').UserID | null} targetUserID
154✔
765
   */
154✔
766
  #recordMessage(command, data, targetUserID = null) {
154✔
767
    const id = ulid();
211✔
768

211✔
769
    this.#uw.db.insertInto('socketMessageQueue')
211✔
770
      .values({
211✔
771
        id,
211✔
772
        command,
211✔
773
        data: jsonb(data),
211✔
774
        targetUserID,
211✔
775
      })
211✔
776
      .execute();
211✔
777

211✔
778
    return id;
211✔
779
  }
211✔
780

154✔
781
  /**
154✔
782
   * Broadcast a command to all connected clients.
154✔
783
   *
154✔
784
   * @param {string} command Command name.
154✔
785
   * @param {import('type-fest').JsonValue} data Command data.
154✔
786
   */
154✔
787
  broadcast(command, data) {
154✔
788
    const id = this.#recordMessage(command, data);
193✔
789

193✔
790
    this.#logger.trace({
193✔
791
      id,
193✔
792
      command,
193✔
793
      data,
193✔
794
      to: this.#connections.map((connection) => (
193✔
795
        'user' in connection ? connection.user.id : null
109!
796
      )),
193✔
797
    }, 'broadcast');
193✔
798

193✔
799
    this.#connections.forEach((connection) => {
193✔
800
      connection.send(id, command, data);
109✔
801
    });
193✔
802
  }
193✔
803

154✔
804
  /**
154✔
805
   * Send a command to a single user.
154✔
806
   *
154✔
807
   * @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
154✔
808
   * @param {string} command Command name.
154✔
809
   * @param {import('type-fest').JsonValue} data Command data.
154✔
810
   */
154✔
811
  sendTo(user, command, data) {
154✔
812
    const userID = typeof user === 'object' ? user.id : user;
18!
813
    const id = this.#recordMessage(command, data, userID);
18✔
814

18✔
815
    this.#connections.forEach((connection) => {
18✔
816
      if ('user' in connection && connection.user.id === userID) {
16✔
817
        connection.send(id, command, data);
13✔
818
      }
13✔
819
    });
18✔
820
  }
18✔
821

154✔
822
  async getGuestCount() {
154✔
UNCOV
823
    const { redis } = this.#uw;
×
824
    const rawCount = await redis.get('http-api:guests');
×
825
    if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
×
826
      return 0;
×
827
    }
×
828
    return parseInt(rawCount, 10);
×
829
  }
×
830

154✔
831
  async #recountGuests() {
154✔
UNCOV
832
    const { redis } = this.#uw;
×
833
    const guests = this.#connections
×
834
      .filter((connection) => connection instanceof GuestConnection)
×
835
      .length;
×
836

×
837
    const lastGuestCount = await this.getGuestCount();
×
838
    if (guests !== lastGuestCount) {
×
839
      await redis.set('http-api:guests', guests);
×
840
      this.broadcast('guests', guests);
×
841
    }
×
842
  }
×
843
}
154✔
844

1✔
845
export default SocketServer;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc