• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

devsnd / tinytag / 8125580323

02 Mar 2024 09:34PM UTC coverage: 98.484% (-0.3%) from 98.833%
8125580323

push

github

mathiascode
Support multiple images of the same type

49 of 51 new or added lines in 3 files covered. (96.08%)

4 existing lines in 1 file now uncovered.

1364 of 1385 relevant lines covered (98.48%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.05
/tinytag/tinytag.py
1
# tinytag - an audio file metadata reader
2
# Copyright (c) 2014-2023 Tom Wallroth
3
# Copyright (c) 2021-2024 Mat (mathiascode)
4
#
5
# Sources on GitHub:
6
# http://github.com/devsnd/tinytag/
7

8
# MIT License
9

10
# Copyright (c) 2014-2024 Tom Wallroth, Mat (mathiascode)
11

12
# Permission is hereby granted, free of charge, to any person obtaining a copy
13
# of this software and associated documentation files (the "Software"), to deal
14
# in the Software without restriction, including without limitation the rights
15
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16
# copies of the Software, and to permit persons to whom the Software is
17
# furnished to do so, subject to the following conditions:
18

19
# The above copyright notice and this permission notice shall be included in all
20
# copies or substantial portions of the Software.
21

22
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28
# SOFTWARE.
29

30
"""Audio file metadata reader"""
1✔
31

32
# pylint: disable=invalid-name,protected-access
33
# pylint: disable=too-many-lines,too-many-arguments,too-many-boolean-expressions
34
# pylint: disable=too-many-branches,too-many-instance-attributes,too-many-locals
35
# pylint: disable=too-many-nested-blocks,too-many-statements,too-few-public-methods
36

37

38
from __future__ import annotations
1✔
39
from collections.abc import Callable, Iterator
1✔
40
from functools import reduce
1✔
41
from os import PathLike
1✔
42
from sys import stderr
1✔
43
from typing import Any, BinaryIO
1✔
44

45
import base64
1✔
46
import io
1✔
47
import os
1✔
48
import re
1✔
49
import struct
1✔
50

51

52
DEBUG = bool(os.environ.get('DEBUG'))  # some of the parsers can print debug info
1✔
53

54

55
class TinyTagException(Exception):
1✔
56
    """Base class for exceptions."""
1✔
57

58

59
class ParseError(TinyTagException):
1✔
60
    """Parsing an audio file failed."""
1✔
61

62

63
class UnsupportedFormatError(TinyTagException):
1✔
64
    """File format is not supported."""
1✔
65

66

67
class TinyTag:
1✔
68
    """A class containing audio file metadata."""
1✔
69

70
    SUPPORTED_FILE_EXTENSIONS = (
1✔
71
        '.mp1', '.mp2', '.mp3',
72
        '.oga', '.ogg', '.opus', '.spx',
73
        '.wav', '.flac', '.wma',
74
        '.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc',
75
        '.aiff', '.aifc', '.aif', '.afc'
76
    )
77
    _EXTRA_PREFIX = 'extra.'
1✔
78
    _file_extension_mapping: dict[tuple[bytes, ...], type[TinyTag]] | None = None
1✔
79
    _magic_bytes_mapping: dict[bytes, type[TinyTag]] | None = None
1✔
80

81
    def __init__(self) -> None:
1✔
82
        self.filename: bytes | str | PathLike[Any] | None = None
1✔
83
        self.filesize = 0
1✔
84
        self.duration: float | None = None
1✔
85
        self.channels: int | None = None
1✔
86
        self.bitrate: float | None = None
1✔
87
        self.bitdepth: int | None = None
1✔
88
        self.samplerate: int | None = None
1✔
89
        self.artist: str | None = None
1✔
90
        self.albumartist: str | None = None
1✔
91
        self.album: str | None = None
1✔
92
        self.disc: int | None = None
1✔
93
        self.disc_total: int | None = None
1✔
94
        self.title: str | None = None
1✔
95
        self.track: int | None = None
1✔
96
        self.track_total: int | None = None
1✔
97
        self.genre: str | None = None
1✔
98
        self.year: str | None = None
1✔
99
        self.comment: str | None = None
1✔
100
        self.extra: dict[str, str | float | int] = {}
1✔
101
        self.images = TagImages()
1✔
102
        self._filehandler: BinaryIO | None = None
1✔
103
        self._default_encoding: str | None = None  # allow override for some file formats
1✔
104
        self._parse_duration = True
1✔
105
        self._parse_tags = True
1✔
106
        self._load_image = False
1✔
107
        self._tags_parsed = False
1✔
108

109
    @classmethod
1✔
110
    def get(cls,
1✔
111
            filename: bytes | str | PathLike[Any] | None = None,
112
            tags: bool = True,
113
            duration: bool = True,
114
            image: bool = False,
115
            encoding: str | None = None,
116
            file_obj: BinaryIO | None = None) -> TinyTag:
117
        """Return a tag object for an audio file."""
118
        should_close_file = file_obj is None
1✔
119
        if filename and should_close_file:
1✔
120
            file_obj = open(filename, 'rb')  # pylint: disable=consider-using-with
1✔
121
        if file_obj is None:
1✔
122
            raise ValueError('Either filename or file_obj argument is required')
1✔
123
        try:
1✔
124
            file_obj.seek(0, os.SEEK_END)
1✔
125
            filesize = file_obj.tell()
1✔
126
            file_obj.seek(0)
1✔
127
            parser_class = cls._get_parser_class(filename, file_obj)
1✔
128
            tag = parser_class()
1✔
129
            tag._filehandler = file_obj
1✔
130
            tag._default_encoding = encoding
1✔
131
            tag.filename = filename
1✔
132
            tag.filesize = filesize
1✔
133
            if filesize > 0:
1✔
134
                try:
1✔
135
                    tag._load(tags=tags, duration=duration, image=image)
1✔
136
                except Exception as exc:
1✔
137
                    raise ParseError(exc) from exc
1✔
138
            return tag
1✔
139
        finally:
140
            if should_close_file:
1✔
141
                file_obj.close()
1✔
142

143
    @classmethod
1✔
144
    def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
1✔
145
        """Check if a specific file is supported based on its file extension."""
146
        return cls._get_parser_for_filename(filename) is not None
1✔
147

148
    def __repr__(self) -> str:
1✔
149
        return str(self._as_dict())
1✔
150

151
    def _as_dict(self) -> dict[str, Any]:
1✔
152
        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
1✔
153

154
    @classmethod
1✔
155
    def _get_parser_for_filename(
1✔
156
            cls, filename: bytes | str | PathLike[Any]) -> type[TinyTag] | None:
157
        if cls._file_extension_mapping is None:
1✔
158
            cls._file_extension_mapping = {
1✔
159
                (b'.mp1', b'.mp2', b'.mp3'): _ID3,
160
                (b'.oga', b'.ogg', b'.opus', b'.spx'): _Ogg,
161
                (b'.wav',): _Wave,
162
                (b'.flac',): _Flac,
163
                (b'.wma',): _Wma,
164
                (b'.m4b', b'.m4a', b'.m4r', b'.m4v', b'.mp4', b'.aax', b'.aaxc'): _MP4,
165
                (b'.aiff', b'.aifc', b'.aif', b'.afc'): _Aiff,
166
            }
167
        filename = os.fspath(filename).lower()
1✔
168
        if isinstance(filename, str):
1✔
169
            filename_bytes = filename.encode('ascii')
1✔
170
        else:
171
            filename_bytes = filename
1✔
172
        for ext, tagclass in cls._file_extension_mapping.items():
1✔
173
            if filename_bytes.endswith(ext):
1✔
174
                return tagclass
1✔
175
        return None
1✔
176

177
    @classmethod
1✔
178
    def _get_parser_for_file_handle(cls, fh: BinaryIO) -> type[TinyTag] | None:
1✔
179
        # https://en.wikipedia.org/wiki/List_of_file_signatures
180
        if cls._magic_bytes_mapping is None:
1✔
181
            cls._magic_bytes_mapping = {
1✔
182
                b'^ID3': _ID3,
183
                b'^\xff\xfb': _ID3,
184
                b'^OggS.........................FLAC': _Ogg,
185
                b'^OggS........................Opus': _Ogg,
186
                b'^OggS........................Speex': _Ogg,
187
                b'^OggS.........................vorbis': _Ogg,
188
                b'^RIFF....WAVE': _Wave,
189
                b'^fLaC': _Flac,
190
                b'^\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C': _Wma,
191
                b'....ftypM4A': _MP4,  # https://www.file-recovery.com/m4a-signature-format.htm
192
                b'....ftypaax': _MP4,  # Audible proprietary M4A container
193
                b'....ftypaaxc': _MP4,  # Audible proprietary M4A container
194
                b'\xff\xf1': _MP4,  # https://www.garykessler.net/library/file_sigs.html
195
                b'^FORM....AIFF': _Aiff,
196
                b'^FORM....AIFC': _Aiff,
197
            }
198
        header = fh.read(max(len(sig) for sig in cls._magic_bytes_mapping))
1✔
199
        fh.seek(0)
1✔
200
        for magic, parser in cls._magic_bytes_mapping.items():
1✔
201
            if re.match(magic, header):
1✔
202
                return parser
1✔
203
        return None
1✔
204

205
    @classmethod
1✔
206
    def _get_parser_class(cls, filename: bytes | str | PathLike[Any] | None = None,
1✔
207
                          filehandle: BinaryIO | None = None) -> type[TinyTag]:
208
        if cls != TinyTag:  # if `get` is invoked on TinyTag, find parser by ext
1✔
209
            return cls  # otherwise use the class on which `get` was invoked
1✔
210
        if filename:
1✔
211
            parser_class = cls._get_parser_for_filename(filename)
1✔
212
            if parser_class is not None:
1✔
213
                return parser_class
1✔
214
        # try determining the file type by magic byte header
215
        if filehandle:
1✔
216
            parser_class = cls._get_parser_for_file_handle(filehandle)
1✔
217
            if parser_class is not None:
1✔
218
                return parser_class
1✔
219
        raise UnsupportedFormatError('No tag reader found to support file type')
1✔
220

221
    def _load(self, tags: bool, duration: bool, image: bool = False) -> None:
1✔
222
        self._parse_tags = tags
1✔
223
        self._parse_duration = duration
1✔
224
        self._load_image = image
1✔
225
        if self._filehandler is None:
1✔
226
            return
1✔
227
        if tags:
1✔
228
            self._parse_tag(self._filehandler)
1✔
229
        if duration:
1✔
230
            if tags:  # rewind file if the tags were already parsed
1✔
231
                self._filehandler.seek(0)
1✔
232
            self._determine_duration(self._filehandler)
1✔
233

234
    def _parse_string_field(self, fieldname: str, old_value: Any | None, value: str) -> str | None:
1✔
235
        if fieldname in {'artist', 'genre'}:
1✔
236
            # First artist/genre goes in tag.artist/genre, others in tag.extra.other_artists/genres
237
            values = value.split('\x00')
1✔
238
            value = values[0]
1✔
239
            start_pos = 0 if old_value else 1
1✔
240
            if len(values) > 1:
1✔
241
                self._set_field(self._EXTRA_PREFIX + f'other_{fieldname}s', values[start_pos:])
1✔
242
            elif old_value and value != old_value:
1✔
243
                self._set_field(self._EXTRA_PREFIX + f'other_{fieldname}s', [value])
1✔
244
                return None
1✔
245
        if old_value or not value:
1✔
246
            return None
1✔
247
        return value
1✔
248

249
    def _set_field(self, fieldname: str, value: str | int | float | list[str] | None) -> None:
1✔
250
        write_dest = self.__dict__
1✔
251
        original_fieldname = fieldname
1✔
252
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
253
            write_dest = self.extra
1✔
254
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
255
        old_value = write_dest.get(fieldname)
1✔
256
        if isinstance(value, str):
1✔
257
            value = self._parse_string_field(original_fieldname, old_value, value)
1✔
258
            if not value:
1✔
259
                return
1✔
260
        elif isinstance(value, list):
1✔
261
            if not isinstance(old_value, list):
1✔
262
                old_value = []
1✔
263
            value = old_value + [i for i in value if i and i not in old_value]
1✔
264
        elif not value and old_value:
1✔
265
            return
1✔
266
        if DEBUG:
1✔
267
            print(f'Setting field "{original_fieldname}" to "{value!r}"')
1✔
268
        write_dest[fieldname] = value
1✔
269

270
    def _set_image_field(self, fieldname: str, value: TagImage) -> None:
1✔
271
        write_dest = self.images.__dict__
1✔
272
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
273
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
274
            write_dest = self.images.extra
1✔
275
        old_values = write_dest.get(fieldname)
1✔
276
        values = [value]
1✔
277
        if old_values is not None:
1✔
278
            values = old_values + values
1✔
279
        if DEBUG:
1✔
280
            print(f'Setting image field "{fieldname}"')
1✔
281
        write_dest[fieldname] = values
1✔
282

283
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
284
        raise NotImplementedError
1✔
285

286
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
287
        raise NotImplementedError
1✔
288

289
    def _update(self, other: TinyTag) -> None:
1✔
290
        # update the values of this tag with the values from another tag
291
        excluded_attrs = {'filesize', 'extra', 'images'}
1✔
292
        for standard_key, standard_value in other.__dict__.items():
1✔
293
            if (not standard_key.startswith('_')
1✔
294
                    and standard_key not in excluded_attrs
295
                    and standard_value is not None):
296
                self._set_field(standard_key, standard_value)
1✔
297
        for extra_key, extra_value in other.extra.items():
1✔
298
            self._set_field(self._EXTRA_PREFIX + extra_key, extra_value)
1✔
299
        for image_key, images in other.images._as_dict().items():
1✔
300
            for image in images:
1✔
301
                self._set_image_field(image_key, image)
1✔
302
        for image_extra_key, images_extra in other.images.extra.items():
1✔
NEW
303
            for image_extra in images_extra:
×
NEW
304
                self._set_image_field(self._EXTRA_PREFIX + image_extra_key, image_extra)
×
305

306
    @staticmethod
1✔
307
    def _bytes_to_int_le(b: bytes) -> int:
1✔
308
        fmt = {1: '<B', 2: '<H', 4: '<I', 8: '<Q'}.get(len(b))
1✔
309
        result: int = struct.unpack(fmt, b)[0] if fmt is not None else 0
1✔
310
        return result
1✔
311

312
    @staticmethod
1✔
313
    def _bytes_to_int(b: tuple[int, ...]) -> int:
1✔
314
        return reduce(lambda accu, elem: (accu << 8) + elem, b, 0)
1✔
315

316
    @staticmethod
1✔
317
    def _unpad(s: str) -> str:
1✔
318
        # strings in mp3 and asf *may* be terminated with a zero byte at the end
319
        return s.strip('\x00')
1✔
320

321

322
class TagImages:
1✔
323
    """A class containing images embedded in an audio file."""
1✔
324
    def __init__(self) -> None:
1✔
325
        self.front_cover: list[TagImage] = []
1✔
326
        self.back_cover: list[TagImage] = []
1✔
327
        self.leaflet: list[TagImage] = []
1✔
328
        self.media: list[TagImage] = []
1✔
329
        self.other: list[TagImage] = []
1✔
330
        self.extra: dict[str, list[TagImage]] = {}
1✔
331

332
    @property
1✔
333
    def any(self) -> TagImage | None:
1✔
334
        """Return a cover image.
335
        If not present, fall back to any other available image.
336
        """
337
        for image_list in self._as_dict().values():
1✔
338
            for image in image_list:
1✔
339
                return image
1✔
340
        for extra_image_list in self.extra.values():
1✔
341
            for extra_image in extra_image_list:
1✔
342
                return extra_image
1✔
343
        return None
1✔
344

345
    def __repr__(self) -> str:
1✔
346
        return str(vars(self))
1✔
347

348
    def _as_dict(self) -> dict[str, list[TagImage]]:
1✔
349
        return {
1✔
350
            k: v for k, v in self.__dict__.items()
351
            if not k.startswith('_') and k != 'extra'
352
        }
353

354

355
class TagImage:
1✔
356
    """A class representing an image embedded in an audio file."""
1✔
357
    def __init__(self, name: str, data: bytes, mime_type: str | None = None) -> None:
1✔
358
        self.name = name
1✔
359
        self.data = data
1✔
360
        self.mime_type = mime_type
1✔
361
        self.description: str | None = None
1✔
362

363
    def __repr__(self) -> str:
1✔
UNCOV
364
        variables = vars(self).copy()
×
UNCOV
365
        data = variables.get("data")
×
UNCOV
366
        if data is not None:
×
367
            variables["data"] = (data[:45] + b'..') if len(data) > 45 else data
×
UNCOV
368
        return str(variables)
×
369

370

371
class _MP4(TinyTag):
1✔
372
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html
373
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html
374

375
    class _Parser:
1✔
376
        atom_decoder_by_type: dict[
1✔
377
            int, Callable[[bytes], int | str | bytes | TagImage]] | None = None
378
        _CUSTOM_FIELD_NAME_MAPPING = {
1✔
379
            'conductor': 'extra.conductor',
380
            'discsubtitle': 'extra.set_subtitle',
381
            'initialkey': 'extra.initial_key',
382
            'isrc': 'extra.isrc',
383
            'language': 'extra.language',
384
            'lyricist': 'extra.lyricist',
385
            'media': 'extra.media',
386
        }
387

388
        @classmethod
1✔
389
        def _unpack_integer(cls, value: bytes, signed: bool = True) -> int:
1✔
390
            value_length = len(value)
1✔
391
            result = -1
1✔
392
            if value_length == 1:
1✔
393
                result = struct.unpack('>b' if signed else '>B', value)[0]
×
394
            elif value_length == 2:
1✔
395
                result = struct.unpack('>h' if signed else '>H', value)[0]
1✔
396
            elif value_length == 4:
1✔
397
                result = struct.unpack('>i' if signed else '>I', value)[0]
1✔
398
            elif value_length == 8:
1✔
399
                result = struct.unpack('>q' if signed else '>Q', value)[0]
1✔
400
            return result
1✔
401

402
        @classmethod
1✔
403
        def _unpack_integer_unsigned(cls, value: bytes) -> int:
1✔
404
            return cls._unpack_integer(value, signed=False)
×
405

406
        @classmethod
1✔
407
        def _make_data_atom_parser(
1✔
408
                cls, fieldname: str) -> Callable[[bytes], dict[str, int | str | bytes | TagImage]]:
409
            def _parse_data_atom(data_atom: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
410
                data_type = struct.unpack('>I', data_atom[:4])[0]
1✔
411
                if cls.atom_decoder_by_type is None:
1✔
412
                    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html#//apple_ref/doc/uid/TP40000939-CH1-SW34
413
                    cls.atom_decoder_by_type = {
1✔
414
                        # 0: 'reserved'
415
                        1: lambda x: x.decode('utf-8', 'replace'),   # UTF-8
416
                        2: lambda x: x.decode('utf-16', 'replace'),  # UTF-16
417
                        3: lambda x: x.decode('s/jis', 'replace'),   # S/JIS
418
                        # 16: duration in millis
419
                        13: lambda x: TagImage('front_cover', x, 'image/jpeg'),  # JPEG
420
                        14: lambda x: TagImage('front_cover', x, 'image/png'),   # PNG
421
                        21: cls._unpack_integer,                    # BE Signed int
422
                        22: cls._unpack_integer_unsigned,           # BE Unsigned int
423
                        # 23: lambda x: struct.unpack('>f', x)[0],  # BE Float32
424
                        # 24: lambda x: struct.unpack('>d', x)[0],  # BE Float64
425
                        # 27: lambda x: x,                          # BMP
426
                        # 28: lambda x: x,                          # QuickTime Metadata atom
427
                        65: cls._unpack_integer,                    # 8-bit Signed int
428
                        66: cls._unpack_integer,                    # BE 16-bit Signed int
429
                        67: cls._unpack_integer,                    # BE 32-bit Signed int
430
                        74: cls._unpack_integer,                    # BE 64-bit Signed int
431
                        75: cls._unpack_integer_unsigned,           # 8-bit Unsigned int
432
                        76: cls._unpack_integer_unsigned,           # BE 16-bit Unsigned int
433
                        77: cls._unpack_integer_unsigned,           # BE 32-bit Unsigned int
434
                        78: cls._unpack_integer_unsigned,           # BE 64-bit Unsigned int
435
                    }
436
                conversion = cls.atom_decoder_by_type.get(data_type)
1✔
437
                if conversion is None:
1✔
438
                    if DEBUG:
1✔
439
                        print(f'Cannot convert data type: {data_type}', file=stderr)
1✔
440
                    return {}  # don't know how to convert data atom
1✔
441
                # skip header & null-bytes, convert rest
442
                return {fieldname: conversion(data_atom[8:])}
1✔
443
            return _parse_data_atom
1✔
444

445
        @classmethod
1✔
446
        def _make_number_parser(
1✔
447
                cls, fieldname1: str, fieldname2: str) -> Callable[[bytes], dict[str, int]]:
448
            def _(data_atom: bytes) -> dict[str, int]:
1✔
449
                number_data = data_atom[8:14]
1✔
450
                numbers = struct.unpack('>HHH', number_data)
1✔
451
                # for some reason the first number is always irrelevant.
452
                return {fieldname1: numbers[1], fieldname2: numbers[2]}
1✔
453
            return _
1✔
454

455
        @classmethod
1✔
456
        def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]:
1✔
457
            # dunno why the genre is offset by -1 but that's how mutagen does it
458
            idx = struct.unpack('>H', data_atom[8:])[0] - 1
1✔
459
            result = {}
1✔
460
            if idx < len(_ID3._ID3V1_GENRES):
1✔
461
                result['genre'] = _ID3._ID3V1_GENRES[idx]
1✔
462
            return result
1✔
463

464
        @classmethod
1✔
465
        def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None:
1✔
466
            for _i in range(4):
1✔
467
                if esds_atom.read(1) != b'\x80':
1✔
468
                    break
1✔
469

470
        @classmethod
1✔
471
        def _parse_custom_field(cls, data: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
472
            fh = io.BytesIO(data)
1✔
473
            header_size = 8
1✔
474
            field_name = None
1✔
475
            data_atom = b''
1✔
476
            atom_header = fh.read(header_size)
1✔
477
            while len(atom_header) == header_size:
1✔
478
                atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
479
                atom_type = atom_header[4:]
1✔
480
                if atom_type == b'name':
1✔
481
                    atom_value = fh.read(atom_size)[4:].lower()
1✔
482
                    field_name = atom_value.decode('utf-8', 'replace')
1✔
483
                    field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get(
1✔
484
                        field_name, TinyTag._EXTRA_PREFIX + field_name)
485
                elif atom_type == b'data':
1✔
486
                    data_atom = fh.read(atom_size)
1✔
487
                else:
488
                    fh.seek(atom_size, os.SEEK_CUR)
1✔
489
                atom_header = fh.read(header_size)  # read next atom
1✔
490
            if len(data_atom) < 8 or field_name is None:
1✔
491
                return {}
1✔
492
            parser = cls._make_data_atom_parser(field_name)
1✔
493
            return parser(data_atom)
1✔
494

495
        @classmethod
1✔
496
        def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]:
1✔
497
            # this atom also contains the esds atom:
498
            # https://ffmpeg.org/doxygen/0.6/mov_8c-source.html
499
            # http://xhelmboyx.tripod.com/formats/mp4-layout.txt
500
            # http://sasperger.tistory.com/103
501
            datafh = io.BytesIO(data)
1✔
502
            datafh.seek(16, os.SEEK_CUR)  # jump over version and flags
1✔
503
            channels = struct.unpack('>H', datafh.read(2))[0]
1✔
504
            datafh.seek(2, os.SEEK_CUR)   # jump over bit_depth
1✔
505
            datafh.seek(2, os.SEEK_CUR)   # jump over QT compr id & pkt size
1✔
506
            sr = struct.unpack('>I', datafh.read(4))[0]
1✔
507

508
            # ES Description Atom
509
            esds_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
510
            esds_atom = io.BytesIO(data[36:36 + esds_atom_size])
1✔
511
            esds_atom.seek(5, os.SEEK_CUR)   # jump over version, flags and tag
1✔
512

513
            # ES Descriptor
514
            cls._read_extended_descriptor(esds_atom)
1✔
515
            esds_atom.seek(4, os.SEEK_CUR)   # jump over ES id, flags and tag
1✔
516

517
            # Decoder Config Descriptor
518
            cls._read_extended_descriptor(esds_atom)
1✔
519
            esds_atom.seek(9, os.SEEK_CUR)
1✔
520
            avg_br = struct.unpack('>I', esds_atom.read(4))[0] / 1000  # kbit/s
1✔
521
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br}
1✔
522

523
        @classmethod
1✔
524
        def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]:
1✔
525
            # https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt
526
            alac_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
527
            alac_atom = io.BytesIO(data[36:36 + alac_atom_size])
1✔
528
            alac_atom.seek(9, os.SEEK_CUR)
1✔
529
            bitdepth = struct.unpack('b', alac_atom.read(1))[0]
1✔
530
            alac_atom.seek(3, os.SEEK_CUR)
1✔
531
            channels = struct.unpack('b', alac_atom.read(1))[0]
1✔
532
            alac_atom.seek(6, os.SEEK_CUR)
1✔
533
            avg_br = struct.unpack('>I', alac_atom.read(4))[0] / 1000  # kbit/s
1✔
534
            sr = struct.unpack('>I', alac_atom.read(4))[0]
1✔
535
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br, 'bitdepth': bitdepth}
1✔
536

537
        @classmethod
1✔
538
        def _parse_mvhd(cls, data: bytes) -> dict[str, float]:
1✔
539
            # http://stackoverflow.com/a/3639993/1191373
540
            walker = io.BytesIO(data)
1✔
541
            version = struct.unpack('b', walker.read(1))[0]
1✔
542
            walker.seek(3, os.SEEK_CUR)  # jump over flags
1✔
543
            if version == 0:  # uses 32 bit integers for timestamps
1✔
544
                walker.seek(8, os.SEEK_CUR)  # jump over create & mod times
1✔
545
                time_scale = struct.unpack('>I', walker.read(4))[0]
1✔
546
                duration = struct.unpack('>I', walker.read(4))[0]
1✔
547
            else:  # version == 1:  # uses 64 bit integers for timestamps
548
                walker.seek(16, os.SEEK_CUR)  # jump over create & mod times
×
549
                time_scale = struct.unpack('>I', walker.read(4))[0]
×
550
                duration = struct.unpack('>q', walker.read(8))[0]
×
551
            return {'duration': duration / time_scale}
1✔
552

553
    # The parser tree: Each key is an atom name which is traversed if existing.
554
    # Leaves of the parser tree are callables which receive the atom data.
555
    # callables return {fieldname: value} which is updates the TinyTag.
556
    _META_DATA_TREE = {b'moov': {b'udta': {b'meta': {b'ilst': {
1✔
557
        # see: http://atomicparsley.sourceforge.net/mpeg-4files.html
558
        # and: https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093
559
        b'\xa9ART': {b'data': _Parser._make_data_atom_parser('artist')},
560
        b'\xa9alb': {b'data': _Parser._make_data_atom_parser('album')},
561
        b'\xa9cmt': {b'data': _Parser._make_data_atom_parser('comment')},
562
        b'\xa9con': {b'data': _Parser._make_data_atom_parser('extra.conductor')},
563
        # need test-data for this
564
        # b'cpil':   {b'data': _Parser._make_data_atom_parser('extra.compilation')},
565
        b'\xa9day': {b'data': _Parser._make_data_atom_parser('year')},
566
        b'\xa9des': {b'data': _Parser._make_data_atom_parser('extra.description')},
567
        b'\xa9dir': {b'data': _Parser._make_data_atom_parser('extra.director')},
568
        b'\xa9gen': {b'data': _Parser._make_data_atom_parser('genre')},
569
        b'\xa9lyr': {b'data': _Parser._make_data_atom_parser('extra.lyrics')},
570
        b'\xa9mvn': {b'data': _Parser._make_data_atom_parser('movement')},
571
        b'\xa9nam': {b'data': _Parser._make_data_atom_parser('title')},
572
        b'\xa9pub': {b'data': _Parser._make_data_atom_parser('extra.publisher')},
573
        b'\xa9too': {b'data': _Parser._make_data_atom_parser('extra.encoded_by')},
574
        b'\xa9wrt': {b'data': _Parser._make_data_atom_parser('extra.composer')},
575
        b'aART': {b'data': _Parser._make_data_atom_parser('albumartist')},
576
        b'cprt': {b'data': _Parser._make_data_atom_parser('extra.copyright')},
577
        b'desc': {b'data': _Parser._make_data_atom_parser('extra.description')},
578
        b'disk': {b'data': _Parser._make_number_parser('disc', 'disc_total')},
579
        b'gnre': {b'data': _Parser._parse_id3v1_genre},
580
        b'trkn': {b'data': _Parser._make_number_parser('track', 'track_total')},
581
        b'tmpo': {b'data': _Parser._make_data_atom_parser('extra.bpm')},
582
        b'covr': {b'data': _Parser._make_data_atom_parser('images.front_cover')},
583
        b'----': _Parser._parse_custom_field,
584
    }}}}}
585

586
    # see: https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html
587
    _AUDIO_DATA_TREE = {
1✔
588
        b'moov': {
589
            b'mvhd': _Parser._parse_mvhd,
590
            b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": {
591
                b'mp4a': _Parser._parse_audio_sample_entry_mp4a,
592
                b'alac': _Parser._parse_audio_sample_entry_alac
593
            }}}}}
594
        }
595
    }
596

597
    _VERSIONED_ATOMS = {b'meta', b'stsd'}  # those have an extra 4 byte header
1✔
598
    _FLAGGED_ATOMS = {b'stsd'}  # these also have an extra 4 byte header
1✔
599

600
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
601
        self._traverse_atoms(fh, path=self._AUDIO_DATA_TREE)
1✔
602

603
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
604
        self._traverse_atoms(fh, path=self._META_DATA_TREE)
1✔
605

606
    def _traverse_atoms(self, fh: BinaryIO, path: dict[bytes, Any],
1✔
607
                        stop_pos: int | None = None,
608
                        curr_path: list[bytes] | None = None) -> None:
609
        header_size = 8
1✔
610
        atom_header = fh.read(header_size)
1✔
611
        while len(atom_header) == header_size:
1✔
612
            atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
613
            atom_type = atom_header[4:]
1✔
614
            if curr_path is None:  # keep track how we traversed in the tree
1✔
615
                curr_path = [atom_type]
1✔
616
            if atom_size <= 0:  # empty atom, jump to next one
1✔
617
                atom_header = fh.read(header_size)
1✔
618
                continue
1✔
619
            if DEBUG:
1✔
620
                print(f'{" " * 4 * len(curr_path)} pos: {fh.tell() - header_size} '
1✔
621
                      f'atom: {atom_type!r} len: {atom_size + header_size}')
622
            if atom_type in self._VERSIONED_ATOMS:  # jump atom version for now
1✔
623
                fh.seek(4, os.SEEK_CUR)
1✔
624
            if atom_type in self._FLAGGED_ATOMS:  # jump atom flags for now
1✔
625
                fh.seek(4, os.SEEK_CUR)
1✔
626
            sub_path = path.get(atom_type, None)
1✔
627
            # if the path leaf is a dict, traverse deeper into the tree:
628
            if isinstance(sub_path, dict):
1✔
629
                atom_end_pos = fh.tell() + atom_size
1✔
630
                self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos,
1✔
631
                                     curr_path=curr_path + [atom_type])
632
            # if the path-leaf is a callable, call it on the atom data
633
            elif callable(sub_path):
1✔
634
                for fieldname, value in sub_path(fh.read(atom_size)).items():
1✔
635
                    if DEBUG:
1✔
636
                        print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
1✔
637
                    if fieldname.startswith('images.'):
1✔
638
                        if self._load_image:
1✔
639
                            self._set_image_field(fieldname[len('images.'):], value)
1✔
640
                    elif fieldname:
1✔
641
                        self._set_field(fieldname, value)
1✔
642
            # if no action was specified using dict or callable, jump over atom
643
            else:
644
                fh.seek(atom_size, os.SEEK_CUR)
1✔
645
            # check if we have reached the end of this branch:
646
            if stop_pos and fh.tell() >= stop_pos:
1✔
647
                return  # return to parent (next parent node in tree)
1✔
648
            atom_header = fh.read(header_size)  # read next atom
1✔
649

650

651
class _ID3(TinyTag):
1✔
652
    _ID3_MAPPING = {
1✔
653
        # Mapping from Frame ID to a field of the TinyTag
654
        # https://exiftool.org/TagNames/ID3.html
655
        'COMM': 'comment', 'COM': 'comment',
656
        'TRCK': 'track', 'TRK': 'track',
657
        'TYER': 'year', 'TYE': 'year', 'TDRC': 'year',
658
        'TALB': 'album', 'TAL': 'album',
659
        'TPE1': 'artist', 'TP1': 'artist',
660
        'TIT2': 'title', 'TT2': 'title',
661
        'TCON': 'genre', 'TCO': 'genre',
662
        'TPOS': 'disc', 'TPA': 'disc',
663
        'TPE2': 'albumartist', 'TP2': 'albumartist',
664
        'TCOM': 'extra.composer', 'TCM': 'extra.composer',
665
        'WOAR': 'extra.url', 'WAR': 'extra.url',
666
        'TSRC': 'extra.isrc', 'TRC': 'extra.isrc',
667
        'TCOP': 'extra.copyright', 'TCR': 'extra.copyright',
668
        'TBPM': 'extra.bpm', 'TBP': 'extra.bpm',
669
        'TKEY': 'extra.initial_key', 'TKE': 'extra.initial_key',
670
        'TLAN': 'extra.language', 'TLA': 'extra.language',
671
        'TPUB': 'extra.publisher', 'TPB': 'extra.publisher',
672
        'USLT': 'extra.lyrics', 'ULT': 'extra.lyrics',
673
        'TPE3': 'extra.conductor', 'TP3': 'extra.conductor',
674
        'TEXT': 'extra.lyricist', 'TXT': 'extra.lyricist',
675
        'TSST': 'extra.set_subtitle',
676
        'TENC': 'extra.encoded_by', 'TEN': 'extra.encoded_by',
677
        'TSSE': 'extra.encoder_settings', 'TSS': 'extra.encoder_settings',
678
        'TMED': 'extra.media', 'TMT': 'extra.media',
679
    }
680
    _IMAGE_FRAME_IDS = {'APIC', 'PIC'}
1✔
681
    _CUSTOM_FRAME_IDS = {'TXXX', 'TXX'}
1✔
682
    _DISALLOWED_FRAME_IDS = {'PRIV', 'RGAD', 'GEOB', 'GEO', 'ÿû°d'}
1✔
683
    _MAX_ESTIMATION_SEC = 30.0
1✔
684
    _CBR_DETECTION_FRAME_COUNT = 5
1✔
685
    _USE_XING_HEADER = True  # much faster, but can be deactivated for testing
1✔
686

687
    _ID3V1_GENRES = (
1✔
688
        'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco',
689
        'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies',
690
        'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial',
691
        'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack',
692
        'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion',
693
        'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game',
694
        'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk',
695
        'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock',
696
        'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic',
697
        'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult',
698
        'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle',
699
        'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave',
700
        'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz',
701
        'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock',
702

703
        # Wimamp Extended Genres
704
        'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob',
705
        'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock',
706
        'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock',
707
        'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech',
708
        'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', 'Booty Bass',
709
        'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', 'Tango', 'Samba',
710
        'Folklore', 'Ballad', 'Power Ballad', 'Rhythmic Soul', 'Freestyle',
711
        'Duet', 'Punk Rock', 'Drum Solo', 'A capella', 'Euro-House',
712
        'Dance Hall', 'Goa', 'Drum & Bass',
713

714
        # according to https://de.wikipedia.org/wiki/Liste_der_ID3v1-Genres:
715
        'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop',
716
        '',  # don't use ethnic slur ("Negerpunk", WTF!)
717
        'Polsk Punk', 'Beat', 'Christian Gangsta Rap', 'Heavy Metal',
718
        'Black Metal', 'Contemporary Christian', 'Christian Rock',
719
        # WinAmp 1.91
720
        'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop',
721
        # WinAmp 5.6
722
        'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat',
723
        'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro',
724
        'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient',
725
        'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge',
726
        'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock',
727
        'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music',
728
        'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle',
729
        'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', 'Psybient',
730
    )
731
    _ID3V2_2_IMAGE_FORMATS = {
1✔
732
        'bmp': 'image/bmp',
733
        'jpg': 'image/jpeg',
734
        'png': 'image/png',
735
    }
736
    _IMAGE_TYPES = (
1✔
737
        'other',
738
        'extra.icon',
739
        'extra.other_icon',
740
        'front_cover',
741
        'back_cover',
742
        'leaflet',
743
        'media',
744
        'extra.lead_artist',
745
        'extra.artist',
746
        'extra.conductor',
747
        'extra.band',
748
        'extra.composer',
749
        'extra.lyricist',
750
        'extra.recording_location',
751
        'extra.during_recording',
752
        'extra.during_performance',
753
        'extra.video',
754
        'extra.bright_colored_fish',
755
        'extra.illustration',
756
        'extra.band_logo',
757
        'extra.publisher_logo',
758
    )
759
    _UNKNOWN_IMAGE_TYPE = 'extra.unknown'
1✔
760

761
    # see this page for the magic values used in mp3:
762
    # http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
763
    _SAMPLE_RATES = (
1✔
764
        (11025, 12000, 8000),   # MPEG 2.5
765
        (0, 0, 0),              # reserved
766
        (22050, 24000, 16000),  # MPEG 2
767
        (44100, 48000, 32000),  # MPEG 1
768
    )
769
    _V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0)
1✔
770
    _V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0)
1✔
771
    _V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0)
1✔
772
    _V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0)
1✔
773
    _V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
1✔
774
    _V2L3 = _V2L2
1✔
775
    _NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
1✔
776
    _BITRATE_BY_VERSION_BY_LAYER = (
1✔
777
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2.5  # note that the layers go
778
        (_NONE, _NONE, _NONE, _NONE),  # reserved          # from 3 to 1 by design.
779
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2    # the first layer id is
780
        (_NONE, _V1L3, _V1L2, _V1L1),  # MPEG Version 1    # reserved
781
    )
782
    _SAMPLES_PER_FRAME = 1152  # the default frame size for mp3
1✔
783
    _CHANNELS_PER_CHANNEL_MODE = (
1✔
784
        2,  # 00 Stereo
785
        2,  # 01 Joint stereo (Stereo)
786
        2,  # 10 Dual channel (2 mono channels)
787
        1,  # 11 Single channel (Mono)
788
    )
789

790
    def __init__(self) -> None:
1✔
791
        super().__init__()
1✔
792
        # save position after the ID3 tag for duration measurement speedup
793
        self._bytepos_after_id3v2 = -1
1✔
794

795
    @staticmethod
1✔
796
    def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]:
1✔
797
        # see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip
798
        fh.seek(4, os.SEEK_CUR)  # read over Xing header
1✔
799
        header_flags = struct.unpack('>i', fh.read(4))[0]
1✔
800
        frames = byte_count = 0
1✔
801
        if header_flags & 1:  # FRAMES FLAG
1✔
802
            frames = struct.unpack('>i', fh.read(4))[0]
1✔
803
        if header_flags & 2:  # BYTES FLAG
1✔
804
            byte_count = struct.unpack('>i', fh.read(4))[0]
1✔
805
        if header_flags & 4:  # TOC FLAG
1✔
806
            fh.seek(100, os.SEEK_CUR)
1✔
807
        if header_flags & 8:  # VBR SCALE FLAG
1✔
808
            fh.seek(4, os.SEEK_CUR)
1✔
809
        return frames, byte_count
1✔
810

811
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
812
        # if tag reading was disabled, find start position of audio data
813
        if self._bytepos_after_id3v2 == -1:
1✔
814
            self._parse_id3v2_header(fh)
1✔
815

816
        max_estimation_frames = (_ID3._MAX_ESTIMATION_SEC * 44100) // _ID3._SAMPLES_PER_FRAME
1✔
817
        frame_size_accu = 0
1✔
818
        header_bytes = 4
1✔
819
        frames = 0  # count frames for determining mp3 duration
1✔
820
        bitrate_accu = 0    # add up bitrates to find average bitrate to detect
1✔
821
        last_bitrates = []  # CBR mp3s (multiple frames with same bitrates)
1✔
822
        # seek to first position after id3 tag (speedup for large header)
823
        fh.seek(self._bytepos_after_id3v2)
1✔
824
        file_offset = fh.tell()
1✔
825
        walker = io.BytesIO(fh.read())
1✔
826
        while True:
1✔
827
            # reading through garbage until 11 '1' sync-bits are found
828
            b = walker.read()
1✔
829
            walker.seek(-len(b), os.SEEK_CUR)
1✔
830
            if len(b) < 4:
1✔
831
                if frames:
1✔
832
                    self.bitrate = bitrate_accu / frames
1✔
833
                break  # EOF
1✔
834
            _sync, conf, bitrate_freq, rest = struct.unpack('BBBB', b[0:4])
1✔
835
            br_id = (bitrate_freq >> 4) & 0x0F  # biterate id
1✔
836
            sr_id = (bitrate_freq >> 2) & 0x03  # sample rate id
1✔
837
            padding = 1 if bitrate_freq & 0x02 > 0 else 0
1✔
838
            mpeg_id = (conf >> 3) & 0x03
1✔
839
            layer_id = (conf >> 1) & 0x03
1✔
840
            channel_mode = (rest >> 6) & 0x03
1✔
841
            # check for eleven 1s, validate bitrate and sample rate
842
            if (not b[:2] > b'\xFF\xE0' or br_id > 14 or br_id == 0 or sr_id == 3
1✔
843
                    or layer_id == 0 or mpeg_id == 1):  # noqa
844
                idx = b.find(b'\xFF', 1)  # invalid frame, find next sync header
1✔
845
                if idx == -1:
1✔
846
                    idx = len(b)  # not found: jump over the current peek buffer
1✔
847
                walker.seek(max(idx, 1), os.SEEK_CUR)
1✔
848
                continue
1✔
849
            self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode]
1✔
850
            frame_bitrate = self._BITRATE_BY_VERSION_BY_LAYER[mpeg_id][layer_id][br_id]
1✔
851
            self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id]
1✔
852
            # There might be a xing header in the first frame that contains
853
            # all the info we need, otherwise parse multiple frames to find the
854
            # accurate average bitrate
855
            if frames == 0 and self._USE_XING_HEADER:
1✔
856
                xing_header_offset = b.find(b'Xing')
1✔
857
                if xing_header_offset != -1:
1✔
858
                    walker.seek(xing_header_offset, os.SEEK_CUR)
1✔
859
                    xframes, byte_count = self._parse_xing_header(walker)
1✔
860
                    if xframes > 0 and byte_count > 0:
1✔
861
                        # MPEG-2 Audio Layer III uses 576 samples per frame
862
                        samples_per_frame = 576 if mpeg_id <= 2 else self._SAMPLES_PER_FRAME
1✔
863
                        self.duration = duration = xframes * samples_per_frame / samplerate
1✔
864
                        # self.duration = (xframes * self._SAMPLES_PER_FRAME / samplerate
865
                        #                  / self.channels)  # noqa
866
                        self.bitrate = byte_count * 8 / duration / 1000
1✔
867
                        return
1✔
868
                    continue
×
869

870
            frames += 1  # it's most probably an mp3 frame
1✔
871
            bitrate_accu += frame_bitrate
1✔
872
            if frames == 1:
1✔
873
                audio_offset = file_offset + walker.tell()
1✔
874
            if frames <= self._CBR_DETECTION_FRAME_COUNT:
1✔
875
                last_bitrates.append(frame_bitrate)
1✔
876
            walker.seek(4, os.SEEK_CUR)  # jump over peeked bytes
1✔
877

878
            frame_length = (144000 * frame_bitrate) // samplerate + padding
1✔
879
            frame_size_accu += frame_length
1✔
880
            # if bitrate does not change over time its probably CBR
881
            is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT and len(set(last_bitrates)) == 1)
1✔
882
            if frames == max_estimation_frames or is_cbr:
1✔
883
                # try to estimate duration
884
                fh.seek(-128, 2)  # jump to last byte (leaving out id3v1 tag)
1✔
885
                audio_stream_size = fh.tell() - audio_offset
1✔
886
                est_frame_count = audio_stream_size / (frame_size_accu / frames)
1✔
887
                samples = est_frame_count * self._SAMPLES_PER_FRAME
1✔
888
                self.duration = samples / samplerate
1✔
889
                self.bitrate = bitrate_accu / frames
1✔
890
                return
1✔
891

892
            if frame_length > 1:  # jump over current frame body
1✔
893
                walker.seek(frame_length - header_bytes, os.SEEK_CUR)
1✔
894
        if self.samplerate:
1✔
895
            self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate
1✔
896

897
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
898
        self._parse_id3v2(fh)
1✔
899
        if self.filesize > 128:
1✔
900
            fh.seek(-128, os.SEEK_END)  # try parsing id3v1 in last 128 bytes
1✔
901
            self._parse_id3v1(fh)
1✔
902

903
    def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]:
1✔
904
        size = major = 0
1✔
905
        extended = False
1✔
906
        # for info on the specs, see: http://id3.org/Developer%20Information
907
        header = struct.unpack('3sBBB4B', fh.read(10))
1✔
908
        tag = header[0].decode('ISO-8859-1', 'replace')
1✔
909
        # check if there is an ID3v2 tag at the beginning of the file
910
        if tag == 'ID3':
1✔
911
            major, _rev = header[1:3]
1✔
912
            if DEBUG:
1✔
913
                print(f'Found id3 v2.{major}')
1✔
914
            # unsync = (header[3] & 0x80) > 0
915
            extended = (header[3] & 0x40) > 0
1✔
916
            # experimental = (header[3] & 0x20) > 0
917
            # footer = (header[3] & 0x10) > 0
918
            size = self._calc_size(header[4:8], 7)
1✔
919
        self._bytepos_after_id3v2 = size
1✔
920
        return size, extended, major
1✔
921

922
    def _parse_id3v2(self, fh: BinaryIO) -> None:
1✔
923
        size, extended, major = self._parse_id3v2_header(fh)
1✔
924
        if size:
1✔
925
            end_pos = fh.tell() + size
1✔
926
            parsed_size = 0
1✔
927
            if extended:  # just read over the extended header.
1✔
928
                size_bytes = struct.unpack('4B', fh.read(6)[0:4])
1✔
929
                extd_size = self._calc_size(size_bytes, 7)
1✔
930
                fh.seek(extd_size - 6, os.SEEK_CUR)  # jump over extended_header
1✔
931
            while parsed_size < size:
1✔
932
                frame_size = self._parse_frame(fh, id3version=major)
1✔
933
                if frame_size == 0:
1✔
934
                    break
1✔
935
                parsed_size += frame_size
1✔
936
            fh.seek(end_pos, os.SEEK_SET)
1✔
937

938
    def _parse_id3v1(self, fh: BinaryIO) -> None:
1✔
939
        if fh.read(3) != b'TAG':  # check if this is an ID3 v1 tag
1✔
940
            return
1✔
941

942
        def asciidecode(x: bytes) -> str:
1✔
943
            return self._unpad(x.decode(self._default_encoding or 'latin1', 'replace'))
1✔
944
        # Only set fields that were not set by ID3v2 tags, as ID3v1
945
        # tags are more likely to be outdated or have encoding issues
946
        fields = fh.read(30 + 30 + 30 + 4 + 30 + 1)
1✔
947
        if not self.title:
1✔
948
            self._set_field('title', asciidecode(fields[:30]))
1✔
949
        if not self.artist:
1✔
950
            self._set_field('artist', asciidecode(fields[30:60]))
1✔
951
        if not self.album:
1✔
952
            self._set_field('album', asciidecode(fields[60:90]))
1✔
953
        if not self.year:
1✔
954
            self._set_field('year', asciidecode(fields[90:94]))
1✔
955
        comment = fields[94:124]
1✔
956
        if b'\x00\x00' < comment[-2:] < b'\x01\x00':
1✔
957
            if self.track is None:
1✔
958
                self._set_field('track', ord(comment[-1:]))
1✔
959
            comment = comment[:-2]
1✔
960
        if not self.comment:
1✔
961
            self._set_field('comment', asciidecode(comment))
1✔
962
        if not self.genre:
1✔
963
            genre_id = ord(fields[124:125])
1✔
964
            if genre_id < len(self._ID3V1_GENRES):
1✔
965
                self._set_field('genre', self._ID3V1_GENRES[genre_id])
1✔
966

967
    def __parse_custom_field(self, content: str) -> bool:
1✔
968
        custom_field_name, separator, value = content.partition('\x00')
1✔
969
        if custom_field_name and separator:
1✔
970
            self._set_field(self._EXTRA_PREFIX + custom_field_name.lower(), value.lstrip('\ufeff'))
1✔
971
            return True
1✔
972
        return False
1✔
973

974
    @classmethod
1✔
975
    def _create_tag_image(cls, data: bytes, pic_type: int, mime_type: str | None = None,
1✔
976
                          description: str | None = None) -> tuple[str, TagImage]:
977
        field_name = cls._UNKNOWN_IMAGE_TYPE
1✔
978
        if 0 <= pic_type <= len(cls._IMAGE_TYPES):
1✔
979
            field_name = cls._IMAGE_TYPES[pic_type]
1✔
980
        image = TagImage(field_name, data)
1✔
981
        if mime_type:
1✔
982
            image.mime_type = mime_type
1✔
983
        if description:
1✔
984
            image.description = description
1✔
985
        return field_name, image
1✔
986

987
    @staticmethod
1✔
988
    def _index_utf16(s: bytes, search: bytes) -> int:
1✔
989
        for i in range(0, len(s), len(search)):
1✔
990
            if s[i:i + len(search)] == search:
1✔
991
                return i
1✔
992
        return -1
×
993

994
    def _parse_frame(self, fh: BinaryIO, id3version: int | None = None) -> int:
1✔
995
        # ID3v2.2 especially ugly. see: http://id3.org/id3v2-00
996
        frame_header_size = 6 if id3version == 2 else 10
1✔
997
        frame_size_bytes = 3 if id3version == 2 else 4
1✔
998
        binformat = '3s3B' if id3version == 2 else '4s4B2B'
1✔
999
        bits_per_byte = 7 if id3version == 4 else 8  # only id3v2.4 is synchsafe
1✔
1000
        frame_header_data = fh.read(frame_header_size)
1✔
1001
        if len(frame_header_data) != frame_header_size:
1✔
1002
            return 0
1✔
1003
        frame = struct.unpack(binformat, frame_header_data)
1✔
1004
        frame_id = self._decode_string(frame[0])
1✔
1005
        frame_size = self._calc_size(frame[1:1 + frame_size_bytes], bits_per_byte)
1✔
1006
        if DEBUG:
1✔
1007
            print(f'Found id3 Frame {frame_id} at {fh.tell()}-{fh.tell() + frame_size} '
1✔
1008
                  f'of {self.filesize}')
1009
        if frame_size > 0:
1✔
1010
            # flags = frame[1+frame_size_bytes:] # dont care about flags.
1011
            content = fh.read(frame_size)
1✔
1012
            fieldname = self._ID3_MAPPING.get(frame_id)
1✔
1013
            should_set_field = True
1✔
1014
            if fieldname:
1✔
1015
                if not self._parse_tags:
1✔
1016
                    return frame_size
1✔
1017
                language = fieldname in {'comment', 'extra.lyrics'}
1✔
1018
                value = self._decode_string(content, language)
1✔
1019
                if fieldname == "comment":
1✔
1020
                    # check if comment is a key-value pair (used by iTunes)
1021
                    should_set_field = not self.__parse_custom_field(value)
1✔
1022
                elif fieldname in {'track', 'disc'}:
1✔
1023
                    if '/' in value:
1✔
1024
                        value, total = value.split('/')[:2]
1✔
1025
                        if total.isdecimal():
1✔
1026
                            self._set_field(f'{fieldname}_total', int(total))
1✔
1027
                    if value.isdecimal():
1✔
1028
                        self._set_field(fieldname, int(value))
1✔
1029
                    should_set_field = False
1✔
1030
                elif fieldname == 'genre':
1✔
1031
                    genre_id = 255
1✔
1032
                    # funky: id3v1 genre hidden in a id3v2 field
1033
                    if value.isdecimal():
1✔
1034
                        genre_id = int(value)
1✔
1035
                    # funkier: the TCO may contain genres in parens, e.g. '(13)'
1036
                    elif value[:1] == '(':
1✔
1037
                        end_pos = value.find(')')
1✔
1038
                        parens_text = value[1:end_pos]
1✔
1039
                        if end_pos > 0 and parens_text.isdecimal():
1✔
1040
                            genre_id = int(parens_text)
1✔
1041
                    if 0 <= genre_id < len(_ID3._ID3V1_GENRES):
1✔
1042
                        value = _ID3._ID3V1_GENRES[genre_id]
1✔
1043
                if should_set_field:
1✔
1044
                    self._set_field(fieldname, value)
1✔
1045
            elif frame_id in self._CUSTOM_FRAME_IDS:
1✔
1046
                # custom fields
1047
                if self._parse_tags:
1✔
1048
                    self.__parse_custom_field(self._decode_string(content))
1✔
1049
            elif frame_id in self._IMAGE_FRAME_IDS:
1✔
1050
                if self._load_image:
1✔
1051
                    # See section 4.14: http://id3.org/id3v2.4.0-frames
1052
                    encoding = content[0:1]
1✔
1053
                    if frame_id == 'PIC':  # ID3 v2.2:
1✔
1054
                        imgformat = self._decode_string(content[1:4]).lower()
1✔
1055
                        mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat)
1✔
1056
                        desc_start_pos = 1 + 3 + 1  # skip encoding (1), imgformat (3), pictype(1)
1✔
1057
                    else:  # ID3 v2.3+
1058
                        mime_type_end_pos = content.index(b'\x00', 1)
1✔
1059
                        mime_type = self._decode_string(content[1:mime_type_end_pos]).lower()
1✔
1060
                        if mime_type in self._ID3V2_2_IMAGE_FORMATS:  # ID3 v2.2 format in v2.3...
1✔
1061
                            mime_type = self._ID3V2_2_IMAGE_FORMATS[mime_type]
1✔
1062
                        desc_start_pos = mime_type_end_pos + 1 + 1  # skip mtype, pictype(1)
1✔
1063
                    pic_type = content[desc_start_pos - 1]
1✔
1064
                    # latin1 and utf-8 are 1 byte
1065
                    termination = b'\x00' if encoding in {b'\x00', b'\x03'} else b'\x00\x00'
1✔
1066
                    desc_length = self._index_utf16(content[desc_start_pos:], termination)
1✔
1067
                    desc_end_pos = desc_start_pos + desc_length + len(termination)
1✔
1068
                    description = self._decode_string(content[desc_start_pos:desc_end_pos])
1✔
1069
                    field_name, image = self._create_tag_image(
1✔
1070
                        content[desc_end_pos:], pic_type, mime_type, description)
1071
                    self._set_image_field(field_name, image)
1✔
1072
            elif frame_id not in self._DISALLOWED_FRAME_IDS:
1✔
1073
                # unknown, try to add to extra dict
1074
                if self._parse_tags:
1✔
1075
                    self._set_field(
1✔
1076
                        self._EXTRA_PREFIX + frame_id.lower(), self._decode_string(content))
1077
            return frame_size
1✔
1078
        return 0
1✔
1079

1080
    def _decode_string(self, bytestr: bytes, language: bool = False) -> str:
1✔
1081
        default_encoding = 'ISO-8859-1'
1✔
1082
        if self._default_encoding:
1✔
1083
            default_encoding = self._default_encoding
1✔
1084
        # it's not my fault, this is the spec.
1085
        first_byte = bytestr[:1]
1✔
1086
        if first_byte == b'\x00':  # ISO-8859-1
1✔
1087
            bytestr = bytestr[1:]
1✔
1088
            encoding = default_encoding
1✔
1089
        elif first_byte == b'\x01':  # UTF-16 with BOM
1✔
1090
            bytestr = bytestr[1:]
1✔
1091
            # remove language (but leave BOM)
1092
            if language:
1✔
1093
                if bytestr[3:5] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1094
                    bytestr = bytestr[3:]
1✔
1095
                if bytestr[:3].isalpha():
1✔
1096
                    bytestr = bytestr[3:]  # remove language
1✔
1097
                bytestr = bytestr.lstrip(b'\x00')  # strip optional additional null bytes
1✔
1098
            # read byte order mark to determine endianness
1099
            encoding = 'UTF-16be' if bytestr[0:2] == b'\xfe\xff' else 'UTF-16le'
1✔
1100
            # strip the bom if it exists
1101
            if bytestr[:2] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1102
                bytestr = bytestr[2:] if len(bytestr) % 2 == 0 else bytestr[2:-1]
1✔
1103
            # remove ADDITIONAL EXTRA BOM :facepalm:
1104
            if bytestr[:4] == b'\x00\x00\xff\xfe':
1✔
1105
                bytestr = bytestr[4:]
1✔
1106
        elif first_byte == b'\x02':  # UTF-16LE
1✔
1107
            # strip optional null byte, if byte count uneven
1108
            bytestr = bytestr[1:-1] if len(bytestr) % 2 == 0 else bytestr[1:]
×
1109
            encoding = 'UTF-16le'
×
1110
        elif first_byte == b'\x03':  # UTF-8
1✔
1111
            bytestr = bytestr[1:]
1✔
1112
            encoding = 'UTF-8'
1✔
1113
        else:
1114
            encoding = default_encoding  # wild guess
1✔
1115
        if language and bytestr[:3].isalpha():
1✔
1116
            bytestr = bytestr[3:]  # remove language
1✔
1117
        return self._unpad(bytestr.decode(encoding, 'replace'))
1✔
1118

1119
    @staticmethod
1✔
1120
    def _calc_size(bytestr: tuple[int, ...], bits_per_byte: int) -> int:
1✔
1121
        # length of some mp3 header fields is described by 7 or 8-bit-bytes
1122
        return reduce(lambda accu, elem: (accu << bits_per_byte) + elem, bytestr, 0)
1✔
1123

1124

1125
class _Ogg(TinyTag):
1✔
1126
    _VORBIS_MAPPING = {
1✔
1127
        'album': 'album',
1128
        'albumartist': 'albumartist',
1129
        'title': 'title',
1130
        'artist': 'artist',
1131
        'author': 'artist',
1132
        'date': 'year',
1133
        'tracknumber': 'track',
1134
        'tracktotal': 'track_total',
1135
        'totaltracks': 'track_total',
1136
        'discnumber': 'disc',
1137
        'disctotal': 'disc_total',
1138
        'totaldiscs': 'disc_total',
1139
        'genre': 'genre',
1140
        'description': 'comment',
1141
        'comment': 'comment',
1142
        'comments': 'comment',
1143
        'composer': 'extra.composer',
1144
        'bpm': 'extra.bpm',
1145
        'copyright': 'extra.copyright',
1146
        'isrc': 'extra.isrc',
1147
        'lyrics': 'extra.lyrics',
1148
        'publisher': 'extra.publisher',
1149
        'language': 'extra.language',
1150
        'director': 'extra.director',
1151
        'website': 'extra.url',
1152
        'conductor': 'extra.conductor',
1153
        'lyricist': 'extra.lyricist',
1154
        'discsubtitle': 'extra.set_subtitle',
1155
        'setsubtitle': 'extra.set_subtitle',
1156
        'initialkey': 'extra.initial_key',
1157
        'key': 'extra.initial_key',
1158
        'encodedby': 'extra.encoded_by',
1159
        'encodersettings': 'extra.encoder_settings',
1160
        'media': 'extra.media',
1161
    }
1162

1163
    def __init__(self) -> None:
1✔
1164
        super().__init__()
1✔
1165
        self._max_samplenum = 0  # maximum sample position ever read
1✔
1166

1167
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1168
        max_page_size = 65536  # https://xiph.org/ogg/doc/libogg/ogg_page.html
1✔
1169
        if not self._tags_parsed:
1✔
1170
            self._parse_tag(fh)  # determine sample rate
1✔
1171
            fh.seek(0)           # and rewind to start
1✔
1172
        if self.duration is not None or not self.samplerate:
1✔
1173
            return  # either ogg flac or invalid file
1✔
1174
        if self.filesize > max_page_size:
1✔
1175
            fh.seek(-max_page_size, 2)  # go to last possible page position
1✔
1176
        while True:
1✔
1177
            file_offset = fh.tell()
1✔
1178
            b = fh.read()
1✔
1179
            if len(b) < 4:
1✔
1180
                return  # EOF
×
1181
            if b[:4] == b'OggS':  # look for an ogg header
1✔
1182
                fh.seek(file_offset)
1✔
1183
                for _ in self._parse_pages(fh):
1✔
1184
                    pass  # parse all remaining pages
1✔
1185
                self.duration = self._max_samplenum / self.samplerate
1✔
1186
                break
1✔
1187
            idx = b.find(b'OggS')  # try to find header in peeked data
1✔
1188
            if idx != -1:
1✔
1189
                fh.seek(file_offset + idx)
1✔
1190

1191
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1192
        check_flac_second_packet = False
1✔
1193
        check_speex_second_packet = False
1✔
1194
        for packet in self._parse_pages(fh):
1✔
1195
            walker = io.BytesIO(packet)
1✔
1196
            if packet[0:7] == b"\x01vorbis":
1✔
1197
                if self._parse_duration:
1✔
1198
                    (self.channels, self.samplerate, _max_bitrate, bitrate,
1✔
1199
                     _min_bitrate) = struct.unpack("<B4i", packet[11:28])
1200
                    self.bitrate = bitrate / 1000
1✔
1201
            elif packet[0:7] == b"\x03vorbis":
1✔
1202
                if self._parse_tags:
1✔
1203
                    walker.seek(7, os.SEEK_CUR)  # jump over header name
1✔
1204
                    self._parse_vorbis_comment(walker)
1✔
1205
            elif packet[0:8] == b'OpusHead':
1✔
1206
                if self._parse_duration:  # parse opus header
1✔
1207
                    # https://www.videolan.org/developers/vlc/modules/codec/opus_header.c
1208
                    # https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/structOpusHead.html
1209
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1210
                    (version, ch, _, _sr, _, _) = struct.unpack("<BBHIHB", walker.read(11))
1✔
1211
                    if (version & 0xF0) == 0:  # only major version 0 supported
1✔
1212
                        self.channels = ch
1✔
1213
                        self.samplerate = 48000  # internally opus always uses 48khz
1✔
1214
            elif packet[0:8] == b'OpusTags':
1✔
1215
                if self._parse_tags:  # parse opus metadata:
1✔
1216
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1217
                    self._parse_vorbis_comment(walker)
1✔
1218
            elif packet[0:5] == b'\x7fFLAC':
1✔
1219
                # https://xiph.org/flac/ogg_mapping.html
1220
                walker.seek(9, os.SEEK_CUR)  # jump over header name, version and number of headers
1✔
1221
                flactag = _Flac()
1✔
1222
                flactag._filehandler = walker
1✔
1223
                flactag.filesize = self.filesize
1✔
1224
                flactag._load(tags=self._parse_tags, duration=self._parse_duration,
1✔
1225
                              image=self._load_image)
1226
                self._update(flactag)
1✔
1227
                check_flac_second_packet = True
1✔
1228
            elif check_flac_second_packet:
1✔
1229
                # second packet contains FLAC metadata block
1230
                if self._parse_tags:
1✔
1231
                    meta_header = struct.unpack('B3B', walker.read(4))
1✔
1232
                    block_type = meta_header[0] & 0x7f
1✔
1233
                    if block_type == _Flac.METADATA_VORBIS_COMMENT:
1✔
1234
                        self._parse_vorbis_comment(walker)
1✔
1235
                check_flac_second_packet = False
1✔
1236
            elif packet[0:8] == b'Speex   ':
1✔
1237
                # https://speex.org/docs/manual/speex-manual/node8.html
1238
                if self._parse_duration:
1✔
1239
                    walker.seek(36, os.SEEK_CUR)  # jump over header name and irrelevant fields
1✔
1240
                    (self.samplerate, _, _, self.channels,
1✔
1241
                     self.bitrate) = struct.unpack("<5i", walker.read(20))
1242
                check_speex_second_packet = True
1✔
1243
            elif check_speex_second_packet:
1✔
1244
                if self._parse_tags:
1✔
1245
                    length = struct.unpack('I', walker.read(4))[0]  # starts with a comment string
1✔
1246
                    comment = walker.read(length).decode('utf-8', 'replace')
1✔
1247
                    self._set_field('comment', comment)
1✔
1248
                    self._parse_vorbis_comment(walker, contains_vendor=False)  # other tags
1✔
1249
                check_speex_second_packet = False
1✔
1250
            else:
1251
                if DEBUG:
1✔
1252
                    print('Unsupported Ogg page type: ', packet[:16], file=stderr)
1✔
1253
                break
1✔
1254
        self._tags_parsed = True
1✔
1255

1256
    def _parse_vorbis_comment(self, fh: BinaryIO, contains_vendor: bool = True) -> None:
1✔
1257
        # for the spec, see: http://xiph.org/vorbis/doc/v-comment.html
1258
        # discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment
1259
        # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html
1260
        if contains_vendor:
1✔
1261
            vendor_length = struct.unpack('I', fh.read(4))[0]
1✔
1262
            fh.seek(vendor_length, os.SEEK_CUR)  # jump over vendor
1✔
1263
        elements = struct.unpack('I', fh.read(4))[0]
1✔
1264
        for _i in range(elements):
1✔
1265
            length = struct.unpack('I', fh.read(4))[0]
1✔
1266
            keyvalpair = fh.read(length).decode('utf-8', 'replace')
1✔
1267
            if '=' in keyvalpair:
1✔
1268
                key, value = keyvalpair.split('=', 1)
1✔
1269
                key_lowercase = key.lower()
1✔
1270

1271
                if key_lowercase == "metadata_block_picture" and self._load_image:
1✔
1272
                    if DEBUG:
1✔
1273
                        print('Found Vorbis TagImage', key, value[:64])
1✔
1274
                    fieldname, fieldvalue = _Flac._parse_image(io.BytesIO(base64.b64decode(value)))
1✔
1275
                    self._set_image_field(fieldname, fieldvalue)
1✔
1276
                else:
1277
                    if DEBUG:
1✔
1278
                        print('Found Vorbis Comment', key, value[:64])
1✔
1279
                    fieldname = self._VORBIS_MAPPING.get(
1✔
1280
                        key_lowercase, self._EXTRA_PREFIX + key_lowercase)  # custom field
1281
                    if fieldname in {'track', 'disc', 'track_total', 'disc_total'}:
1✔
1282
                        if fieldname in {'track', 'disc'} and '/' in value:
1✔
1283
                            value, total = value.split('/')[:2]
1✔
1284
                            if total.isdecimal():
1✔
1285
                                self._set_field(f'{fieldname}_total', int(total))
1✔
1286
                        if value.isdecimal():
1✔
1287
                            self._set_field(fieldname, int(value))
1✔
1288
                    else:
1289
                        self._set_field(fieldname, value)
1✔
1290

1291
    def _parse_pages(self, fh: BinaryIO) -> Iterator[bytes]:
1✔
1292
        # for the spec, see: https://wiki.xiph.org/Ogg
1293
        previous_page = b''  # contains data from previous (continuing) pages
1✔
1294
        header_data = fh.read(27)  # read ogg page header
1✔
1295
        while len(header_data) == 27:
1✔
1296
            header = struct.unpack('<4sBBqIIiB', header_data)
1✔
1297
            # https://xiph.org/ogg/doc/framing.html
1298
            oggs, version, _flags, pos, _serial, _pageseq, _crc, segments = header
1✔
1299
            self._max_samplenum = max(self._max_samplenum, pos)
1✔
1300
            if oggs != b'OggS' or version != 0:
1✔
1301
                raise ParseError('Invalid OGG header')
1✔
1302
            segsizes = struct.unpack('B' * segments, fh.read(segments))
1✔
1303
            total = 0
1✔
1304
            for segsize in segsizes:  # read all segments
1✔
1305
                total += segsize
1✔
1306
                if total < 255:  # less than 255 bytes means end of page
1✔
1307
                    yield previous_page + fh.read(total)
1✔
1308
                    previous_page = b''
1✔
1309
                    total = 0
1✔
1310
            if total != 0:
1✔
1311
                if total % 255 == 0:
1✔
1312
                    previous_page += fh.read(total)
×
1313
                else:
1314
                    yield previous_page + fh.read(total)
1✔
1315
                    previous_page = b''
1✔
1316
            header_data = fh.read(27)
1✔
1317

1318

1319
class _Wave(TinyTag):
1✔
1320
    # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
1321
    _RIFF_MAPPING = {
1✔
1322
        b'INAM': 'title',
1323
        b'TITL': 'title',
1324
        b'IPRD': 'album',
1325
        b'IART': 'artist',
1326
        b'IBPM': 'extra.bpm',
1327
        b'ICMT': 'comment',
1328
        b'IMUS': 'extra.composer',
1329
        b'ICOP': 'extra.copyright',
1330
        b'ICRD': 'year',
1331
        b'IGNR': 'genre',
1332
        b'ILNG': 'extra.language',
1333
        b'ISRC': 'extra.isrc',
1334
        b'IPUB': 'extra.publisher',
1335
        b'IPRT': 'track',
1336
        b'ITRK': 'track',
1337
        b'TRCK': 'track',
1338
        b'IBSU': 'extra.url',
1339
        b'YEAR': 'year',
1340
        b'IWRI': 'extra.lyricist',
1341
        b'IENC': 'extra.encoded_by',
1342
        b'IMED': 'extra.media',
1343
    }
1344

1345
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1346
        if not self._tags_parsed:
1✔
1347
            self._parse_tag(fh)
1✔
1348

1349
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1350
        # see: http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
1351
        # and: https://en.wikipedia.org/wiki/WAV
1352
        riff, _size, fformat = struct.unpack('4sI4s', fh.read(12))
1✔
1353
        if riff != b'RIFF' or fformat != b'WAVE':
1✔
1354
            raise ParseError('Invalid WAV header')
1✔
1355
        if self._parse_duration:
1✔
1356
            self.bitdepth = 16  # assume 16bit depth (CD quality)
1✔
1357
        chunk_header = fh.read(8)
1✔
1358
        while len(chunk_header) == 8:
1✔
1359
            subchunkid, subchunksize = struct.unpack('4sI', chunk_header)
1✔
1360
            subchunksize += subchunksize % 2  # IFF chunks are padded to an even number of bytes
1✔
1361
            if subchunkid == b'fmt ' and self._parse_duration:
1✔
1362
                _, channels, samplerate = struct.unpack('HHI', fh.read(8))
1✔
1363
                _, _, bitdepth = struct.unpack('<IHH', fh.read(8))
1✔
1364
                if bitdepth == 0:
1✔
1365
                    # Certain codecs (e.g. GSM 6.10) give us a bit depth of zero.
1366
                    # Avoid division by zero when calculating duration.
1367
                    bitdepth = 1
1✔
1368
                self.bitrate = samplerate * channels * bitdepth / 1000
1✔
1369
                self.channels, self.samplerate, self.bitdepth = channels, samplerate, bitdepth
1✔
1370
                remaining_size = subchunksize - 16
1✔
1371
                if remaining_size > 0:
1✔
1372
                    fh.seek(remaining_size, 1)  # skip remaining data in chunk
1✔
1373
            elif subchunkid == b'data' and self._parse_duration:
1✔
1374
                if (self.channels is not None and self.samplerate is not None
1✔
1375
                        and self.bitdepth is not None):
1376
                    self.duration = (
1✔
1377
                        subchunksize / self.channels / self.samplerate / (self.bitdepth / 8))
1378
                fh.seek(subchunksize, 1)
1✔
1379
            elif subchunkid == b'LIST' and self._parse_tags:
1✔
1380
                is_info = fh.read(4)  # check INFO header
1✔
1381
                if is_info != b'INFO':  # jump over non-INFO sections
1✔
1382
                    fh.seek(subchunksize - 4, os.SEEK_CUR)
×
1383
                else:
1384
                    sub_fh = io.BytesIO(fh.read(subchunksize - 4))
1✔
1385
                    field = sub_fh.read(4)
1✔
1386
                    while len(field) == 4:
1✔
1387
                        data_length = struct.unpack('I', sub_fh.read(4))[0]
1✔
1388
                        data_length += data_length % 2  # IFF chunks are padded to an even size
1✔
1389
                        data = sub_fh.read(data_length).split(b'\x00', 1)[0]  # strip zero-byte
1✔
1390
                        fieldname = self._RIFF_MAPPING.get(field)
1✔
1391
                        if fieldname:
1✔
1392
                            value = data.decode('utf-8', 'replace')
1✔
1393
                            if fieldname == 'track':
1✔
1394
                                if value.isdecimal():
1✔
1395
                                    self._set_field(fieldname, int(value))
1✔
1396
                            else:
1397
                                self._set_field(fieldname, value)
1✔
1398
                        field = sub_fh.read(4)
1✔
1399
            elif subchunkid in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1400
                id3 = _ID3()
1✔
1401
                id3._filehandler = fh
1✔
1402
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1403
                self._update(id3)
1✔
1404
            else:  # some other chunk, just skip the data
1405
                fh.seek(subchunksize, 1)
1✔
1406
            chunk_header = fh.read(8)
1✔
1407
        self._tags_parsed = True
1✔
1408

1409

1410
class _Flac(TinyTag):
1✔
1411
    METADATA_STREAMINFO = 0
1✔
1412
    METADATA_PADDING = 1
1✔
1413
    METADATA_APPLICATION = 2
1✔
1414
    METADATA_SEEKTABLE = 3
1✔
1415
    METADATA_VORBIS_COMMENT = 4
1✔
1416
    METADATA_CUESHEET = 5
1✔
1417
    METADATA_PICTURE = 6
1✔
1418

1419
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1420
        if not self._tags_parsed:
1✔
1421
            self._parse_tag(fh)
1✔
1422

1423
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1424
        id3 = None
1✔
1425
        header = fh.read(4)
1✔
1426
        if header[:3] == b'ID3':  # parse ID3 header if it exists
1✔
1427
            fh.seek(-4, os.SEEK_CUR)
1✔
1428
            id3 = _ID3()
1✔
1429
            id3._filehandler = fh
1✔
1430
            id3._parse_tags = self._parse_tags
1✔
1431
            id3._load_image = self._load_image
1✔
1432
            id3._parse_id3v2(fh)
1✔
1433
            header = fh.read(4)  # after ID3 should be fLaC
1✔
1434
        if header[:4] != b'fLaC':
1✔
1435
            raise ParseError('Invalid FLAC header')
1✔
1436
        # for spec, see https://xiph.org/flac/ogg_mapping.html
1437
        header_data = fh.read(4)
1✔
1438
        while len(header_data) == 4:
1✔
1439
            meta_header = struct.unpack('B3B', header_data)
1✔
1440
            block_type = meta_header[0] & 0x7f
1✔
1441
            is_last_block = meta_header[0] & 0x80
1✔
1442
            size = self._bytes_to_int(meta_header[1:4])
1✔
1443
            # http://xiph.org/flac/format.html#metadata_block_streaminfo
1444
            if block_type == self.METADATA_STREAMINFO and self._parse_duration:
1✔
1445
                stream_info_header = fh.read(size)
1✔
1446
                if len(stream_info_header) < 34:  # invalid streaminfo
1✔
1447
                    break
1✔
1448
                header_values = struct.unpack('HH3s3s8B16s', stream_info_header)
1✔
1449
                # From the xiph documentation:
1450
                # py | <bits>
1451
                # ----------------------------------------------
1452
                # H  | <16>  The minimum block size (in samples)
1453
                # H  | <16>  The maximum block size (in samples)
1454
                # 3s | <24>  The minimum frame size (in bytes)
1455
                # 3s | <24>  The maximum frame size (in bytes)
1456
                # 8B | <20>  Sample rate in Hz.
1457
                #    | <3>   (number of channels)-1.
1458
                #    | <5>   (bits per sample)-1.
1459
                #    | <36>  Total samples in stream.
1460
                # 16s| <128> MD5 signature
1461
                # min_blk, max_blk, min_frm, max_frm = header[0:4]
1462
                # min_frm = self._bytes_to_int(struct.unpack('3B', min_frm))
1463
                # max_frm = self._bytes_to_int(struct.unpack('3B', max_frm))
1464
                #                 channels--.  bits      total samples
1465
                # |----- samplerate -----| |-||----| |---------~   ~----|
1466
                # 0000 0000 0000 0000 0000 0000 0000 0000 0000      0000
1467
                # #---4---# #---5---# #---6---# #---7---# #--8-~   ~-12-#
1468
                self.samplerate = self._bytes_to_int(header_values[4:7]) >> 4
1✔
1469
                self.channels = ((header_values[6] >> 1) & 0x07) + 1
1✔
1470
                self.bitdepth = (
1✔
1471
                    ((header_values[6] & 1) << 4) + ((header_values[7] & 0xF0) >> 4) + 1)
1472
                total_sample_bytes = ((header_values[7] & 0x0F),) + header_values[8:12]
1✔
1473
                total_samples = self._bytes_to_int(total_sample_bytes)
1✔
1474
                self.duration = total_samples / self.samplerate
1✔
1475
                if self.duration > 0:
1✔
1476
                    self.bitrate = self.filesize / self.duration * 8 / 1000
1✔
1477
            elif block_type == self.METADATA_VORBIS_COMMENT and self._parse_tags:
1✔
1478
                oggtag = _Ogg()
1✔
1479
                oggtag._filehandler = fh
1✔
1480
                oggtag._parse_vorbis_comment(fh)
1✔
1481
                self._update(oggtag)
1✔
1482
            elif block_type == self.METADATA_PICTURE and self._load_image:
1✔
1483
                fieldname, value = self._parse_image(fh)
1✔
1484
                self._set_image_field(fieldname, value)
1✔
1485
            elif block_type >= 127:
1✔
1486
                break  # invalid block type
×
1487
            else:
1488
                if DEBUG:
1✔
1489
                    print('Unknown FLAC block type', block_type)
1✔
1490
                fh.seek(size, 1)  # seek over this block
1✔
1491

1492
            if is_last_block:
1✔
1493
                break
1✔
1494
            header_data = fh.read(4)
1✔
1495
        if id3 is not None:  # apply ID3 tags after vorbis
1✔
1496
            self._update(id3)
1✔
1497
        self._tags_parsed = True
1✔
1498

1499
    @classmethod
1✔
1500
    def _parse_image(cls, fh: BinaryIO) -> tuple[str, TagImage]:
1✔
1501
        # https://xiph.org/flac/format.html#metadata_block_picture
1502
        pic_type, mime_type_len = struct.unpack('>2I', fh.read(8))
1✔
1503
        mime_type = fh.read(mime_type_len).decode('utf-8', 'replace')
1✔
1504
        description_len = struct.unpack('>I', fh.read(4))[0]
1✔
1505
        description = fh.read(description_len).decode('utf-8', 'replace')
1✔
1506
        _width, _height, _depth, _colors, pic_len = struct.unpack('>5I', fh.read(20))
1✔
1507
        return _ID3._create_tag_image(fh.read(pic_len), pic_type, mime_type, description)
1✔
1508

1509

1510
class _Wma(TinyTag):
1✔
1511
    # see:
1512
    # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx
1513
    # and (japanese, but none the less helpful)
1514
    # http://uguisu.skr.jp/Windows/format_asf.html
1515
    _ASF_MAPPING = {
1✔
1516
        'WM/TrackNumber': 'track',
1517
        'WM/PartOfSet': 'disc',
1518
        'WM/Year': 'year',
1519
        'WM/AlbumArtist': 'albumartist',
1520
        'WM/Genre': 'genre',
1521
        'WM/AlbumTitle': 'album',
1522
        'WM/Composer': 'extra.composer',
1523
        'WM/Publisher': 'extra.publisher',
1524
        'WM/BeatsPerMinute': 'extra.bpm',
1525
        'WM/InitialKey': 'extra.initial_key',
1526
        'WM/Lyrics': 'extra.lyrics',
1527
        'WM/Language': 'extra.language',
1528
        'WM/AuthorURL': 'extra.url',
1529
        'WM/ISRC': 'extra.isrc',
1530
        'WM/Conductor': 'extra.conductor',
1531
        'WM/Writer': 'extra.lyricist',
1532
        'WM/SetSubTitle': 'extra.set_subtitle',
1533
        'WM/EncodedBy': 'extra.encoded_by',
1534
        'WM/EncodingSettings': 'extra.encoder_settings',
1535
        'WM/Media': 'extra.media',
1536
    }
1537
    _ASF_CONTENT_DESCRIPTION_OBJECT = b'3&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
1✔
1538
    _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT = (b'@\xa4\xd0\xd2\x07\xe3\xd2\x11\x97\xf0\x00'
1✔
1539
                                                b'\xa0\xc9^\xa8P')
1540
    _STREAM_BITRATE_PROPERTIES_OBJECT = b'\xceu\xf8{\x8dF\xd1\x11\x8d\x82\x00`\x97\xc9\xa2\xb2'
1✔
1541
    _ASF_FILE_PROPERTY_OBJECT = b'\xa1\xdc\xab\x8cG\xa9\xcf\x11\x8e\xe4\x00\xc0\x0c Se'
1✔
1542
    _ASF_STREAM_PROPERTIES_OBJECT = b'\x91\x07\xdc\xb7\xb7\xa9\xcf\x11\x8e\xe6\x00\xc0\x0c Se'
1✔
1543
    _STREAM_TYPE_ASF_AUDIO_MEDIA = b'@\x9ei\xf8M[\xcf\x11\xa8\xfd\x00\x80_\\D+'
1✔
1544

1545
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1546
        if not self._tags_parsed:
1✔
1547
            self._parse_tag(fh)
1✔
1548

1549
    def _decode_string(self, bytestring: bytes) -> str:
1✔
1550
        return self._unpad(bytestring.decode('utf-16', 'replace'))
1✔
1551

1552
    def _decode_ext_desc(self, value_type: int, value: bytes) -> int | str | None:
1✔
1553
        """ decode _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT values"""
1554
        if value_type == 0:  # Unicode string
1✔
1555
            return self._decode_string(value)
1✔
1556
        if 1 < value_type < 6:  # DWORD / QWORD / WORD
1✔
1557
            return self._bytes_to_int_le(value)
1✔
1558
        return None
×
1559

1560
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1561
        header = fh.read(30)
1✔
1562
        # http://www.garykessler.net/library/file_sigs.html
1563
        # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958
1564
        if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'  # 128 bit GUID
1✔
1565
                or header[-1:] != b'\x02'):
1566
            raise ParseError('Invalid WMA header')
1✔
1567
        while True:
1✔
1568
            object_id = fh.read(16)
1✔
1569
            object_size = self._bytes_to_int_le(fh.read(8))
1✔
1570
            if object_size == 0 or object_size > self.filesize:
1✔
1571
                break  # invalid object, stop parsing.
1✔
1572
            if object_id == self._ASF_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1573
                title_length = self._bytes_to_int_le(fh.read(2))
1✔
1574
                author_length = self._bytes_to_int_le(fh.read(2))
1✔
1575
                copyright_length = self._bytes_to_int_le(fh.read(2))
1✔
1576
                description_length = self._bytes_to_int_le(fh.read(2))
1✔
1577
                rating_length = self._bytes_to_int_le(fh.read(2))
1✔
1578
                data_blocks = {
1✔
1579
                    'title': title_length,
1580
                    'artist': author_length,
1581
                    'extra.copyright': copyright_length,
1582
                    'comment': description_length,
1583
                    '_rating': rating_length,
1584
                }
1585
                for i_field_name, length in data_blocks.items():
1✔
1586
                    bytestring = fh.read(length)
1✔
1587
                    if not i_field_name.startswith('_'):
1✔
1588
                        self._set_field(i_field_name, self._decode_string(bytestring))
1✔
1589
            elif object_id == self._ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1590
                # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195
1591
                descriptor_count = self._bytes_to_int_le(fh.read(2))
1✔
1592
                for _ in range(descriptor_count):
1✔
1593
                    name_len = self._bytes_to_int_le(fh.read(2))
1✔
1594
                    name = self._decode_string(fh.read(name_len))
1✔
1595
                    value_type = self._bytes_to_int_le(fh.read(2))
1✔
1596
                    value_len = self._bytes_to_int_le(fh.read(2))
1✔
1597
                    if value_type == 1:
1✔
1598
                        fh.seek(value_len, os.SEEK_CUR)  # skip byte values
1✔
1599
                        continue
1✔
1600
                    field_name = self._ASF_MAPPING.get(name)  # try to get normalized field name
1✔
1601
                    if field_name is None:  # custom field
1✔
1602
                        if name.startswith('WM/'):
1✔
1603
                            name = name[3:]
1✔
1604
                        field_name = self._EXTRA_PREFIX + name.lower()
1✔
1605
                    field_value = self._decode_ext_desc(value_type, fh.read(value_len))
1✔
1606
                    if field_value is not None:
1✔
1607
                        if field_name in {'track', 'disc'}:
1✔
1608
                            if isinstance(field_value, int) or field_value.isdecimal():
1✔
1609
                                self._set_field(field_name, int(field_value))
1✔
1610
                        else:
1611
                            self._set_field(field_name, field_value)
1✔
1612
            elif object_id == self._ASF_FILE_PROPERTY_OBJECT and self._parse_duration:
1✔
1613
                fh.seek(40, os.SEEK_CUR)
1✔
1614
                play_duration = self._bytes_to_int_le(fh.read(8)) / 10000000
1✔
1615
                fh.seek(8, os.SEEK_CUR)
1✔
1616
                preroll = self._bytes_to_int_le(fh.read(8)) / 1000
1✔
1617
                fh.seek(16, os.SEEK_CUR)
1✔
1618
                # According to the specification, we need to subtract the preroll from play_duration
1619
                # to get the actual duration of the file
1620
                self.duration = max(play_duration - preroll, 0.0)
1✔
1621
            elif object_id == self._ASF_STREAM_PROPERTIES_OBJECT and self._parse_duration:
1✔
1622
                stream_type = fh.read(16)
1✔
1623
                fh.seek(24, os.SEEK_CUR)  # skip irrelevant fields
1✔
1624
                type_specific_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1625
                error_correction_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1626
                fh.seek(6, os.SEEK_CUR)   # skip irrelevant fields
1✔
1627
                already_read = 0
1✔
1628
                if stream_type == self._STREAM_TYPE_ASF_AUDIO_MEDIA:
1✔
1629
                    codec_id_format_tag = self._bytes_to_int_le(fh.read(2))
1✔
1630
                    self.channels = self._bytes_to_int_le(fh.read(2))
1✔
1631
                    self.samplerate = self._bytes_to_int_le(fh.read(4))
1✔
1632
                    avg_bytes_per_second = self._bytes_to_int_le(fh.read(4))
1✔
1633
                    self.bitrate = avg_bytes_per_second * 8 / 1000
1✔
1634
                    fh.seek(2, os.SEEK_CUR)  # skip irrelevant field
1✔
1635
                    bits_per_sample = self._bytes_to_int_le(fh.read(2))
1✔
1636
                    if codec_id_format_tag == 355:  # lossless
1✔
1637
                        self.bitdepth = bits_per_sample
1✔
1638
                    already_read = 16
1✔
1639
                fh.seek(type_specific_data_length - already_read, os.SEEK_CUR)
1✔
1640
                fh.seek(error_correction_data_length, os.SEEK_CUR)
1✔
1641
            else:
1642
                fh.seek(object_size - 24, os.SEEK_CUR)  # read over onknown object ids
1✔
1643
        self._tags_parsed = True
1✔
1644

1645

1646
class _Aiff(TinyTag):
1✔
1647
    #
1648
    # AIFF is part of the IFF family of file formats.
1649
    #
1650
    # https://en.wikipedia.org/wiki/Audio_Interchange_File_Format#Data_format
1651
    # https://web.archive.org/web/20171118222232/http://www-mmsp.ece.mcgill.ca/documents/audioformats/aiff/aiff.html
1652
    # https://web.archive.org/web/20071219035740/http://www.cnpbagwell.com/aiff-c.txt
1653
    #
1654
    # A few things about the spec:
1655
    #
1656
    # * IFF strings are not supposed to be null terminated.  They sometimes are.
1657
    # * Some tools might throw more metadata into the ANNO chunk but it is
1658
    #   wildly unreliable to count on it. In fact, the official spec recommends against
1659
    #   using it. That said... this code throws the ANNO field into comment and hopes
1660
    #   for the best.
1661
    #
1662
    # The key thing here is that AIFF metadata is usually in a handful of fields
1663
    # and the rest is an ID3 or XMP field.  XMP is too complicated and only Adobe-related
1664
    # products support it. The vast majority use ID3. As such, this code inherits from
1665
    # ID3 rather than TinyTag since it does everything that needs to be done here.
1666
    #
1667

1668
    _AIFF_MAPPING = {
1✔
1669
        #
1670
        # "Name Chunk text contains the name of the sampled sound."
1671
        #
1672
        # "Author Chunk text contains one or more author names.  An author in
1673
        # this case is the creator of a sampled sound."
1674
        #
1675
        # "Annotation Chunk text contains a comment.  Use of this chunk is
1676
        # discouraged within FORM AIFC." Some tools: "hold my beer"
1677
        #
1678
        # "The Copyright Chunk contains a copyright notice for the sound.  text
1679
        #  contains a date followed by the copyright owner.  The chunk ID '[c] '
1680
        # serves as the copyright character. " Some tools: "hold my beer"
1681
        #
1682
        b'NAME': 'title',
1683
        b'AUTH': 'artist',
1684
        b'ANNO': 'comment',
1685
        b'(c) ': 'extra.copyright',
1686
    }
1687

1688
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1689
        chunk_id, _size, form = struct.unpack('>4sI4s', fh.read(12))
1✔
1690
        if chunk_id != b'FORM' or form not in (b'AIFC', b'AIFF'):
1✔
1691
            raise ParseError('Invalid AIFF header')
1✔
1692
        chunk_header = fh.read(8)
1✔
1693
        while len(chunk_header) == 8:
1✔
1694
            sub_chunk_id, sub_chunk_size = struct.unpack('>4sI', chunk_header)
1✔
1695
            sub_chunk_size += sub_chunk_size % 2  # IFF chunks are padded to an even number of bytes
1✔
1696
            if sub_chunk_id in self._AIFF_MAPPING and self._parse_tags:
1✔
1697
                value = self._unpad(fh.read(sub_chunk_size).decode('utf-8', 'replace'))
1✔
1698
                self._set_field(self._AIFF_MAPPING[sub_chunk_id], value)
1✔
1699
            elif sub_chunk_id == b'COMM' and self._parse_duration:
1✔
1700
                channels, num_frames, bitdepth = struct.unpack('>hLh', fh.read(8))
1✔
1701
                self.channels, self.bitdepth = channels, bitdepth
1✔
1702
                try:
1✔
1703
                    exponent, mantissa = struct.unpack('>HQ', fh.read(10))   # Extended precision
1✔
1704
                    samplerate = int(mantissa * (2 ** (exponent - 0x3FFF - 63)))
1✔
1705
                    duration = num_frames / samplerate
1✔
1706
                    bitrate = samplerate * channels * bitdepth / 1000
1✔
1707
                    self.samplerate, self.duration, self.bitrate = samplerate, duration, bitrate
1✔
1708
                except OverflowError:
1✔
1709
                    pass
1✔
1710
                fh.seek(sub_chunk_size - 18, 1)  # skip remaining data in chunk
1✔
1711
            elif sub_chunk_id in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1712
                id3 = _ID3()
1✔
1713
                id3._filehandler = fh
1✔
1714
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1715
                self._update(id3)
1✔
1716
            else:  # some other chunk, just skip the data
1717
                fh.seek(sub_chunk_size, 1)
1✔
1718
            chunk_header = fh.read(8)
1✔
1719
        self._tags_parsed = True
1✔
1720

1721
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1722
        if not self._tags_parsed:
1✔
1723
            self._parse_tag(fh)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc