• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 19327295348

13 Nov 2025 09:46AM UTC coverage: 85.017%. First build
19327295348

Pull #657

github

web-flow
Merge 5513d20bf into cc369fe03
Pull Request #657: SQLite Key-value store to replace Redis

949 of 1126 branches covered (84.28%)

Branch coverage included in aggregate %.

156 of 206 new or added lines in 7 files covered. (75.73%)

9985 of 11735 relevant lines covered (85.09%)

92.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.55
/src/plugins/booth.js
1
import RedLock from 'redlock';
1✔
2
import { sql } from 'kysely';
1✔
3
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
1✔
4
import routes from '../routes/booth.js';
1✔
5
import { randomUUID } from 'node:crypto';
1✔
6
import { fromJson, jsonb, jsonGroupArray } from '../utils/sqlite.js';
1✔
7

1✔
8
/**
1✔
9
 * @typedef {import('../schema.js').UserID} UserID
1✔
10
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
1✔
11
 * @typedef {import('type-fest').JsonObject} JsonObject
1✔
12
 * @typedef {import('../schema.js').User} User
1✔
13
 * @typedef {import('../schema.js').Playlist} Playlist
1✔
14
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
1✔
15
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
1✔
16
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
1✔
17
 */
1✔
18

1✔
19
const REDIS_ADVANCING = 'booth:advancing';
1✔
20
const KEY_HISTORY_ID = 'booth:historyID';
1✔
21
const KEY_CURRENT_DJ_ID = 'booth:currentDJ';
1✔
22
const KEY_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
1✔
23

1✔
24
class Booth {
1✔
25
  #uw;
144✔
26

144✔
27
  #logger;
144✔
28

144✔
29
  /** @type {ReturnType<typeof setTimeout>|null} */
144✔
30
  #timeout = null;
144✔
31

144✔
32
  #locker;
144✔
33

144✔
34
  /** @type {Promise<unknown>|null} */
144✔
35
  #awaitAdvance = null;
144✔
36

144✔
37
  /**
144✔
38
   * @param {import('../Uwave.js').Boot} uw
144✔
39
   */
144✔
40
  constructor(uw) {
144✔
41
    this.#uw = uw;
144✔
42
    this.#locker = new RedLock([this.#uw.redis]);
144✔
43
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
144✔
44
  }
144✔
45

144✔
46
  /** @internal */
144✔
47
  async onStart() {
144✔
48
    const current = await this.getCurrentEntry();
144✔
49
    if (current && this.#timeout === null) {
144!
50
      // Restart the advance timer after a server restart, if a track was
×
51
      // playing before the server restarted.
×
52
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
53
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
54
      if (endTime > Date.now()) {
×
55
        this.#timeout = setTimeout(
×
56
          () => this.#advanceAutomatically(),
×
57
          endTime - Date.now(),
×
58
        );
×
59
      } else {
×
60
        this.#advanceAutomatically();
×
61
      }
×
62
    }
×
63

144✔
64
    this.#uw.onClose(async () => {
144✔
65
      this.#onStop();
144✔
66
      await this.#awaitAdvance;
144✔
67
    });
144✔
68
  }
144✔
69

144✔
70
  async #advanceAutomatically() {
144✔
71
    try {
×
72
      await this.advance();
×
73
    } catch (error) {
×
74
      this.#logger.error({ err: error }, 'advance failed');
×
75
    }
×
76
  }
×
77

144✔
78
  #onStop() {
144✔
79
    this.#maybeStop();
144✔
80
  }
144✔
81

144✔
82
  async getCurrentEntry(tx = this.#uw.db) {
144✔
83
    const entry = await tx.selectFrom('keyval')
171✔
84
      .where('key', '=', KEY_HISTORY_ID)
171✔
85
      .innerJoin('historyEntries', (join) => join.on(
171✔
86
        (eb) => sql`${eb.ref('value')}->>'$'`,
171✔
87
        '=',
171✔
88
        (eb) => eb.ref('historyEntries.id'),
171✔
89
      ))
171✔
90
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
171✔
91
      .innerJoin('users', 'historyEntries.userID', 'users.id')
171✔
92
      .select([
171✔
93
        'historyEntries.id as id',
171✔
94
        'media.id as media.id',
171✔
95
        'media.sourceID as media.sourceID',
171✔
96
        'media.sourceType as media.sourceType',
171✔
97
        'media.sourceData as media.sourceData',
171✔
98
        'media.artist as media.artist',
171✔
99
        'media.title as media.title',
171✔
100
        'media.duration as media.duration',
171✔
101
        'media.thumbnail as media.thumbnail',
171✔
102
        'users.id as users.id',
171✔
103
        'users.username as users.username',
171✔
104
        'users.avatar as users.avatar',
171✔
105
        'users.createdAt as users.createdAt',
171✔
106
        'historyEntries.artist',
171✔
107
        'historyEntries.title',
171✔
108
        'historyEntries.start',
171✔
109
        'historyEntries.end',
171✔
110
        'historyEntries.createdAt',
171✔
111
        (eb) => eb.selectFrom('feedback')
171✔
112
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
171✔
113
          .where('vote', '=', 1)
171✔
114
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
171✔
115
          .as('upvotes'),
171✔
116
        (eb) => eb.selectFrom('feedback')
171✔
117
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
171✔
118
          .where('vote', '=', -1)
171✔
119
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
171✔
120
          .as('downvotes'),
171✔
121
        (eb) => eb.selectFrom('feedback')
171✔
122
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
171✔
123
          .where('favorite', '=', 1)
171✔
124
          .select((eb) => jsonGroupArray(eb.ref('userID')).as('userIDs'))
171✔
125
          .as('favorites'),
171✔
126
      ])
171✔
127
      .executeTakeFirst();
171✔
128

171✔
129
    return entry ? {
171✔
130
      media: {
7✔
131
        id: entry['media.id'],
7✔
132
        artist: entry['media.artist'],
7✔
133
        title: entry['media.title'],
7✔
134
        duration: entry['media.duration'],
7✔
135
        thumbnail: entry['media.thumbnail'],
7✔
136
        sourceID: entry['media.sourceID'],
7✔
137
        sourceType: entry['media.sourceType'],
7✔
138
        sourceData: entry['media.sourceData'] ?? {},
7✔
139
      },
7✔
140
      user: {
7✔
141
        id: entry['users.id'],
7✔
142
        username: entry['users.username'],
7✔
143
        avatar: entry['users.avatar'],
7✔
144
        createdAt: entry['users.createdAt'],
7✔
145
      },
7✔
146
      historyEntry: {
7✔
147
        id: entry.id,
7✔
148
        userID: entry['users.id'],
7✔
149
        mediaID: entry['media.id'],
7✔
150
        artist: entry.artist,
7✔
151
        title: entry.title,
7✔
152
        start: entry.start,
7✔
153
        end: entry.end,
7✔
154
        createdAt: entry.createdAt,
7✔
155
      },
7✔
156
      upvotes: entry.upvotes != null ? fromJson(entry.upvotes) : [],
7!
157
      downvotes: entry.downvotes != null ? fromJson(entry.downvotes) : [],
7!
158
      favorites: entry.favorites != null ? fromJson(entry.favorites) : [],
7!
159
    } : null;
171✔
160
  }
171✔
161

144✔
162
  /**
144✔
163
   * @param {{ remove?: boolean }} options
144✔
164
   */
144✔
165
  async #getNextDJ(options, tx = this.#uw.db) {
144✔
166
    const waitlist = await this.#uw.waitlist.getUserIDs();
15✔
167
    let userID = waitlist.at(0) ?? null;
15!
168
    if (!userID && !options.remove) {
15!
169
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
170
      userID = /** @type {UserID|null} */ (await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx));
×
171
    }
×
172
    if (!userID) {
15!
173
      return null;
×
174
    }
×
175

15✔
176
    return this.#uw.users.getUser(userID, tx);
15✔
177
  }
15✔
178

144✔
179
  /**
144✔
180
   * @param {{ remove?: boolean }} options
144✔
181
   */
144✔
182
  async #getNextEntry(options) {
144✔
183
    const { playlists } = this.#uw;
15✔
184

15✔
185
    const user = await this.#getNextDJ(options);
15✔
186
    if (!user || !user.activePlaylistID) {
15!
187
      return null;
×
188
    }
×
189
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
15✔
190
    if (playlist.size === 0) {
15!
191
      throw new EmptyPlaylistError();
×
192
    }
×
193

15✔
194
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
15✔
195
    if (!playlistItem) {
15!
196
      throw new PlaylistItemNotFoundError();
×
197
    }
×
198

15✔
199
    return {
15✔
200
      user,
15✔
201
      playlist,
15✔
202
      playlistItem,
15✔
203
      media,
15✔
204
      historyEntry: {
15✔
205
        id: /** @type {HistoryEntryID} */ (randomUUID()),
15✔
206
        userID: user.id,
15✔
207
        mediaID: media.id,
15✔
208
        artist: playlistItem.artist,
15✔
209
        title: playlistItem.title,
15✔
210
        start: playlistItem.start,
15✔
211
        end: playlistItem.end,
15✔
212
        /** @type {null | JsonObject} */
15✔
213
        sourceData: null,
15✔
214
        createdAt: new Date(),
15✔
215
      },
15✔
216
    };
15✔
217
  }
15✔
218

144✔
219
  /**
144✔
220
   * @param {UserID|null} previous
144✔
221
   * @param {{ remove?: boolean }} options
144✔
222
   */
144✔
223
  async #cycleWaitlist(previous, options) {
144✔
224
    await this.#uw.waitlist.cycle(previous, options);
15✔
225
  }
15✔
226

144✔
227
  async clear(tx = this.#uw.db) {
144✔
NEW
228
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
229
    await this.#uw.keyv.delete(KEY_HISTORY_ID, tx);
×
NEW
230
    await this.#uw.keyv.delete(KEY_CURRENT_DJ_ID, tx);
×
231
  }
×
232

144✔
233
  /**
144✔
234
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
144✔
235
   */
144✔
236
  async #update(next, tx = this.#uw.db) {
144✔
237
    await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
15✔
238
    await this.#uw.keyv.set(KEY_HISTORY_ID, next.historyEntry.id, tx);
15✔
239
    await this.#uw.keyv.set(KEY_CURRENT_DJ_ID, next.user.id, tx);
15✔
240
  }
15✔
241

144✔
242
  #maybeStop() {
144✔
243
    if (this.#timeout) {
159✔
244
      clearTimeout(this.#timeout);
15✔
245
      this.#timeout = null;
15✔
246
    }
15✔
247
  }
159✔
248

144✔
249
  /**
144✔
250
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
144✔
251
   */
144✔
252
  #play(entry) {
144✔
253
    this.#maybeStop();
15✔
254
    this.#timeout = setTimeout(
15✔
255
      () => this.#advanceAutomatically(),
15✔
256
      (entry.end - entry.start) * 1000,
15✔
257
    );
15✔
258
  }
15✔
259

144✔
260
  /**
144✔
261
   * This method creates a `media` object that clients can understand from a
144✔
262
   * history entry object.
144✔
263
   *
144✔
264
   * We present the playback-specific `sourceData` as if it is
144✔
265
   * a property of the media model for backwards compatibility.
144✔
266
   * Old clients don't expect `sourceData` directly on a history entry object.
144✔
267
   *
144✔
268
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
144✔
269
   */
144✔
270
  getMediaForPlayback(next) {
144✔
271
    return {
22✔
272
      artist: next.historyEntry.artist,
22✔
273
      title: next.historyEntry.title,
22✔
274
      start: next.historyEntry.start,
22✔
275
      end: next.historyEntry.end,
22✔
276
      media: {
22✔
277
        sourceType: next.media.sourceType,
22✔
278
        sourceID: next.media.sourceID,
22✔
279
        artist: next.media.artist,
22✔
280
        title: next.media.title,
22✔
281
        duration: next.media.duration,
22✔
282
        sourceData: {
22✔
283
          ...next.media.sourceData,
22✔
284
          ...next.historyEntry.sourceData,
22✔
285
        },
22✔
286
      },
22✔
287
    };
22✔
288
  }
22✔
289

144✔
290
  /**
144✔
291
   * @param {{
144✔
292
   *   user: User,
144✔
293
   *   playlist: Playlist,
144✔
294
   *   media: Media,
144✔
295
   *   historyEntry: HistoryEntry
144✔
296
   * } | null} next
144✔
297
   */
144✔
298
  async #publishAdvanceComplete(next) {
144✔
299
    const { waitlist } = this.#uw;
15✔
300

15✔
301
    if (next != null) {
15✔
302
      this.#uw.publish('advance:complete', {
15✔
303
        historyID: next.historyEntry.id,
15✔
304
        userID: next.user.id,
15✔
305
        playlistID: next.playlist.id,
15✔
306
        media: this.getMediaForPlayback(next),
15✔
307
        playedAt: next.historyEntry.createdAt.getTime(),
15✔
308
      });
15✔
309
      this.#uw.publish('playlist:cycle', {
15✔
310
        userID: next.user.id,
15✔
311
        playlistID: next.playlist.id,
15✔
312
      });
15✔
313
    } else {
15!
314
      this.#uw.publish('advance:complete', null);
×
315
    }
×
316
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
15✔
317
  }
15✔
318

144✔
319
  /**
144✔
320
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
144✔
321
   */
144✔
322
  async #getSourceDataForPlayback(entry) {
144✔
323
    const { sourceID, sourceType } = entry.media;
15✔
324
    const source = this.#uw.source(sourceType);
15✔
325
    if (source) {
15✔
326
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
15✔
327
      /** @type {JsonObject | undefined} */
15✔
328
      let sourceData;
15✔
329
      try {
15✔
330
        sourceData = await source.play(entry.user, entry.media);
15✔
331
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
15✔
332
      } catch (error) {
15!
333
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
334
      }
×
335
      return sourceData;
15✔
336
    }
15✔
337

×
338
    return undefined;
×
339
  }
15✔
340

144✔
341
  /**
144✔
342
   * @typedef {object} AdvanceOptions
144✔
343
   * @prop {boolean} [remove]
144✔
344
   * @prop {boolean} [publish]
144✔
345
   * @prop {import('redlock').RedlockAbortSignal} [signal]
144✔
346
   * @param {AdvanceOptions} [opts]
144✔
347
   * @returns {Promise<{
144✔
348
   *   historyEntry: HistoryEntry,
144✔
349
   *   user: User,
144✔
350
   *   media: Media,
144✔
351
   *   playlist: Playlist,
144✔
352
   * }|null>}
144✔
353
   */
144✔
354
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
144✔
355
    const { playlists } = this.#uw;
15✔
356

15✔
357
    const publish = opts.publish ?? true;
15✔
358
    const removeAfterCurrent = (await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY)) === true;
15✔
359
    const remove = opts.remove || removeAfterCurrent || (
15✔
360
      !await this.#uw.waitlist.isCycleEnabled()
15✔
361
    );
15✔
362

15✔
363
    const previous = await this.getCurrentEntry(tx);
15✔
364
    let next;
15✔
365
    try {
15✔
366
      next = await this.#getNextEntry({ remove });
15✔
367
    } catch (err) {
15!
368
      // If the next user's playlist was empty, remove them from the waitlist
×
369
      // and try advancing again.
×
370
      if (err instanceof EmptyPlaylistError) {
×
371
        this.#logger.info('user has empty playlist, skipping on to the next');
×
372
        const previousDJ = previous != null ? previous.historyEntry.userID : null;
×
373
        await this.#cycleWaitlist(previousDJ, { remove });
×
374
        return this.#advanceLocked({ publish, remove: true }, tx);
×
375
      }
×
376
      throw err;
×
377
    }
×
378

15✔
379
    if (opts.signal?.aborted) {
15!
380
      throw opts.signal.error;
×
381
    }
×
382

15✔
383
    if (previous) {
15!
384
      this.#logger.info({
×
385
        id: previous.historyEntry.id,
×
386
        artist: previous.media.artist,
×
387
        title: previous.media.title,
×
388
        upvotes: previous.upvotes.length,
×
389
        favorites: previous.favorites.length,
×
390
        downvotes: previous.downvotes.length,
×
391
      }, 'previous track stats');
×
392
    }
×
393

15✔
394
    let result = null;
15✔
395
    if (next != null) {
15✔
396
      this.#logger.info({
15✔
397
        id: next.playlistItem.id,
15✔
398
        artist: next.playlistItem.artist,
15✔
399
        title: next.playlistItem.title,
15✔
400
      }, 'next track');
15✔
401
      const sourceData = await this.#getSourceDataForPlayback(next);
15✔
402

15✔
403
      // Conservatively, we should take *all* the data from the inserted values.
15✔
404
      // But then we need to reparse the source data... It's easier to only take
15✔
405
      // the actually generated value from there :')
15✔
406
      const { createdAt } = await tx.insertInto('historyEntries')
15✔
407
        .returning('createdAt')
15✔
408
        .values({
15✔
409
          id: next.historyEntry.id,
15✔
410
          userID: next.user.id,
15✔
411
          mediaID: next.media.id,
15✔
412
          artist: next.historyEntry.artist,
15✔
413
          title: next.historyEntry.title,
15✔
414
          start: next.historyEntry.start,
15✔
415
          end: next.historyEntry.end,
15✔
416
          sourceData: sourceData != null ? jsonb(sourceData) : null,
15!
417
        })
15✔
418
        .executeTakeFirstOrThrow();
15✔
419

15✔
420
      if (sourceData != null) {
15!
421
        next.historyEntry.sourceData = sourceData;
×
422
      }
×
423
      next.historyEntry.createdAt = createdAt;
15✔
424

15✔
425
      result = {
15✔
426
        historyEntry: next.historyEntry,
15✔
427
        playlist: next.playlist,
15✔
428
        user: next.user,
15✔
429
        media: next.media,
15✔
430
      };
15✔
431
    } else {
15!
432
      this.#maybeStop();
×
433
    }
×
434

15✔
435
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
15!
436

15✔
437
    if (next) {
15✔
438
      await this.#update(next);
15✔
439
      await playlists.cyclePlaylist(next.playlist, tx);
15✔
440
      this.#play(next.historyEntry);
15✔
441
    } else {
15!
442
      await this.clear();
×
443
    }
×
444

15✔
445
    if (publish !== false) {
15✔
446
      await this.#publishAdvanceComplete(result);
15✔
447
    }
15✔
448

15✔
449
    return result;
15✔
450
  }
15✔
451

144✔
452
  /**
144✔
453
   * @param {AdvanceOptions} [opts]
144✔
454
   */
144✔
455
  advance(opts = {}) {
144✔
456
    const result = this.#locker.using(
15✔
457
      [REDIS_ADVANCING],
15✔
458
      10_000,
15✔
459
      (signal) => this.#advanceLocked({ ...opts, signal }),
15✔
460
    );
15✔
461
    this.#awaitAdvance = result;
15✔
462
    return result;
15✔
463
  }
15✔
464

144✔
465
  /**
144✔
466
   * @param {User} user
144✔
467
   * @param {boolean} remove
144✔
468
   */
144✔
469
  async setRemoveAfterCurrentPlay(user, remove) {
144✔
NEW
470
    const newValue = await this.#uw.db.transaction().execute(async (tx) => {
×
NEW
471
      const currentDJ = /** @type {UserID|undefined} */ (
×
NEW
472
        await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
×
NEW
473
      );
×
NEW
474
      if (currentDJ === user.id) {
×
NEW
475
        if (remove) {
×
NEW
476
          await this.#uw.keyv.set(KEY_REMOVE_AFTER_CURRENT_PLAY, true, tx);
×
NEW
477
          return true;
×
NEW
478
        }
×
NEW
479
        await this.#uw.keyv.delete(KEY_REMOVE_AFTER_CURRENT_PLAY, tx);
×
NEW
480
        return false;
×
NEW
481
      } else {
×
NEW
482
        throw new Error('You are not currently playing');
×
NEW
483
      }
×
NEW
484
    });
×
NEW
485
    return newValue;
×
486
  }
×
487

144✔
488
  /**
144✔
489
   * @param {User} user
144✔
490
   */
144✔
491
  async getRemoveAfterCurrentPlay(user, tx = this.#uw.db) {
144✔
492
    const currentDJ = /** @type {UserID|undefined} */ (
3✔
493
      await this.#uw.keyv.get(KEY_CURRENT_DJ_ID, tx)
3✔
494
    );
3✔
495
    const removeAfterCurrentPlay = /** @type {boolean|undefined} */ (
3✔
496
      await this.#uw.keyv.get(KEY_REMOVE_AFTER_CURRENT_PLAY, tx)
3✔
497
    );
3✔
498

3✔
499
    if (currentDJ === user.id) {
3✔
500
      return removeAfterCurrentPlay != null;
1✔
501
    }
1✔
502
    return null;
2✔
503
  }
3✔
504
}
144✔
505

1✔
506
/**
1✔
507
 * @param {import('../Uwave.js').Boot} uw
1✔
508
 */
1✔
509
async function boothPlugin(uw) {
144✔
510
  uw.booth = new Booth(uw);
144✔
511
  uw.httpApi.use('/booth', routes());
144✔
512

144✔
513
  uw.after(async (err) => {
144✔
514
    if (!err) {
144✔
515
      await uw.booth.onStart();
144✔
516
    }
144✔
517
  });
144✔
518
}
144✔
519

1✔
520
export default boothPlugin;
1✔
521
export { Booth };
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc