• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 19337142747

13 Nov 2025 03:43PM UTC coverage: 85.319%. First build
19337142747

Pull #657

github

web-flow
Merge 2ae249b03 into cc369fe03
Pull Request #657: Store waitlist and booth state in SQLite instead of Redis

954 of 1135 branches covered (84.05%)

Branch coverage included in aggregate %.

194 of 230 new or added lines in 7 files covered. (84.35%)

10047 of 11759 relevant lines covered (85.44%)

192.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.55
/src/plugins/booth.js
1
import RedLock from 'redlock';
2✔
2
import { sql } from 'kysely';
2✔
3
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
2✔
4
import routes from '../routes/booth.js';
2✔
5
import { randomUUID } from 'node:crypto';
2✔
6
import { fromJson, jsonb, jsonGroupArray } from '../utils/sqlite.js';
2✔
7

2✔
8
/**
2✔
9
 * @typedef {import('../schema.js').UserID} UserID
2✔
10
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
2✔
11
 * @typedef {import('type-fest').JsonObject} JsonObject
2✔
12
 * @typedef {import('../schema.js').User} User
2✔
13
 * @typedef {import('../schema.js').Playlist} Playlist
2✔
14
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
2✔
15
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
2✔
16
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
2✔
17
 */
2✔
18

2✔
19
const REDIS_ADVANCING = 'booth:advancing';
2✔
20
const KEY_HISTORY_ID = 'booth:historyID';
2✔
21
const KEY_CURRENT_DJ_ID = 'booth:currentDJ';
2✔
22
const KEY_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
2✔
23

2✔
24
class Booth {
2✔
25
  #uw;
298✔
26

298✔
27
  #logger;
298✔
28

298✔
29
  /** @type {ReturnType<typeof setTimeout>|null} */
298✔
30
  #timeout = null;
298✔
31

298✔
32
  #locker;
298✔
33

298✔
34
  /** @type {Promise<unknown>|null} */
298✔
35
  #awaitAdvance = null;
298✔
36

298✔
37
  /**
298✔
38
   * @param {import('../Uwave.js').Boot} uw
298✔
39
   */
298✔
40
  constructor(uw) {
298✔
41
    this.#uw = uw;
298✔
42
    this.#locker = new RedLock([this.#uw.redis]);
298✔
43
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
298✔
44
  }
298✔
45

298✔
46
  /** @internal */
298✔
47
  async onStart() {
298✔
48
    const current = await this.getCurrentEntry();
298✔
49
    if (current && this.#timeout === null) {
298!
50
      // Restart the advance timer after a server restart, if a track was
×
51
      // playing before the server restarted.
×
52
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
53
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
54
      if (endTime > Date.now()) {
×
55
        this.#timeout = setTimeout(
×
56
          () => this.#advanceAutomatically(),
×
57
          endTime - Date.now(),
×
58
        );
×
59
      } else {
×
60
        this.#advanceAutomatically();
×
61
      }
×
62
    }
×
63

298✔
64
    this.#uw.onClose(async () => {
298✔
65
      this.#onStop();
298✔
66
      await this.#awaitAdvance;
298✔
67
    });
298✔
68
  }
298✔
69

298✔
70
  async #advanceAutomatically() {
298✔
71
    try {
×
72
      await this.advance();
×
73
    } catch (error) {
×
74
      this.#logger.error({ err: error }, 'advance failed');
×
75
    }
×
76
  }
×
77

298✔
78
  #onStop() {
298✔
79
    this.#maybeStop();
298✔
80
  }
298✔
81

298✔
82
  async getCurrentEntry(tx = this.#uw.db) {
298✔
83
    const entry = await tx.selectFrom('keyval')
358✔
84
      .where('key', '=', KEY_HISTORY_ID)
358✔
85
      .innerJoin('historyEntries', (join) => join.on(
358✔
86
        (eb) => sql`${eb.ref('value')}->>'$'`,
358✔
87
        '=',
358✔
88
        (eb) => eb.ref('historyEntries.id'),
358✔
89
      ))
358✔
90
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
358✔
91
      .innerJoin('users', 'historyEntries.userID', 'users.id')
358✔
92
      .select([
358✔
93
        'historyEntries.id as id',
358✔
94
        'media.id as media.id',
358✔
95
        'media.sourceID as media.sourceID',
358✔
96
        'media.sourceType as media.sourceType',
358✔
97
        'media.sourceData as media.sourceData',
358✔
98
        'media.artist as media.artist',
358✔
99
        'media.title as media.title',
358✔
100
        'media.duration as media.duration',
358✔
101
        'media.thumbnail as media.thumbnail',
358✔
102
        'users.id as users.id',
358✔
103
        'users.username as users.username',
358✔
104
        'users.avatar as users.avatar',
358✔
105
        'users.createdAt as users.createdAt',
358✔
106
        'historyEntries.artist',
358✔
107
        'historyEntries.title',
358✔
108
        'historyEntries.start',
358✔
109
        'historyEntries.end',
358✔
110
        'historyEntries.createdAt',
358✔
111
        (eb) => eb.selectFrom('feedback')
358✔
112
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
358✔
113
          .where('vote', '=', 1)
358✔
114
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
358✔
115
          .as('upvotes'),
358✔
116
        (eb) => eb.selectFrom('feedback')
358✔
117
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
358✔
118
          .where('vote', '=', -1)
358✔
119
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
358✔
120
          .as('downvotes'),
358✔
121
        (eb) => eb.selectFrom('feedback')
358✔
122
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
358✔
123
          .where('favorite', '=', 1)
358✔
124
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
358✔
125
          .as('favorites'),
358✔
126
      ])
358✔
127
      .executeTakeFirst();
358✔
128

358✔
129
    return entry ? {
358✔
130
      media: {
14✔
131
        id: entry['media.id'],
14✔
132
        artist: entry['media.artist'],
14✔
133
        title: entry['media.title'],
14✔
134
        duration: entry['media.duration'],
14✔
135
        thumbnail: entry['media.thumbnail'],
14✔
136
        sourceID: entry['media.sourceID'],
14✔
137
        sourceType: entry['media.sourceType'],
14✔
138
        sourceData: entry['media.sourceData'] ?? {},
14✔
139
      },
14✔
140
      user: {
14✔
141
        id: entry['users.id'],
14✔
142
        username: entry['users.username'],
14✔
143
        avatar: entry['users.avatar'],
14✔
144
        createdAt: entry['users.createdAt'],
14✔
145
      },
14✔
146
      historyEntry: {
14✔
147
        id: entry.id,
14✔
148
        userID: entry['users.id'],
14✔
149
        mediaID: entry['media.id'],
14✔
150
        artist: entry.artist,
14✔
151
        title: entry.title,
14✔
152
        start: entry.start,
14✔
153
        end: entry.end,
14✔
154
        createdAt: entry.createdAt,
14✔
155
      },
14✔
156
      upvotes: entry.upvotes != null ? fromJson(entry.upvotes) : [],
14!
157
      downvotes: entry.downvotes != null ? fromJson(entry.downvotes) : [],
14!
158
      favorites: entry.favorites != null ? fromJson(entry.favorites) : [],
14!
159
    } : null;
358✔
160
  }
358✔
161

298✔
162
  /**
298✔
163
   * @param {{ remove?: boolean }} options
298✔
164
   */
298✔
165
  async #getNextDJ(options, tx = this.#uw.db) {
298✔
166
    const waitlist = await this.#uw.waitlist.getUserIDs();
36✔
167
    let userID = waitlist.at(0) ?? null;
36!
168
    if (!userID && !options.remove) {
36!
169
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
170
      userID = /** @type {UserID|null} */ (await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx));
×
171
    }
×
172
    if (!userID) {
36!
173
      return null;
×
174
    }
×
175

36✔
176
    return this.#uw.users.getUser(userID, tx);
36✔
177
  }
36✔
178

298✔
179
  /**
298✔
180
   * @param {{ remove?: boolean }} options
298✔
181
   */
298✔
182
  async #getNextEntry(options) {
298✔
183
    const { playlists } = this.#uw;
36✔
184

36✔
185
    const user = await this.#getNextDJ(options);
36✔
186
    if (!user || !user.activePlaylistID) {
36!
187
      return null;
×
188
    }
×
189
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
36✔
190
    if (playlist.size === 0) {
36!
191
      throw new EmptyPlaylistError();
×
192
    }
×
193

36✔
194
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
36✔
195
    if (!playlistItem) {
36!
196
      throw new PlaylistItemNotFoundError();
×
197
    }
×
198

36✔
199
    return {
36✔
200
      user,
36✔
201
      playlist,
36✔
202
      playlistItem,
36✔
203
      media,
36✔
204
      historyEntry: {
36✔
205
        id: /** @type {HistoryEntryID} */ (randomUUID()),
36✔
206
        userID: user.id,
36✔
207
        mediaID: media.id,
36✔
208
        artist: playlistItem.artist,
36✔
209
        title: playlistItem.title,
36✔
210
        start: playlistItem.start,
36✔
211
        end: playlistItem.end,
36✔
212
        /** @type {null | JsonObject} */
36✔
213
        sourceData: null,
36✔
214
        createdAt: new Date(),
36✔
215
      },
36✔
216
    };
36✔
217
  }
36✔
218

298✔
219
  /**
298✔
220
   * @param {UserID|null} previous
298✔
221
   * @param {{ remove?: boolean }} options
298✔
222
   */
298✔
223
  async #cycleWaitlist(previous, options) {
298✔
224
    await this.#uw.waitlist.cycle(previous, options);
36✔
225
  }
36✔
226

298✔
227
  async clear(tx = this.#uw.db) {
298✔
NEW
228
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
229
    await this.#uw.keyv.delete(KEY_HISTORY_ID, tx);
×
NEW
230
    await this.#uw.keyv.delete(KEY_CURRENT_DJ_ID, tx);
×
231
  }
×
232

298✔
233
  /**
298✔
234
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
298✔
235
   */
298✔
236
  async #update(next, tx = this.#uw.db) {
298✔
237
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
36✔
238
    await this.#uw.keyv.set(KEY_HISTORY_ID, next.historyEntry.id, tx);
36✔
239
    await this.#uw.keyv.set(KEY_CURRENT_DJ_ID, next.user.id, tx);
36✔
240
  }
36✔
241

298✔
242
  #maybeStop() {
298✔
243
    if (this.#timeout) {
334✔
244
      clearTimeout(this.#timeout);
36✔
245
      this.#timeout = null;
36✔
246
    }
36✔
247
  }
334✔
248

298✔
249
  /**
298✔
250
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
298✔
251
   */
298✔
252
  #play(entry) {
298✔
253
    this.#maybeStop();
36✔
254
    this.#timeout = setTimeout(
36✔
255
      () => this.#advanceAutomatically(),
36✔
256
      (entry.end - entry.start) * 1000,
36✔
257
    );
36✔
258
  }
36✔
259

298✔
260
  /**
298✔
261
   * This method creates a `media` object that clients can understand from a
298✔
262
   * history entry object.
298✔
263
   *
298✔
264
   * We present the playback-specific `sourceData` as if it is
298✔
265
   * a property of the media model for backwards compatibility.
298✔
266
   * Old clients don't expect `sourceData` directly on a history entry object.
298✔
267
   *
298✔
268
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
298✔
269
   */
298✔
270
  getMediaForPlayback(next) {
298✔
271
    return {
50✔
272
      artist: next.historyEntry.artist,
50✔
273
      title: next.historyEntry.title,
50✔
274
      start: next.historyEntry.start,
50✔
275
      end: next.historyEntry.end,
50✔
276
      media: {
50✔
277
        sourceType: next.media.sourceType,
50✔
278
        sourceID: next.media.sourceID,
50✔
279
        artist: next.media.artist,
50✔
280
        title: next.media.title,
50✔
281
        duration: next.media.duration,
50✔
282
        sourceData: {
50✔
283
          ...next.media.sourceData,
50✔
284
          ...next.historyEntry.sourceData,
50✔
285
        },
50✔
286
      },
50✔
287
    };
50✔
288
  }
50✔
289

298✔
290
  /**
298✔
291
   * @param {{
298✔
292
   *   user: User,
298✔
293
   *   playlist: Playlist,
298✔
294
   *   media: Media,
298✔
295
   *   historyEntry: HistoryEntry
298✔
296
   * } | null} next
298✔
297
   */
298✔
298
  async #publishAdvanceComplete(next) {
298✔
299
    const { waitlist } = this.#uw;
36✔
300

36✔
301
    if (next != null) {
36✔
302
      this.#uw.publish('advance:complete', {
36✔
303
        historyID: next.historyEntry.id,
36✔
304
        userID: next.user.id,
36✔
305
        playlistID: next.playlist.id,
36✔
306
        media: this.getMediaForPlayback(next),
36✔
307
        playedAt: next.historyEntry.createdAt.getTime(),
36✔
308
      });
36✔
309
      this.#uw.publish('playlist:cycle', {
36✔
310
        userID: next.user.id,
36✔
311
        playlistID: next.playlist.id,
36✔
312
      });
36✔
313
    } else {
36!
314
      this.#uw.publish('advance:complete', null);
×
315
    }
×
316
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
36✔
317
  }
36✔
318

298✔
319
  /**
298✔
320
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
298✔
321
   */
298✔
322
  async #getSourceDataForPlayback(entry) {
298✔
323
    const { sourceID, sourceType } = entry.media;
36✔
324
    const source = this.#uw.source(sourceType);
36✔
325
    if (source) {
36✔
326
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
36✔
327
      /** @type {JsonObject | undefined} */
36✔
328
      let sourceData;
36✔
329
      try {
36✔
330
        sourceData = await source.play(entry.user, entry.media);
36✔
331
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
36✔
332
      } catch (error) {
36!
333
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
334
      }
×
335
      return sourceData;
36✔
336
    }
36✔
337

×
338
    return undefined;
×
339
  }
36✔
340

298✔
341
  /**
298✔
342
   * @typedef {object} AdvanceOptions
298✔
343
   * @prop {boolean} [remove]
298✔
344
   * @prop {boolean} [publish]
298✔
345
   * @prop {import('redlock').RedlockAbortSignal} [signal]
298✔
346
   * @param {AdvanceOptions} [opts]
298✔
347
   * @returns {Promise<{
298✔
348
   *   historyEntry: HistoryEntry,
298✔
349
   *   user: User,
298✔
350
   *   media: Media,
298✔
351
   *   playlist: Playlist,
298✔
352
   * }|null>}
298✔
353
   */
298✔
354
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
298✔
355
    const { playlists } = this.#uw;
36✔
356

36✔
357
    const publish = opts.publish ?? true;
36✔
358
    const removeAfterCurrent = (await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY)) === true;
36✔
359
    const remove = opts.remove || removeAfterCurrent || (
36✔
360
      !await this.#uw.waitlist.isCycleEnabled()
36✔
361
    );
36✔
362

36✔
363
    const previous = await this.getCurrentEntry(tx);
36✔
364
    let next;
36✔
365
    try {
36✔
366
      next = await this.#getNextEntry({ remove });
36✔
367
    } catch (err) {
36!
368
      // If the next user's playlist was empty, remove them from the waitlist
×
369
      // and try advancing again.
×
370
      if (err instanceof EmptyPlaylistError) {
×
371
        this.#logger.info('user has empty playlist, skipping on to the next');
×
372
        const previousDJ = previous != null ? previous.historyEntry.userID : null;
×
373
        await this.#cycleWaitlist(previousDJ, { remove });
×
374
        return this.#advanceLocked({ publish, remove: true }, tx);
×
375
      }
×
376
      throw err;
×
377
    }
×
378

36✔
379
    if (opts.signal?.aborted) {
36!
380
      throw opts.signal.error;
×
381
    }
×
382

36✔
383
    if (previous) {
36!
384
      this.#logger.info({
×
385
        id: previous.historyEntry.id,
×
386
        artist: previous.media.artist,
×
387
        title: previous.media.title,
×
388
        upvotes: previous.upvotes.length,
×
389
        favorites: previous.favorites.length,
×
390
        downvotes: previous.downvotes.length,
×
391
      }, 'previous track stats');
×
392
    }
×
393

36✔
394
    let result = null;
36✔
395
    if (next != null) {
36✔
396
      this.#logger.info({
36✔
397
        id: next.playlistItem.id,
36✔
398
        artist: next.playlistItem.artist,
36✔
399
        title: next.playlistItem.title,
36✔
400
      }, 'next track');
36✔
401
      const sourceData = await this.#getSourceDataForPlayback(next);
36✔
402

36✔
403
      // Conservatively, we should take *all* the data from the inserted values.
36✔
404
      // But then we need to reparse the source data... It's easier to only take
36✔
405
      // the actually generated value from there :')
36✔
406
      const { createdAt } = await tx.insertInto('historyEntries')
36✔
407
        .returning('createdAt')
36✔
408
        .values({
36✔
409
          id: next.historyEntry.id,
36✔
410
          userID: next.user.id,
36✔
411
          mediaID: next.media.id,
36✔
412
          artist: next.historyEntry.artist,
36✔
413
          title: next.historyEntry.title,
36✔
414
          start: next.historyEntry.start,
36✔
415
          end: next.historyEntry.end,
36✔
416
          sourceData: sourceData != null ? jsonb(sourceData) : null,
36!
417
        })
36✔
418
        .executeTakeFirstOrThrow();
36✔
419

36✔
420
      if (sourceData != null) {
36!
421
        next.historyEntry.sourceData = sourceData;
×
422
      }
×
423
      next.historyEntry.createdAt = createdAt;
36✔
424

36✔
425
      result = {
36✔
426
        historyEntry: next.historyEntry,
36✔
427
        playlist: next.playlist,
36✔
428
        user: next.user,
36✔
429
        media: next.media,
36✔
430
      };
36✔
431
    } else {
36!
432
      this.#maybeStop();
×
433
    }
×
434

36✔
435
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
36!
436

36✔
437
    if (next) {
36✔
438
      await this.#update(next);
36✔
439
      await playlists.cyclePlaylist(next.playlist, tx);
36✔
440
      this.#play(next.historyEntry);
36✔
441
    } else {
36!
442
      await this.clear();
×
443
    }
×
444

36✔
445
    if (publish !== false) {
36✔
446
      await this.#publishAdvanceComplete(result);
36✔
447
    }
36✔
448

36✔
449
    return result;
36✔
450
  }
36✔
451

298✔
452
  /**
298✔
453
   * @param {AdvanceOptions} [opts]
298✔
454
   */
298✔
455
  advance(opts = {}) {
298✔
456
    const result = this.#locker.using(
36✔
457
      [REDIS_ADVANCING],
36✔
458
      10_000,
36✔
459
      (signal) => this.#advanceLocked({ ...opts, signal }),
36✔
460
    );
36✔
461
    this.#awaitAdvance = result;
36✔
462
    return result;
36✔
463
  }
36✔
464

298✔
465
  /**
298✔
466
   * @param {User} user
298✔
467
   * @param {boolean} remove
298✔
468
   */
298✔
469
  async setRemoveAfterCurrentPlay(user, remove) {
298✔
NEW
470
    const newValue = await this.#uw.db.transaction().execute(async (tx) => {
×
NEW
471
      const currentDJ = /** @type {UserID|undefined} */ (
×
NEW
472
        await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
×
NEW
473
      );
×
NEW
474
      if (currentDJ === user.id) {
×
NEW
475
        if (remove) {
×
NEW
476
          await this.#uw.keyv.set(KEY_REMOVE_AFTER_CURRENT_PLAY, true, tx);
×
NEW
477
          return true;
×
NEW
478
        }
×
NEW
479
        await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
480
        return false;
×
NEW
481
      } else {
×
NEW
482
        throw new Error('You are not currently playing');
×
NEW
483
      }
×
NEW
484
    });
×
NEW
485
    return newValue;
×
486
  }
×
487

298✔
488
  /**
298✔
489
   * @param {User} user
298✔
490
   */
298✔
491
  async getRemoveAfterCurrentPlay(user, tx = this.#uw.db) {
298✔
492
    const currentDJ = /** @type {UserID|undefined} */ (
6✔
493
      await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
6✔
494
    );
6✔
495
    const removeAfterCurrentPlay = /** @type {boolean|undefined} */ (
6✔
496
      await this.#uw.keyv.get(KEY_REMOVE_AFTER_CURRENT_PLAY, tx)
6✔
497
    );
6✔
498

6✔
499
    if (currentDJ === user.id) {
6✔
500
      return removeAfterCurrentPlay != null;
2✔
501
    }
2✔
502
    return null;
4✔
503
  }
6✔
504
}
298✔
505

2✔
506
/**
2✔
507
 * @param {import('../Uwave.js').Boot} uw
2✔
508
 */
2✔
509
async function boothPlugin(uw) {
298✔
510
  uw.booth = new Booth(uw);
298✔
511
  uw.httpApi.use('/booth', routes());
298✔
512

298✔
513
  uw.after(async (err) => {
298✔
514
    if (!err) {
298✔
515
      await uw.booth.onStart();
298✔
516
    }
298✔
517
  });
298✔
518
}
298✔
519

2✔
520
export default boothPlugin;
2✔
521
export { Booth };
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc