• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 11980840475

22 Nov 2024 10:04PM UTC coverage: 78.492% (-1.7%) from 80.158%
11980840475

Pull #637

github

goto-bus-stop
ci: add node 22
Pull Request #637: Switch to a relational database

757 of 912 branches covered (83.0%)

Branch coverage included in aggregate %.

2001 of 2791 new or added lines in 52 files covered. (71.69%)

9 existing lines in 7 files now uncovered.

8666 of 11093 relevant lines covered (78.12%)

70.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.8
/src/plugins/booth.js
1
import RedLock from 'redlock';
1✔
2
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
1✔
3
import routes from '../routes/booth.js';
1✔
4
import { randomUUID } from 'node:crypto';
1✔
5
import { jsonb } from '../utils/sqlite.js';
1✔
6

1✔
7
/**
1✔
8
 * @typedef {import('../schema.js').UserID} UserID
1✔
9
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
1✔
10
 * @typedef {import('type-fest').JsonObject} JsonObject
1✔
11
 * @typedef {import('../schema.js').User} User
1✔
12
 * @typedef {import('../schema.js').Playlist} Playlist
1✔
13
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
1✔
14
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
1✔
15
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
1✔
16
 */
1✔
17

1✔
18
const REDIS_ADVANCING = 'booth:advancing';
1✔
19
const REDIS_HISTORY_ID = 'booth:historyID';
1✔
20
const REDIS_CURRENT_DJ_ID = 'booth:currentDJ';
1✔
21
const REDIS_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
1✔
22

1✔
23
const REMOVE_AFTER_CURRENT_PLAY_SCRIPT = {
1✔
24
  keys: [REDIS_CURRENT_DJ_ID, REDIS_REMOVE_AFTER_CURRENT_PLAY],
1✔
25
  lua: `
1✔
26
    local k_dj = KEYS[1]
1✔
27
    local k_remove = KEYS[2]
1✔
28
    local user_id = ARGV[1]
1✔
29
    local value = ARGV[2]
1✔
30
    local current_dj_id = redis.call('GET', k_dj)
1✔
31
    if current_dj_id == user_id then
1✔
32
      if value == 'true' then
1✔
33
        redis.call('SET', k_remove, 'true')
1✔
34
        return 1
1✔
35
      else
1✔
36
        redis.call('DEL', k_remove)
1✔
37
        return 0
1✔
38
      end
1✔
39
    else
1✔
40
      return redis.error_reply('You are not currently playing')
1✔
41
    end
1✔
42
  `,
1✔
43
};
1✔
44

1✔
45
class Booth {
92✔
46
  #uw;
92✔
47

92✔
48
  #logger;
92✔
49

92✔
50
  /** @type {ReturnType<typeof setTimeout>|null} */
92✔
51
  #timeout = null;
92✔
52

92✔
53
  #locker;
92✔
54

92✔
55
  /** @type {Promise<unknown>|null} */
92✔
56
  #awaitAdvance = null;
92✔
57

92✔
58
  /**
92✔
59
   * @param {import('../Uwave.js').Boot} uw
92✔
60
   */
92✔
61
  constructor(uw) {
92✔
62
    this.#uw = uw;
92✔
63
    this.#locker = new RedLock([this.#uw.redis]);
92✔
64
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
92✔
65

92✔
66
    uw.redis.defineCommand('uw:removeAfterCurrentPlay', {
92✔
67
      numberOfKeys: REMOVE_AFTER_CURRENT_PLAY_SCRIPT.keys.length,
92✔
68
      lua: REMOVE_AFTER_CURRENT_PLAY_SCRIPT.lua,
92✔
69
    });
92✔
70
  }
92✔
71

92✔
72
  /** @internal */
92✔
73
  async onStart() {
92✔
74
    const current = await this.getCurrentEntry();
92✔
75
    if (current && this.#timeout === null) {
92!
76
      // Restart the advance timer after a server restart, if a track was
×
77
      // playing before the server restarted.
×
NEW
78
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
NEW
79
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
80
      if (endTime > Date.now()) {
×
81
        this.#timeout = setTimeout(
×
82
          () => this.#advanceAutomatically(),
×
83
          endTime - Date.now(),
×
84
        );
×
85
      } else {
×
86
        this.#advanceAutomatically();
×
87
      }
×
88
    }
×
89

92✔
90
    this.#uw.onClose(async () => {
92✔
91
      this.#onStop();
92✔
92
      await this.#awaitAdvance;
92✔
93
    });
92✔
94
  }
92✔
95

92✔
96
  async #advanceAutomatically() {
92✔
97
    try {
×
98
      await this.advance();
×
99
    } catch (error) {
×
100
      this.#logger.error({ err: error }, 'advance failed');
×
101
    }
×
102
  }
×
103

92✔
104
  #onStop() {
92✔
105
    this.#maybeStop();
92✔
106
  }
92✔
107

92✔
108
  async getCurrentEntry(tx = this.#uw.db) {
92✔
109
    const historyID = /** @type {HistoryEntryID} */ (await this.#uw.redis.get(REDIS_HISTORY_ID));
103✔
110
    if (!historyID) {
103✔
111
      return null;
101✔
112
    }
101✔
113

2✔
114
    const entry = await tx.selectFrom('historyEntries')
2✔
115
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
2✔
116
      .innerJoin('users', 'historyEntries.userID', 'users.id')
2✔
117
      .select([
2✔
118
        'historyEntries.id as id',
2✔
119
        'media.id as media.id',
2✔
120
        'media.sourceID as media.sourceID',
2✔
121
        'media.sourceType as media.sourceType',
2✔
122
        'media.sourceData as media.sourceData',
2✔
123
        'media.artist as media.artist',
2✔
124
        'media.title as media.title',
2✔
125
        'media.duration as media.duration',
2✔
126
        'media.thumbnail as media.thumbnail',
2✔
127
        'users.id as users.id',
2✔
128
        'users.username as users.username',
2✔
129
        'users.avatar as users.avatar',
2✔
130
        'users.createdAt as users.createdAt',
2✔
131
        'historyEntries.artist',
2✔
132
        'historyEntries.title',
2✔
133
        'historyEntries.start',
2✔
134
        'historyEntries.end',
2✔
135
        'historyEntries.createdAt',
2✔
136
        (eb) => eb.selectFrom('feedback')
2✔
137
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
2✔
138
          .where('vote', '=', 1)
2✔
139
          .select((eb) => eb.fn.agg('json_group_array', ['userID']).as('userIDs'))
2✔
140
          .as('upvotes'),
2✔
141
        (eb) => eb.selectFrom('feedback')
2✔
142
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
2✔
143
          .where('vote', '=', -1)
2✔
144
          .select((eb) => eb.fn.agg('json_group_array', ['userID']).as('userIDs'))
2✔
145
          .as('downvotes'),
2✔
146
        (eb) => eb.selectFrom('feedback')
2✔
147
          .where('historyEntryID', '=', eb.ref('historyEntries.id'))
2✔
148
          .where('favorite', '=', 1)
2✔
149
          .select((eb) => eb.fn.agg('json_group_array', ['userID']).as('userIDs'))
2✔
150
          .as('favorites'),
2✔
151
      ])
2✔
152
      .where('historyEntries.id', '=', historyID)
2✔
153
      .executeTakeFirst();
2✔
154

2✔
155
    return entry ? {
2✔
156
      media: {
2✔
157
        id: entry['media.id'],
2✔
158
        artist: entry['media.artist'],
2✔
159
        title: entry['media.title'],
2✔
160
        duration: entry['media.duration'],
2✔
161
        thumbnail: entry['media.thumbnail'],
2✔
162
        sourceID: entry['media.sourceID'],
2✔
163
        sourceType: entry['media.sourceType'],
2✔
164
        sourceData: entry['media.sourceData'] ?? {},
2✔
165
      },
2✔
166
      user: {
2✔
167
        id: entry['users.id'],
2✔
168
        username: entry['users.username'],
2✔
169
        avatar: entry['users.avatar'],
2✔
170
        createdAt: entry['users.createdAt'],
2✔
171
      },
2✔
172
      historyEntry: {
2✔
173
        id: entry.id,
2✔
174
        userID: entry['users.id'],
2✔
175
        mediaID: entry['media.id'],
2✔
176
        artist: entry.artist,
2✔
177
        title: entry.title,
2✔
178
        start: entry.start,
2✔
179
        end: entry.end,
2✔
180
        createdAt: entry.createdAt,
2✔
181
      },
2✔
182
      upvotes: /** @type {UserID[]} */ (JSON.parse(entry.upvotes)),
2✔
183
      downvotes: /** @type {UserID[]} */ (JSON.parse(entry.downvotes)),
2✔
184
      favorites: /** @type {UserID[]} */ (JSON.parse(entry.favorites)),
2✔
185
    } : null;
103!
186
  }
103✔
187

92✔
188
  /**
92✔
189
   * @param {{ remove?: boolean }} options
92✔
190
   */
92✔
191
  async #getNextDJ(options, tx = this.#uw.db) {
92✔
192
    let userID = /** @type {UserID|null} */ (await this.#uw.redis.lindex('waitlist', 0));
6✔
193
    if (!userID && !options.remove) {
6!
194
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
195
      userID = /** @type {UserID|null} */ (await this.#uw.redis.get(REDIS_CURRENT_DJ_ID));
×
196
    }
×
197
    if (!userID) {
6!
198
      return null;
×
199
    }
×
200

6✔
201
    return this.#uw.users.getUser(userID, tx);
6✔
202
  }
6✔
203

92✔
204
  /**
92✔
205
   * @param {{ remove?: boolean }} options
92✔
206
   */
92✔
207
  async #getNextEntry(options) {
92✔
208
    const { playlists } = this.#uw;
6✔
209

6✔
210
    const user = await this.#getNextDJ(options);
6✔
211
    if (!user || !user.activePlaylistID) {
6!
212
      return null;
×
213
    }
×
214
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
6✔
215
    if (playlist.size === 0) {
6!
216
      throw new EmptyPlaylistError();
×
217
    }
×
218

6✔
219
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
6✔
220
    if (!playlistItem) {
6!
NEW
221
      throw new PlaylistItemNotFoundError();
×
222
    }
×
223

6✔
224
    return {
6✔
225
      user,
6✔
226
      playlist,
6✔
227
      playlistItem,
6✔
228
      media,
6✔
229
      historyEntry: {
6✔
230
        id: /** @type {HistoryEntryID} */ (randomUUID()),
6✔
231
        userID: user.id,
6✔
232
        mediaID: media.id,
6✔
233
        artist: playlistItem.artist,
6✔
234
        title: playlistItem.title,
6✔
235
        start: playlistItem.start,
6✔
236
        end: playlistItem.end,
6✔
237
        /** @type {null | JsonObject} */
6✔
238
        sourceData: null,
6✔
239
      },
6✔
240
    };
6✔
241
  }
6✔
242

92✔
243
  /**
92✔
244
   * @param {UserID|null} previous
92✔
245
   * @param {{ remove?: boolean }} options
92✔
246
   */
92✔
247
  async #cycleWaitlist(previous, options) {
92✔
248
    const waitlistLen = await this.#uw.redis.llen('waitlist');
6✔
249
    if (waitlistLen > 0) {
6✔
250
      await this.#uw.redis.lpop('waitlist');
6✔
251
      if (previous && !options.remove) {
6!
252
        // The previous DJ should only be added to the waitlist again if it was
×
253
        // not empty. If it was empty, the previous DJ is already in the booth.
×
NEW
254
        await this.#uw.redis.rpush('waitlist', previous);
×
255
      }
×
256
    }
6✔
257
  }
6✔
258

92✔
259
  async clear() {
92✔
260
    await this.#uw.redis.del(
×
261
      REDIS_HISTORY_ID,
×
262
      REDIS_CURRENT_DJ_ID,
×
263
      REDIS_REMOVE_AFTER_CURRENT_PLAY,
×
264
    );
×
265
  }
×
266

92✔
267
  /**
92✔
268
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
92✔
269
   */
92✔
270
  async #update(next) {
92✔
271
    await this.#uw.redis.multi()
6✔
272
      .del(REDIS_REMOVE_AFTER_CURRENT_PLAY)
6✔
273
      .set(REDIS_HISTORY_ID, next.historyEntry.id)
6✔
274
      .set(REDIS_CURRENT_DJ_ID, next.user.id)
6✔
275
      .exec();
6✔
276
  }
6✔
277

92✔
278
  #maybeStop() {
92✔
279
    if (this.#timeout) {
98✔
280
      clearTimeout(this.#timeout);
6✔
281
      this.#timeout = null;
6✔
282
    }
6✔
283
  }
98✔
284

92✔
285
  /**
92✔
286
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
92✔
287
   */
92✔
288
  #play(entry) {
92✔
289
    this.#maybeStop();
6✔
290
    this.#timeout = setTimeout(
6✔
291
      () => this.#advanceAutomatically(),
6✔
292
      (entry.end - entry.start) * 1000,
6✔
293
    );
6✔
294
  }
6✔
295

92✔
296
  /**
92✔
297
   * This method creates a `media` object that clients can understand from a
92✔
298
   * history entry object.
92✔
299
   *
92✔
300
   * We present the playback-specific `sourceData` as if it is
92✔
301
   * a property of the media model for backwards compatibility.
92✔
302
   * Old clients don't expect `sourceData` directly on a history entry object.
92✔
303
   *
92✔
304
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
92✔
305
   */
92✔
306
  getMediaForPlayback(next) {
92✔
307
    return {
8✔
308
      artist: next.historyEntry.artist,
8✔
309
      title: next.historyEntry.title,
8✔
310
      start: next.historyEntry.start,
8✔
311
      end: next.historyEntry.end,
8✔
312
      media: {
8✔
313
        sourceType: next.media.sourceType,
8✔
314
        sourceID: next.media.sourceID,
8✔
315
        artist: next.media.artist,
8✔
316
        title: next.media.title,
8✔
317
        duration: next.media.duration,
8✔
318
        sourceData: {
8✔
319
          ...next.media.sourceData,
8✔
320
          ...next.historyEntry.sourceData,
8✔
321
        },
8✔
322
      },
8✔
323
    };
8✔
324
  }
8✔
325

92✔
326
  /**
92✔
327
   * @param {{
92✔
328
   *   user: User,
92✔
329
   *   playlist: Playlist,
92✔
330
   *   media: Media,
92✔
331
   *   historyEntry: HistoryEntry
92✔
332
   * } | null} next
92✔
333
   */
92✔
334
  async #publishAdvanceComplete(next) {
92✔
335
    const { waitlist } = this.#uw;
6✔
336

6✔
337
    if (next != null) {
6✔
338
      this.#uw.publish('advance:complete', {
6✔
339
        historyID: next.historyEntry.id,
6✔
340
        userID: next.user.id,
6✔
341
        playlistID: next.playlist.id,
6✔
342
        media: this.getMediaForPlayback(next),
6✔
343
        playedAt: next.historyEntry.createdAt.getTime(),
6✔
344
      });
6✔
345
      this.#uw.publish('playlist:cycle', {
6✔
346
        userID: next.user.id,
6✔
347
        playlistID: next.playlist.id,
6✔
348
      });
6✔
349
    } else {
6!
350
      this.#uw.publish('advance:complete', null);
×
351
    }
×
352
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
6✔
353
  }
6✔
354

92✔
355
  /**
92✔
356
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
92✔
357
   */
92✔
358
  async #getSourceDataForPlayback(entry) {
92✔
359
    const { sourceID, sourceType } = entry.media;
6✔
360
    const source = this.#uw.source(sourceType);
6✔
361
    if (source) {
6✔
362
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
6✔
363
      /** @type {JsonObject | undefined} */
6✔
364
      let sourceData;
6✔
365
      try {
6✔
366
        sourceData = await source.play(entry.user, entry.media);
6✔
367
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
6✔
368
      } catch (error) {
6!
369
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
370
      }
×
371
      return sourceData;
6✔
372
    }
6✔
373

×
374
    return undefined;
×
375
  }
6✔
376

92✔
377
  /**
92✔
378
   * @typedef {object} AdvanceOptions
92✔
379
   * @prop {boolean} [remove]
92✔
380
   * @prop {boolean} [publish]
92✔
381
   * @prop {import('redlock').RedlockAbortSignal} [signal]
92✔
382
   * @param {AdvanceOptions} [opts]
92✔
383
   * @returns {Promise<{
92✔
384
   *   historyEntry: HistoryEntry,
92✔
385
   *   user: User,
92✔
386
   *   media: Media,
92✔
387
   *   playlist: Playlist,
92✔
388
   * }|null>}
92✔
389
   */
92✔
390
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
92✔
391
    const { playlists } = this.#uw;
6✔
392

6✔
393
    const publish = opts.publish ?? true;
6✔
394
    const removeAfterCurrent = (await this.#uw.redis.del(REDIS_REMOVE_AFTER_CURRENT_PLAY)) === 1;
6✔
395
    const remove = opts.remove || removeAfterCurrent || (
6✔
396
      !await this.#uw.waitlist.isCycleEnabled()
6✔
397
    );
6✔
398

6✔
399
    const previous = await this.getCurrentEntry(tx);
6✔
400
    let next;
6✔
401
    try {
6✔
402
      next = await this.#getNextEntry({ remove });
6✔
403
    } catch (err) {
6!
404
      // If the next user's playlist was empty, remove them from the waitlist
×
405
      // and try advancing again.
×
406
      if (err instanceof EmptyPlaylistError) {
×
407
        this.#logger.info('user has empty playlist, skipping on to the next');
×
NEW
408
        const previousDJ = previous != null ? previous.historyEntry.userID : null;
×
NEW
409
        await this.#cycleWaitlist(previousDJ, { remove });
×
NEW
410
        return this.#advanceLocked({ publish, remove: true }, tx);
×
411
      }
×
412
      throw err;
×
413
    }
×
414

6✔
415
    if (opts.signal?.aborted) {
6!
416
      throw opts.signal.error;
×
417
    }
×
418

6✔
419
    if (previous) {
6!
420
      this.#logger.info({
×
NEW
421
        id: previous.historyEntry.id,
×
422
        artist: previous.media.artist,
×
423
        title: previous.media.title,
×
424
        upvotes: previous.upvotes.length,
×
425
        favorites: previous.favorites.length,
×
426
        downvotes: previous.downvotes.length,
×
427
      }, 'previous track stats');
×
428
    }
×
429

6✔
430
    let result = null;
6✔
431
    if (next != null) {
6✔
432
      this.#logger.info({
6✔
433
        id: next.playlistItem.id,
6✔
434
        artist: next.playlistItem.artist,
6✔
435
        title: next.playlistItem.title,
6✔
436
      }, 'next track');
6✔
437
      const sourceData = await this.#getSourceDataForPlayback(next);
6✔
438
      if (sourceData) {
6!
NEW
439
        next.historyEntry.sourceData = sourceData;
×
440
      }
×
441
      const historyEntry = await tx.insertInto('historyEntries')
6✔
442
        .returningAll()
6✔
443
        .values({
6✔
444
          id: next.historyEntry.id,
6✔
445
          userID: next.user.id,
6✔
446
          mediaID: next.media.id,
6✔
447
          artist: next.historyEntry.artist,
6✔
448
          title: next.historyEntry.title,
6✔
449
          start: next.historyEntry.start,
6✔
450
          end: next.historyEntry.end,
6✔
451
          sourceData: sourceData != null ? jsonb(sourceData) : null,
6!
452
        })
6✔
453
        .executeTakeFirstOrThrow();
6✔
454

6✔
455
      result = {
6✔
456
        historyEntry,
6✔
457
        playlist: next.playlist,
6✔
458
        user: next.user,
6✔
459
        media: next.media,
6✔
460
      };
6✔
461
    } else {
6!
462
      this.#maybeStop();
×
463
    }
×
464

6✔
465
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
6!
466

6✔
467
    if (next) {
6✔
468
      await this.#update(next);
6✔
469
      await playlists.cyclePlaylist(next.playlist, tx);
6✔
470
      this.#play(next.historyEntry);
6✔
471
    } else {
6!
472
      await this.clear();
×
473
    }
×
474

6✔
475
    if (publish !== false) {
6✔
476
      await this.#publishAdvanceComplete(result);
6✔
477
    }
6✔
478

6✔
479
    return result;
6✔
480
  }
6✔
481

92✔
482
  /**
92✔
483
   * @param {AdvanceOptions} [opts]
92✔
484
   */
92✔
485
  advance(opts = {}) {
92✔
486
    const result = this.#locker.using(
6✔
487
      [REDIS_ADVANCING],
6✔
488
      10_000,
6✔
489
      (signal) => this.#advanceLocked({ ...opts, signal }),
6✔
490
    );
6✔
491
    this.#awaitAdvance = result;
6✔
492
    return result;
6✔
493
  }
6✔
494

92✔
495
  /**
92✔
496
   * @param {User} user
92✔
497
   * @param {boolean} remove
92✔
498
   */
92✔
499
  async setRemoveAfterCurrentPlay(user, remove) {
92✔
500
    const newValue = await this.#uw.redis['uw:removeAfterCurrentPlay'](
×
501
      ...REMOVE_AFTER_CURRENT_PLAY_SCRIPT.keys,
×
NEW
502
      user.id,
×
503
      remove,
×
504
    );
×
505
    return newValue === 1;
×
506
  }
×
507

92✔
508
  /**
92✔
509
   * @param {User} user
92✔
510
   */
92✔
511
  async getRemoveAfterCurrentPlay(user) {
92✔
512
    const [currentDJ, removeAfterCurrentPlay] = await this.#uw.redis.mget(
3✔
513
      REDIS_CURRENT_DJ_ID,
3✔
514
      REDIS_REMOVE_AFTER_CURRENT_PLAY,
3✔
515
    );
3✔
516
    if (currentDJ === user.id) {
3✔
517
      return removeAfterCurrentPlay != null;
1✔
518
    }
1✔
519
    return null;
2✔
520
  }
3✔
521
}
92✔
522

1✔
523
/**
1✔
524
 * @param {import('../Uwave.js').Boot} uw
1✔
525
 */
1✔
526
async function boothPlugin(uw) {
92✔
527
  uw.booth = new Booth(uw);
92✔
528
  uw.httpApi.use('/booth', routes());
92✔
529

92✔
530
  uw.after(async (err) => {
92✔
531
    if (!err) {
92✔
532
      await uw.booth.onStart();
92✔
533
    }
92✔
534
  });
92✔
535
}
92✔
536

1✔
537
export default boothPlugin;
1✔
538
export { Booth };
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc