• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snejus / beetcamp / 11114941159

30 Sep 2024 01:32PM UTC coverage: 90.77% (-0.4%) from 91.123%
11114941159

push

github

snejus
build: use standard poetry workflow

528 of 587 branches covered (89.95%)

Branch coverage included in aggregate %.

1016 of 1114 relevant lines covered (91.2%)

13.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.08
/beetsplug/bandcamp/metaguru.py
1
"""Module for parsing bandcamp metadata."""
2

3
import itertools as it
15✔
4
import json
15✔
5
import operator as op
15✔
6
import re
15✔
7
from collections import Counter
15✔
8
from datetime import date, datetime
15✔
9
from functools import cached_property, partial
15✔
10
from typing import Any, Dict, Iterable, List, Optional, Set
15✔
11
from unicodedata import normalize
15✔
12

13
from beets import __version__ as beets_version
15✔
14
from beets import config as beets_config
15✔
15
from beets.autotag.hooks import AlbumInfo, TrackInfo
15✔
16
from packaging import version
15✔
17
from pycountry import countries, subdivisions
15✔
18

19
from .album import AlbumName
15✔
20
from .helpers import PATTERNS, Helpers, MediaInfo
15✔
21
from .track import Track
15✔
22
from .tracks import Tracks
15✔
23

24
BEETS_VERSION = version.parse(beets_version)
15✔
25
EXTENDED_FIELDS_SUPPORT = version.Version("1.5.0") <= BEETS_VERSION
15✔
26
ALBUMTYPES_LIST_SUPPORT = version.Version("1.6.0") < BEETS_VERSION
15✔
27

28
JSONDict = Dict[str, Any]
15✔
29

30
COUNTRY_OVERRIDES = {
15✔
31
    "Russia": "RU",  # pycountry: Russian Federation
32
    "The Netherlands": "NL",  # pycountry: Netherlands
33
    "UK": "GB",  # pycountry: Great Britain
34
    "D.C.": "US",
35
    "South Korea": "KR",  # pycountry: Korea, Republic of
36
    "Turkey": "TR",  # pycountry: only handles Türkiye
37
}
38
DATA_SOURCE = "bandcamp"
15✔
39
WORLDWIDE = "XW"
15✔
40
DIGI_MEDIA = "Digital Media"
15✔
41
VA = "Various Artists"
15✔
42

43

44
class Metaguru(Helpers):
15✔
45
    _singleton = False
15✔
46
    va_name = VA
15✔
47
    media = MediaInfo("", "", "", "")
15✔
48

49
    meta: JSONDict
15✔
50
    config: JSONDict
15✔
51
    media_formats: List[MediaInfo]
15✔
52
    _tracks: Tracks
15✔
53
    _album_name: AlbumName
15✔
54

55
    def __init__(self, meta: JSONDict, config: Optional[JSONDict] = None) -> None:
15✔
56
        self.meta = meta
15✔
57
        self.media_formats = self.get_media_formats(
15✔
58
            (meta.get("inAlbum") or meta).get("albumRelease") or []
59
        )
60
        if self.media_formats:
15!
61
            self.media = self.media_formats[0]
15✔
62
        self.config = config or {}
15✔
63
        self.va_name = beets_config["va_name"].as_str() or self.va_name
15✔
64
        self._tracks = Tracks.from_json(meta)
15✔
65
        self._album_name = AlbumName(
15✔
66
            meta.get("name") or "", self.all_media_comments, self._tracks.album
67
        )
68

69
    @classmethod
15✔
70
    def from_html(cls, html: str, config: Optional[JSONDict] = None) -> "Metaguru":
15✔
71
        try:
15✔
72
            meta = re.search(PATTERNS["meta"], html.replace("\u200b", "")).group()  # type: ignore[union-attr]  # noqa
15✔
73
        except AttributeError as exc:
15✔
74
            raise AttributeError("Could not find release metadata JSON") from exc
15✔
75
        else:
76
            return cls(json.loads(meta), config)
15✔
77

78
    @cached_property
15✔
79
    def excluded_fields(self) -> Set[str]:
15✔
80
        return set(self.config.get("excluded_fields") or [])
15✔
81

82
    @property
15✔
83
    def comments(self) -> Optional[str]:
15✔
84
        """Return release, media descriptions and credits separated by
85
        the configured separator string.
86
        """
87
        parts: List[str] = [self.meta.get("description") or ""]
15✔
88
        media_desc = self.media.description
15✔
89
        if media_desc and not media_desc.startswith("Includes high-quality"):
15✔
90
            parts.append(media_desc)
15✔
91

92
        parts.append(self.meta.get("creditText") or "")
15✔
93
        sep: str = self.config["comments_separator"]
15✔
94
        return sep.join(filter(None, parts)).replace("\r", "") or None
15✔
95

96
    @cached_property
15✔
97
    def all_media_comments(self) -> str:
15✔
98
        return "\n".join([
15✔
99
            *[m.description for m in self.media_formats],
100
            self.comments or "",
101
        ])
102

103
    @cached_property
15✔
104
    def label(self) -> str:
15✔
105
        m = re.search(r"Label:([^/,\n]+)", self.all_media_comments)
15✔
106
        if m:
15✔
107
            return m.expand(r"\1").strip(" '\"")
15✔
108

109
        return self.get_label(self.meta)
15✔
110

111
    @cached_property
15✔
112
    def album_id(self) -> str:
15✔
113
        return self.meta.get("@id") or ""
15✔
114

115
    @cached_property
15✔
116
    def artist_id(self) -> str:
15✔
117
        try:
15✔
118
            return self.meta["byArtist"]["@id"]  # type: ignore [no-any-return]
15✔
119
        except KeyError:
15✔
120
            return self.meta["publisher"]["@id"]  # type: ignore [no-any-return]
15✔
121

122
    @cached_property
15✔
123
    def original_albumartist(self) -> str:
15✔
124
        m = re.search(r"Artists?:([^\n]+)", self.all_media_comments)
15✔
125
        aartist = m.group(1).strip() if m else self.meta["byArtist"]["name"]
15✔
126
        return re.sub(r" +// +", ", ", aartist)
15✔
127

128
    @cached_property
15✔
129
    def original_album(self) -> str:
15✔
130
        return self._album_name.original
15✔
131

132
    @cached_property
15✔
133
    def bandcamp_albumartist(self) -> str:
15✔
134
        """Return the official release albumartist.
135
        It is correct in half of the cases. In others, we usually find the label name.
136
        """
137
        aartist = self.original_albumartist
15✔
138
        if self.label == aartist:
15✔
139
            split = AlbumName.clean(self.original_album, [self.catalognum]).split(" - ")
15✔
140
            if len(split) > 1:
15✔
141
                aartist = split[0]
15✔
142

143
        aartists = Helpers.split_artists([aartist])
15✔
144
        if len(aartists) == 1:
15✔
145
            return aartist
15✔
146

147
        remixers_str = " ".join(self._tracks.other_artists).lower()
15✔
148

149
        def not_remixer(x: str) -> bool:
15✔
150
            splits = {x, *x.split(" & ")}
15✔
151
            return not any(y.lower() in remixers_str for y in splits)
15✔
152

153
        valid = list(filter(not_remixer, aartists))
15✔
154
        if len(valid) == len(aartists) and len(self._tracks.artists) <= 4:
15✔
155
            return aartist
15✔
156
        return ", ".join(valid)
15✔
157

158
    @cached_property
15✔
159
    def image(self) -> str:
15✔
160
        image = self.meta.get("image") or ""
×
161
        if isinstance(image, list) and isinstance(image[0], str):
×
162
            return image[0]
×
163
        return image
×
164

165
    @cached_property
15✔
166
    def release_date(self) -> Optional[date]:
15✔
167
        """Parse the datestring that takes the format like below and return date object.
168
        {"datePublished": "17 Jul 2020 00:00:00 GMT"}
169

170
        If the field is not found, return None.
171
        """
172
        rel = self.meta.get("datePublished") or self.meta.get("dateModified")
15✔
173
        if rel:
15✔
174
            return datetime.strptime(re.sub(r" \d{2}:.+", "", rel), "%d %b %Y").date()
15✔
175
        return rel
15✔
176

177
    @cached_property
15✔
178
    def albumstatus(self) -> str:
15✔
179
        reldate = self.release_date
15✔
180
        return "Official" if reldate and reldate <= date.today() else "Promotional"
15✔
181

182
    @property
15✔
183
    def disctitle(self) -> str:
15✔
184
        """Return medium's disc title if found."""
185
        return "" if self.media.name == DIGI_MEDIA else self.media.title
15✔
186

187
    @property
15✔
188
    def mediums(self) -> int:
15✔
189
        return self.get_vinyl_count(self.disctitle) if self.media.name == "Vinyl" else 1
15✔
190

191
    @cached_property
15✔
192
    def general_catalognum(self) -> str:
15✔
193
        """Find catalog number in the media-agnostic release metadata and cache it."""
194
        return self._tracks.catalognum or self.parse_catalognum(
15✔
195
            album=self.meta["name"],
196
            description=self.comments or "",
197
            label=self.label if not self._singleton else "",
198
            artistitles=self._tracks.artistitles,
199
        )
200

201
    @property
15✔
202
    def catalognum(self) -> str:
15✔
203
        """Find catalog number in the media-specific release metadata or return
204
        the cached media-agnostic one.
205
        """
206
        return (
15✔
207
            self.parse_catalognum(
208
                disctitle=self.disctitle,
209
                description=self.media.description,
210
                label=self.label if not self._singleton else "",
211
                artistitles=self._tracks.artistitles,
212
            )
213
            or self.general_catalognum
214
        )
215

216
    @cached_property
15✔
217
    def country(self) -> str:
15✔
218
        try:
15✔
219
            loc = self.meta["publisher"]["foundingLocation"]["name"].rpartition(", ")[
15✔
220
                -1
221
            ]
222
            name = normalize("NFKD", loc).encode("ascii", "ignore").decode()
15✔
223
            return (
15✔
224
                COUNTRY_OVERRIDES.get(name)
225
                or getattr(countries.get(name=name, default=object), "alpha_2", None)
226
                or subdivisions.lookup(name).country_code
227
            )
228
        except (ValueError, LookupError):
15✔
229
            return WORLDWIDE
15✔
230

231
    @cached_property
15✔
232
    def tracks(self) -> Tracks:
15✔
233
        self._tracks.adjust_artists(self.bandcamp_albumartist)
15✔
234
        return self._tracks
15✔
235

236
    @cached_property
15✔
237
    def unique_artists(self) -> List[str]:
15✔
238
        return self.split_artists(self._tracks.artists)
15✔
239

240
    @cached_property
15✔
241
    def albumartist(self) -> str:
15✔
242
        """Take into account the release contents and return the actual albumartist.
243
        * 'Various Artists' (or `va_name` configuration option) for a compilation release
244
        """
245
        if self.va:
15✔
246
            return self.va_name
15✔
247

248
        if len(self._tracks) == 1:
15✔
249
            return self.tracks.first.artist
15✔
250

251
        aartist = self.original_albumartist
15✔
252
        if self.unique_artists:
15!
253
            aartist = ", ".join(sorted(self.unique_artists))
15✔
254

255
        return aartist
15✔
256

257
    @cached_property
15✔
258
    def vinyl_disctitles(self) -> str:
15✔
259
        return " ".join([m.title for m in self.media_formats if m.name == "Vinyl"])
15✔
260

261
    @cached_property
15✔
262
    def album_name(self) -> str:
15✔
263
        return self._album_name.get(
15✔
264
            self.catalognum,
265
            self.tracks.original_artists,
266
            self.tracks.artists,
267
            self.label,
268
        )
269

270
    def _search_albumtype(self, word: str) -> bool:
15✔
271
        """Return whether the given word (ep or lp) matches the release albumtype.
272
        True when one of the following conditions is met:
273
        * if {word}[0-9] is found in the catalognum
274
        * if it's found in the original album name or any vinyl disctitle
275
        * if it's found in the same sentence as 'this' or '{album_name}', where
276
        sentences are read from release and media descriptions.
277
        """
278
        sentences = re.split(r"[.]\s+|\n", self.all_media_comments)
15✔
279
        word_pat = re.compile(rf"\b{word}\b", re.I)
15✔
280
        catnum_pat = re.compile(rf"{word}\d", re.I)
15✔
281
        name_pat = re.compile(rf"\b(this|{re.escape(self.album_name)})\b", re.I)
15✔
282
        return bool(
15✔
283
            catnum_pat.search(self.catalognum)
284
            or word_pat.search(self.original_album + " " + self.vinyl_disctitles)
285
            or any(word_pat.search(s) and name_pat.search(s) for s in sentences)
286
        )
287

288
    @cached_property
15✔
289
    def is_single_album(self) -> bool:
15✔
290
        return (
15✔
291
            self._singleton
292
            or len({t.title_without_remix for t in self.tracks}) == 1
293
            or len(self._tracks.raw_names) == 1
294
        )
295

296
    @cached_property
15✔
297
    def is_lp(self) -> bool:
15✔
298
        """Return whether the release is an LP."""
299
        return self._search_albumtype("lp")
15✔
300

301
    @cached_property
15✔
302
    def is_ep(self) -> bool:
15✔
303
        """Return whether the release is an EP."""
304
        return self._search_albumtype("ep") or (
15✔
305
            " / " in self.album_name and len(self.tracks.artists) == 2
306
        )
307

308
    def check_albumtype_in_descriptions(self) -> str:
15✔
309
        """Count 'lp', 'album' and 'ep' words in the release and media descriptions
310
        and return the albumtype that represents the word matching the most times.
311
        """
312
        matches = re.findall(r"\b(album|ep|lp)\b", self.all_media_comments.lower())
15✔
313
        if matches:
15✔
314
            counts = Counter(x.replace("lp", "album") for x in matches)
15✔
315
            # if equal, we assume it's an EP since it's more likely that an EP is
316
            # referred to as an "album" rather than the other way around
317
            if counts["ep"] >= counts["album"]:
15✔
318
                return "ep"
15✔
319
        return "album"
15✔
320

321
    @cached_property
15✔
322
    def is_comp(self) -> bool:
15✔
323
        """Return whether the release is a compilation."""
324

325
        def first_one(artist: str) -> str:
15✔
326
            return PATTERNS["split_artists"].split(artist.replace(" & ", ", "))[0]
15✔
327

328
        truly_unique = set(map(first_one, self.tracks.artists))
15✔
329
        return (
15✔
330
            self._album_name.mentions_compilation
331
            or self._search_albumtype("compilation")
332
            or (len(truly_unique) > 3 and len(self.tracks) > 4)
333
        )
334

335
    @cached_property
15✔
336
    def albumtype(self) -> str:
15✔
337
        if self._singleton:
15✔
338
            return "single"
15✔
339
        if self.is_ep:
15✔
340
            return "ep"
15✔
341
        if self.is_lp:
15✔
342
            return "album"
15✔
343

344
        atype = self.check_albumtype_in_descriptions()
15✔
345
        if atype == "ep":
15✔
346
            return "ep"
15✔
347
        # otherwise, it's an album, but we firstly need to check if it's a compilation
348
        if self.is_comp:
15✔
349
            return "compilation"
15✔
350

351
        return "album"
15✔
352

353
    @cached_property
15✔
354
    def albumtypes(self) -> List[str]:
15✔
355
        albumtypes = {self.albumtype}
15✔
356
        if self.is_comp:
15✔
357
            if self.albumtype == "ep":
15!
358
                albumtypes.add("compilation")
×
359
            else:
360
                albumtypes.add("album")
15✔
361
        if self.is_lp:
15✔
362
            albumtypes.add("lp")
15✔
363
        if self.is_single_album:
15✔
364
            albumtypes.add("single")
15✔
365
        for word in ["remix", "rmx", "edits", "live", "soundtrack"]:
15✔
366
            if word in self.original_album.lower():
15✔
367
                albumtypes.add(word.replace("rmx", "remix").replace("edits", "remix"))
15✔
368
        if len(self.tracks.remixers) == len(self.tracks):
15✔
369
            albumtypes.add("remix")
15✔
370

371
        return sorted(albumtypes)
15✔
372

373
    @cached_property
15✔
374
    def va(self) -> bool:
15✔
375
        return len(self.unique_artists) > 3
15✔
376

377
    @cached_property
15✔
378
    def style(self) -> Optional[str]:
15✔
379
        """Extract bandcamp genre tag from the metadata."""
380
        # expecting the following form: https://bandcamp.com/tag/folk
381
        tag_url = self.meta.get("publisher", {}).get("genre") or ""
15✔
382
        style = None
15✔
383
        if tag_url:
15✔
384
            style = tag_url.split("/")[-1]
15✔
385
            if self.config["genre"]["capitalize"]:
15✔
386
                style = style.capitalize()
15✔
387
        return style
15✔
388

389
    @cached_property
15✔
390
    def genre(self) -> Optional[str]:
15✔
391
        kws: Iterable[str] = map(str.lower, self.meta.get("keywords", []))
15✔
392
        if self.style:
15✔
393
            exclude_style = partial(op.ne, self.style.lower())
15✔
394
            kws = filter(exclude_style, kws)
15✔
395

396
        genre_cfg = self.config["genre"]
15✔
397
        genres = self.get_genre(kws, genre_cfg, self.label)
15✔
398
        if genre_cfg["capitalize"]:
15✔
399
            genres = map(str.capitalize, genres)
15✔
400
        if genre_cfg["maximum"]:
15✔
401
            genres = it.islice(genres, genre_cfg["maximum"])
15✔
402

403
        return ", ".join(sorted(genres)).strip() or None
15✔
404

405
    @property
15✔
406
    def _common(self) -> JSONDict:
15✔
407
        return {
15✔
408
            "data_source": DATA_SOURCE,
409
            "media": self.media.name,
410
            "data_url": self.album_id,
411
            "artist_id": self.artist_id,
412
        }
413

414
    def get_fields(self, fields: Iterable[str], src: object = None) -> JSONDict:
15✔
415
        """Return a mapping between unexcluded fields and their values."""
416
        fields = list(set(fields) - self.excluded_fields)
15✔
417
        if len(fields) == 1:
15✔
418
            field = fields.pop()
15✔
419
            return {field: getattr(self, field)}
15✔
420
        return dict(zip(fields, iter(op.attrgetter(*fields)(src or self))))
15✔
421

422
    @property
15✔
423
    def _common_album(self) -> JSONDict:
15✔
424
        common_data: JSONDict = {"album": self.album_name}
15✔
425
        fields = ["label", "catalognum", "albumtype", "country"]
15✔
426
        if EXTENDED_FIELDS_SUPPORT:
15!
427
            fields.extend(["genre", "style", "comments", "albumtypes"])
15✔
428
        common_data.update(self.get_fields(fields))
15✔
429
        if EXTENDED_FIELDS_SUPPORT and not ALBUMTYPES_LIST_SUPPORT:
15!
430
            common_data["albumtypes"] = "; ".join(common_data["albumtypes"])
15✔
431
        reldate = self.release_date
15✔
432
        if reldate:
15✔
433
            common_data.update(self.get_fields(["year", "month", "day"], reldate))
15✔
434

435
        return common_data
15✔
436

437
    def _trackinfo(self, track: Track, **kwargs: Any) -> TrackInfo:
15✔
438
        data = track.info
15✔
439
        data.update(**self._common, **kwargs)
15✔
440
        # if track-level catalognum is not found or if it is the same as album's, then
441
        # remove it. Otherwise, keep it attached to the track
442
        if not data["catalognum"] or data["catalognum"] == self.catalognum:
15!
443
            data.pop("catalognum", None)
15✔
444
        if not data["lyrics"]:
15✔
445
            data.pop("lyrics", None)
15✔
446
        if not EXTENDED_FIELDS_SUPPORT:
15!
447
            data.pop("catalognum", None)
×
448
            data.pop("lyrics", None)
×
449
        for field in set(data.keys()) & self.excluded_fields:
15!
450
            data.pop(field)
×
451

452
        return TrackInfo(**data)
15✔
453

454
    @cached_property
15✔
455
    def singleton(self) -> TrackInfo:
15✔
456
        self._singleton = True
15✔
457
        self.media = self.media_formats[0]
15✔
458
        track = self._trackinfo(self.tracks.first)
15✔
459
        if EXTENDED_FIELDS_SUPPORT:
15!
460
            track.update(self._common_album)
15✔
461
            track.pop("album", None)
15✔
462
        track.track_id = track.data_url
15✔
463
        return track
15✔
464

465
    def get_media_album(self, media: MediaInfo) -> AlbumInfo:
15✔
466
        """Return album for the appropriate release format."""
467
        self.media = media
15✔
468
        include_digi = self.config.get("include_digital_only_tracks")
15✔
469

470
        tracks = list(self.tracks)
15✔
471
        if not include_digi and self.media.name != DIGI_MEDIA:
15✔
472
            tracks = [t for t in self.tracks if not t.digi_only]
15✔
473

474
        get_trackinfo = partial(
15✔
475
            self._trackinfo,
476
            medium=1,
477
            disctitle=self.disctitle or None,
478
            medium_total=len(self.tracks),
479
        )
480
        album_info = AlbumInfo(
15✔
481
            **self._common,
482
            **self._common_album,
483
            artist=self.albumartist,
484
            album_id=self.album_id,
485
            mediums=self.mediums,
486
            albumstatus=self.albumstatus,
487
            tracks=list(map(get_trackinfo, tracks)),
488
        )
489
        for key, val in self.get_fields(["va"]).items():
15✔
490
            setattr(album_info, key, val)
15✔
491
        album_info.album_id = self.media.album_id
15✔
492
        if self.media.name == "Vinyl":
15✔
493
            album_info = self.add_track_alts(album_info, self.comments or "")
15✔
494
        return album_info
15✔
495

496
    @cached_property
15✔
497
    def albums(self) -> List[AlbumInfo]:
15✔
498
        """Return album for the appropriate release format."""
499
        return list(map(self.get_media_album, self.media_formats))
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc