• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 11083910793

28 Sep 2024 12:29PM UTC coverage: 79.804% (-0.3%) from 80.124%
11083910793

Pull #637

github

goto-bus-stop
slight jank but its ok
Pull Request #637: Switch to a relational database, closes #549

749 of 914 branches covered (81.95%)

Branch coverage included in aggregate %.

1890 of 2529 new or added lines in 50 files covered. (74.73%)

138 existing lines in 10 files now uncovered.

9106 of 11435 relevant lines covered (79.63%)

68.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.12
/src/plugins/booth.js
1
import assert from 'node:assert';
1✔
2
import RedLock from 'redlock';
1✔
3
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
1✔
4
import routes from '../routes/booth.js';
1✔
5
import { randomUUID } from 'node:crypto';
1✔
6
import { jsonb } from '../utils/sqlite.js';
1✔
7

1✔
8
/**
1✔
9
 * @typedef {import('../schema.js').UserID} UserID
1✔
10
 * @typedef {import('../schema.js').HistoryEntryID} HistoryEntryID
1✔
11
 * @typedef {import('type-fest').JsonObject} JsonObject
1✔
12
 * @typedef {import('../schema.js').User} User
1✔
13
 * @typedef {import('../schema.js').Playlist} Playlist
1✔
14
 * @typedef {import('../schema.js').PlaylistItem} PlaylistItem
1✔
15
 * @typedef {import('../schema.js').HistoryEntry} HistoryEntry
1✔
16
 * @typedef {Omit<import('../schema.js').Media, 'createdAt' | 'updatedAt'>} Media
1✔
17
 */
1✔
18

1✔
19
class Booth {
92✔
20
  #uw;
92✔
21

92✔
22
  #logger;
92✔
23

92✔
24
  /** @type {ReturnType<typeof setTimeout>|null} */
92✔
25
  #timeout = null;
92✔
26

92✔
27
  #locker;
92✔
28

92✔
29
  /** @type {Promise<unknown>|null} */
92✔
30
  #awaitAdvance = null;
92✔
31

92✔
32
  /**
92✔
33
   * @param {import('../Uwave.js').Boot} uw
92✔
34
   */
92✔
35
  constructor(uw) {
92✔
36
    this.#uw = uw;
92✔
37
    this.#locker = new RedLock([this.#uw.redis]);
92✔
38
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
92✔
39
  }
92✔
40

92✔
41
  /** @internal */
92✔
42
  async onStart() {
92✔
43
    const current = await this.getCurrentEntry();
92✔
44
    if (current && this.#timeout === null) {
92!
UNCOV
45
      // Restart the advance timer after a server restart, if a track was
×
UNCOV
46
      // playing before the server restarted.
×
NEW
47
      const duration = (current.historyEntry.end - current.historyEntry.start) * 1000;
×
NEW
48
      const endTime = current.historyEntry.createdAt.getTime() + duration;
×
UNCOV
49
      if (endTime > Date.now()) {
×
UNCOV
50
        this.#timeout = setTimeout(
×
UNCOV
51
          () => this.#advanceAutomatically(),
×
UNCOV
52
          endTime - Date.now(),
×
UNCOV
53
        );
×
UNCOV
54
      } else {
×
UNCOV
55
        this.#advanceAutomatically();
×
UNCOV
56
      }
×
UNCOV
57
    }
×
58

92✔
59
    this.#uw.onClose(async () => {
92✔
60
      this.#onStop();
92✔
61
      await this.#awaitAdvance;
92✔
62
    });
92✔
63
  }
92✔
64

92✔
65
  async #advanceAutomatically() {
92✔
UNCOV
66
    try {
×
UNCOV
67
      await this.advance();
×
UNCOV
68
    } catch (error) {
×
UNCOV
69
      this.#logger.error({ err: error }, 'advance failed');
×
UNCOV
70
    }
×
UNCOV
71
  }
×
72

92✔
73
  #onStop() {
92✔
74
    this.#maybeStop();
92✔
75
  }
92✔
76

92✔
77
  async getCurrentEntry() {
92✔
78
    const { db } = this.#uw;
101✔
79

101✔
80
    const historyID = /** @type {HistoryEntryID} */ (await this.#uw.redis.get('booth:historyID'));
101✔
81
    if (!historyID) {
101✔
82
      return null;
100✔
83
    }
100✔
84

1✔
85
    const entry = await db.selectFrom('historyEntries')
1✔
86
      .innerJoin('media', 'historyEntries.mediaID', 'media.id')
1✔
87
      .innerJoin('users', 'historyEntries.userID', 'users.id')
1✔
88
      .select([
1✔
89
        'historyEntries.id as id',
1✔
90
        'media.id as media.id',
1✔
91
        'media.sourceID as media.sourceID',
1✔
92
        'media.sourceType as media.sourceType',
1✔
93
        'media.sourceData as media.sourceData',
1✔
94
        'media.artist as media.artist',
1✔
95
        'media.title as media.title',
1✔
96
        'media.duration as media.duration',
1✔
97
        'media.thumbnail as media.thumbnail',
1✔
98
        'users.id as users.id',
1✔
99
        'users.username as users.username',
1✔
100
        'users.avatar as users.avatar',
1✔
101
        'users.createdAt as users.createdAt',
1✔
102
        'historyEntries.artist',
1✔
103
        'historyEntries.title',
1✔
104
        'historyEntries.start',
1✔
105
        'historyEntries.end',
1✔
106
        'historyEntries.createdAt',
1✔
107
      ])
1✔
108
      .where('historyEntries.id', '=', historyID)
1✔
109
      .executeTakeFirst();
1✔
110

1✔
111
    return entry ? {
1✔
112
      media: {
1✔
113
        id: entry['media.id'],
1✔
114
        artist: entry['media.artist'],
1✔
115
        title: entry['media.title'],
1✔
116
        duration: entry['media.duration'],
1✔
117
        thumbnail: entry['media.thumbnail'],
1✔
118
        sourceID: entry['media.sourceID'],
1✔
119
        sourceType: entry['media.sourceType'],
1✔
120
        sourceData: entry['media.sourceData'] ?? {},
1✔
121
      },
1✔
122
      user: {
1✔
123
        id: entry['users.id'],
1✔
124
        username: entry['users.username'],
1✔
125
        avatar: entry['users.avatar'],
1✔
126
        createdAt: entry['users.createdAt'],
1✔
127
      },
1✔
128
      historyEntry: {
1✔
129
        id: entry.id,
1✔
130
        userID: entry['users.id'],
1✔
131
        mediaID: entry['media.id'],
1✔
132
        artist: entry.artist,
1✔
133
        title: entry.title,
1✔
134
        start: entry.start,
1✔
135
        end: entry.end,
1✔
136
        createdAt: entry.createdAt,
1✔
137
      },
1✔
138
      // TODO
1✔
139
      upvotes: [],
1✔
140
      downvotes: [],
1✔
141
      favorites: [],
1✔
142
    } : null;
101!
143
  }
101✔
144

92✔
145
  /**
92✔
146
   * Get vote counts for the currently playing media.
92✔
147
   *
92✔
148
   * @returns {Promise<{ upvotes: UserID[], downvotes: UserID[], favorites: UserID[] }>}
92✔
149
   */
92✔
150
  async getCurrentVoteStats() {
92✔
151
    const { redis } = this.#uw;
1✔
152

1✔
153
    const results = await redis.pipeline()
1✔
154
      .smembers('booth:upvotes')
1✔
155
      .smembers('booth:downvotes')
1✔
156
      .smembers('booth:favorites')
1✔
157
      .exec();
1✔
158
    assert(results);
1✔
159

1✔
160
    const voteStats = {
1✔
161
      upvotes: /** @type {UserID[]} */ (results[0][1]),
1✔
162
      downvotes: /** @type {UserID[]} */ (results[1][1]),
1✔
163
      favorites: /** @type {UserID[]} */ (results[2][1]),
1✔
164
    };
1✔
165

1✔
166
    return voteStats;
1✔
167
  }
1✔
168

92✔
169
  /** @param {{ remove?: boolean }} options */
92✔
170
  async #getNextDJ(options) {
92✔
171
    let userID = /** @type {UserID|null} */ (await this.#uw.redis.lindex('waitlist', 0));
5✔
172
    if (!userID && !options.remove) {
5!
UNCOV
173
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
174
      userID = /** @type {UserID|null} */ (await this.#uw.redis.get('booth:currentDJ'));
×
UNCOV
175
    }
×
176
    if (!userID) {
5!
UNCOV
177
      return null;
×
UNCOV
178
    }
×
179

5✔
180
    return this.#uw.users.getUser(userID);
5✔
181
  }
5✔
182

92✔
183
  /**
92✔
184
   * @param {{ remove?: boolean }} options
92✔
185
   */
92✔
186
  async #getNextEntry(options) {
92✔
187
    const { playlists } = this.#uw;
5✔
188

5✔
189
    const user = await this.#getNextDJ(options);
5✔
190
    if (!user || !user.activePlaylistID) {
5!
191
      return null;
×
192
    }
×
193
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylistID);
5✔
194
    if (playlist.size === 0) {
5!
195
      throw new EmptyPlaylistError();
×
UNCOV
196
    }
×
197

5✔
198
    const { playlistItem, media } = await playlists.getPlaylistItemAt(playlist, 0);
5✔
199
    if (!playlistItem) {
5!
NEW
200
      throw new PlaylistItemNotFoundError();
×
UNCOV
201
    }
×
202

5✔
203
    return {
5✔
204
      user,
5✔
205
      playlist,
5✔
206
      playlistItem,
5✔
207
      media,
5✔
208
      historyEntry: {
5✔
209
        id: /** @type {HistoryEntryID} */ (randomUUID()),
5✔
210
        userID: user.id,
5✔
211
        mediaID: media.id,
5✔
212
        artist: playlistItem.artist,
5✔
213
        title: playlistItem.title,
5✔
214
        start: playlistItem.start,
5✔
215
        end: playlistItem.end,
5✔
216
        /** @type {null | JsonObject} */
5✔
217
        sourceData: null,
5✔
218
      },
5✔
219
    };
5✔
220
  }
5✔
221

92✔
222
  /**
92✔
223
   * @param {UserID|null} previous
92✔
224
   * @param {{ remove?: boolean }} options
92✔
225
   */
92✔
226
  async #cycleWaitlist(previous, options) {
92✔
227
    const waitlistLen = await this.#uw.redis.llen('waitlist');
5✔
228
    if (waitlistLen > 0) {
5✔
229
      await this.#uw.redis.lpop('waitlist');
5✔
230
      if (previous && !options.remove) {
5!
UNCOV
231
        // The previous DJ should only be added to the waitlist again if it was
×
UNCOV
232
        // not empty. If it was empty, the previous DJ is already in the booth.
×
NEW
233
        await this.#uw.redis.rpush('waitlist', previous);
×
234
      }
×
235
    }
5✔
236
  }
5✔
237

92✔
238
  clear() {
92✔
UNCOV
239
    return this.#uw.redis.del(
×
UNCOV
240
      'booth:historyID',
×
UNCOV
241
      'booth:currentDJ',
×
UNCOV
242
      'booth:upvotes',
×
UNCOV
243
      'booth:downvotes',
×
UNCOV
244
      'booth:favorites',
×
UNCOV
245
    );
×
UNCOV
246
  }
×
247

92✔
248
  /**
92✔
249
   * @param {{ historyEntry: { id: HistoryEntryID }, user: { id: UserID } }} next
92✔
250
   */
92✔
251
  #update(next) {
92✔
252
    return this.#uw.redis.multi()
5✔
253
      .del('booth:upvotes', 'booth:downvotes', 'booth:favorites')
5✔
254
      .set('booth:historyID', next.historyEntry.id)
5✔
255
      .set('booth:currentDJ', next.user.id)
5✔
256
      .exec();
5✔
257
  }
5✔
258

92✔
259
  #maybeStop() {
92✔
260
    if (this.#timeout) {
97✔
261
      clearTimeout(this.#timeout);
5✔
262
      this.#timeout = null;
5✔
263
    }
5✔
264
  }
97✔
265

92✔
266
  /**
92✔
267
   * @param {Pick<HistoryEntry, 'start' | 'end'>} entry
92✔
268
   */
92✔
269
  #play(entry) {
92✔
270
    this.#maybeStop();
5✔
271
    this.#timeout = setTimeout(
5✔
272
      () => this.#advanceAutomatically(),
5✔
273
      (entry.end - entry.start) * 1000,
5✔
274
    );
5✔
275
  }
5✔
276

92✔
277
  /**
92✔
278
   * This method creates a `media` object that clients can understand from a
92✔
279
   * history entry object.
92✔
280
   *
92✔
281
   * We present the playback-specific `sourceData` as if it is
92✔
282
   * a property of the media model for backwards compatibility.
92✔
283
   * Old clients don't expect `sourceData` directly on a history entry object.
92✔
284
   *
92✔
285
   * @param {{ user: User, media: Media, historyEntry: HistoryEntry }} next
92✔
286
   */
92✔
287
  getMediaForPlayback(next) {
92✔
288
    return {
6✔
289
      artist: next.historyEntry.artist,
6✔
290
      title: next.historyEntry.title,
6✔
291
      start: next.historyEntry.start,
6✔
292
      end: next.historyEntry.end,
6✔
293
      media: {
6✔
294
        sourceType: next.media.sourceType,
6✔
295
        sourceID: next.media.sourceID,
6✔
296
        artist: next.media.artist,
6✔
297
        title: next.media.title,
6✔
298
        duration: next.media.duration,
6✔
299
        sourceData: {
6✔
300
          ...next.media.sourceData,
6✔
301
          ...next.historyEntry.sourceData,
6✔
302
        },
6✔
303
      },
6✔
304
    };
6✔
305
  }
6✔
306

92✔
307
  /**
92✔
308
   * @param {{
92✔
309
   *   user: User,
92✔
310
   *   playlist: Playlist,
92✔
311
   *   media: Media,
92✔
312
   *   historyEntry: HistoryEntry
92✔
313
   * } | null} next
92✔
314
   */
92✔
315
  async #publishAdvanceComplete(next) {
92✔
316
    const { waitlist } = this.#uw;
5✔
317

5✔
318
    if (next != null) {
5✔
319
      this.#uw.publish('advance:complete', {
5✔
320
        historyID: next.historyEntry.id,
5✔
321
        userID: next.user.id,
5✔
322
        playlistID: next.playlist.id,
5✔
323
        media: this.getMediaForPlayback(next),
5✔
324
        playedAt: next.historyEntry.createdAt.getTime(),
5✔
325
      });
5✔
326
      this.#uw.publish('playlist:cycle', {
5✔
327
        userID: next.user.id,
5✔
328
        playlistID: next.playlist.id,
5✔
329
      });
5✔
330
    } else {
5!
UNCOV
331
      this.#uw.publish('advance:complete', null);
×
UNCOV
332
    }
×
333
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
5✔
334
  }
5✔
335

92✔
336
  /**
92✔
337
   * @param {{ user: User, media: { sourceID: string, sourceType: string } }} entry
92✔
338
   */
92✔
339
  async #getSourceDataForPlayback(entry) {
92✔
340
    const { sourceID, sourceType } = entry.media;
5✔
341
    const source = this.#uw.source(sourceType);
5✔
342
    if (source) {
5✔
343
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
5✔
344
      /** @type {JsonObject | undefined} */
5✔
345
      let sourceData;
5✔
346
      try {
5✔
347
        sourceData = await source.play(entry.user, entry.media);
5✔
348
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
5✔
349
      } catch (error) {
5!
UNCOV
350
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
UNCOV
351
      }
×
352
      return sourceData;
5✔
353
    }
5✔
UNCOV
354

×
UNCOV
355
    return undefined;
×
356
  }
5✔
357

92✔
358
  /**
92✔
359
   * @typedef {object} AdvanceOptions
92✔
360
   * @prop {boolean} [remove]
92✔
361
   * @prop {boolean} [publish]
92✔
362
   * @prop {import('redlock').RedlockAbortSignal} [signal]
92✔
363
   * @param {AdvanceOptions} [opts]
92✔
364
   * @returns {Promise<{
92✔
365
   *   historyEntry: HistoryEntry,
92✔
366
   *   user: User,
92✔
367
   *   media: Media,
92✔
368
   *   playlist: Playlist,
92✔
369
   * }|null>}
92✔
370
   */
92✔
371
  async #advanceLocked(opts = {}, tx = this.#uw.db) {
92✔
372
    const { playlists } = this.#uw;
5✔
373

5✔
374
    const publish = opts.publish ?? true;
5✔
375
    const remove = opts.remove || (
5✔
376
      !await this.#uw.waitlist.isCycleEnabled()
5✔
377
    );
5✔
378

5✔
379
    const previous = await this.getCurrentEntry();
5✔
380
    let next;
5✔
381
    try {
5✔
382
      next = await this.#getNextEntry({ remove });
5✔
383
    } catch (err) {
5!
UNCOV
384
      // If the next user's playlist was empty, remove them from the waitlist
×
UNCOV
385
      // and try advancing again.
×
UNCOV
386
      if (err instanceof EmptyPlaylistError) {
×
UNCOV
387
        this.#logger.info('user has empty playlist, skipping on to the next');
×
NEW
388
        await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
×
NEW
389
        return this.#advanceLocked({ publish, remove: true }, tx);
×
UNCOV
390
      }
×
UNCOV
391
      throw err;
×
392
    }
×
393

5✔
394
    if (opts.signal?.aborted) {
5!
UNCOV
395
      throw opts.signal.error;
×
396
    }
×
397

5✔
398
    if (previous) {
5!
UNCOV
399
      this.#logger.info({
×
NEW
400
        id: previous.historyEntry.id,
×
UNCOV
401
        artist: previous.media.artist,
×
UNCOV
402
        title: previous.media.title,
×
UNCOV
403
        upvotes: previous.upvotes.length,
×
UNCOV
404
        favorites: previous.favorites.length,
×
UNCOV
405
        downvotes: previous.downvotes.length,
×
UNCOV
406
      }, 'previous track stats');
×
UNCOV
407
    }
×
408

5✔
409
    let result = null;
5✔
410
    if (next != null) {
5✔
411
      this.#logger.info({
5✔
412
        id: next.playlistItem.id,
5✔
413
        artist: next.playlistItem.artist,
5✔
414
        title: next.playlistItem.title,
5✔
415
      }, 'next track');
5✔
416
      const sourceData = await this.#getSourceDataForPlayback(next);
5✔
417
      if (sourceData) {
5!
NEW
418
        next.historyEntry.sourceData = sourceData;
×
UNCOV
419
      }
×
420
      const historyEntry = await tx.insertInto('historyEntries')
5✔
421
        .returningAll()
5✔
422
        .values({
5✔
423
          id: next.historyEntry.id,
5✔
424
          userID: next.user.id,
5✔
425
          mediaID: next.media.id,
5✔
426
          artist: next.historyEntry.artist,
5✔
427
          title: next.historyEntry.title,
5✔
428
          start: next.historyEntry.start,
5✔
429
          end: next.historyEntry.end,
5✔
430
          sourceData: sourceData != null ? jsonb(sourceData) : null,
5!
431
        })
5✔
432
        .executeTakeFirstOrThrow();
5✔
433

5✔
434
      result = {
5✔
435
        historyEntry,
5✔
436
        playlist: next.playlist,
5✔
437
        user: next.user,
5✔
438
        media: next.media,
5✔
439
      };
5✔
440
    } else {
5!
441
      this.#maybeStop();
×
442
    }
×
443

5✔
444
    await this.#cycleWaitlist(previous != null ? previous.historyEntry.userID : null, { remove });
5!
445

5✔
446
    if (next) {
5✔
447
      await this.#update(next);
5✔
448
      await playlists.cyclePlaylist(next.playlist, tx);
5✔
449
      this.#play(next.historyEntry);
5✔
450
    } else {
5!
451
      await this.clear();
×
UNCOV
452
    }
×
453

5✔
454
    if (publish !== false) {
5✔
455
      await this.#publishAdvanceComplete(result);
5✔
456
    }
5✔
457

5✔
458
    return result;
5✔
459
  }
5✔
460

92✔
461
  /**
92✔
462
   * @param {AdvanceOptions} [opts]
92✔
463
   */
92✔
464
  advance(opts = {}) {
92✔
465
    const result = this.#locker.using(
5✔
466
      ['booth:advancing'],
5✔
467
      10_000,
5✔
468
      (signal) => this.#advanceLocked({ ...opts, signal }),
5✔
469
    );
5✔
470
    this.#awaitAdvance = result;
5✔
471
    return result;
5✔
472
  }
5✔
473
}
92✔
474

1✔
475
/**
1✔
476
 * @param {import('../Uwave.js').Boot} uw
1✔
477
 */
1✔
478
async function boothPlugin(uw) {
92✔
479
  uw.booth = new Booth(uw);
92✔
480
  uw.httpApi.use('/booth', routes());
92✔
481

92✔
482
  uw.after(async (err) => {
92✔
483
    if (!err) {
92✔
484
      await uw.booth.onStart();
92✔
485
    }
92✔
486
  });
92✔
487
}
92✔
488

1✔
489
export default boothPlugin;
1✔
490
export { Booth };
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc