• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

u-wave / core / 10530495771

23 Aug 2024 06:22PM UTC coverage: 80.129% (-0.1%) from 80.244%
10530495771

push

github

web-flow
Add booth auto-leave after current play  (#600)

* Add booth auto-leave after current play

The current DJ can enable auto-leave to leave the waitlist *after* their
current play is over.

* Allow disabling auto-leave

643 of 782 branches covered (82.23%)

Branch coverage included in aggregate %.

98 of 138 new or added lines in 4 files covered. (71.01%)

2 existing lines in 1 file now uncovered.

8394 of 10496 relevant lines covered (79.97%)

44.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.89
/src/plugins/booth.js
1
import assert from 'node:assert';
1✔
2
import RedLock from 'redlock';
1✔
3
import lodash from 'lodash';
1✔
4
import { EmptyPlaylistError, PlaylistItemNotFoundError } from '../errors/index.js';
1✔
5
import routes from '../routes/booth.js';
1✔
6

1✔
7
const { omit } = lodash;
1✔
8

1✔
9
/**
1✔
10
 * @typedef {import('type-fest').JsonObject} JsonObject
1✔
11
 * @typedef {import('../models/index.js').User} User
1✔
12
 * @typedef {import('../models/index.js').Playlist} Playlist
1✔
13
 * @typedef {import('../models/index.js').PlaylistItem} PlaylistItem
1✔
14
 * @typedef {import('../models/index.js').HistoryEntry} HistoryEntry
1✔
15
 * @typedef {import('../models/History.js').HistoryMedia} HistoryMedia
1✔
16
 * @typedef {import('../models/index.js').Media} Media
1✔
17
 * @typedef {{ user: User }} PopulateUser
1✔
18
 * @typedef {{ playlist: Playlist }} PopulatePlaylist
1✔
19
 * @typedef {{ media: Omit<HistoryMedia, 'media'> & { media: Media } }} PopulateMedia
1✔
20
 * @typedef {Omit<HistoryEntry, 'user' | 'playlist' | 'media'>
1✔
21
 *     & PopulateUser & PopulatePlaylist & PopulateMedia} PopulatedHistoryEntry
1✔
22
 */
1✔
23

1✔
24
const REDIS_ADVANCING = 'booth:advancing';
1✔
25
const REDIS_HISTORY_ID = 'booth:historyID';
1✔
26
const REDIS_CURRENT_DJ_ID = 'booth:currentDJ';
1✔
27
const REDIS_REMOVE_AFTER_CURRENT_PLAY = 'booth:removeAfterCurrentPlay';
1✔
28
const REDIS_UPVOTES = 'booth:upvotes';
1✔
29
const REDIS_DOWNVOTES = 'booth:downvotes';
1✔
30
const REDIS_FAVORITES = 'booth:favorites';
1✔
31

1✔
32
const REMOVE_AFTER_CURRENT_PLAY_SCRIPT = {
1✔
33
  keys: [REDIS_CURRENT_DJ_ID, REDIS_REMOVE_AFTER_CURRENT_PLAY],
1✔
34
  lua: `
1✔
35
    local k_dj = KEYS[1]
1✔
36
    local k_remove = KEYS[2]
1✔
37
    local user_id = ARGV[1]
1✔
38
    local value = ARGV[2]
1✔
39
    local current_dj_id = redis.call('GET', k_dj)
1✔
40
    if current_dj_id == user_id then
1✔
41
      if value == 'true' then
1✔
42
        redis.call('SET', k_remove, 'true')
1✔
43
        return 1
1✔
44
      else
1✔
45
        redis.call('DEL', k_remove)
1✔
46
        return 0
1✔
47
      end
1✔
48
    else
1✔
49
      return redis.error_reply('You are not currently playing')
1✔
50
    end
1✔
51
  `,
1✔
52
};
1✔
53

1✔
54
/**
1✔
55
 * @param {Playlist} playlist
1✔
56
 * @returns {Promise<void>}
1✔
57
 */
1✔
58
async function cyclePlaylist(playlist) {
5✔
59
  const item = playlist.media.shift();
5✔
60
  if (item !== undefined) {
5✔
61
    playlist.media.push(item);
5✔
62
  }
5✔
63
  await playlist.save();
5✔
64
}
5✔
65

1✔
66
class Booth {
92✔
67
  #uw;
92✔
68

92✔
69
  #logger;
92✔
70

92✔
71
  /** @type {ReturnType<typeof setTimeout>|null} */
92✔
72
  #timeout = null;
92✔
73

92✔
74
  #locker;
92✔
75

92✔
76
  /** @type {Promise<unknown>|null} */
92✔
77
  #awaitAdvance = null;
92✔
78

92✔
79
  /**
92✔
80
   * @param {import('../Uwave.js').Boot} uw
92✔
81
   */
92✔
82
  constructor(uw) {
92✔
83
    this.#uw = uw;
92✔
84
    this.#locker = new RedLock([this.#uw.redis]);
92✔
85
    this.#logger = uw.logger.child({ ns: 'uwave:booth' });
92✔
86

92✔
87
    uw.redis.defineCommand('uw:removeAfterCurrentPlay', {
92✔
88
      numberOfKeys: REMOVE_AFTER_CURRENT_PLAY_SCRIPT.keys.length,
92✔
89
      lua: REMOVE_AFTER_CURRENT_PLAY_SCRIPT.lua,
92✔
90
    });
92✔
91
  }
92✔
92

92✔
93
  /** @internal */
92✔
94
  async onStart() {
92✔
95
    const current = await this.getCurrentEntry();
92✔
96
    if (current && this.#timeout === null) {
92!
97
      // Restart the advance timer after a server restart, if a track was
×
98
      // playing before the server restarted.
×
99
      const duration = (current.media.end - current.media.start) * 1000;
×
100
      const endTime = Number(current.playedAt) + duration;
×
101
      if (endTime > Date.now()) {
×
102
        this.#timeout = setTimeout(
×
103
          () => this.#advanceAutomatically(),
×
104
          endTime - Date.now(),
×
105
        );
×
106
      } else {
×
107
        this.#advanceAutomatically();
×
108
      }
×
109
    }
×
110

92✔
111
    this.#uw.onClose(async () => {
92✔
112
      this.#onStop();
92✔
113
      await this.#awaitAdvance;
92✔
114
    });
92✔
115
  }
92✔
116

92✔
117
  async #advanceAutomatically() {
92✔
118
    try {
×
119
      await this.advance();
×
120
    } catch (error) {
×
121
      this.#logger.error({ err: error }, 'advance failed');
×
122
    }
×
123
  }
×
124

92✔
125
  #onStop() {
92✔
126
    this.#maybeStop();
92✔
127
  }
92✔
128

92✔
129
  /**
92✔
130
   * @returns {Promise<HistoryEntry | null>}
92✔
131
   */
92✔
132
  async getCurrentEntry() {
92✔
133
    const { HistoryEntry } = this.#uw.models;
101✔
134
    const historyID = await this.#uw.redis.get(REDIS_HISTORY_ID);
101✔
135
    if (!historyID) {
101✔
136
      return null;
100✔
137
    }
100✔
138

1✔
139
    return HistoryEntry.findById(historyID, '+media.sourceData');
1✔
140
  }
101✔
141

92✔
142
  /**
92✔
143
   * Get vote counts for the currently playing media.
92✔
144
   *
92✔
145
   * @returns {Promise<{ upvotes: string[], downvotes: string[], favorites: string[] }>}
92✔
146
   */
92✔
147
  async getCurrentVoteStats() {
92✔
148
    const { redis } = this.#uw;
1✔
149

1✔
150
    const results = await redis.pipeline()
1✔
151
      .smembers(REDIS_UPVOTES)
1✔
152
      .smembers(REDIS_DOWNVOTES)
1✔
153
      .smembers(REDIS_FAVORITES)
1✔
154
      .exec();
1✔
155
    assert(results);
1✔
156

1✔
157
    const voteStats = {
1✔
158
      upvotes: /** @type {string[]} */ (results[0][1]),
1✔
159
      downvotes: /** @type {string[]} */ (results[1][1]),
1✔
160
      favorites: /** @type {string[]} */ (results[2][1]),
1✔
161
    };
1✔
162

1✔
163
    return voteStats;
1✔
164
  }
1✔
165

92✔
166
  /**
92✔
167
   * @param {HistoryEntry} entry
92✔
168
   */
92✔
169
  async #saveStats(entry) {
92✔
170
    const stats = await this.getCurrentVoteStats();
×
171

×
172
    Object.assign(entry, stats);
×
173
    return entry.save();
×
174
  }
×
175

92✔
176
  /**
92✔
177
   * @param {{ remove?: boolean }} options
92✔
178
   * @returns {Promise<User|null>}
92✔
179
   */
92✔
180
  async #getNextDJ(options) {
92✔
181
    const { User } = this.#uw.models;
5✔
182
    /** @type {string|null} */
5✔
183
    let userID = await this.#uw.redis.lindex('waitlist', 0);
5✔
184
    if (!userID && !options.remove) {
5!
185
      // If the waitlist is empty, the current DJ will play again immediately.
×
NEW
186
      userID = await this.#uw.redis.get(REDIS_CURRENT_DJ_ID);
×
187
    }
×
188
    if (!userID) {
5!
189
      return null;
×
190
    }
×
191

5✔
192
    return User.findById(userID);
5✔
193
  }
5✔
194

92✔
195
  /**
92✔
196
   * @param {{ remove?: boolean }} options
92✔
197
   * @returns {Promise<PopulatedHistoryEntry | null>}
92✔
198
   */
92✔
199
  async #getNextEntry(options) {
92✔
200
    const { HistoryEntry, PlaylistItem } = this.#uw.models;
5✔
201
    const { playlists } = this.#uw;
5✔
202

5✔
203
    const user = await this.#getNextDJ(options);
5✔
204
    if (!user || !user.activePlaylist) {
5!
205
      return null;
×
206
    }
×
207
    const playlist = await playlists.getUserPlaylist(user, user.activePlaylist);
5✔
208
    if (playlist.size === 0) {
5!
209
      throw new EmptyPlaylistError();
×
210
    }
×
211

5✔
212
    const playlistItem = await PlaylistItem.findById(playlist.media[0]);
5✔
213
    if (!playlistItem) {
5!
214
      throw new PlaylistItemNotFoundError({ id: playlist.media[0] });
×
215
    }
×
216

5✔
217
    /** @type {PopulatedHistoryEntry} */
5✔
218
    // @ts-expect-error TS2322: `user` and `playlist` are already populated,
5✔
219
    // and `media.media` is populated immediately below.
5✔
220
    const entry = new HistoryEntry({
5✔
221
      user,
5✔
222
      playlist,
5✔
223
      item: playlistItem._id,
5✔
224
      media: {
5✔
225
        media: playlistItem.media,
5✔
226
        artist: playlistItem.artist,
5✔
227
        title: playlistItem.title,
5✔
228
        start: playlistItem.start,
5✔
229
        end: playlistItem.end,
5✔
230
      },
5✔
231
    });
5✔
232
    await entry.populate('media.media');
5✔
233

5✔
234
    return entry;
5✔
235
  }
5✔
236

92✔
237
  /**
92✔
238
   * @param {HistoryEntry|null} previous
92✔
239
   * @param {{ remove?: boolean }} options
92✔
240
   */
92✔
241
  async #cycleWaitlist(previous, options) {
92✔
242
    const waitlistLen = await this.#uw.redis.llen('waitlist');
5✔
243
    if (waitlistLen > 0) {
5✔
244
      await this.#uw.redis.lpop('waitlist');
5✔
245
      if (previous && !options.remove) {
5!
246
        // The previous DJ should only be added to the waitlist again if it was
×
247
        // not empty. If it was empty, the previous DJ is already in the booth.
×
248
        await this.#uw.redis.rpush('waitlist', previous.user.toString());
×
249
      }
×
250
    }
5✔
251
  }
5✔
252

92✔
253
  async clear() {
92✔
NEW
254
    await this.#uw.redis.del(
×
NEW
255
      REDIS_HISTORY_ID,
×
NEW
256
      REDIS_CURRENT_DJ_ID,
×
NEW
257
      REDIS_REMOVE_AFTER_CURRENT_PLAY,
×
NEW
258
      REDIS_UPVOTES,
×
NEW
259
      REDIS_DOWNVOTES,
×
NEW
260
      REDIS_FAVORITES,
×
261
    );
×
262
  }
×
263

92✔
264
  /**
92✔
265
   * @param {PopulatedHistoryEntry} next
92✔
266
   */
92✔
267
  async #update(next) {
92✔
268
    await this.#uw.redis.multi()
5✔
269
      .del(REDIS_UPVOTES, REDIS_DOWNVOTES, REDIS_FAVORITES, REDIS_REMOVE_AFTER_CURRENT_PLAY)
5✔
270
      .set(REDIS_HISTORY_ID, next.id)
5✔
271
      .set(REDIS_CURRENT_DJ_ID, next.user.id)
5✔
272
      .exec();
5✔
273
  }
5✔
274

92✔
275
  #maybeStop() {
92✔
276
    if (this.#timeout) {
97✔
277
      clearTimeout(this.#timeout);
5✔
278
      this.#timeout = null;
5✔
279
    }
5✔
280
  }
97✔
281

92✔
282
  /**
92✔
283
   * @param {PopulatedHistoryEntry} entry
92✔
284
   */
92✔
285
  #play(entry) {
92✔
286
    this.#maybeStop();
5✔
287
    this.#timeout = setTimeout(
5✔
288
      () => this.#advanceAutomatically(),
5✔
289
      (entry.media.end - entry.media.start) * 1000,
5✔
290
    );
5✔
291
  }
5✔
292

92✔
293
  /**
92✔
294
   * This method creates a `media` object that clients can understand from a
92✔
295
   * history entry object.
92✔
296
   *
92✔
297
   * We present the playback-specific `sourceData` as if it is
92✔
298
   * a property of the media model for backwards compatibility.
92✔
299
   * Old clients don't expect `sourceData` directly on a history entry object.
92✔
300
   *
92✔
301
   * @param {PopulateMedia} historyEntry
92✔
302
   */
92✔
303
  // eslint-disable-next-line class-methods-use-this
92✔
304
  getMediaForPlayback(historyEntry) {
92✔
305
    return Object.assign(omit(historyEntry.media, 'sourceData'), {
6✔
306
      media: {
6✔
307
        ...historyEntry.media.media.toJSON(),
6✔
308
        sourceData: {
6✔
309
          ...historyEntry.media.media.sourceData,
6✔
310
          ...historyEntry.media.sourceData,
6✔
311
        },
6✔
312
      },
6✔
313
    });
6✔
314
  }
6✔
315

92✔
316
  /**
92✔
317
   * @param {PopulatedHistoryEntry|null} next
92✔
318
   */
92✔
319
  async #publishAdvanceComplete(next) {
92✔
320
    const { waitlist } = this.#uw;
5✔
321

5✔
322
    if (next) {
5✔
323
      this.#uw.publish('advance:complete', {
5✔
324
        historyID: next.id,
5✔
325
        userID: next.user.id,
5✔
326
        playlistID: next.playlist.id,
5✔
327
        itemID: next.item.toString(),
5✔
328
        media: this.getMediaForPlayback(next),
5✔
329
        playedAt: next.playedAt.getTime(),
5✔
330
      });
5✔
331
      this.#uw.publish('playlist:cycle', {
5✔
332
        userID: next.user.id,
5✔
333
        playlistID: next.playlist.id,
5✔
334
      });
5✔
335
    } else {
5!
336
      this.#uw.publish('advance:complete', null);
×
337
    }
×
338
    this.#uw.publish('waitlist:update', await waitlist.getUserIDs());
5✔
339
  }
5✔
340

92✔
341
  /**
92✔
342
   * @param {PopulatedHistoryEntry} entry
92✔
343
   */
92✔
344
  async #getSourceDataForPlayback(entry) {
92✔
345
    const { sourceID, sourceType } = entry.media.media;
5✔
346
    const source = this.#uw.source(sourceType);
5✔
347
    if (source) {
5✔
348
      this.#logger.trace({ sourceType: source.type, sourceID }, 'running pre-play hook');
5✔
349
      /** @type {JsonObject | undefined} */
5✔
350
      let sourceData;
5✔
351
      try {
5✔
352
        sourceData = await source.play(entry.user, entry.media.media);
5✔
353
        this.#logger.trace({ sourceType: source.type, sourceID, sourceData }, 'pre-play hook result');
5✔
354
      } catch (error) {
5!
355
        this.#logger.error({ sourceType: source.type, sourceID, err: error }, 'pre-play hook failed');
×
356
      }
×
357
      return sourceData;
5✔
358
    }
5✔
359

×
360
    return undefined;
×
361
  }
5✔
362

92✔
363
  /**
92✔
364
   * @typedef {object} AdvanceOptions
92✔
365
   * @prop {boolean} [remove]
92✔
366
   * @prop {boolean} [publish]
92✔
367
   * @prop {import('redlock').RedlockAbortSignal} [signal]
92✔
368
   *
92✔
369
   * @param {AdvanceOptions} [opts]
92✔
370
   * @returns {Promise<PopulatedHistoryEntry|null>}
92✔
371
   */
92✔
372
  async #advanceLocked(opts = {}) {
92✔
373
    const publish = opts.publish ?? true;
5✔
374
    const removeAfterCurrent = (await this.#uw.redis.del(REDIS_REMOVE_AFTER_CURRENT_PLAY)) === 1;
5✔
375
    const remove = opts.remove || removeAfterCurrent || (
5✔
376
      !await this.#uw.waitlist.isCycleEnabled()
5✔
377
    );
5✔
378

5✔
379
    const previous = await this.getCurrentEntry();
5✔
380
    let next;
5✔
381
    try {
5✔
382
      next = await this.#getNextEntry({ remove });
5✔
383
    } catch (err) {
5!
384
      // If the next user's playlist was empty, remove them from the waitlist
×
385
      // and try advancing again.
×
386
      if (err instanceof EmptyPlaylistError) {
×
387
        this.#logger.info('user has empty playlist, skipping on to the next');
×
388
        await this.#cycleWaitlist(previous, { remove });
×
389
        return this.#advanceLocked({ publish, remove: true });
×
390
      }
×
391
      throw err;
×
392
    }
×
393

5✔
394
    if (opts.signal?.aborted) {
5!
395
      throw opts.signal.error;
×
396
    }
×
397

5✔
398
    if (previous) {
5!
399
      await this.#saveStats(previous);
×
400

×
401
      this.#logger.info({
×
402
        id: previous._id,
×
403
        artist: previous.media.artist,
×
404
        title: previous.media.title,
×
405
        upvotes: previous.upvotes.length,
×
406
        favorites: previous.favorites.length,
×
407
        downvotes: previous.downvotes.length,
×
408
      }, 'previous track stats');
×
409
    }
×
410

5✔
411
    if (next) {
5✔
412
      this.#logger.info({
5✔
413
        id: next._id,
5✔
414
        artist: next.media.artist,
5✔
415
        title: next.media.title,
5✔
416
      }, 'next track');
5✔
417
      const sourceData = await this.#getSourceDataForPlayback(next);
5✔
418
      if (sourceData) {
5!
419
        next.media.sourceData = sourceData;
×
420
      }
×
421
      await next.save();
5✔
422
    } else {
5!
423
      this.#maybeStop();
×
424
    }
×
425

5✔
426
    await this.#cycleWaitlist(previous, { remove });
5✔
427

5✔
428
    if (next) {
5✔
429
      await this.#update(next);
5✔
430
      await cyclePlaylist(next.playlist);
5✔
431
      this.#play(next);
5✔
432
    } else {
5!
433
      await this.clear();
×
434
    }
×
435

5✔
436
    if (publish !== false) {
5✔
437
      await this.#publishAdvanceComplete(next);
5✔
438
    }
5✔
439

5✔
440
    return next;
5✔
441
  }
5✔
442

92✔
443
  /**
92✔
444
   * @param {AdvanceOptions} [opts]
92✔
445
   * @returns {Promise<PopulatedHistoryEntry|null>}
92✔
446
   */
92✔
447
  advance(opts = {}) {
92✔
448
    const result = this.#locker.using(
5✔
449
      [REDIS_ADVANCING],
5✔
450
      10_000,
5✔
451
      (signal) => this.#advanceLocked({ ...opts, signal }),
5✔
452
    );
5✔
453
    this.#awaitAdvance = result;
5✔
454
    return result;
5✔
455
  }
5✔
456

92✔
457
  /**
92✔
458
   * @param {User} user
92✔
459
   * @param {boolean} remove
92✔
460
   */
92✔
461
  async setRemoveAfterCurrentPlay(user, remove) {
92✔
NEW
462
    const newValue = await this.#uw.redis['uw:removeAfterCurrentPlay'](
×
NEW
463
      ...REMOVE_AFTER_CURRENT_PLAY_SCRIPT.keys,
×
NEW
464
      user._id.toString(),
×
NEW
465
      remove,
×
NEW
466
    );
×
NEW
467
    return newValue === 1;
×
NEW
468
  }
×
469

92✔
470
  /**
92✔
471
   * @param {User} user
92✔
472
   */
92✔
473
  async getRemoveAfterCurrentPlay(user) {
92✔
474
    const [currentDJ, removeAfterCurrentPlay] = await this.#uw.redis.mget(
1✔
475
      REDIS_CURRENT_DJ_ID,
1✔
476
      REDIS_REMOVE_AFTER_CURRENT_PLAY,
1✔
477
    );
1✔
478
    if (currentDJ === user.id) {
1✔
479
      return removeAfterCurrentPlay != null;
1✔
480
    }
1✔
NEW
481
    return null;
×
482
  }
1✔
483
}
92✔
484

1✔
485
/**
1✔
486
 * @param {import('../Uwave.js').Boot} uw
1✔
487
 */
1✔
488
async function boothPlugin(uw) {
92✔
489
  uw.booth = new Booth(uw);
92✔
490
  uw.httpApi.use('/booth', routes());
92✔
491

92✔
492
  uw.after(async (err) => {
92✔
493
    if (!err) {
92✔
494
      await uw.booth.onStart();
92✔
495
    }
92✔
496
  });
92✔
497
}
92✔
498

1✔
499
export default boothPlugin;
1✔
500
export { Booth };
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc