• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 23241410419

18 Mar 2026 10:59AM UTC coverage: 71.344% (-16.0%) from 87.348%
23241410419

Pull #756

github

web-flow
Merge 1d9eda950 into acce7f2f9
Pull Request #756: Switch tests to use vitest

697 of 1141 branches covered (61.09%)

Branch coverage included in aggregate %.

2064 of 2729 relevant lines covered (75.63%)

180.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.79
/src/plugins/booth.js
1
import Mutex from 'p-mutex';
2
import { sql } from 'kysely';
3
import { EmptyPlaylistError, PlaylistItemNotFoundError, UserNotInWaitlistError } from '../errors/index.js';
4
import routes from '../routes/booth.js';
5
import { randomUUID } from 'node:crypto';
6
import { fromJson, jsonb, jsonGroupArray } from '../utils/sqlite.js';
7

8
/**
9
 * @typedef {import('../schema.js').UserID} UserID
10
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
11
 * @typedef {import('type-fest').JsonObject} JsonObject
12
 * @typedef {import('../schema.js').User} User
13
 * @typedef {import('../schema.js').Playlist} Playlist
14
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
15
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
16
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
17
 */
18

19
const KEY_HISTORY_ID = 'booth:historyID';
14✔
20
const KEY_CURRENT_DJ_ID = 'booth:currentDJ';
14✔
21
const KEY_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
14✔
22

23
class Booth {
24
  #uw;
25

26
  #logger;
27

28
  /** @type {ReturnType<typeof setTimeout>|null} */
29
  #timeout = null;
180✔
30

31
  #mutex;
32

33
  /** @type {Promise<unknown>|null} */
34
  #awaitAdvance = null;
180✔
35

36
  /**
37
   * @param {import('../Uwave.js').Boot} uw
38
   */
39
  constructor(uw) {
40
    this.#uw = uw;
180✔
41
    this.#mutex = new Mutex();
180✔
42
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
180✔
43
  }
44

45
  /** @internal */
46
  async onStart() {
47
    const current = await this.getCurrentEntry();
180✔
48
    if (current && this.#timeout === null) {
180!
49
      // Restart the advance timer after a server restart, if a track was
50
      // playing before the server restarted.
51
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
52
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
53
      if (endTime > Date.now()) {
×
54
        this.#timeout = setTimeout(
×
55
          () => this.#advanceAutomatically(),
×
56
          endTime - Date.now(),
57
        );
58
      } else {
59
        this.#advanceAutomatically();
×
60
      }
61
    }
62

63
    this.#uw.onClose(async () => {
180✔
64
      this.#onStop();
180✔
65
      await this.#awaitAdvance;
180✔
66
    });
67
  }
68

69
  async #advanceAutomatically() {
70
    try {
×
71
      await this.advance();
×
72
    } catch (error) {
73
      this.#logger.error({ err: error }, 'advance failed');
×
74
    }
75
  }
76

77
  #onStop() {
78
    this.#maybeStop();
180✔
79
  }
80

81
  async getCurrentEntry(tx = this.#uw.db) {
212✔
82
    const entry = await tx.selectFrom('keyval')
212✔
83
      .where('key', '=', KEY_HISTORY_ID)
84
      .innerJoin('historyEntries', (join) => join.on(
212✔
85
        (eb) => sql`${eb.ref('value')}->>'$'`,
212✔
86
        '=',
87
        (eb) => eb.ref('historyEntries.id'),
212✔
88
      ))
89
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
90
      .innerJoin('users', 'historyEntries.userID', 'users.id')
91
      .select([
92
        'historyEntries.id as id',
93
        'media.id as media.id',
94
        'media.sourceID as media.sourceID',
95
        'media.sourceType as media.sourceType',
96
        'media.sourceData as media.sourceData',
97
        'media.artist as media.artist',
98
        'media.title as media.title',
99
        'media.duration as media.duration',
100
        'media.thumbnail as media.thumbnail',
101
        'users.id as users.id',
102
        'users.username as users.username',
103
        'users.avatar as users.avatar',
104
        'users.createdAt as users.createdAt',
105
        'historyEntries.artist',
106
        'historyEntries.title',
107
        'historyEntries.start',
108
        'historyEntries.end',
109
        'historyEntries.createdAt',
110
        (eb) => eb.selectFrom('feedback')
212✔
111
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
112
          .where('vote', '=', 1)
113
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
212✔
114
          .as('upvotes'),
115
        (eb) => eb.selectFrom('feedback')
212✔
116
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
117
          .where('vote', '=', -1)
118
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
212✔
119
          .as('downvotes'),
120
        (eb) => eb.selectFrom('feedback')
212✔
121
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
122
          .where('favorite', '=', 1)
123
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
212✔
124
          .as('favorites'),
125
      ])
126
      .executeTakeFirst();
127

128
    return entry ? {
212✔
129
      media: {
130
        id: entry['media.id'],
131
        artist: entry['media.artist'],
132
        title: entry['media.title'],
133
        duration: entry['media.duration'],
134
        thumbnail: entry['media.thumbnail'],
135
        sourceID: entry['media.sourceID'],
136
        sourceType: entry['media.sourceType'],
137
        sourceData: entry['media.sourceData'] ?? {},
14✔
138
      },
139
      user: {
140
        id: entry['users.id'],
141
        username: entry['users.username'],
142
        avatar: entry['users.avatar'],
143
        createdAt: entry['users.createdAt'],
144
      },
145
      historyEntry: {
146
        id: entry.id,
147
        userID: entry['users.id'],
148
        mediaID: entry['media.id'],
149
        artist: entry.artist,
150
        title: entry.title,
151
        start: entry.start,
152
        end: entry.end,
153
        createdAt: entry.createdAt,
154
      },
155
      upvotes: entry.upvotes != null ? fromJson(entry.upvotes) : [],
7!
156
      downvotes: entry.downvotes != null ? fromJson(entry.downvotes) : [],
7!
157
      favorites: entry.favorites != null ? fromJson(entry.favorites) : [],
7!
158
    } : null;
159
  }
160

161
  /**
162
   * @param {{ remove?: boolean }} options
163
   */
164
  async #getNextDJ(options, tx = this.#uw.db) {
18✔
165
    const waitlist = await this.#uw.waitlist.getUserIDs();
18✔
166
    let userID = waitlist.at(0) ?? null;
18!
167
    if (!userID && !options.remove) {
18!
168
      // If the waitlist is empty, the current DJ will play again immediately.
169
      userID = /** @type {UserID|null} */ (await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx));
×
170
    }
171
    if (!userID) {
18!
172
      return null;
×
173
    }
174

175
    return this.#uw.users.getUser(userID, tx);
18✔
176
  }
177

178
  /**
179
   * @param {{ remove?: boolean }} options
180
   */
181
  async #getNextEntry(options) {
182
    const { playlists } = this.#uw;
18✔
183

184
    const user = await this.#getNextDJ(options);
18✔
185
    if (!user || !user.activePlaylistID) {
18!
186
      return null;
×
187
    }
188
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
18✔
189
    if (playlist.size === 0) {
18!
190
      throw new EmptyPlaylistError();
×
191
    }
192

193
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
18✔
194
    if (!playlistItem) {
18!
195
      throw new PlaylistItemNotFoundError();
×
196
    }
197

198
    return {
18✔
199
      user,
200
      playlist,
201
      playlistItem,
202
      media,
203
      historyEntry: {
204
        id: /** @type {HistoryEntryID} */ (randomUUID()),
205
        userID: user.id,
206
        mediaID: media.id,
207
        artist: playlistItem.artist,
208
        title: playlistItem.title,
209
        start: playlistItem.start,
210
        end: playlistItem.end,
211
        /** @type {null | JsonObject} */
212
        sourceData: null,
213
        createdAt: new Date(),
214
      },
215
    };
216
  }
217

218
  /**
219
   * @param {UserID|null} previous
220
   * @param {{ remove?: boolean }} options
221
   */
222
  async #cycleWaitlist(previous, options) {
223
    await this.#uw.waitlist.cycle(previous, options);
18✔
224
  }
225

226
  async clear(tx = this.#uw.db) {
×
227
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
228
    await this.#uw.keyv.delete(KEY_HISTORY_ID, tx);
×
229
    await this.#uw.keyv.delete(KEY_CURRENT_DJ_ID, tx);
×
230
  }
231

232
  /**
233
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
234
   */
235
  async #update(next, tx = this.#uw.db) {
18✔
236
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
18✔
237
    await this.#uw.keyv.set(KEY_HISTORY_ID, next.historyEntry.id, tx);
18✔
238
    await this.#uw.keyv.set(KEY_CURRENT_DJ_ID, next.user.id, tx);
18✔
239
  }
240

241
  #maybeStop() {
242
    if (this.#timeout) {
198✔
243
      clearTimeout(this.#timeout);
18✔
244
      this.#timeout = null;
18✔
245
    }
246
  }
247

248
  /**
249
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
250
   */
251
  #play(entry) {
252
    this.#maybeStop();
18✔
253
    this.#timeout = setTimeout(
18✔
254
      () => this.#advanceAutomatically(),
×
255
      (entry.end - entry.start) * 1000,
256
    );
257
  }
258

259
  /**
260
   * This method creates a `media` object that clients can understand from a
261
   * history entry object.
262
   *
263
   * We present the playback-specific `sourceData` as if it is
264
   * a property of the media model for backwards compatibility.
265
   * Old clients don't expect `sourceData` directly on a history entry object.
266
   *
267
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
268
   */
269
  getMediaForPlayback(next) {
270
    return {
25✔
271
      artist: next.historyEntry.artist,
272
      title: next.historyEntry.title,
273
      start: next.historyEntry.start,
274
      end: next.historyEntry.end,
275
      media: {
276
        sourceType: next.media.sourceType,
277
        sourceID: next.media.sourceID,
278
        artist: next.media.artist,
279
        title: next.media.title,
280
        duration: next.media.duration,
281
        sourceData: {
282
          ...next.media.sourceData,
283
          ...next.historyEntry.sourceData,
284
        },
285
      },
286
    };
287
  }
288

289
  /**
290
   * @param {{
291
   *   user: User,
292
   *   playlist: Playlist,
293
   *   media: Media,
294
   *   historyEntry: HistoryEntry
295
   * } | null} next
296
   */
297
  async #publishAdvanceComplete(next) {
298
    const { waitlist } = this.#uw;
18✔
299

300
    if (next != null) {
18!
301
      this.#uw.publish('advance:complete', {
18✔
302
        historyID: next.historyEntry.id,
303
        userID: next.user.id,
304
        playlistID: next.playlist.id,
305
        media: this.getMediaForPlayback(next),
306
        playedAt: next.historyEntry.createdAt.getTime(),
307
      });
308
      this.#uw.publish('playlist:cycle', {
18✔
309
        userID: next.user.id,
310
        playlistID: next.playlist.id,
311
      });
312
    } else {
313
      this.#uw.publish('advance:complete', null);
×
314
    }
315
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
18✔
316
  }
317

318
  /**
319
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
320
   */
321
  async #getSourceDataForPlayback(entry) {
322
    const { sourceID, sourceType } = entry.media;
18✔
323
    const source = this.#uw.source(sourceType);
18✔
324
    if (source) {
18!
325
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
18✔
326
      /** @type {JsonObject | undefined} */
327
      let sourceData;
328
      try {
18✔
329
        sourceData = await source.play(entry.user, entry.media);
18✔
330
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
18✔
331
      } catch (error) {
332
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
333
      }
334
      return sourceData;
18✔
335
    }
336

337
    return undefined;
×
338
  }
339

340
  /**
341
   * @typedef {object} AdvanceOptions
342
   * @prop {boolean} [remove]
343
   * @prop {boolean} [publish]
344
   * @prop {AbortSignal} [signal]
345
   * @param {AdvanceOptions} [opts]
346
   * @returns {Promise<{
347
   *   historyEntry: HistoryEntry,
348
   *   user: User,
349
   *   media: Media,
350
   *   playlist: Playlist,
351
   * }|null>}
352
   */
353
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
36✔
354
    const { playlists } = this.#uw;
18✔
355

356
    const publish = opts.publish ?? true;
18✔
357
    const removeAfterCurrent = (await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY)) === true;
18✔
358
    const remove = opts.remove || removeAfterCurrent || (
18✔
359
      !await this.#uw.waitlist.isCycleEnabled()
360
    );
361

362
    const previous = await this.getCurrentEntry(tx);
18✔
363
    let next;
364
    try {
18✔
365
      next = await this.#getNextEntry({ remove });
18✔
366
    } catch (err) {
367
      // If the next user's playlist was empty, remove them from the waitlist
368
      // and try advancing again.
369
      if (err instanceof EmptyPlaylistError) {
×
370
        this.#logger.info('user has empty playlist, skipping on to the next');
×
371
        const previousDJ = previous != null ? previous.historyEntry.userID : null;
×
372
        await this.#cycleWaitlist(previousDJ, { remove });
×
373
        return this.#advanceLocked({ publish, remove: true }, tx);
×
374
      }
375
      throw err;
×
376
    }
377

378
    if (opts.signal?.aborted) {
18!
379
      throw opts.signal.reason;
×
380
    }
381

382
    if (previous) {
18!
383
      this.#logger.info({
×
384
        id: previous.historyEntry.id,
385
        artist: previous.media.artist,
386
        title: previous.media.title,
387
        upvotes: previous.upvotes.length,
388
        favorites: previous.favorites.length,
389
        downvotes: previous.downvotes.length,
390
      }, 'previous track stats');
391
    }
392

393
    let result = null;
18✔
394
    if (next != null) {
18!
395
      this.#logger.info({
18✔
396
        id: next.playlistItem.id,
397
        artist: next.playlistItem.artist,
398
        title: next.playlistItem.title,
399
      }, 'next track');
400
      const sourceData = await this.#getSourceDataForPlayback(next);
18✔
401

402
      // Conservatively, we should take *all* the data from the inserted values.
403
      // But then we need to reparse the source data... It's easier to only take
404
      // the actually generated value from there :')
405
      const { createdAt } = await tx.insertInto('historyEntries')
18✔
406
        .returning('createdAt')
407
        .values({
408
          id: next.historyEntry.id,
409
          userID: next.user.id,
410
          mediaID: next.media.id,
411
          artist: next.historyEntry.artist,
412
          title: next.historyEntry.title,
413
          start: next.historyEntry.start,
414
          end: next.historyEntry.end,
415
          sourceData: sourceData != null ? jsonb(sourceData) : null,
18!
416
        })
417
        .executeTakeFirstOrThrow();
418

419
      if (sourceData != null) {
18!
420
        next.historyEntry.sourceData = sourceData;
×
421
      }
422
      next.historyEntry.createdAt = createdAt;
18✔
423

424
      result = {
18✔
425
        historyEntry: next.historyEntry,
426
        playlist: next.playlist,
427
        user: next.user,
428
        media: next.media,
429
      };
430
    } else {
431
      this.#maybeStop();
×
432
    }
433

434
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
18!
435

436
    if (next) {
18!
437
      await this.#update(next);
18✔
438
      await playlists.cyclePlaylist(next.playlist, tx);
18✔
439
      this.#play(next.historyEntry);
18✔
440
    } else {
441
      await this.clear();
×
442
    }
443

444
    if (publish !== false) {
18!
445
      await this.#publishAdvanceComplete(result);
18✔
446
    }
447

448
    return result;
18✔
449
  }
450

451
  /**
452
   * @param {AdvanceOptions} [opts]
453
   */
454
  advance(opts = {}) {
18✔
455
    const result = this.#mutex.withLock(() => {
18✔
456
      const signal = AbortSignal.timeout(10_000);
18✔
457
      return this.#advanceLocked({ ...opts, signal });
18✔
458
    });
459
    this.#awaitAdvance = result;
18✔
460
    return result;
18✔
461
  }
462

463
  /**
464
   * @param {User} user
465
   * @param {boolean} remove
466
   */
467
  async setRemoveAfterCurrentPlay(user, remove) {
468
    const newValue = await this.#uw.db.transaction().execute(async (tx) => {
×
469
      const currentDJ = /** @type {UserID|undefined} */ (
470
        await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
×
471
      );
472
      if (currentDJ === user.id) {
×
473
        if (remove) {
×
474
          await this.#uw.keyv.set(KEY_REMOVE_AFTER_CURRENT_PLAY, true, tx);
×
475
          return true;
×
476
        }
477
        await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
478
        return false;
×
479
      } else {
480
        throw new Error('You are not currently playing');
×
481
      }
482
    });
483
    return newValue;
×
484
  }
485

486
  /**
487
   * @param {User} user
488
   */
489
  async getRemoveAfterCurrentPlay(user, tx = this.#uw.db) {
5✔
490
    const currentDJ = /** @type {UserID|undefined} */ (
491
      await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
5✔
492
    );
493
    const removeAfterCurrentPlay = /** @type {boolean|undefined} */ (
494
      await this.#uw.keyv.get(KEY_REMOVE_AFTER_CURRENT_PLAY, tx)
5✔
495
    );
496

497
    if (currentDJ === user.id) {
5✔
498
      return removeAfterCurrentPlay != null;
1✔
499
    }
500
    return null;
4✔
501
  }
502

503
  /**
504
   * Remove the given user from the booth. Throw an error if the user is not playing.
505
   *
506
   * @param {UserID} userID
507
   */
508
  async removeUser(userID) {
509
    const currentDJ = await this.#uw.keyv.get(KEY_CURRENT_DJ_ID);
1✔
510
    if (userID !== currentDJ) {
1!
511
      throw new UserNotInWaitlistError({ id: userID });
1✔
512
    }
513
    await this.advance({ remove: true });
×
514
  }
515
}
516

517
/**
518
 * @param {import('../Uwave.js').Boot} uw
519
 */
520
async function boothPlugin(uw) {
521
  uw.booth = new Booth(uw);
180✔
522
  uw.httpApi.use('/booth', routes());
180✔
523

524
  uw.after(async (err) => {
180✔
525
    if (!err) {
180!
526
      await uw.booth.onStart();
180✔
527
    }
528
  });
529
}
530

531
export default boothPlugin;
532
export { Booth };
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc