• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 19337281714

13 Nov 2025 03:48PM UTC coverage: 85.319% (+0.2%) from 85.077%
19337281714

push

github

web-flow
Store waitlist and booth state in SQLite instead of Redis (#657)

954 of 1135 branches covered (84.05%)

Branch coverage included in aggregate %.

194 of 230 new or added lines in 7 files covered. (84.35%)

3 existing lines in 2 files now uncovered.

10047 of 11759 relevant lines covered (85.44%)

96.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.55
/src/plugins/booth.js
1
import RedLock from 'redlock';
1✔
2
import { sql } from 'kysely';
1✔
3
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
1✔
4
import routes from '../routes/booth.js';
1✔
5
import { randomUUID } from 'node:crypto';
1✔
6
import { fromJson, jsonb, jsonGroupArray } from '../utils/sqlite.js';
1✔
7

1✔
8
/**
1✔
9
 * @typedef {import('../schema.js').UserID} UserID
1✔
10
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
1✔
11
 * @typedef {import('type-fest').JsonObject} JsonObject
1✔
12
 * @typedef {import('../schema.js').User} User
1✔
13
 * @typedef {import('../schema.js').Playlist} Playlist
1✔
14
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
1✔
15
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
1✔
16
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
1✔
17
 */
1✔
18

1✔
19
const REDIS_ADVANCING = 'booth:advancing';
1✔
20
const KEY_HISTORY_ID = 'booth:historyID';
1✔
21
const KEY_CURRENT_DJ_ID = 'booth:currentDJ';
1✔
22
const KEY_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
1✔
23

1✔
24
class Booth {
1✔
25
  #uw;
149✔
26

149✔
27
  #logger;
149✔
28

149✔
29
  /** @type {ReturnType<typeof setTimeout>|null} */
149✔
30
  #timeout = null;
149✔
31

149✔
32
  #locker;
149✔
33

149✔
34
  /** @type {Promise<unknown>|null} */
149✔
35
  #awaitAdvance = null;
149✔
36

149✔
37
  /**
149✔
38
   * @param {import('../Uwave.js').Boot} uw
149✔
39
   */
149✔
40
  constructor(uw) {
149✔
41
    this.#uw = uw;
149✔
42
    this.#locker = new RedLock([this.#uw.redis]);
149✔
43
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
149✔
44
  }
149✔
45

149✔
46
  /** @internal */
149✔
47
  async onStart() {
149✔
48
    const current = await this.getCurrentEntry();
149✔
49
    if (current && this.#timeout === null) {
149!
50
      // Restart the advance timer after a server restart, if a track was
×
51
      // playing before the server restarted.
×
52
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
53
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
54
      if (endTime > Date.now()) {
×
55
        this.#timeout = setTimeout(
×
56
          () => this.#advanceAutomatically(),
×
57
          endTime - Date.now(),
×
58
        );
×
59
      } else {
×
60
        this.#advanceAutomatically();
×
61
      }
×
62
    }
×
63

149✔
64
    this.#uw.onClose(async () => {
149✔
65
      this.#onStop();
149✔
66
      await this.#awaitAdvance;
149✔
67
    });
149✔
68
  }
149✔
69

149✔
70
  async #advanceAutomatically() {
149✔
71
    try {
×
72
      await this.advance();
×
73
    } catch (error) {
×
74
      this.#logger.error({ err: error }, 'advance failed');
×
75
    }
×
76
  }
×
77

149✔
78
  #onStop() {
149✔
79
    this.#maybeStop();
149✔
80
  }
149✔
81

149✔
82
  async getCurrentEntry(tx = this.#uw.db) {
149✔
83
    const entry = await tx.selectFrom('keyval')
179✔
84
      .where('key', '=', KEY_HISTORY_ID)
179✔
85
      .innerJoin('historyEntries', (join) => join.on(
179✔
86
        (eb) => sql`${eb.ref('value')}->>'$'`,
179✔
87
        '=',
179✔
88
        (eb) => eb.ref('historyEntries.id'),
179✔
89
      ))
179✔
90
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
179✔
91
      .innerJoin('users', 'historyEntries.userID', 'users.id')
179✔
92
      .select([
179✔
93
        'historyEntries.id as id',
179✔
94
        'media.id as media.id',
179✔
95
        'media.sourceID as media.sourceID',
179✔
96
        'media.sourceType as media.sourceType',
179✔
97
        'media.sourceData as media.sourceData',
179✔
98
        'media.artist as media.artist',
179✔
99
        'media.title as media.title',
179✔
100
        'media.duration as media.duration',
179✔
101
        'media.thumbnail as media.thumbnail',
179✔
102
        'users.id as users.id',
179✔
103
        'users.username as users.username',
179✔
104
        'users.avatar as users.avatar',
179✔
105
        'users.createdAt as users.createdAt',
179✔
106
        'historyEntries.artist',
179✔
107
        'historyEntries.title',
179✔
108
        'historyEntries.start',
179✔
109
        'historyEntries.end',
179✔
110
        'historyEntries.createdAt',
179✔
111
        (eb) => eb.selectFrom('feedback')
179✔
112
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
179✔
113
          .where('vote', '=', 1)
179✔
114
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
179✔
115
          .as('upvotes'),
179✔
116
        (eb) => eb.selectFrom('feedback')
179✔
117
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
179✔
118
          .where('vote', '=', -1)
179✔
119
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
179✔
120
          .as('downvotes'),
179✔
121
        (eb) => eb.selectFrom('feedback')
179✔
122
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
179✔
123
          .where('favorite', '=', 1)
179✔
124
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
179✔
125
          .as('favorites'),
179✔
126
      ])
179✔
127
      .executeTakeFirst();
179✔
128

179✔
129
    return entry ? {
179✔
130
      media: {
7✔
131
        id: entry['media.id'],
7✔
132
        artist: entry['media.artist'],
7✔
133
        title: entry['media.title'],
7✔
134
        duration: entry['media.duration'],
7✔
135
        thumbnail: entry['media.thumbnail'],
7✔
136
        sourceID: entry['media.sourceID'],
7✔
137
        sourceType: entry['media.sourceType'],
7✔
138
        sourceData: entry['media.sourceData'] ?? {},
7✔
139
      },
7✔
140
      user: {
7✔
141
        id: entry['users.id'],
7✔
142
        username: entry['users.username'],
7✔
143
        avatar: entry['users.avatar'],
7✔
144
        createdAt: entry['users.createdAt'],
7✔
145
      },
7✔
146
      historyEntry: {
7✔
147
        id: entry.id,
7✔
148
        userID: entry['users.id'],
7✔
149
        mediaID: entry['media.id'],
7✔
150
        artist: entry.artist,
7✔
151
        title: entry.title,
7✔
152
        start: entry.start,
7✔
153
        end: entry.end,
7✔
154
        createdAt: entry.createdAt,
7✔
155
      },
7✔
156
      upvotes: entry.upvotes != null ? fromJson(entry.upvotes) : [],
7!
157
      downvotes: entry.downvotes != null ? fromJson(entry.downvotes) : [],
7!
158
      favorites: entry.favorites != null ? fromJson(entry.favorites) : [],
7!
159
    } : null;
179✔
160
  }
179✔
161

149✔
162
  /**
149✔
163
   * @param {{ remove?: boolean }} options
149✔
164
   */
149✔
165
  async #getNextDJ(options, tx = this.#uw.db) {
149✔
166
    const waitlist = await this.#uw.waitlist.getUserIDs();
18✔
167
    let userID = waitlist.at(0) ?? null;
18!
168
    if (!userID && !options.remove) {
18!
169
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
170
      userID = /** @type {UserID|null} */ (await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx));
×
171
    }
×
172
    if (!userID) {
18!
173
      return null;
×
174
    }
×
175

18✔
176
    return this.#uw.users.getUser(userID, tx);
18✔
177
  }
18✔
178

149✔
179
  /**
149✔
180
   * @param {{ remove?: boolean }} options
149✔
181
   */
149✔
182
  async #getNextEntry(options) {
149✔
183
    const { playlists } = this.#uw;
18✔
184

18✔
185
    const user = await this.#getNextDJ(options);
18✔
186
    if (!user || !user.activePlaylistID) {
18!
187
      return null;
×
188
    }
×
189
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
18✔
190
    if (playlist.size === 0) {
18!
191
      throw new EmptyPlaylistError();
×
192
    }
×
193

18✔
194
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
18✔
195
    if (!playlistItem) {
18!
196
      throw new PlaylistItemNotFoundError();
×
197
    }
×
198

18✔
199
    return {
18✔
200
      user,
18✔
201
      playlist,
18✔
202
      playlistItem,
18✔
203
      media,
18✔
204
      historyEntry: {
18✔
205
        id: /** @type {HistoryEntryID} */ (randomUUID()),
18✔
206
        userID: user.id,
18✔
207
        mediaID: media.id,
18✔
208
        artist: playlistItem.artist,
18✔
209
        title: playlistItem.title,
18✔
210
        start: playlistItem.start,
18✔
211
        end: playlistItem.end,
18✔
212
        /** @type {null | JsonObject} */
18✔
213
        sourceData: null,
18✔
214
        createdAt: new Date(),
18✔
215
      },
18✔
216
    };
18✔
217
  }
18✔
218

149✔
219
  /**
149✔
220
   * @param {UserID|null} previous
149✔
221
   * @param {{ remove?: boolean }} options
149✔
222
   */
149✔
223
  async #cycleWaitlist(previous, options) {
149✔
224
    await this.#uw.waitlist.cycle(previous, options);
18✔
225
  }
18✔
226

149✔
227
  async clear(tx = this.#uw.db) {
149✔
NEW
228
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
229
    await this.#uw.keyv.delete(KEY_HISTORY_ID, tx);
×
NEW
230
    await this.#uw.keyv.delete(KEY_CURRENT_DJ_ID, tx);
×
UNCOV
231
  }
×
232

149✔
233
  /**
149✔
234
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
149✔
235
   */
149✔
236
  async #update(next, tx = this.#uw.db) {
149✔
237
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
18✔
238
    await this.#uw.keyv.set(KEY_HISTORY_ID, next.historyEntry.id, tx);
18✔
239
    await this.#uw.keyv.set(KEY_CURRENT_DJ_ID, next.user.id, tx);
18✔
240
  }
18✔
241

149✔
242
  #maybeStop() {
149✔
243
    if (this.#timeout) {
167✔
244
      clearTimeout(this.#timeout);
18✔
245
      this.#timeout = null;
18✔
246
    }
18✔
247
  }
167✔
248

149✔
249
  /**
149✔
250
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
149✔
251
   */
149✔
252
  #play(entry) {
149✔
253
    this.#maybeStop();
18✔
254
    this.#timeout = setTimeout(
18✔
255
      () => this.#advanceAutomatically(),
18✔
256
      (entry.end - entry.start) * 1000,
18✔
257
    );
18✔
258
  }
18✔
259

149✔
260
  /**
149✔
261
   * This method creates a `media` object that clients can understand from a
149✔
262
   * history entry object.
149✔
263
   *
149✔
264
   * We present the playback-specific `sourceData` as if it is
149✔
265
   * a property of the media model for backwards compatibility.
149✔
266
   * Old clients don't expect `sourceData` directly on a history entry object.
149✔
267
   *
149✔
268
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
149✔
269
   */
149✔
270
  getMediaForPlayback(next) {
149✔
271
    return {
25✔
272
      artist: next.historyEntry.artist,
25✔
273
      title: next.historyEntry.title,
25✔
274
      start: next.historyEntry.start,
25✔
275
      end: next.historyEntry.end,
25✔
276
      media: {
25✔
277
        sourceType: next.media.sourceType,
25✔
278
        sourceID: next.media.sourceID,
25✔
279
        artist: next.media.artist,
25✔
280
        title: next.media.title,
25✔
281
        duration: next.media.duration,
25✔
282
        sourceData: {
25✔
283
          ...next.media.sourceData,
25✔
284
          ...next.historyEntry.sourceData,
25✔
285
        },
25✔
286
      },
25✔
287
    };
25✔
288
  }
25✔
289

149✔
290
  /**
149✔
291
   * @param {{
149✔
292
   *   user: User,
149✔
293
   *   playlist: Playlist,
149✔
294
   *   media: Media,
149✔
295
   *   historyEntry: HistoryEntry
149✔
296
   * } | null} next
149✔
297
   */
149✔
298
  async #publishAdvanceComplete(next) {
149✔
299
    const { waitlist } = this.#uw;
18✔
300

18✔
301
    if (next != null) {
18✔
302
      this.#uw.publish('advance:complete', {
18✔
303
        historyID: next.historyEntry.id,
18✔
304
        userID: next.user.id,
18✔
305
        playlistID: next.playlist.id,
18✔
306
        media: this.getMediaForPlayback(next),
18✔
307
        playedAt: next.historyEntry.createdAt.getTime(),
18✔
308
      });
18✔
309
      this.#uw.publish('playlist:cycle', {
18✔
310
        userID: next.user.id,
18✔
311
        playlistID: next.playlist.id,
18✔
312
      });
18✔
313
    } else {
18!
314
      this.#uw.publish('advance:complete', null);
×
315
    }
×
316
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
18✔
317
  }
18✔
318

149✔
319
  /**
149✔
320
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
149✔
321
   */
149✔
322
  async #getSourceDataForPlayback(entry) {
149✔
323
    const { sourceID, sourceType } = entry.media;
18✔
324
    const source = this.#uw.source(sourceType);
18✔
325
    if (source) {
18✔
326
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
18✔
327
      /** @type {JsonObject | undefined} */
18✔
328
      let sourceData;
18✔
329
      try {
18✔
330
        sourceData = await source.play(entry.user, entry.media);
18✔
331
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
18✔
332
      } catch (error) {
18!
333
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
334
      }
×
335
      return sourceData;
18✔
336
    }
18✔
337

×
338
    return undefined;
×
339
  }
18✔
340

149✔
341
  /**
149✔
342
   * @typedef {object} AdvanceOptions
149✔
343
   * @prop {boolean} [remove]
149✔
344
   * @prop {boolean} [publish]
149✔
345
   * @prop {import('redlock').RedlockAbortSignal} [signal]
149✔
346
   * @param {AdvanceOptions} [opts]
149✔
347
   * @returns {Promise<{
149✔
348
   *   historyEntry: HistoryEntry,
149✔
349
   *   user: User,
149✔
350
   *   media: Media,
149✔
351
   *   playlist: Playlist,
149✔
352
   * }|null>}
149✔
353
   */
149✔
354
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
149✔
355
    const { playlists } = this.#uw;
18✔
356

18✔
357
    const publish = opts.publish ?? true;
18✔
358
    const removeAfterCurrent = (await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY)) === true;
18✔
359
    const remove = opts.remove || removeAfterCurrent || (
18✔
360
      !await this.#uw.waitlist.isCycleEnabled()
18✔
361
    );
18✔
362

18✔
363
    const previous = await this.getCurrentEntry(tx);
18✔
364
    let next;
18✔
365
    try {
18✔
366
      next = await this.#getNextEntry({ remove });
18✔
367
    } catch (err) {
18!
368
      // If the next user's playlist was empty, remove them from the waitlist
×
369
      // and try advancing again.
×
370
      if (err instanceof EmptyPlaylistError) {
×
371
        this.#logger.info('user has empty playlist, skipping on to the next');
×
372
        const previousDJ = previous != null ? previous.historyEntry.userID : null;
×
373
        await this.#cycleWaitlist(previousDJ, { remove });
×
374
        return this.#advanceLocked({ publish, remove: true }, tx);
×
375
      }
×
376
      throw err;
×
377
    }
×
378

18✔
379
    if (opts.signal?.aborted) {
18!
380
      throw opts.signal.error;
×
381
    }
×
382

18✔
383
    if (previous) {
18!
384
      this.#logger.info({
×
385
        id: previous.historyEntry.id,
×
386
        artist: previous.media.artist,
×
387
        title: previous.media.title,
×
388
        upvotes: previous.upvotes.length,
×
389
        favorites: previous.favorites.length,
×
390
        downvotes: previous.downvotes.length,
×
391
      }, 'previous track stats');
×
392
    }
×
393

18✔
394
    let result = null;
18✔
395
    if (next != null) {
18✔
396
      this.#logger.info({
18✔
397
        id: next.playlistItem.id,
18✔
398
        artist: next.playlistItem.artist,
18✔
399
        title: next.playlistItem.title,
18✔
400
      }, 'next track');
18✔
401
      const sourceData = await this.#getSourceDataForPlayback(next);
18✔
402

18✔
403
      // Conservatively, we should take *all* the data from the inserted values.
18✔
404
      // But then we need to reparse the source data... It's easier to only take
18✔
405
      // the actually generated value from there :')
18✔
406
      const { createdAt } = await tx.insertInto('historyEntries')
18✔
407
        .returning('createdAt')
18✔
408
        .values({
18✔
409
          id: next.historyEntry.id,
18✔
410
          userID: next.user.id,
18✔
411
          mediaID: next.media.id,
18✔
412
          artist: next.historyEntry.artist,
18✔
413
          title: next.historyEntry.title,
18✔
414
          start: next.historyEntry.start,
18✔
415
          end: next.historyEntry.end,
18✔
416
          sourceData: sourceData != null ? jsonb(sourceData) : null,
18!
417
        })
18✔
418
        .executeTakeFirstOrThrow();
18✔
419

18✔
420
      if (sourceData != null) {
18!
421
        next.historyEntry.sourceData = sourceData;
×
422
      }
×
423
      next.historyEntry.createdAt = createdAt;
18✔
424

18✔
425
      result = {
18✔
426
        historyEntry: next.historyEntry,
18✔
427
        playlist: next.playlist,
18✔
428
        user: next.user,
18✔
429
        media: next.media,
18✔
430
      };
18✔
431
    } else {
18!
432
      this.#maybeStop();
×
433
    }
×
434

18✔
435
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
18!
436

18✔
437
    if (next) {
18✔
438
      await this.#update(next);
18✔
439
      await playlists.cyclePlaylist(next.playlist, tx);
18✔
440
      this.#play(next.historyEntry);
18✔
441
    } else {
18!
442
      await this.clear();
×
443
    }
×
444

18✔
445
    if (publish !== false) {
18✔
446
      await this.#publishAdvanceComplete(result);
18✔
447
    }
18✔
448

18✔
449
    return result;
18✔
450
  }
18✔
451

149✔
452
  /**
149✔
453
   * @param {AdvanceOptions} [opts]
149✔
454
   */
149✔
455
  advance(opts = {}) {
149✔
456
    const result = this.#locker.using(
18✔
457
      [REDIS_ADVANCING],
18✔
458
      10_000,
18✔
459
      (signal) => this.#advanceLocked({ ...opts, signal }),
18✔
460
    );
18✔
461
    this.#awaitAdvance = result;
18✔
462
    return result;
18✔
463
  }
18✔
464

149✔
465
  /**
149✔
466
   * @param {User} user
149✔
467
   * @param {boolean} remove
149✔
468
   */
149✔
469
  async setRemoveAfterCurrentPlay(user, remove) {
149✔
NEW
470
    const newValue = await this.#uw.db.transaction().execute(async (tx) => {
×
NEW
471
      const currentDJ = /** @type {UserID|undefined} */ (
×
NEW
472
        await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
×
NEW
473
      );
×
NEW
474
      if (currentDJ === user.id) {
×
NEW
475
        if (remove) {
×
NEW
476
          await this.#uw.keyv.set(KEY_REMOVE_AFTER_CURRENT_PLAY, true, tx);
×
NEW
477
          return true;
×
NEW
478
        }
×
NEW
479
        await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
480
        return false;
×
NEW
481
      } else {
×
NEW
482
        throw new Error('You are not currently playing');
×
NEW
483
      }
×
NEW
484
    });
×
NEW
485
    return newValue;
×
UNCOV
486
  }
×
487

149✔
488
  /**
149✔
489
   * @param {User} user
149✔
490
   */
149✔
491
  async getRemoveAfterCurrentPlay(user, tx = this.#uw.db) {
149✔
492
    const currentDJ = /** @type {UserID|undefined} */ (
3✔
493
      await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
3✔
494
    );
3✔
495
    const removeAfterCurrentPlay = /** @type {boolean|undefined} */ (
3✔
496
      await this.#uw.keyv.get(KEY_REMOVE_AFTER_CURRENT_PLAY, tx)
3✔
497
    );
3✔
498

3✔
499
    if (currentDJ === user.id) {
3✔
500
      return removeAfterCurrentPlay != null;
1✔
501
    }
1✔
502
    return null;
2✔
503
  }
3✔
504
}
149✔
505

1✔
506
/**
1✔
507
 * @param {import('../Uwave.js').Boot} uw
1✔
508
 */
1✔
509
async function boothPlugin(uw) {
149✔
510
  uw.booth = new Booth(uw);
149✔
511
  uw.httpApi.use('/booth', routes());
149✔
512

149✔
513
  uw.after(async (err) => {
149✔
514
    if (!err) {
149✔
515
      await uw.booth.onStart();
149✔
516
    }
149✔
517
  });
149✔
518
}
149✔
519

1✔
520
export default boothPlugin;
1✔
521
export { Booth };
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc