• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

devsnd / tinytag / 9167051894

21 May 2024 12:51AM UTC coverage: 98.86% (-0.004%) from 98.864%
9167051894

push

github

mathiascode
Fix linting error

1 of 1 new or added line in 1 file covered. (100.0%)

15 existing lines in 1 file now uncovered.

1388 of 1404 relevant lines covered (98.86%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.52
/tinytag/tinytag.py
1
# tinytag - an audio file metadata reader
2
# Copyright (c) 2014-2023 Tom Wallroth
3
# Copyright (c) 2021-2024 Mat (mathiascode)
4
#
5
# Sources on GitHub:
6
# http://github.com/devsnd/tinytag/
7

8
# MIT License
9

10
# Copyright (c) 2014-2024 Tom Wallroth, Mat (mathiascode)
11

12
# Permission is hereby granted, free of charge, to any person obtaining a copy
13
# of this software and associated documentation files (the "Software"), to deal
14
# in the Software without restriction, including without limitation the rights
15
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16
# copies of the Software, and to permit persons to whom the Software is
17
# furnished to do so, subject to the following conditions:
18

19
# The above copyright notice and this permission notice shall be included in all
20
# copies or substantial portions of the Software.
21

22
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28
# SOFTWARE.
29

30
"""Audio file metadata reader"""
1✔
31

32
# pylint: disable=invalid-name,protected-access
33
# pylint: disable=too-many-lines,too-many-arguments,too-many-boolean-expressions
34
# pylint: disable=too-many-branches,too-many-instance-attributes,too-many-locals
35
# pylint: disable=too-many-nested-blocks,too-many-statements,too-few-public-methods
36

37

38
from __future__ import annotations
1✔
39
from collections.abc import Callable, Iterator
1✔
40
from functools import reduce
1✔
41
from os import PathLike
1✔
42
from sys import stderr
1✔
43
from typing import Any, BinaryIO
1✔
44
from warnings import warn
1✔
45

46
import base64
1✔
47
import io
1✔
48
import os
1✔
49
import re
1✔
50
import struct
1✔
51

52

53
DEBUG = bool(os.environ.get('TINYTAG_DEBUG'))  # some of the parsers can print debug info
1✔
54

55

56
class TinyTagException(Exception):
1✔
57
    """Base class for exceptions."""
1✔
58

59

60
class ParseError(TinyTagException):
1✔
61
    """Parsing an audio file failed."""
1✔
62

63

64
class UnsupportedFormatError(TinyTagException):
1✔
65
    """File format is not supported."""
1✔
66

67

68
class TinyTag:
1✔
69
    """A class containing audio file metadata."""
1✔
70

71
    SUPPORTED_FILE_EXTENSIONS = (
1✔
72
        '.mp1', '.mp2', '.mp3',
73
        '.oga', '.ogg', '.opus', '.spx',
74
        '.wav', '.flac', '.wma',
75
        '.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc',
76
        '.aiff', '.aifc', '.aif', '.afc'
77
    )
78
    _EXTRA_PREFIX = 'extra.'
1✔
79
    _file_extension_mapping: dict[tuple[str, ...], type[TinyTag]] | None = None
1✔
80
    _magic_bytes_mapping: dict[bytes, type[TinyTag]] | None = None
1✔
81

82
    def __init__(self) -> None:
1✔
83
        self.filename: bytes | str | PathLike[Any] | None = None
1✔
84
        self.filesize = 0
1✔
85
        self.duration: float | None = None
1✔
86
        self.channels: int | None = None
1✔
87
        self.bitrate: float | None = None
1✔
88
        self.bitdepth: int | None = None
1✔
89
        self.samplerate: int | None = None
1✔
90
        self.artist: str | None = None
1✔
91
        self.albumartist: str | None = None
1✔
92
        self.composer: str | None = None
1✔
93
        self.album: str | None = None
1✔
94
        self.disc: int | None = None
1✔
95
        self.disc_total: int | None = None
1✔
96
        self.title: str | None = None
1✔
97
        self.track: int | None = None
1✔
98
        self.track_total: int | None = None
1✔
99
        self.genre: str | None = None
1✔
100
        self.year: str | None = None
1✔
101
        self.comment: str | None = None
1✔
102
        self.extra: dict[str, str | float | int] = {}
1✔
103
        self.images = TagImages()
1✔
104
        self._filehandler: BinaryIO | None = None
1✔
105
        self._default_encoding: str | None = None  # allow override for some file formats
1✔
106
        self._parse_duration = True
1✔
107
        self._parse_tags = True
1✔
108
        self._load_image = False
1✔
109
        self._tags_parsed = False
1✔
110

111
    @classmethod
1✔
112
    def get(cls,
1✔
113
            filename: bytes | str | PathLike[Any] | None = None,
114
            tags: bool = True,
115
            duration: bool = True,
116
            image: bool = False,
117
            encoding: str | None = None,
118
            file_obj: BinaryIO | None = None,
119
            **kwargs: Any) -> TinyTag:
120
        """Return a tag object for an audio file."""
121
        should_close_file = file_obj is None
1✔
122
        if filename and should_close_file:
1✔
123
            file_obj = open(filename, 'rb')  # pylint: disable=consider-using-with
1✔
124
        if file_obj is None:
1✔
125
            raise ValueError('Either filename or file_obj argument is required')
1✔
126
        if 'ignore_errors' in kwargs:
1✔
127
            warn('ignore_errors argument is obsolete, and will be removed in a future '
1✔
128
                 '2.x release', DeprecationWarning, stacklevel=2)
129
        try:
1✔
130
            file_obj.seek(0, os.SEEK_END)
1✔
131
            filesize = file_obj.tell()
1✔
132
            file_obj.seek(0)
1✔
133
            parser_class = cls._get_parser_class(filename, file_obj)
1✔
134
            tag = parser_class()
1✔
135
            tag._filehandler = file_obj
1✔
136
            tag._default_encoding = encoding
1✔
137
            tag.filename = filename
1✔
138
            tag.filesize = filesize
1✔
139
            if filesize > 0:
1✔
140
                try:
1✔
141
                    tag._load(tags=tags, duration=duration, image=image)
1✔
142
                except Exception as exc:
1✔
143
                    raise ParseError(exc) from exc
1✔
144
            return tag
1✔
145
        finally:
146
            if should_close_file:
1✔
147
                file_obj.close()
1✔
148

149
    @classmethod
1✔
150
    def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
1✔
151
        """Check if a specific file is supported based on its file extension."""
152
        return cls._get_parser_for_filename(filename) is not None
1✔
153

154
    def __repr__(self) -> str:
1✔
155
        return str(self._as_dict())
1✔
156

157
    def _as_dict(self) -> dict[str, Any]:
1✔
158
        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
1✔
159

160
    @classmethod
1✔
161
    def _get_parser_for_filename(
1✔
162
            cls, filename: bytes | str | PathLike[Any]) -> type[TinyTag] | None:
163
        if cls._file_extension_mapping is None:
1✔
164
            cls._file_extension_mapping = {
1✔
165
                ('.mp1', '.mp2', '.mp3'): _ID3,
166
                ('.oga', '.ogg', '.opus', '.spx'): _Ogg,
167
                ('.wav',): _Wave,
168
                ('.flac',): _Flac,
169
                ('.wma',): _Wma,
170
                ('.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc'): _MP4,
171
                ('.aiff', '.aifc', '.aif', '.afc'): _Aiff,
172
            }
173
        filename = os.fsdecode(filename).lower()
1✔
174
        for ext, tagclass in cls._file_extension_mapping.items():
1✔
175
            if filename.endswith(ext):
1✔
176
                return tagclass
1✔
177
        return None
1✔
178

179
    @classmethod
1✔
180
    def _get_parser_for_file_handle(cls, fh: BinaryIO) -> type[TinyTag] | None:
1✔
181
        # https://en.wikipedia.org/wiki/List_of_file_signatures
182
        if cls._magic_bytes_mapping is None:
1✔
183
            cls._magic_bytes_mapping = {
1✔
184
                b'^ID3': _ID3,
185
                b'^\xff\xfb': _ID3,
186
                b'^OggS.........................FLAC': _Ogg,
187
                b'^OggS........................Opus': _Ogg,
188
                b'^OggS........................Speex': _Ogg,
189
                b'^OggS.........................vorbis': _Ogg,
190
                b'^RIFF....WAVE': _Wave,
191
                b'^fLaC': _Flac,
192
                b'^\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C': _Wma,
193
                b'....ftypM4A': _MP4,  # https://www.file-recovery.com/m4a-signature-format.htm
194
                b'....ftypaax': _MP4,  # Audible proprietary M4A container
195
                b'....ftypaaxc': _MP4,  # Audible proprietary M4A container
196
                b'\xff\xf1': _MP4,  # https://www.garykessler.net/library/file_sigs.html
197
                b'^FORM....AIFF': _Aiff,
198
                b'^FORM....AIFC': _Aiff,
199
            }
200
        header = fh.read(max(len(sig) for sig in cls._magic_bytes_mapping))
1✔
201
        fh.seek(0)
1✔
202
        for magic, parser in cls._magic_bytes_mapping.items():
1✔
203
            if re.match(magic, header):
1✔
204
                return parser
1✔
205
        return None
1✔
206

207
    @classmethod
1✔
208
    def _get_parser_class(cls, filename: bytes | str | PathLike[Any] | None = None,
1✔
209
                          filehandle: BinaryIO | None = None) -> type[TinyTag]:
210
        if cls != TinyTag:  # if `get` is invoked on TinyTag, find parser by ext
1✔
211
            return cls  # otherwise use the class on which `get` was invoked
1✔
212
        if filename:
1✔
213
            parser_class = cls._get_parser_for_filename(filename)
1✔
214
            if parser_class is not None:
1✔
215
                return parser_class
1✔
216
        # try determining the file type by magic byte header
217
        if filehandle:
1✔
218
            parser_class = cls._get_parser_for_file_handle(filehandle)
1✔
219
            if parser_class is not None:
1✔
220
                return parser_class
1✔
221
        raise UnsupportedFormatError('No tag reader found to support file type')
1✔
222

223
    def _load(self, tags: bool, duration: bool, image: bool = False) -> None:
1✔
224
        self._parse_tags = tags
1✔
225
        self._parse_duration = duration
1✔
226
        self._load_image = image
1✔
227
        if self._filehandler is None:
1✔
228
            return
1✔
229
        if tags:
1✔
230
            self._parse_tag(self._filehandler)
1✔
231
        if duration:
1✔
232
            if tags:  # rewind file if the tags were already parsed
1✔
233
                self._filehandler.seek(0)
1✔
234
            self._determine_duration(self._filehandler)
1✔
235

236
    def _parse_string_field(self, fieldname: str, old_value: Any | None, value: str) -> str | None:
1✔
237
        if fieldname in {'artist', 'genre'}:
1✔
238
            # First artist/genre goes in tag.artist/genre, others in tag.extra.other_artists/genres
239
            values = value.split('\x00')
1✔
240
            value = values[0]
1✔
241
            start_pos = 0 if old_value else 1
1✔
242
            if len(values) > 1:
1✔
243
                self._set_field(self._EXTRA_PREFIX + f'other_{fieldname}s', values[start_pos:])
1✔
244
            elif old_value and value != old_value:
1✔
245
                self._set_field(self._EXTRA_PREFIX + f'other_{fieldname}s', [value])
1✔
246
                return None
1✔
247
        if old_value or not value:
1✔
248
            return None
1✔
249
        return value
1✔
250

251
    def _set_field(self, fieldname: str, value: str | int | float | list[str] | None) -> None:
1✔
252
        write_dest = self.__dict__
1✔
253
        original_fieldname = fieldname
1✔
254
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
255
            write_dest = self.extra
1✔
256
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
257
        old_value = write_dest.get(fieldname)
1✔
258
        if isinstance(value, str):
1✔
259
            value = self._parse_string_field(original_fieldname, old_value, value)
1✔
260
            if not value:
1✔
261
                return
1✔
262
        elif isinstance(value, list):
1✔
263
            if not isinstance(old_value, list):
1✔
264
                old_value = []
1✔
265
            value = old_value + [i for i in value if i and i not in old_value]
1✔
266
        elif not value and old_value:
1✔
267
            return
1✔
268
        if DEBUG:
1✔
269
            print(f'Setting field "{original_fieldname}" to "{value!r}"')
1✔
270
        write_dest[fieldname] = value
1✔
271

272
    def _set_image_field(self, fieldname: str, value: TagImage) -> None:
1✔
273
        write_dest = self.images.__dict__
1✔
274
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
275
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
276
            write_dest = self.images.extra
1✔
277
        old_values = write_dest.get(fieldname)
1✔
278
        values = [value]
1✔
279
        if old_values is not None:
1✔
280
            values = old_values + values
1✔
281
        if DEBUG:
1✔
282
            print(f'Setting image field "{fieldname}"')
1✔
283
        write_dest[fieldname] = values
1✔
284

285
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
286
        raise NotImplementedError
1✔
287

288
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
289
        raise NotImplementedError
1✔
290

291
    def _update(self, other: TinyTag) -> None:
1✔
292
        # update the values of this tag with the values from another tag
293
        excluded_attrs = {'filesize', 'extra', 'images'}
1✔
294
        for standard_key, standard_value in other.__dict__.items():
1✔
295
            if (not standard_key.startswith('_')
1✔
296
                    and standard_key not in excluded_attrs
297
                    and standard_value is not None):
298
                self._set_field(standard_key, standard_value)
1✔
299
        for extra_key, extra_value in other.extra.items():
1✔
300
            self._set_field(self._EXTRA_PREFIX + extra_key, extra_value)
1✔
301
        for image_key, images in other.images._as_dict().items():
1✔
302
            for image in images:
1✔
303
                self._set_image_field(image_key, image)
1✔
304
        for image_extra_key, images_extra in other.images.extra.items():
1✔
305
            for image_extra in images_extra:
×
UNCOV
306
                self._set_image_field(self._EXTRA_PREFIX + image_extra_key, image_extra)
×
307

308
    @staticmethod
1✔
309
    def _bytes_to_int_le(b: bytes) -> int:
1✔
310
        fmt = {1: '<B', 2: '<H', 4: '<I', 8: '<Q'}.get(len(b))
1✔
311
        result: int = struct.unpack(fmt, b)[0] if fmt is not None else 0
1✔
312
        return result
1✔
313

314
    @staticmethod
1✔
315
    def _bytes_to_int(b: tuple[int, ...]) -> int:
1✔
316
        return reduce(lambda accu, elem: (accu << 8) + elem, b, 0)
1✔
317

318
    @staticmethod
1✔
319
    def _unpad(s: str) -> str:
1✔
320
        # strings in mp3 and asf *may* be terminated with a zero byte at the end
321
        return s.strip('\x00')
1✔
322

323
    def get_image(self) -> bytes | None:
1✔
324
        """Deprecated, use images.any instead."""
325
        warn('get_image() is deprecated, and will be removed in a future 2.x release. '
1✔
326
             'Use images.any instead.', DeprecationWarning, stacklevel=2)
327
        image = self.images.any
1✔
328
        return image.data if image is not None else None
1✔
329

330
    @property
1✔
331
    def audio_offset(self) -> None:
1✔
332
        """Obsolete."""
333
        warn('audio_offset attribute is obsolete, and will be '
1✔
334
             'removed in a future 2.x release', DeprecationWarning, stacklevel=2)
335

336

337
class TagImages:
1✔
338
    """A class containing images embedded in an audio file."""
1✔
339
    def __init__(self) -> None:
1✔
340
        self.front_cover: list[TagImage] = []
1✔
341
        self.back_cover: list[TagImage] = []
1✔
342
        self.leaflet: list[TagImage] = []
1✔
343
        self.media: list[TagImage] = []
1✔
344
        self.other: list[TagImage] = []
1✔
345
        self.extra: dict[str, list[TagImage]] = {}
1✔
346

347
    @property
1✔
348
    def any(self) -> TagImage | None:
1✔
349
        """Return a cover image.
350
        If not present, fall back to any other available image.
351
        """
352
        for image_list in self._as_dict().values():
1✔
353
            for image in image_list:
1✔
354
                return image
1✔
355
        for extra_image_list in self.extra.values():
1✔
356
            for extra_image in extra_image_list:
1✔
357
                return extra_image
1✔
358
        return None
1✔
359

360
    def __repr__(self) -> str:
1✔
361
        return str(vars(self))
1✔
362

363
    def _as_dict(self) -> dict[str, list[TagImage]]:
1✔
364
        return {
1✔
365
            k: v for k, v in self.__dict__.items()
366
            if not k.startswith('_') and k != 'extra'
367
        }
368

369

370
class TagImage:
1✔
371
    """A class representing an image embedded in an audio file."""
1✔
372
    def __init__(self, name: str, data: bytes, mime_type: str | None = None) -> None:
1✔
373
        self.name = name
1✔
374
        self.data = data
1✔
375
        self.mime_type = mime_type
1✔
376
        self.description: str | None = None
1✔
377

378
    def __repr__(self) -> str:
1✔
379
        variables = vars(self).copy()
1✔
380
        data = variables.get("data")
1✔
381
        if data is not None:
1✔
382
            variables["data"] = (data[:45] + b'..') if len(data) > 45 else data
1✔
383
        return str(variables)
1✔
384

385

386
class _MP4(TinyTag):
1✔
387
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html
388
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html
389

390
    class _Parser:
1✔
391
        atom_decoder_by_type: dict[
1✔
392
            int, Callable[[bytes], int | str | bytes | TagImage]] | None = None
393
        _CUSTOM_FIELD_NAME_MAPPING = {
1✔
394
            'conductor': 'extra.conductor',
395
            'discsubtitle': 'extra.set_subtitle',
396
            'initialkey': 'extra.initial_key',
397
            'isrc': 'extra.isrc',
398
            'language': 'extra.language',
399
            'lyricist': 'extra.lyricist',
400
            'media': 'extra.media',
401
        }
402

403
        @classmethod
1✔
404
        def _unpack_integer(cls, value: bytes, signed: bool = True) -> int:
1✔
405
            value_length = len(value)
1✔
406
            result = -1
1✔
407
            if value_length == 1:
1✔
UNCOV
408
                result = struct.unpack('>b' if signed else '>B', value)[0]
×
409
            elif value_length == 2:
1✔
410
                result = struct.unpack('>h' if signed else '>H', value)[0]
1✔
411
            elif value_length == 4:
1✔
412
                result = struct.unpack('>i' if signed else '>I', value)[0]
1✔
413
            elif value_length == 8:
1✔
414
                result = struct.unpack('>q' if signed else '>Q', value)[0]
1✔
415
            return result
1✔
416

417
        @classmethod
1✔
418
        def _unpack_integer_unsigned(cls, value: bytes) -> int:
1✔
UNCOV
419
            return cls._unpack_integer(value, signed=False)
×
420

421
        @classmethod
1✔
422
        def _make_data_atom_parser(
1✔
423
                cls, fieldname: str) -> Callable[[bytes], dict[str, int | str | bytes | TagImage]]:
424
            def _parse_data_atom(data_atom: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
425
                data_type = struct.unpack('>I', data_atom[:4])[0]
1✔
426
                if cls.atom_decoder_by_type is None:
1✔
427
                    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html#//apple_ref/doc/uid/TP40000939-CH1-SW34
428
                    cls.atom_decoder_by_type = {
1✔
429
                        # 0: 'reserved'
430
                        1: lambda x: x.decode('utf-8', 'replace'),   # UTF-8
431
                        2: lambda x: x.decode('utf-16', 'replace'),  # UTF-16
432
                        3: lambda x: x.decode('s/jis', 'replace'),   # S/JIS
433
                        # 16: duration in millis
434
                        13: lambda x: TagImage('front_cover', x, 'image/jpeg'),  # JPEG
435
                        14: lambda x: TagImage('front_cover', x, 'image/png'),   # PNG
436
                        21: cls._unpack_integer,                    # BE Signed int
437
                        22: cls._unpack_integer_unsigned,           # BE Unsigned int
438
                        # 23: lambda x: struct.unpack('>f', x)[0],  # BE Float32
439
                        # 24: lambda x: struct.unpack('>d', x)[0],  # BE Float64
440
                        # 27: lambda x: x,                          # BMP
441
                        # 28: lambda x: x,                          # QuickTime Metadata atom
442
                        65: cls._unpack_integer,                    # 8-bit Signed int
443
                        66: cls._unpack_integer,                    # BE 16-bit Signed int
444
                        67: cls._unpack_integer,                    # BE 32-bit Signed int
445
                        74: cls._unpack_integer,                    # BE 64-bit Signed int
446
                        75: cls._unpack_integer_unsigned,           # 8-bit Unsigned int
447
                        76: cls._unpack_integer_unsigned,           # BE 16-bit Unsigned int
448
                        77: cls._unpack_integer_unsigned,           # BE 32-bit Unsigned int
449
                        78: cls._unpack_integer_unsigned,           # BE 64-bit Unsigned int
450
                    }
451
                conversion = cls.atom_decoder_by_type.get(data_type)
1✔
452
                if conversion is None:
1✔
453
                    if DEBUG:
1✔
454
                        print(f'Cannot convert data type: {data_type}', file=stderr)
1✔
455
                    return {}  # don't know how to convert data atom
1✔
456
                # skip header & null-bytes, convert rest
457
                return {fieldname: conversion(data_atom[8:])}
1✔
458
            return _parse_data_atom
1✔
459

460
        @classmethod
1✔
461
        def _make_number_parser(
1✔
462
                cls, fieldname1: str, fieldname2: str) -> Callable[[bytes], dict[str, int]]:
463
            def _(data_atom: bytes) -> dict[str, int]:
1✔
464
                number_data = data_atom[8:14]
1✔
465
                numbers = struct.unpack('>HHH', number_data)
1✔
466
                # for some reason the first number is always irrelevant.
467
                return {fieldname1: numbers[1], fieldname2: numbers[2]}
1✔
468
            return _
1✔
469

470
        @classmethod
1✔
471
        def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]:
1✔
472
            # dunno why the genre is offset by -1 but that's how mutagen does it
473
            idx = struct.unpack('>H', data_atom[8:])[0] - 1
1✔
474
            result = {}
1✔
475
            if idx < len(_ID3._ID3V1_GENRES):
1✔
476
                result['genre'] = _ID3._ID3V1_GENRES[idx]
1✔
477
            return result
1✔
478

479
        @classmethod
1✔
480
        def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None:
1✔
481
            for _i in range(4):
1✔
482
                if esds_atom.read(1) != b'\x80':
1✔
483
                    break
1✔
484

485
        @classmethod
1✔
486
        def _parse_custom_field(cls, data: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
487
            fh = io.BytesIO(data)
1✔
488
            header_size = 8
1✔
489
            field_name = None
1✔
490
            data_atom = b''
1✔
491
            atom_header = fh.read(header_size)
1✔
492
            while len(atom_header) == header_size:
1✔
493
                atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
494
                atom_type = atom_header[4:]
1✔
495
                if atom_type == b'name':
1✔
496
                    atom_value = fh.read(atom_size)[4:].lower()
1✔
497
                    field_name = atom_value.decode('utf-8', 'replace')
1✔
498
                    field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get(
1✔
499
                        field_name, TinyTag._EXTRA_PREFIX + field_name)
500
                elif atom_type == b'data':
1✔
501
                    data_atom = fh.read(atom_size)
1✔
502
                else:
503
                    fh.seek(atom_size, os.SEEK_CUR)
1✔
504
                atom_header = fh.read(header_size)  # read next atom
1✔
505
            if len(data_atom) < 8 or field_name is None:
1✔
506
                return {}
1✔
507
            parser = cls._make_data_atom_parser(field_name)
1✔
508
            return parser(data_atom)
1✔
509

510
        @classmethod
1✔
511
        def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]:
1✔
512
            # this atom also contains the esds atom:
513
            # https://ffmpeg.org/doxygen/0.6/mov_8c-source.html
514
            # http://xhelmboyx.tripod.com/formats/mp4-layout.txt
515
            # http://sasperger.tistory.com/103
516
            datafh = io.BytesIO(data)
1✔
517
            datafh.seek(16, os.SEEK_CUR)  # jump over version and flags
1✔
518
            channels = struct.unpack('>H', datafh.read(2))[0]
1✔
519
            datafh.seek(2, os.SEEK_CUR)   # jump over bit_depth
1✔
520
            datafh.seek(2, os.SEEK_CUR)   # jump over QT compr id & pkt size
1✔
521
            sr = struct.unpack('>I', datafh.read(4))[0]
1✔
522

523
            # ES Description Atom
524
            esds_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
525
            esds_atom = io.BytesIO(data[36:36 + esds_atom_size])
1✔
526
            esds_atom.seek(5, os.SEEK_CUR)   # jump over version, flags and tag
1✔
527

528
            # ES Descriptor
529
            cls._read_extended_descriptor(esds_atom)
1✔
530
            esds_atom.seek(4, os.SEEK_CUR)   # jump over ES id, flags and tag
1✔
531

532
            # Decoder Config Descriptor
533
            cls._read_extended_descriptor(esds_atom)
1✔
534
            esds_atom.seek(9, os.SEEK_CUR)
1✔
535
            avg_br = struct.unpack('>I', esds_atom.read(4))[0] / 1000  # kbit/s
1✔
536
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br}
1✔
537

538
        @classmethod
1✔
539
        def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]:
1✔
540
            # https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt
541
            alac_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
542
            alac_atom = io.BytesIO(data[36:36 + alac_atom_size])
1✔
543
            alac_atom.seek(9, os.SEEK_CUR)
1✔
544
            bitdepth = struct.unpack('b', alac_atom.read(1))[0]
1✔
545
            alac_atom.seek(3, os.SEEK_CUR)
1✔
546
            channels = struct.unpack('b', alac_atom.read(1))[0]
1✔
547
            alac_atom.seek(6, os.SEEK_CUR)
1✔
548
            avg_br = struct.unpack('>I', alac_atom.read(4))[0] / 1000  # kbit/s
1✔
549
            sr = struct.unpack('>I', alac_atom.read(4))[0]
1✔
550
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br, 'bitdepth': bitdepth}
1✔
551

552
        @classmethod
1✔
553
        def _parse_mvhd(cls, data: bytes) -> dict[str, float]:
1✔
554
            # http://stackoverflow.com/a/3639993/1191373
555
            walker = io.BytesIO(data)
1✔
556
            version = struct.unpack('b', walker.read(1))[0]
1✔
557
            walker.seek(3, os.SEEK_CUR)  # jump over flags
1✔
558
            if version == 0:  # uses 32 bit integers for timestamps
1✔
559
                walker.seek(8, os.SEEK_CUR)  # jump over create & mod times
1✔
560
                time_scale = struct.unpack('>I', walker.read(4))[0]
1✔
561
                duration = struct.unpack('>I', walker.read(4))[0]
1✔
562
            else:  # version == 1:  # uses 64 bit integers for timestamps
UNCOV
563
                walker.seek(16, os.SEEK_CUR)  # jump over create & mod times
×
UNCOV
564
                time_scale = struct.unpack('>I', walker.read(4))[0]
×
UNCOV
565
                duration = struct.unpack('>q', walker.read(8))[0]
×
566
            return {'duration': duration / time_scale}
1✔
567

568
    # The parser tree: Each key is an atom name which is traversed if existing.
569
    # Leaves of the parser tree are callables which receive the atom data.
570
    # callables return {fieldname: value} which is updates the TinyTag.
571
    _META_DATA_TREE = {b'moov': {b'udta': {b'meta': {b'ilst': {
1✔
572
        # see: http://atomicparsley.sourceforge.net/mpeg-4files.html
573
        # and: https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093
574
        b'\xa9ART': {b'data': _Parser._make_data_atom_parser('artist')},
575
        b'\xa9alb': {b'data': _Parser._make_data_atom_parser('album')},
576
        b'\xa9cmt': {b'data': _Parser._make_data_atom_parser('comment')},
577
        b'\xa9con': {b'data': _Parser._make_data_atom_parser('extra.conductor')},
578
        # need test-data for this
579
        # b'cpil':   {b'data': _Parser._make_data_atom_parser('extra.compilation')},
580
        b'\xa9day': {b'data': _Parser._make_data_atom_parser('year')},
581
        b'\xa9des': {b'data': _Parser._make_data_atom_parser('extra.description')},
582
        b'\xa9dir': {b'data': _Parser._make_data_atom_parser('extra.director')},
583
        b'\xa9gen': {b'data': _Parser._make_data_atom_parser('genre')},
584
        b'\xa9lyr': {b'data': _Parser._make_data_atom_parser('extra.lyrics')},
585
        b'\xa9mvn': {b'data': _Parser._make_data_atom_parser('movement')},
586
        b'\xa9nam': {b'data': _Parser._make_data_atom_parser('title')},
587
        b'\xa9pub': {b'data': _Parser._make_data_atom_parser('extra.publisher')},
588
        b'\xa9too': {b'data': _Parser._make_data_atom_parser('extra.encoded_by')},
589
        b'\xa9wrt': {b'data': _Parser._make_data_atom_parser('composer')},
590
        b'aART': {b'data': _Parser._make_data_atom_parser('albumartist')},
591
        b'cprt': {b'data': _Parser._make_data_atom_parser('extra.copyright')},
592
        b'desc': {b'data': _Parser._make_data_atom_parser('extra.description')},
593
        b'disk': {b'data': _Parser._make_number_parser('disc', 'disc_total')},
594
        b'gnre': {b'data': _Parser._parse_id3v1_genre},
595
        b'trkn': {b'data': _Parser._make_number_parser('track', 'track_total')},
596
        b'tmpo': {b'data': _Parser._make_data_atom_parser('extra.bpm')},
597
        b'covr': {b'data': _Parser._make_data_atom_parser('images.front_cover')},
598
        b'----': _Parser._parse_custom_field,
599
    }}}}}
600

601
    # see: https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html
602
    _AUDIO_DATA_TREE = {
1✔
603
        b'moov': {
604
            b'mvhd': _Parser._parse_mvhd,
605
            b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": {
606
                b'mp4a': _Parser._parse_audio_sample_entry_mp4a,
607
                b'alac': _Parser._parse_audio_sample_entry_alac
608
            }}}}}
609
        }
610
    }
611

612
    _VERSIONED_ATOMS = {b'meta', b'stsd'}  # those have an extra 4 byte header
1✔
613
    _FLAGGED_ATOMS = {b'stsd'}  # these also have an extra 4 byte header
1✔
614

615
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
616
        self._traverse_atoms(fh, path=self._AUDIO_DATA_TREE)
1✔
617

618
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
619
        self._traverse_atoms(fh, path=self._META_DATA_TREE)
1✔
620

621
    def _traverse_atoms(self, fh: BinaryIO, path: dict[bytes, Any],
1✔
622
                        stop_pos: int | None = None,
623
                        curr_path: list[bytes] | None = None) -> None:
624
        header_size = 8
1✔
625
        atom_header = fh.read(header_size)
1✔
626
        while len(atom_header) == header_size:
1✔
627
            atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
628
            atom_type = atom_header[4:]
1✔
629
            if curr_path is None:  # keep track how we traversed in the tree
1✔
630
                curr_path = [atom_type]
1✔
631
            if atom_size <= 0:  # empty atom, jump to next one
1✔
632
                atom_header = fh.read(header_size)
1✔
633
                continue
1✔
634
            if DEBUG:
1✔
635
                print(f'{" " * 4 * len(curr_path)} pos: {fh.tell() - header_size} '
1✔
636
                      f'atom: {atom_type!r} len: {atom_size + header_size}')
637
            if atom_type in self._VERSIONED_ATOMS:  # jump atom version for now
1✔
638
                fh.seek(4, os.SEEK_CUR)
1✔
639
            if atom_type in self._FLAGGED_ATOMS:  # jump atom flags for now
1✔
640
                fh.seek(4, os.SEEK_CUR)
1✔
641
            sub_path = path.get(atom_type, None)
1✔
642
            # if the path leaf is a dict, traverse deeper into the tree:
643
            if isinstance(sub_path, dict):
1✔
644
                atom_end_pos = fh.tell() + atom_size
1✔
645
                self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos,
1✔
646
                                     curr_path=curr_path + [atom_type])
647
            # if the path-leaf is a callable, call it on the atom data
648
            elif callable(sub_path):
1✔
649
                for fieldname, value in sub_path(fh.read(atom_size)).items():
1✔
650
                    if DEBUG:
1✔
651
                        print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
1✔
652
                    if fieldname.startswith('images.'):
1✔
653
                        if self._load_image:
1✔
654
                            self._set_image_field(fieldname[len('images.'):], value)
1✔
655
                    elif fieldname:
1✔
656
                        self._set_field(fieldname, value)
1✔
657
            # if no action was specified using dict or callable, jump over atom
658
            else:
659
                fh.seek(atom_size, os.SEEK_CUR)
1✔
660
            # check if we have reached the end of this branch:
661
            if stop_pos and fh.tell() >= stop_pos:
1✔
662
                return  # return to parent (next parent node in tree)
1✔
663
            atom_header = fh.read(header_size)  # read next atom
1✔
664

665

666
class _ID3(TinyTag):
1✔
667
    _ID3_MAPPING = {
1✔
668
        # Mapping from Frame ID to a field of the TinyTag
669
        # https://exiftool.org/TagNames/ID3.html
670
        'COMM': 'comment', 'COM': 'comment',
671
        'TRCK': 'track', 'TRK': 'track',
672
        'TYER': 'year', 'TYE': 'year', 'TDRC': 'year',
673
        'TALB': 'album', 'TAL': 'album',
674
        'TPE1': 'artist', 'TP1': 'artist',
675
        'TIT2': 'title', 'TT2': 'title',
676
        'TCON': 'genre', 'TCO': 'genre',
677
        'TPOS': 'disc', 'TPA': 'disc',
678
        'TPE2': 'albumartist', 'TP2': 'albumartist',
679
        'TCOM': 'composer', 'TCM': 'composer',
680
        'WOAR': 'extra.url', 'WAR': 'extra.url',
681
        'TSRC': 'extra.isrc', 'TRC': 'extra.isrc',
682
        'TCOP': 'extra.copyright', 'TCR': 'extra.copyright',
683
        'TBPM': 'extra.bpm', 'TBP': 'extra.bpm',
684
        'TKEY': 'extra.initial_key', 'TKE': 'extra.initial_key',
685
        'TLAN': 'extra.language', 'TLA': 'extra.language',
686
        'TPUB': 'extra.publisher', 'TPB': 'extra.publisher',
687
        'USLT': 'extra.lyrics', 'ULT': 'extra.lyrics',
688
        'TPE3': 'extra.conductor', 'TP3': 'extra.conductor',
689
        'TEXT': 'extra.lyricist', 'TXT': 'extra.lyricist',
690
        'TSST': 'extra.set_subtitle',
691
        'TENC': 'extra.encoded_by', 'TEN': 'extra.encoded_by',
692
        'TSSE': 'extra.encoder_settings', 'TSS': 'extra.encoder_settings',
693
        'TMED': 'extra.media', 'TMT': 'extra.media',
694
    }
695
    _IMAGE_FRAME_IDS = {'APIC', 'PIC'}
1✔
696
    _CUSTOM_FRAME_IDS = {'TXXX', 'TXX'}
1✔
697
    _DISALLOWED_FRAME_IDS = {'PRIV', 'RGAD', 'GEOB', 'GEO', 'ÿû°d'}
1✔
698
    _MAX_ESTIMATION_SEC = 30.0
1✔
699
    _CBR_DETECTION_FRAME_COUNT = 5
1✔
700
    _USE_XING_HEADER = True  # much faster, but can be deactivated for testing
1✔
701

702
    _ID3V1_GENRES = (
1✔
703
        'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco',
704
        'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies',
705
        'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial',
706
        'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack',
707
        'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion',
708
        'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game',
709
        'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk',
710
        'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock',
711
        'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic',
712
        'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult',
713
        'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle',
714
        'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave',
715
        'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz',
716
        'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock',
717

718
        # Wimamp Extended Genres
719
        'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob',
720
        'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock',
721
        'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock',
722
        'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech',
723
        'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', 'Booty Bass',
724
        'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', 'Tango', 'Samba',
725
        'Folklore', 'Ballad', 'Power Ballad', 'Rhythmic Soul', 'Freestyle',
726
        'Duet', 'Punk Rock', 'Drum Solo', 'A capella', 'Euro-House',
727
        'Dance Hall', 'Goa', 'Drum & Bass',
728

729
        # according to https://de.wikipedia.org/wiki/Liste_der_ID3v1-Genres:
730
        'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop',
731
        '',  # don't use ethnic slur ("Negerpunk", WTF!)
732
        'Polsk Punk', 'Beat', 'Christian Gangsta Rap', 'Heavy Metal',
733
        'Black Metal', 'Contemporary Christian', 'Christian Rock',
734
        # WinAmp 1.91
735
        'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop',
736
        # WinAmp 5.6
737
        'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat',
738
        'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro',
739
        'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient',
740
        'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge',
741
        'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock',
742
        'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music',
743
        'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle',
744
        'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', 'Psybient',
745
    )
746
    _ID3V2_2_IMAGE_FORMATS = {
1✔
747
        'bmp': 'image/bmp',
748
        'jpg': 'image/jpeg',
749
        'png': 'image/png',
750
    }
751
    _IMAGE_TYPES = (
1✔
752
        'other',
753
        'extra.icon',
754
        'extra.other_icon',
755
        'front_cover',
756
        'back_cover',
757
        'leaflet',
758
        'media',
759
        'extra.lead_artist',
760
        'extra.artist',
761
        'extra.conductor',
762
        'extra.band',
763
        'extra.composer',
764
        'extra.lyricist',
765
        'extra.recording_location',
766
        'extra.during_recording',
767
        'extra.during_performance',
768
        'extra.video',
769
        'extra.bright_colored_fish',
770
        'extra.illustration',
771
        'extra.band_logo',
772
        'extra.publisher_logo',
773
    )
774
    _UNKNOWN_IMAGE_TYPE = 'extra.unknown'
1✔
775

776
    # see this page for the magic values used in mp3:
777
    # http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
778
    _SAMPLE_RATES = (
1✔
779
        (11025, 12000, 8000),   # MPEG 2.5
780
        (0, 0, 0),              # reserved
781
        (22050, 24000, 16000),  # MPEG 2
782
        (44100, 48000, 32000),  # MPEG 1
783
    )
784
    _V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0)
1✔
785
    _V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0)
1✔
786
    _V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0)
1✔
787
    _V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0)
1✔
788
    _V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
1✔
789
    _V2L3 = _V2L2
1✔
790
    _NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
1✔
791
    _BITRATE_BY_VERSION_BY_LAYER = (
1✔
792
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2.5  # note that the layers go
793
        (_NONE, _NONE, _NONE, _NONE),  # reserved          # from 3 to 1 by design.
794
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2    # the first layer id is
795
        (_NONE, _V1L3, _V1L2, _V1L1),  # MPEG Version 1    # reserved
796
    )
797
    _SAMPLES_PER_FRAME = 1152  # the default frame size for mp3
1✔
798
    _CHANNELS_PER_CHANNEL_MODE = (
1✔
799
        2,  # 00 Stereo
800
        2,  # 01 Joint stereo (Stereo)
801
        2,  # 10 Dual channel (2 mono channels)
802
        1,  # 11 Single channel (Mono)
803
    )
804

805
    def __init__(self) -> None:
1✔
806
        super().__init__()
1✔
807
        # save position after the ID3 tag for duration measurement speedup
808
        self._bytepos_after_id3v2 = -1
1✔
809

810
    @staticmethod
1✔
811
    def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]:
1✔
812
        # see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip
813
        fh.seek(4, os.SEEK_CUR)  # read over Xing header
1✔
814
        header_flags = struct.unpack('>i', fh.read(4))[0]
1✔
815
        frames = byte_count = 0
1✔
816
        if header_flags & 1:  # FRAMES FLAG
1✔
817
            frames = struct.unpack('>i', fh.read(4))[0]
1✔
818
        if header_flags & 2:  # BYTES FLAG
1✔
819
            byte_count = struct.unpack('>i', fh.read(4))[0]
1✔
820
        if header_flags & 4:  # TOC FLAG
1✔
821
            fh.seek(100, os.SEEK_CUR)
1✔
822
        if header_flags & 8:  # VBR SCALE FLAG
1✔
823
            fh.seek(4, os.SEEK_CUR)
1✔
824
        return frames, byte_count
1✔
825

826
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
827
        # if tag reading was disabled, find start position of audio data
828
        if self._bytepos_after_id3v2 == -1:
1✔
829
            self._parse_id3v2_header(fh)
1✔
830

831
        max_estimation_frames = (_ID3._MAX_ESTIMATION_SEC * 44100) // _ID3._SAMPLES_PER_FRAME
1✔
832
        frame_size_accu = 0
1✔
833
        audio_offset = 0
1✔
834
        header_bytes = 4
1✔
835
        frames = 0  # count frames for determining mp3 duration
1✔
836
        bitrate_accu = 0    # add up bitrates to find average bitrate to detect
1✔
837
        last_bitrates = []  # CBR mp3s (multiple frames with same bitrates)
1✔
838
        # seek to first position after id3 tag (speedup for large header)
839
        fh.seek(self._bytepos_after_id3v2)
1✔
840
        file_offset = fh.tell()
1✔
841
        walker = io.BytesIO(fh.read())
1✔
842
        while True:
1✔
843
            # reading through garbage until 11 '1' sync-bits are found
844
            b = walker.read()
1✔
845
            walker.seek(-len(b), os.SEEK_CUR)
1✔
846
            if len(b) < 4:
1✔
847
                if frames:
1✔
848
                    self.bitrate = bitrate_accu / frames
1✔
849
                break  # EOF
1✔
850
            _sync, conf, bitrate_freq, rest = struct.unpack('BBBB', b[0:4])
1✔
851
            br_id = (bitrate_freq >> 4) & 0x0F  # biterate id
1✔
852
            sr_id = (bitrate_freq >> 2) & 0x03  # sample rate id
1✔
853
            padding = 1 if bitrate_freq & 0x02 > 0 else 0
1✔
854
            mpeg_id = (conf >> 3) & 0x03
1✔
855
            layer_id = (conf >> 1) & 0x03
1✔
856
            channel_mode = (rest >> 6) & 0x03
1✔
857
            # check for eleven 1s, validate bitrate and sample rate
858
            if (not b[:2] > b'\xFF\xE0' or br_id > 14 or br_id == 0 or sr_id == 3
1✔
859
                    or layer_id == 0 or mpeg_id == 1):  # noqa
860
                idx = b.find(b'\xFF', 1)  # invalid frame, find next sync header
1✔
861
                if idx == -1:
1✔
862
                    idx = len(b)  # not found: jump over the current peek buffer
1✔
863
                walker.seek(max(idx, 1), os.SEEK_CUR)
1✔
864
                continue
1✔
865
            self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode]
1✔
866
            frame_bitrate = self._BITRATE_BY_VERSION_BY_LAYER[mpeg_id][layer_id][br_id]
1✔
867
            self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id]
1✔
868
            # There might be a xing header in the first frame that contains
869
            # all the info we need, otherwise parse multiple frames to find the
870
            # accurate average bitrate
871
            if frames == 0 and self._USE_XING_HEADER:
1✔
872
                xing_header_offset = b.find(b'Xing')
1✔
873
                if xing_header_offset != -1:
1✔
874
                    walker.seek(xing_header_offset, os.SEEK_CUR)
1✔
875
                    xframes, byte_count = self._parse_xing_header(walker)
1✔
876
                    if xframes > 0 and byte_count > 0:
1✔
877
                        # MPEG-2 Audio Layer III uses 576 samples per frame
878
                        samples_per_frame = 576 if mpeg_id <= 2 else self._SAMPLES_PER_FRAME
1✔
879
                        self.duration = duration = xframes * samples_per_frame / samplerate
1✔
880
                        # self.duration = (xframes * self._SAMPLES_PER_FRAME / samplerate
881
                        #                  / self.channels)  # noqa
882
                        self.bitrate = byte_count * 8 / duration / 1000
1✔
883
                        return
1✔
UNCOV
884
                    continue
×
885

886
            frames += 1  # it's most probably an mp3 frame
1✔
887
            bitrate_accu += frame_bitrate
1✔
888
            if frames == 1:
1✔
889
                audio_offset = file_offset + walker.tell()
1✔
890
            if frames <= self._CBR_DETECTION_FRAME_COUNT:
1✔
891
                last_bitrates.append(frame_bitrate)
1✔
892
            walker.seek(4, os.SEEK_CUR)  # jump over peeked bytes
1✔
893

894
            frame_length = (144000 * frame_bitrate) // samplerate + padding
1✔
895
            frame_size_accu += frame_length
1✔
896
            # if bitrate does not change over time its probably CBR
897
            is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT and len(set(last_bitrates)) == 1)
1✔
898
            if frames == max_estimation_frames or is_cbr:
1✔
899
                # try to estimate duration
900
                fh.seek(-128, 2)  # jump to last byte (leaving out id3v1 tag)
1✔
901
                audio_stream_size = fh.tell() - audio_offset
1✔
902
                est_frame_count = audio_stream_size / (frame_size_accu / frames)
1✔
903
                samples = est_frame_count * self._SAMPLES_PER_FRAME
1✔
904
                self.duration = samples / samplerate
1✔
905
                self.bitrate = bitrate_accu / frames
1✔
906
                return
1✔
907

908
            if frame_length > 1:  # jump over current frame body
1✔
909
                walker.seek(frame_length - header_bytes, os.SEEK_CUR)
1✔
910
        if self.samplerate:
1✔
911
            self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate
1✔
912

913
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
914
        self._parse_id3v2(fh)
1✔
915
        if self.filesize > 128:
1✔
916
            fh.seek(-128, os.SEEK_END)  # try parsing id3v1 in last 128 bytes
1✔
917
            self._parse_id3v1(fh)
1✔
918

919
    def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]:
1✔
920
        size = major = 0
1✔
921
        extended = False
1✔
922
        # for info on the specs, see: http://id3.org/Developer%20Information
923
        header = struct.unpack('3sBBB4B', fh.read(10))
1✔
924
        tag = header[0].decode('ISO-8859-1', 'replace')
1✔
925
        # check if there is an ID3v2 tag at the beginning of the file
926
        if tag == 'ID3':
1✔
927
            major, _rev = header[1:3]
1✔
928
            if DEBUG:
1✔
929
                print(f'Found id3 v2.{major}')
1✔
930
            # unsync = (header[3] & 0x80) > 0
931
            extended = (header[3] & 0x40) > 0
1✔
932
            # experimental = (header[3] & 0x20) > 0
933
            # footer = (header[3] & 0x10) > 0
934
            size = self._calc_size(header[4:8], 7)
1✔
935
        self._bytepos_after_id3v2 = size
1✔
936
        return size, extended, major
1✔
937

938
    def _parse_id3v2(self, fh: BinaryIO) -> None:
1✔
939
        size, extended, major = self._parse_id3v2_header(fh)
1✔
940
        if size:
1✔
941
            end_pos = fh.tell() + size
1✔
942
            parsed_size = 0
1✔
943
            if extended:  # just read over the extended header.
1✔
944
                size_bytes = struct.unpack('4B', fh.read(6)[0:4])
1✔
945
                extd_size = self._calc_size(size_bytes, 7)
1✔
946
                fh.seek(extd_size - 6, os.SEEK_CUR)  # jump over extended_header
1✔
947
            while parsed_size < size:
1✔
948
                frame_size = self._parse_frame(fh, id3version=major)
1✔
949
                if frame_size == 0:
1✔
950
                    break
1✔
951
                parsed_size += frame_size
1✔
952
            fh.seek(end_pos, os.SEEK_SET)
1✔
953

954
    def _parse_id3v1(self, fh: BinaryIO) -> None:
1✔
955
        if fh.read(3) != b'TAG':  # check if this is an ID3 v1 tag
1✔
956
            return
1✔
957

958
        def asciidecode(x: bytes) -> str:
1✔
959
            return self._unpad(x.decode(self._default_encoding or 'latin1', 'replace'))
1✔
960
        # Only set fields that were not set by ID3v2 tags, as ID3v1
961
        # tags are more likely to be outdated or have encoding issues
962
        fields = fh.read(30 + 30 + 30 + 4 + 30 + 1)
1✔
963
        if not self.title:
1✔
964
            self._set_field('title', asciidecode(fields[:30]))
1✔
965
        if not self.artist:
1✔
966
            self._set_field('artist', asciidecode(fields[30:60]))
1✔
967
        if not self.album:
1✔
968
            self._set_field('album', asciidecode(fields[60:90]))
1✔
969
        if not self.year:
1✔
970
            self._set_field('year', asciidecode(fields[90:94]))
1✔
971
        comment = fields[94:124]
1✔
972
        if b'\x00\x00' < comment[-2:] < b'\x01\x00':
1✔
973
            if self.track is None:
1✔
974
                self._set_field('track', ord(comment[-1:]))
1✔
975
            comment = comment[:-2]
1✔
976
        if not self.comment:
1✔
977
            self._set_field('comment', asciidecode(comment))
1✔
978
        if not self.genre:
1✔
979
            genre_id = ord(fields[124:125])
1✔
980
            if genre_id < len(self._ID3V1_GENRES):
1✔
981
                self._set_field('genre', self._ID3V1_GENRES[genre_id])
1✔
982

983
    def __parse_custom_field(self, content: str) -> bool:
1✔
984
        custom_field_name, separator, value = content.partition('\x00')
1✔
985
        if custom_field_name and separator:
1✔
986
            self._set_field(self._EXTRA_PREFIX + custom_field_name.lower(), value.lstrip('\ufeff'))
1✔
987
            return True
1✔
988
        return False
1✔
989

990
    @classmethod
1✔
991
    def _create_tag_image(cls, data: bytes, pic_type: int, mime_type: str | None = None,
1✔
992
                          description: str | None = None) -> tuple[str, TagImage]:
993
        field_name = cls._UNKNOWN_IMAGE_TYPE
1✔
994
        if 0 <= pic_type <= len(cls._IMAGE_TYPES):
1✔
995
            field_name = cls._IMAGE_TYPES[pic_type]
1✔
996
        image = TagImage(field_name, data)
1✔
997
        if mime_type:
1✔
998
            image.mime_type = mime_type
1✔
999
        if description:
1✔
1000
            image.description = description
1✔
1001
        return field_name, image
1✔
1002

1003
    @staticmethod
1✔
1004
    def _index_utf16(s: bytes, search: bytes) -> int:
1✔
1005
        for i in range(0, len(s), len(search)):
1✔
1006
            if s[i:i + len(search)] == search:
1✔
1007
                return i
1✔
UNCOV
1008
        return -1
×
1009

1010
    def _parse_frame(self, fh: BinaryIO, id3version: int | None = None) -> int:
1✔
1011
        # ID3v2.2 especially ugly. see: http://id3.org/id3v2-00
1012
        frame_header_size = 6 if id3version == 2 else 10
1✔
1013
        frame_size_bytes = 3 if id3version == 2 else 4
1✔
1014
        binformat = '3s3B' if id3version == 2 else '4s4B2B'
1✔
1015
        bits_per_byte = 7 if id3version == 4 else 8  # only id3v2.4 is synchsafe
1✔
1016
        frame_header_data = fh.read(frame_header_size)
1✔
1017
        if len(frame_header_data) != frame_header_size:
1✔
1018
            return 0
1✔
1019
        frame = struct.unpack(binformat, frame_header_data)
1✔
1020
        frame_id = self._decode_string(frame[0])
1✔
1021
        frame_size = self._calc_size(frame[1:1 + frame_size_bytes], bits_per_byte)
1✔
1022
        if DEBUG:
1✔
1023
            print(f'Found id3 Frame {frame_id} at {fh.tell()}-{fh.tell() + frame_size} '
1✔
1024
                  f'of {self.filesize}')
1025
        if frame_size > 0:
1✔
1026
            # flags = frame[1+frame_size_bytes:] # dont care about flags.
1027
            content = fh.read(frame_size)
1✔
1028
            fieldname = self._ID3_MAPPING.get(frame_id)
1✔
1029
            should_set_field = True
1✔
1030
            if fieldname:
1✔
1031
                if not self._parse_tags:
1✔
1032
                    return frame_size
1✔
1033
                language = fieldname in {'comment', 'extra.lyrics'}
1✔
1034
                value = self._decode_string(content, language)
1✔
1035
                if fieldname == "comment":
1✔
1036
                    # check if comment is a key-value pair (used by iTunes)
1037
                    should_set_field = not self.__parse_custom_field(value)
1✔
1038
                elif fieldname in {'track', 'disc'}:
1✔
1039
                    if '/' in value:
1✔
1040
                        value, total = value.split('/')[:2]
1✔
1041
                        if total.isdecimal():
1✔
1042
                            self._set_field(f'{fieldname}_total', int(total))
1✔
1043
                    if value.isdecimal():
1✔
1044
                        self._set_field(fieldname, int(value))
1✔
1045
                    should_set_field = False
1✔
1046
                elif fieldname == 'genre':
1✔
1047
                    genre_id = 255
1✔
1048
                    # funky: id3v1 genre hidden in a id3v2 field
1049
                    if value.isdecimal():
1✔
1050
                        genre_id = int(value)
1✔
1051
                    # funkier: the TCO may contain genres in parens, e.g. '(13)'
1052
                    elif value[:1] == '(':
1✔
1053
                        end_pos = value.find(')')
1✔
1054
                        parens_text = value[1:end_pos]
1✔
1055
                        if end_pos > 0 and parens_text.isdecimal():
1✔
1056
                            genre_id = int(parens_text)
1✔
1057
                    if 0 <= genre_id < len(_ID3._ID3V1_GENRES):
1✔
1058
                        value = _ID3._ID3V1_GENRES[genre_id]
1✔
1059
                if should_set_field:
1✔
1060
                    self._set_field(fieldname, value)
1✔
1061
            elif frame_id in self._CUSTOM_FRAME_IDS:
1✔
1062
                # custom fields
1063
                if self._parse_tags:
1✔
1064
                    self.__parse_custom_field(self._decode_string(content))
1✔
1065
            elif frame_id in self._IMAGE_FRAME_IDS:
1✔
1066
                if self._load_image:
1✔
1067
                    # See section 4.14: http://id3.org/id3v2.4.0-frames
1068
                    encoding = content[0:1]
1✔
1069
                    if frame_id == 'PIC':  # ID3 v2.2:
1✔
1070
                        imgformat = self._decode_string(content[1:4]).lower()
1✔
1071
                        mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat)
1✔
1072
                        desc_start_pos = 1 + 3 + 1  # skip encoding (1), imgformat (3), pictype(1)
1✔
1073
                    else:  # ID3 v2.3+
1074
                        mime_type_end_pos = content.index(b'\x00', 1)
1✔
1075
                        mime_type = self._decode_string(content[1:mime_type_end_pos]).lower()
1✔
1076
                        if mime_type in self._ID3V2_2_IMAGE_FORMATS:  # ID3 v2.2 format in v2.3...
1✔
1077
                            mime_type = self._ID3V2_2_IMAGE_FORMATS[mime_type]
1✔
1078
                        desc_start_pos = mime_type_end_pos + 1 + 1  # skip mtype, pictype(1)
1✔
1079
                    pic_type = content[desc_start_pos - 1]
1✔
1080
                    # latin1 and utf-8 are 1 byte
1081
                    termination = b'\x00' if encoding in {b'\x00', b'\x03'} else b'\x00\x00'
1✔
1082
                    desc_length = self._index_utf16(content[desc_start_pos:], termination)
1✔
1083
                    desc_end_pos = desc_start_pos + desc_length + len(termination)
1✔
1084
                    description = self._decode_string(content[desc_start_pos:desc_end_pos])
1✔
1085
                    field_name, image = self._create_tag_image(
1✔
1086
                        content[desc_end_pos:], pic_type, mime_type, description)
1087
                    self._set_image_field(field_name, image)
1✔
1088
            elif frame_id not in self._DISALLOWED_FRAME_IDS:
1✔
1089
                # unknown, try to add to extra dict
1090
                if self._parse_tags:
1✔
1091
                    self._set_field(
1✔
1092
                        self._EXTRA_PREFIX + frame_id.lower(), self._decode_string(content))
1093
            return frame_size
1✔
1094
        return 0
1✔
1095

1096
    def _decode_string(self, bytestr: bytes, language: bool = False) -> str:
1✔
1097
        default_encoding = 'ISO-8859-1'
1✔
1098
        if self._default_encoding:
1✔
1099
            default_encoding = self._default_encoding
1✔
1100
        # it's not my fault, this is the spec.
1101
        first_byte = bytestr[:1]
1✔
1102
        if first_byte == b'\x00':  # ISO-8859-1
1✔
1103
            bytestr = bytestr[1:]
1✔
1104
            encoding = default_encoding
1✔
1105
        elif first_byte == b'\x01':  # UTF-16 with BOM
1✔
1106
            bytestr = bytestr[1:]
1✔
1107
            # remove language (but leave BOM)
1108
            if language:
1✔
1109
                if bytestr[3:5] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1110
                    bytestr = bytestr[3:]
1✔
1111
                if bytestr[:3].isalpha():
1✔
1112
                    bytestr = bytestr[3:]  # remove language
1✔
1113
                bytestr = bytestr.lstrip(b'\x00')  # strip optional additional null bytes
1✔
1114
            # read byte order mark to determine endianness
1115
            encoding = 'UTF-16be' if bytestr[0:2] == b'\xfe\xff' else 'UTF-16le'
1✔
1116
            # strip the bom if it exists
1117
            if bytestr[:2] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1118
                bytestr = bytestr[2:] if len(bytestr) % 2 == 0 else bytestr[2:-1]
1✔
1119
            # remove ADDITIONAL EXTRA BOM :facepalm:
1120
            if bytestr[:4] == b'\x00\x00\xff\xfe':
1✔
1121
                bytestr = bytestr[4:]
1✔
1122
        elif first_byte == b'\x02':  # UTF-16LE
1✔
1123
            # strip optional null byte, if byte count uneven
UNCOV
1124
            bytestr = bytestr[1:-1] if len(bytestr) % 2 == 0 else bytestr[1:]
×
UNCOV
1125
            encoding = 'UTF-16le'
×
1126
        elif first_byte == b'\x03':  # UTF-8
1✔
1127
            bytestr = bytestr[1:]
1✔
1128
            encoding = 'UTF-8'
1✔
1129
        else:
1130
            encoding = default_encoding  # wild guess
1✔
1131
        if language and bytestr[:3].isalpha():
1✔
1132
            bytestr = bytestr[3:]  # remove language
1✔
1133
        return self._unpad(bytestr.decode(encoding, 'replace'))
1✔
1134

1135
    @staticmethod
1✔
1136
    def _calc_size(bytestr: tuple[int, ...], bits_per_byte: int) -> int:
1✔
1137
        # length of some mp3 header fields is described by 7 or 8-bit-bytes
1138
        return reduce(lambda accu, elem: (accu << bits_per_byte) + elem, bytestr, 0)
1✔
1139

1140

1141
class _Ogg(TinyTag):
1✔
1142
    _VORBIS_MAPPING = {
1✔
1143
        'album': 'album',
1144
        'albumartist': 'albumartist',
1145
        'title': 'title',
1146
        'artist': 'artist',
1147
        'author': 'artist',
1148
        'date': 'year',
1149
        'tracknumber': 'track',
1150
        'tracktotal': 'track_total',
1151
        'totaltracks': 'track_total',
1152
        'discnumber': 'disc',
1153
        'disctotal': 'disc_total',
1154
        'totaldiscs': 'disc_total',
1155
        'genre': 'genre',
1156
        'description': 'comment',
1157
        'comment': 'comment',
1158
        'comments': 'comment',
1159
        'composer': 'composer',
1160
        'bpm': 'extra.bpm',
1161
        'copyright': 'extra.copyright',
1162
        'isrc': 'extra.isrc',
1163
        'lyrics': 'extra.lyrics',
1164
        'publisher': 'extra.publisher',
1165
        'language': 'extra.language',
1166
        'director': 'extra.director',
1167
        'website': 'extra.url',
1168
        'conductor': 'extra.conductor',
1169
        'lyricist': 'extra.lyricist',
1170
        'discsubtitle': 'extra.set_subtitle',
1171
        'setsubtitle': 'extra.set_subtitle',
1172
        'initialkey': 'extra.initial_key',
1173
        'key': 'extra.initial_key',
1174
        'encodedby': 'extra.encoded_by',
1175
        'encodersettings': 'extra.encoder_settings',
1176
        'media': 'extra.media',
1177
    }
1178

1179
    def __init__(self) -> None:
1✔
1180
        super().__init__()
1✔
1181
        self._max_samplenum = 0  # maximum sample position ever read
1✔
1182

1183
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1184
        max_page_size = 65536  # https://xiph.org/ogg/doc/libogg/ogg_page.html
1✔
1185
        if not self._tags_parsed:
1✔
1186
            self._parse_tag(fh)  # determine sample rate
1✔
1187
            fh.seek(0)           # and rewind to start
1✔
1188
        if self.duration is not None or not self.samplerate:
1✔
1189
            return  # either ogg flac or invalid file
1✔
1190
        if self.filesize > max_page_size:
1✔
1191
            fh.seek(-max_page_size, 2)  # go to last possible page position
1✔
1192
        while True:
1✔
1193
            file_offset = fh.tell()
1✔
1194
            b = fh.read()
1✔
1195
            if len(b) < 4:
1✔
UNCOV
1196
                return  # EOF
×
1197
            if b[:4] == b'OggS':  # look for an ogg header
1✔
1198
                fh.seek(file_offset)
1✔
1199
                for _ in self._parse_pages(fh):
1✔
1200
                    pass  # parse all remaining pages
1✔
1201
                self.duration = self._max_samplenum / self.samplerate
1✔
1202
                break
1✔
1203
            idx = b.find(b'OggS')  # try to find header in peeked data
1✔
1204
            if idx != -1:
1✔
1205
                fh.seek(file_offset + idx)
1✔
1206

1207
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1208
        check_flac_second_packet = False
1✔
1209
        check_speex_second_packet = False
1✔
1210
        for packet in self._parse_pages(fh):
1✔
1211
            walker = io.BytesIO(packet)
1✔
1212
            if packet[0:7] == b"\x01vorbis":
1✔
1213
                if self._parse_duration:
1✔
1214
                    (self.channels, self.samplerate, _max_bitrate, bitrate,
1✔
1215
                     _min_bitrate) = struct.unpack("<B4i", packet[11:28])
1216
                    self.bitrate = bitrate / 1000
1✔
1217
            elif packet[0:7] == b"\x03vorbis":
1✔
1218
                if self._parse_tags:
1✔
1219
                    walker.seek(7, os.SEEK_CUR)  # jump over header name
1✔
1220
                    self._parse_vorbis_comment(walker)
1✔
1221
            elif packet[0:8] == b'OpusHead':
1✔
1222
                if self._parse_duration:  # parse opus header
1✔
1223
                    # https://www.videolan.org/developers/vlc/modules/codec/opus_header.c
1224
                    # https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/structOpusHead.html
1225
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1226
                    (version, ch, _, _sr, _, _) = struct.unpack("<BBHIHB", walker.read(11))
1✔
1227
                    if (version & 0xF0) == 0:  # only major version 0 supported
1✔
1228
                        self.channels = ch
1✔
1229
                        self.samplerate = 48000  # internally opus always uses 48khz
1✔
1230
            elif packet[0:8] == b'OpusTags':
1✔
1231
                if self._parse_tags:  # parse opus metadata:
1✔
1232
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1233
                    self._parse_vorbis_comment(walker)
1✔
1234
            elif packet[0:5] == b'\x7fFLAC':
1✔
1235
                # https://xiph.org/flac/ogg_mapping.html
1236
                walker.seek(9, os.SEEK_CUR)  # jump over header name, version and number of headers
1✔
1237
                flactag = _Flac()
1✔
1238
                flactag._filehandler = walker
1✔
1239
                flactag.filesize = self.filesize
1✔
1240
                flactag._load(tags=self._parse_tags, duration=self._parse_duration,
1✔
1241
                              image=self._load_image)
1242
                self._update(flactag)
1✔
1243
                check_flac_second_packet = True
1✔
1244
            elif check_flac_second_packet:
1✔
1245
                # second packet contains FLAC metadata block
1246
                if self._parse_tags:
1✔
1247
                    meta_header = struct.unpack('B3B', walker.read(4))
1✔
1248
                    block_type = meta_header[0] & 0x7f
1✔
1249
                    if block_type == _Flac.METADATA_VORBIS_COMMENT:
1✔
1250
                        self._parse_vorbis_comment(walker)
1✔
1251
                check_flac_second_packet = False
1✔
1252
            elif packet[0:8] == b'Speex   ':
1✔
1253
                # https://speex.org/docs/manual/speex-manual/node8.html
1254
                if self._parse_duration:
1✔
1255
                    walker.seek(36, os.SEEK_CUR)  # jump over header name and irrelevant fields
1✔
1256
                    (self.samplerate, _, _, self.channels,
1✔
1257
                     self.bitrate) = struct.unpack("<5i", walker.read(20))
1258
                check_speex_second_packet = True
1✔
1259
            elif check_speex_second_packet:
1✔
1260
                if self._parse_tags:
1✔
1261
                    length = struct.unpack('I', walker.read(4))[0]  # starts with a comment string
1✔
1262
                    comment = walker.read(length).decode('utf-8', 'replace')
1✔
1263
                    self._set_field('comment', comment)
1✔
1264
                    self._parse_vorbis_comment(walker, contains_vendor=False)  # other tags
1✔
1265
                check_speex_second_packet = False
1✔
1266
            else:
1267
                if DEBUG:
1✔
1268
                    print('Unsupported Ogg page type: ', packet[:16], file=stderr)
1✔
1269
                break
1✔
1270
        self._tags_parsed = True
1✔
1271

1272
    def _parse_vorbis_comment(self, fh: BinaryIO, contains_vendor: bool = True) -> None:
1✔
1273
        # for the spec, see: http://xiph.org/vorbis/doc/v-comment.html
1274
        # discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment
1275
        # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html
1276
        if contains_vendor:
1✔
1277
            vendor_length = struct.unpack('I', fh.read(4))[0]
1✔
1278
            fh.seek(vendor_length, os.SEEK_CUR)  # jump over vendor
1✔
1279
        elements = struct.unpack('I', fh.read(4))[0]
1✔
1280
        for _i in range(elements):
1✔
1281
            length = struct.unpack('I', fh.read(4))[0]
1✔
1282
            keyvalpair = fh.read(length).decode('utf-8', 'replace')
1✔
1283
            if '=' in keyvalpair:
1✔
1284
                key, value = keyvalpair.split('=', 1)
1✔
1285
                key_lowercase = key.lower()
1✔
1286

1287
                if key_lowercase == "metadata_block_picture" and self._load_image:
1✔
1288
                    if DEBUG:
1✔
1289
                        print('Found Vorbis TagImage', key, value[:64])
1✔
1290
                    fieldname, fieldvalue = _Flac._parse_image(io.BytesIO(base64.b64decode(value)))
1✔
1291
                    self._set_image_field(fieldname, fieldvalue)
1✔
1292
                else:
1293
                    if DEBUG:
1✔
1294
                        print('Found Vorbis Comment', key, value[:64])
1✔
1295
                    fieldname = self._VORBIS_MAPPING.get(
1✔
1296
                        key_lowercase, self._EXTRA_PREFIX + key_lowercase)  # custom field
1297
                    if fieldname in {'track', 'disc', 'track_total', 'disc_total'}:
1✔
1298
                        if fieldname in {'track', 'disc'} and '/' in value:
1✔
1299
                            value, total = value.split('/')[:2]
1✔
1300
                            if total.isdecimal():
1✔
1301
                                self._set_field(f'{fieldname}_total', int(total))
1✔
1302
                        if value.isdecimal():
1✔
1303
                            self._set_field(fieldname, int(value))
1✔
1304
                    else:
1305
                        self._set_field(fieldname, value)
1✔
1306

1307
    def _parse_pages(self, fh: BinaryIO) -> Iterator[bytes]:
1✔
1308
        # for the spec, see: https://wiki.xiph.org/Ogg
1309
        previous_page = b''  # contains data from previous (continuing) pages
1✔
1310
        header_data = fh.read(27)  # read ogg page header
1✔
1311
        while len(header_data) == 27:
1✔
1312
            header = struct.unpack('<4sBBqIIiB', header_data)
1✔
1313
            # https://xiph.org/ogg/doc/framing.html
1314
            oggs, version, _flags, pos, _serial, _pageseq, _crc, segments = header
1✔
1315
            self._max_samplenum = max(self._max_samplenum, pos)
1✔
1316
            if oggs != b'OggS' or version != 0:
1✔
1317
                raise ParseError('Invalid OGG header')
1✔
1318
            segsizes = struct.unpack('B' * segments, fh.read(segments))
1✔
1319
            total = 0
1✔
1320
            for segsize in segsizes:  # read all segments
1✔
1321
                total += segsize
1✔
1322
                if total < 255:  # less than 255 bytes means end of page
1✔
1323
                    yield previous_page + fh.read(total)
1✔
1324
                    previous_page = b''
1✔
1325
                    total = 0
1✔
1326
            if total != 0:
1✔
1327
                if total % 255 == 0:
1✔
UNCOV
1328
                    previous_page += fh.read(total)
×
1329
                else:
1330
                    yield previous_page + fh.read(total)
1✔
1331
                    previous_page = b''
1✔
1332
            header_data = fh.read(27)
1✔
1333

1334

1335
class _Wave(TinyTag):
1✔
1336
    # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
1337
    _RIFF_MAPPING = {
1✔
1338
        b'INAM': 'title',
1339
        b'TITL': 'title',
1340
        b'IPRD': 'album',
1341
        b'IART': 'artist',
1342
        b'IBPM': 'extra.bpm',
1343
        b'ICMT': 'comment',
1344
        b'IMUS': 'composer',
1345
        b'ICOP': 'extra.copyright',
1346
        b'ICRD': 'year',
1347
        b'IGNR': 'genre',
1348
        b'ILNG': 'extra.language',
1349
        b'ISRC': 'extra.isrc',
1350
        b'IPUB': 'extra.publisher',
1351
        b'IPRT': 'track',
1352
        b'ITRK': 'track',
1353
        b'TRCK': 'track',
1354
        b'IBSU': 'extra.url',
1355
        b'YEAR': 'year',
1356
        b'IWRI': 'extra.lyricist',
1357
        b'IENC': 'extra.encoded_by',
1358
        b'IMED': 'extra.media',
1359
    }
1360

1361
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1362
        if not self._tags_parsed:
1✔
1363
            self._parse_tag(fh)
1✔
1364

1365
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1366
        # see: http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
1367
        # and: https://en.wikipedia.org/wiki/WAV
1368
        riff, _size, fformat = struct.unpack('4sI4s', fh.read(12))
1✔
1369
        if riff != b'RIFF' or fformat != b'WAVE':
1✔
1370
            raise ParseError('Invalid WAV header')
1✔
1371
        if self._parse_duration:
1✔
1372
            self.bitdepth = 16  # assume 16bit depth (CD quality)
1✔
1373
        chunk_header = fh.read(8)
1✔
1374
        while len(chunk_header) == 8:
1✔
1375
            subchunkid, subchunksize = struct.unpack('4sI', chunk_header)
1✔
1376
            subchunksize += subchunksize % 2  # IFF chunks are padded to an even number of bytes
1✔
1377
            if subchunkid == b'fmt ' and self._parse_duration:
1✔
1378
                _, channels, samplerate = struct.unpack('HHI', fh.read(8))
1✔
1379
                _, _, bitdepth = struct.unpack('<IHH', fh.read(8))
1✔
1380
                if bitdepth == 0:
1✔
1381
                    # Certain codecs (e.g. GSM 6.10) give us a bit depth of zero.
1382
                    # Avoid division by zero when calculating duration.
1383
                    bitdepth = 1
1✔
1384
                self.bitrate = samplerate * channels * bitdepth / 1000
1✔
1385
                self.channels, self.samplerate, self.bitdepth = channels, samplerate, bitdepth
1✔
1386
                remaining_size = subchunksize - 16
1✔
1387
                if remaining_size > 0:
1✔
1388
                    fh.seek(remaining_size, 1)  # skip remaining data in chunk
1✔
1389
            elif subchunkid == b'data' and self._parse_duration:
1✔
1390
                if (self.channels is not None and self.samplerate is not None
1✔
1391
                        and self.bitdepth is not None):
1392
                    self.duration = (
1✔
1393
                        subchunksize / self.channels / self.samplerate / (self.bitdepth / 8))
1394
                fh.seek(subchunksize, 1)
1✔
1395
            elif subchunkid == b'LIST' and self._parse_tags:
1✔
1396
                is_info = fh.read(4)  # check INFO header
1✔
1397
                if is_info != b'INFO':  # jump over non-INFO sections
1✔
UNCOV
1398
                    fh.seek(subchunksize - 4, os.SEEK_CUR)
×
1399
                else:
1400
                    sub_fh = io.BytesIO(fh.read(subchunksize - 4))
1✔
1401
                    field = sub_fh.read(4)
1✔
1402
                    while len(field) == 4:
1✔
1403
                        data_length = struct.unpack('I', sub_fh.read(4))[0]
1✔
1404
                        data_length += data_length % 2  # IFF chunks are padded to an even size
1✔
1405
                        data = sub_fh.read(data_length).split(b'\x00', 1)[0]  # strip zero-byte
1✔
1406
                        fieldname = self._RIFF_MAPPING.get(field)
1✔
1407
                        if fieldname:
1✔
1408
                            value = data.decode('utf-8', 'replace')
1✔
1409
                            if fieldname == 'track':
1✔
1410
                                if value.isdecimal():
1✔
1411
                                    self._set_field(fieldname, int(value))
1✔
1412
                            else:
1413
                                self._set_field(fieldname, value)
1✔
1414
                        field = sub_fh.read(4)
1✔
1415
            elif subchunkid in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1416
                id3 = _ID3()
1✔
1417
                id3._filehandler = fh
1✔
1418
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1419
                self._update(id3)
1✔
1420
            else:  # some other chunk, just skip the data
1421
                fh.seek(subchunksize, 1)
1✔
1422
            chunk_header = fh.read(8)
1✔
1423
        self._tags_parsed = True
1✔
1424

1425

1426
class _Flac(TinyTag):
1✔
1427
    METADATA_STREAMINFO = 0
1✔
1428
    METADATA_PADDING = 1
1✔
1429
    METADATA_APPLICATION = 2
1✔
1430
    METADATA_SEEKTABLE = 3
1✔
1431
    METADATA_VORBIS_COMMENT = 4
1✔
1432
    METADATA_CUESHEET = 5
1✔
1433
    METADATA_PICTURE = 6
1✔
1434

1435
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1436
        if not self._tags_parsed:
1✔
1437
            self._parse_tag(fh)
1✔
1438

1439
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1440
        id3 = None
1✔
1441
        header = fh.read(4)
1✔
1442
        if header[:3] == b'ID3':  # parse ID3 header if it exists
1✔
1443
            fh.seek(-4, os.SEEK_CUR)
1✔
1444
            id3 = _ID3()
1✔
1445
            id3._filehandler = fh
1✔
1446
            id3._parse_tags = self._parse_tags
1✔
1447
            id3._load_image = self._load_image
1✔
1448
            id3._parse_id3v2(fh)
1✔
1449
            header = fh.read(4)  # after ID3 should be fLaC
1✔
1450
        if header[:4] != b'fLaC':
1✔
1451
            raise ParseError('Invalid FLAC header')
1✔
1452
        # for spec, see https://xiph.org/flac/ogg_mapping.html
1453
        header_data = fh.read(4)
1✔
1454
        while len(header_data) == 4:
1✔
1455
            meta_header = struct.unpack('B3B', header_data)
1✔
1456
            block_type = meta_header[0] & 0x7f
1✔
1457
            is_last_block = meta_header[0] & 0x80
1✔
1458
            size = self._bytes_to_int(meta_header[1:4])
1✔
1459
            # http://xiph.org/flac/format.html#metadata_block_streaminfo
1460
            if block_type == self.METADATA_STREAMINFO and self._parse_duration:
1✔
1461
                stream_info_header = fh.read(size)
1✔
1462
                if len(stream_info_header) < 34:  # invalid streaminfo
1✔
1463
                    break
1✔
1464
                header_values = struct.unpack('HH3s3s8B16s', stream_info_header)
1✔
1465
                # From the xiph documentation:
1466
                # py | <bits>
1467
                # ----------------------------------------------
1468
                # H  | <16>  The minimum block size (in samples)
1469
                # H  | <16>  The maximum block size (in samples)
1470
                # 3s | <24>  The minimum frame size (in bytes)
1471
                # 3s | <24>  The maximum frame size (in bytes)
1472
                # 8B | <20>  Sample rate in Hz.
1473
                #    | <3>   (number of channels)-1.
1474
                #    | <5>   (bits per sample)-1.
1475
                #    | <36>  Total samples in stream.
1476
                # 16s| <128> MD5 signature
1477
                # min_blk, max_blk, min_frm, max_frm = header[0:4]
1478
                # min_frm = self._bytes_to_int(struct.unpack('3B', min_frm))
1479
                # max_frm = self._bytes_to_int(struct.unpack('3B', max_frm))
1480
                #                 channels--.  bits      total samples
1481
                # |----- samplerate -----| |-||----| |---------~   ~----|
1482
                # 0000 0000 0000 0000 0000 0000 0000 0000 0000      0000
1483
                # #---4---# #---5---# #---6---# #---7---# #--8-~   ~-12-#
1484
                self.samplerate = self._bytes_to_int(header_values[4:7]) >> 4
1✔
1485
                self.channels = ((header_values[6] >> 1) & 0x07) + 1
1✔
1486
                self.bitdepth = (
1✔
1487
                    ((header_values[6] & 1) << 4) + ((header_values[7] & 0xF0) >> 4) + 1)
1488
                total_sample_bytes = ((header_values[7] & 0x0F),) + header_values[8:12]
1✔
1489
                total_samples = self._bytes_to_int(total_sample_bytes)
1✔
1490
                self.duration = total_samples / self.samplerate
1✔
1491
                if self.duration > 0:
1✔
1492
                    self.bitrate = self.filesize / self.duration * 8 / 1000
1✔
1493
            elif block_type == self.METADATA_VORBIS_COMMENT and self._parse_tags:
1✔
1494
                oggtag = _Ogg()
1✔
1495
                oggtag._filehandler = fh
1✔
1496
                oggtag._parse_vorbis_comment(fh)
1✔
1497
                self._update(oggtag)
1✔
1498
            elif block_type == self.METADATA_PICTURE and self._load_image:
1✔
1499
                fieldname, value = self._parse_image(fh)
1✔
1500
                self._set_image_field(fieldname, value)
1✔
1501
            elif block_type >= 127:
1✔
UNCOV
1502
                break  # invalid block type
×
1503
            else:
1504
                if DEBUG:
1✔
1505
                    print('Unknown FLAC block type', block_type)
1✔
1506
                fh.seek(size, 1)  # seek over this block
1✔
1507

1508
            if is_last_block:
1✔
1509
                break
1✔
1510
            header_data = fh.read(4)
1✔
1511
        if id3 is not None:  # apply ID3 tags after vorbis
1✔
1512
            self._update(id3)
1✔
1513
        self._tags_parsed = True
1✔
1514

1515
    @classmethod
1✔
1516
    def _parse_image(cls, fh: BinaryIO) -> tuple[str, TagImage]:
1✔
1517
        # https://xiph.org/flac/format.html#metadata_block_picture
1518
        pic_type, mime_type_len = struct.unpack('>2I', fh.read(8))
1✔
1519
        mime_type = fh.read(mime_type_len).decode('utf-8', 'replace')
1✔
1520
        description_len = struct.unpack('>I', fh.read(4))[0]
1✔
1521
        description = fh.read(description_len).decode('utf-8', 'replace')
1✔
1522
        _width, _height, _depth, _colors, pic_len = struct.unpack('>5I', fh.read(20))
1✔
1523
        return _ID3._create_tag_image(fh.read(pic_len), pic_type, mime_type, description)
1✔
1524

1525

1526
class _Wma(TinyTag):
1✔
1527
    # see:
1528
    # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx
1529
    # and (japanese, but none the less helpful)
1530
    # http://uguisu.skr.jp/Windows/format_asf.html
1531
    _ASF_MAPPING = {
1✔
1532
        'WM/TrackNumber': 'track',
1533
        'WM/PartOfSet': 'disc',
1534
        'WM/Year': 'year',
1535
        'WM/AlbumArtist': 'albumartist',
1536
        'WM/Genre': 'genre',
1537
        'WM/AlbumTitle': 'album',
1538
        'WM/Composer': 'composer',
1539
        'WM/Publisher': 'extra.publisher',
1540
        'WM/BeatsPerMinute': 'extra.bpm',
1541
        'WM/InitialKey': 'extra.initial_key',
1542
        'WM/Lyrics': 'extra.lyrics',
1543
        'WM/Language': 'extra.language',
1544
        'WM/AuthorURL': 'extra.url',
1545
        'WM/ISRC': 'extra.isrc',
1546
        'WM/Conductor': 'extra.conductor',
1547
        'WM/Writer': 'extra.lyricist',
1548
        'WM/SetSubTitle': 'extra.set_subtitle',
1549
        'WM/EncodedBy': 'extra.encoded_by',
1550
        'WM/EncodingSettings': 'extra.encoder_settings',
1551
        'WM/Media': 'extra.media',
1552
    }
1553
    _ASF_CONTENT_DESCRIPTION_OBJECT = b'3&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
1✔
1554
    _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT = (b'@\xa4\xd0\xd2\x07\xe3\xd2\x11\x97\xf0\x00'
1✔
1555
                                                b'\xa0\xc9^\xa8P')
1556
    _STREAM_BITRATE_PROPERTIES_OBJECT = b'\xceu\xf8{\x8dF\xd1\x11\x8d\x82\x00`\x97\xc9\xa2\xb2'
1✔
1557
    _ASF_FILE_PROPERTY_OBJECT = b'\xa1\xdc\xab\x8cG\xa9\xcf\x11\x8e\xe4\x00\xc0\x0c Se'
1✔
1558
    _ASF_STREAM_PROPERTIES_OBJECT = b'\x91\x07\xdc\xb7\xb7\xa9\xcf\x11\x8e\xe6\x00\xc0\x0c Se'
1✔
1559
    _STREAM_TYPE_ASF_AUDIO_MEDIA = b'@\x9ei\xf8M[\xcf\x11\xa8\xfd\x00\x80_\\D+'
1✔
1560

1561
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1562
        if not self._tags_parsed:
1✔
1563
            self._parse_tag(fh)
1✔
1564

1565
    def _decode_string(self, bytestring: bytes) -> str:
1✔
1566
        return self._unpad(bytestring.decode('utf-16', 'replace'))
1✔
1567

1568
    def _decode_ext_desc(self, value_type: int, value: bytes) -> int | str | None:
1✔
1569
        """ decode _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT values"""
1570
        if value_type == 0:  # Unicode string
1✔
1571
            return self._decode_string(value)
1✔
1572
        if 1 < value_type < 6:  # DWORD / QWORD / WORD
1✔
1573
            return self._bytes_to_int_le(value)
1✔
UNCOV
1574
        return None
×
1575

1576
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1577
        header = fh.read(30)
1✔
1578
        # http://www.garykessler.net/library/file_sigs.html
1579
        # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958
1580
        if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'  # 128 bit GUID
1✔
1581
                or header[-1:] != b'\x02'):
1582
            raise ParseError('Invalid WMA header')
1✔
1583
        while True:
1✔
1584
            object_id = fh.read(16)
1✔
1585
            object_size = self._bytes_to_int_le(fh.read(8))
1✔
1586
            if object_size == 0 or object_size > self.filesize:
1✔
1587
                break  # invalid object, stop parsing.
1✔
1588
            if object_id == self._ASF_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1589
                title_length = self._bytes_to_int_le(fh.read(2))
1✔
1590
                author_length = self._bytes_to_int_le(fh.read(2))
1✔
1591
                copyright_length = self._bytes_to_int_le(fh.read(2))
1✔
1592
                description_length = self._bytes_to_int_le(fh.read(2))
1✔
1593
                rating_length = self._bytes_to_int_le(fh.read(2))
1✔
1594
                data_blocks = {
1✔
1595
                    'title': title_length,
1596
                    'artist': author_length,
1597
                    'extra.copyright': copyright_length,
1598
                    'comment': description_length,
1599
                    '_rating': rating_length,
1600
                }
1601
                for i_field_name, length in data_blocks.items():
1✔
1602
                    bytestring = fh.read(length)
1✔
1603
                    if not i_field_name.startswith('_'):
1✔
1604
                        self._set_field(i_field_name, self._decode_string(bytestring))
1✔
1605
            elif object_id == self._ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1606
                # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195
1607
                descriptor_count = self._bytes_to_int_le(fh.read(2))
1✔
1608
                for _ in range(descriptor_count):
1✔
1609
                    name_len = self._bytes_to_int_le(fh.read(2))
1✔
1610
                    name = self._decode_string(fh.read(name_len))
1✔
1611
                    value_type = self._bytes_to_int_le(fh.read(2))
1✔
1612
                    value_len = self._bytes_to_int_le(fh.read(2))
1✔
1613
                    if value_type == 1:
1✔
1614
                        fh.seek(value_len, os.SEEK_CUR)  # skip byte values
1✔
1615
                        continue
1✔
1616
                    field_name = self._ASF_MAPPING.get(name)  # try to get normalized field name
1✔
1617
                    if field_name is None:  # custom field
1✔
1618
                        if name.startswith('WM/'):
1✔
1619
                            name = name[3:]
1✔
1620
                        field_name = self._EXTRA_PREFIX + name.lower()
1✔
1621
                    field_value = self._decode_ext_desc(value_type, fh.read(value_len))
1✔
1622
                    if field_value is not None:
1✔
1623
                        if field_name in {'track', 'disc'}:
1✔
1624
                            if isinstance(field_value, int) or field_value.isdecimal():
1✔
1625
                                self._set_field(field_name, int(field_value))
1✔
1626
                        else:
1627
                            self._set_field(field_name, field_value)
1✔
1628
            elif object_id == self._ASF_FILE_PROPERTY_OBJECT and self._parse_duration:
1✔
1629
                fh.seek(40, os.SEEK_CUR)
1✔
1630
                play_duration = self._bytes_to_int_le(fh.read(8)) / 10000000
1✔
1631
                fh.seek(8, os.SEEK_CUR)
1✔
1632
                preroll = self._bytes_to_int_le(fh.read(8)) / 1000
1✔
1633
                fh.seek(16, os.SEEK_CUR)
1✔
1634
                # According to the specification, we need to subtract the preroll from play_duration
1635
                # to get the actual duration of the file
1636
                self.duration = max(play_duration - preroll, 0.0)
1✔
1637
            elif object_id == self._ASF_STREAM_PROPERTIES_OBJECT and self._parse_duration:
1✔
1638
                stream_type = fh.read(16)
1✔
1639
                fh.seek(24, os.SEEK_CUR)  # skip irrelevant fields
1✔
1640
                type_specific_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1641
                error_correction_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1642
                fh.seek(6, os.SEEK_CUR)   # skip irrelevant fields
1✔
1643
                already_read = 0
1✔
1644
                if stream_type == self._STREAM_TYPE_ASF_AUDIO_MEDIA:
1✔
1645
                    codec_id_format_tag = self._bytes_to_int_le(fh.read(2))
1✔
1646
                    self.channels = self._bytes_to_int_le(fh.read(2))
1✔
1647
                    self.samplerate = self._bytes_to_int_le(fh.read(4))
1✔
1648
                    avg_bytes_per_second = self._bytes_to_int_le(fh.read(4))
1✔
1649
                    self.bitrate = avg_bytes_per_second * 8 / 1000
1✔
1650
                    fh.seek(2, os.SEEK_CUR)  # skip irrelevant field
1✔
1651
                    bits_per_sample = self._bytes_to_int_le(fh.read(2))
1✔
1652
                    if codec_id_format_tag == 355:  # lossless
1✔
1653
                        self.bitdepth = bits_per_sample
1✔
1654
                    already_read = 16
1✔
1655
                fh.seek(type_specific_data_length - already_read, os.SEEK_CUR)
1✔
1656
                fh.seek(error_correction_data_length, os.SEEK_CUR)
1✔
1657
            else:
1658
                fh.seek(object_size - 24, os.SEEK_CUR)  # read over onknown object ids
1✔
1659
        self._tags_parsed = True
1✔
1660

1661

1662
class _Aiff(TinyTag):
1✔
1663
    #
1664
    # AIFF is part of the IFF family of file formats.
1665
    #
1666
    # https://en.wikipedia.org/wiki/Audio_Interchange_File_Format#Data_format
1667
    # https://web.archive.org/web/20171118222232/http://www-mmsp.ece.mcgill.ca/documents/audioformats/aiff/aiff.html
1668
    # https://web.archive.org/web/20071219035740/http://www.cnpbagwell.com/aiff-c.txt
1669
    #
1670
    # A few things about the spec:
1671
    #
1672
    # * IFF strings are not supposed to be null terminated.  They sometimes are.
1673
    # * Some tools might throw more metadata into the ANNO chunk but it is
1674
    #   wildly unreliable to count on it. In fact, the official spec recommends against
1675
    #   using it. That said... this code throws the ANNO field into comment and hopes
1676
    #   for the best.
1677
    #
1678
    # The key thing here is that AIFF metadata is usually in a handful of fields
1679
    # and the rest is an ID3 or XMP field.  XMP is too complicated and only Adobe-related
1680
    # products support it. The vast majority use ID3. As such, this code inherits from
1681
    # ID3 rather than TinyTag since it does everything that needs to be done here.
1682
    #
1683

1684
    _AIFF_MAPPING = {
1✔
1685
        #
1686
        # "Name Chunk text contains the name of the sampled sound."
1687
        #
1688
        # "Author Chunk text contains one or more author names.  An author in
1689
        # this case is the creator of a sampled sound."
1690
        #
1691
        # "Annotation Chunk text contains a comment.  Use of this chunk is
1692
        # discouraged within FORM AIFC." Some tools: "hold my beer"
1693
        #
1694
        # "The Copyright Chunk contains a copyright notice for the sound.  text
1695
        #  contains a date followed by the copyright owner.  The chunk ID '[c] '
1696
        # serves as the copyright character. " Some tools: "hold my beer"
1697
        #
1698
        b'NAME': 'title',
1699
        b'AUTH': 'artist',
1700
        b'ANNO': 'comment',
1701
        b'(c) ': 'extra.copyright',
1702
    }
1703

1704
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1705
        chunk_id, _size, form = struct.unpack('>4sI4s', fh.read(12))
1✔
1706
        if chunk_id != b'FORM' or form not in (b'AIFC', b'AIFF'):
1✔
1707
            raise ParseError('Invalid AIFF header')
1✔
1708
        chunk_header = fh.read(8)
1✔
1709
        while len(chunk_header) == 8:
1✔
1710
            sub_chunk_id, sub_chunk_size = struct.unpack('>4sI', chunk_header)
1✔
1711
            sub_chunk_size += sub_chunk_size % 2  # IFF chunks are padded to an even number of bytes
1✔
1712
            if sub_chunk_id in self._AIFF_MAPPING and self._parse_tags:
1✔
1713
                value = self._unpad(fh.read(sub_chunk_size).decode('utf-8', 'replace'))
1✔
1714
                self._set_field(self._AIFF_MAPPING[sub_chunk_id], value)
1✔
1715
            elif sub_chunk_id == b'COMM' and self._parse_duration:
1✔
1716
                channels, num_frames, bitdepth = struct.unpack('>hLh', fh.read(8))
1✔
1717
                self.channels, self.bitdepth = channels, bitdepth
1✔
1718
                try:
1✔
1719
                    exponent, mantissa = struct.unpack('>HQ', fh.read(10))   # Extended precision
1✔
1720
                    samplerate = int(mantissa * (2 ** (exponent - 0x3FFF - 63)))
1✔
1721
                    duration = num_frames / samplerate
1✔
1722
                    bitrate = samplerate * channels * bitdepth / 1000
1✔
1723
                    self.samplerate, self.duration, self.bitrate = samplerate, duration, bitrate
1✔
1724
                except OverflowError:
1✔
1725
                    pass
1✔
1726
                fh.seek(sub_chunk_size - 18, 1)  # skip remaining data in chunk
1✔
1727
            elif sub_chunk_id in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1728
                id3 = _ID3()
1✔
1729
                id3._filehandler = fh
1✔
1730
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1731
                self._update(id3)
1✔
1732
            else:  # some other chunk, just skip the data
1733
                fh.seek(sub_chunk_size, 1)
1✔
1734
            chunk_header = fh.read(8)
1✔
1735
        self._tags_parsed = True
1✔
1736

1737
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1738
        if not self._tags_parsed:
1✔
1739
            self._parse_tag(fh)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc