• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

devsnd / tinytag / 9285996632

29 May 2024 12:22PM UTC coverage: 98.916% (+0.05%) from 98.863%
9285996632

Pull #209

github

web-flow
Merge b481a049c into 5b966007c
Pull Request #209: Allow reading multiple extra fields of same type

174 of 176 new or added lines in 4 files covered. (98.86%)

14 existing lines in 1 file now uncovered.

1460 of 1476 relevant lines covered (98.92%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.59
/tinytag/tinytag.py
1
# tinytag - an audio file metadata reader
2
# Copyright (c) 2014-2023 Tom Wallroth
3
# Copyright (c) 2021-2024 Mat (mathiascode)
4
#
5
# Sources on GitHub:
6
# http://github.com/devsnd/tinytag/
7

8
# MIT License
9

10
# Copyright (c) 2014-2024 Tom Wallroth, Mat (mathiascode)
11

12
# Permission is hereby granted, free of charge, to any person obtaining a copy
13
# of this software and associated documentation files (the "Software"), to deal
14
# in the Software without restriction, including without limitation the rights
15
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16
# copies of the Software, and to permit persons to whom the Software is
17
# furnished to do so, subject to the following conditions:
18

19
# The above copyright notice and this permission notice shall be included in all
20
# copies or substantial portions of the Software.
21

22
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28
# SOFTWARE.
29

30
"""Audio file metadata reader"""
1✔
31

32
# pylint: disable=invalid-name,protected-access
33
# pylint: disable=too-many-lines,too-many-arguments,too-many-boolean-expressions
34
# pylint: disable=too-many-branches,too-many-instance-attributes,too-many-locals
35
# pylint: disable=too-many-nested-blocks,too-many-statements,too-few-public-methods
36

37

38
from __future__ import annotations
1✔
39
from collections.abc import Callable, Iterator
1✔
40
from functools import reduce
1✔
41
from os import PathLike
1✔
42
from sys import stderr
1✔
43
from typing import Any, BinaryIO
1✔
44
from warnings import warn
1✔
45

46
import base64
1✔
47
import io
1✔
48
import os
1✔
49
import re
1✔
50
import struct
1✔
51

52

53
DEBUG = bool(os.environ.get('TINYTAG_DEBUG'))  # some of the parsers can print debug info
1✔
54

55

56
class TinyTagException(Exception):
1✔
57
    """Base class for exceptions."""
1✔
58

59

60
class ParseError(TinyTagException):
1✔
61
    """Parsing an audio file failed."""
1✔
62

63

64
class UnsupportedFormatError(TinyTagException):
1✔
65
    """File format is not supported."""
1✔
66

67

68
class TinyTag:
1✔
69
    """A class containing audio file metadata."""
1✔
70

71
    SUPPORTED_FILE_EXTENSIONS = (
1✔
72
        '.mp1', '.mp2', '.mp3',
73
        '.oga', '.ogg', '.opus', '.spx',
74
        '.wav', '.flac', '.wma',
75
        '.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc',
76
        '.aiff', '.aifc', '.aif', '.afc'
77
    )
78
    _EXTRA_PREFIX = 'extra.'
1✔
79
    _file_extension_mapping: dict[tuple[str, ...], type[TinyTag]] | None = None
1✔
80
    _magic_bytes_mapping: dict[bytes, type[TinyTag]] | None = None
1✔
81

82
    def __init__(self) -> None:
1✔
83
        self.filename: bytes | str | PathLike[Any] | None = None
1✔
84
        self.filesize = 0
1✔
85
        self.duration: float | None = None
1✔
86
        self.channels: int | None = None
1✔
87
        self.bitrate: float | None = None
1✔
88
        self.bitdepth: int | None = None
1✔
89
        self.samplerate: int | None = None
1✔
90
        self.artist: str | None = None
1✔
91
        self.albumartist: str | None = None
1✔
92
        self.composer: str | None = None
1✔
93
        self.album: str | None = None
1✔
94
        self.disc: int | None = None
1✔
95
        self.disc_total: int | None = None
1✔
96
        self.title: str | None = None
1✔
97
        self.track: int | None = None
1✔
98
        self.track_total: int | None = None
1✔
99
        self.genre: str | None = None
1✔
100
        self.year: str | None = None
1✔
101
        self.comment: str | None = None
1✔
102
        self.extra: dict[str, list[str]] = {}
1✔
103
        self.images = TagImages()
1✔
104
        self._filehandler: BinaryIO | None = None
1✔
105
        self._default_encoding: str | None = None  # allow override for some file formats
1✔
106
        self._parse_duration = True
1✔
107
        self._parse_tags = True
1✔
108
        self._load_image = False
1✔
109
        self._tags_parsed = False
1✔
110

111
    def __repr__(self) -> str:
1✔
112
        return str(self.as_dict(flatten=False))
1✔
113

114
    @classmethod
1✔
115
    def get(cls,
1✔
116
            filename: bytes | str | PathLike[Any] | None = None,
117
            tags: bool = True,
118
            duration: bool = True,
119
            image: bool = False,
120
            encoding: str | None = None,
121
            file_obj: BinaryIO | None = None,
122
            **kwargs: Any) -> TinyTag:
123
        """Return a tag object for an audio file."""
124
        should_close_file = file_obj is None
1✔
125
        if filename and should_close_file:
1✔
126
            file_obj = open(filename, 'rb')  # pylint: disable=consider-using-with
1✔
127
        if file_obj is None:
1✔
128
            raise ValueError('Either filename or file_obj argument is required')
1✔
129
        if 'ignore_errors' in kwargs:
1✔
130
            warn('ignore_errors argument is obsolete, and will be removed in a future '
1✔
131
                 '2.x release', DeprecationWarning, stacklevel=2)
132
        try:
1✔
133
            file_obj.seek(0, os.SEEK_END)
1✔
134
            filesize = file_obj.tell()
1✔
135
            file_obj.seek(0)
1✔
136
            parser_class = cls._get_parser_class(filename, file_obj)
1✔
137
            tag = parser_class()
1✔
138
            tag._filehandler = file_obj
1✔
139
            tag._default_encoding = encoding
1✔
140
            tag.filename = filename
1✔
141
            tag.filesize = filesize
1✔
142
            if filesize > 0:
1✔
143
                try:
1✔
144
                    tag._load(tags=tags, duration=duration, image=image)
1✔
145
                except Exception as exc:
1✔
146
                    raise ParseError(exc) from exc
1✔
147
            return tag
1✔
148
        finally:
149
            if should_close_file:
1✔
150
                file_obj.close()
1✔
151

152
    @classmethod
1✔
153
    def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
1✔
154
        """Check if a specific file is supported based on its file extension."""
155
        return cls._get_parser_for_filename(filename) is not None
1✔
156

157
    def as_dict(self, flatten: bool = True) -> dict[
1✔
158
        str,
159
        str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
160
    ]:
161
        """Return a dictionary representation of the tag."""
162
        fields: dict[
1✔
163
            str,
164
            str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
165
        ] = {}
166
        for key, value in self.__dict__.items():
1✔
167
            if key.startswith('_'):
1✔
168
                continue
1✔
169
            if flatten and key == 'extra':
1✔
170
                for extra_key, extra_values in value.items():
1✔
171
                    if extra_key in fields:
1✔
172
                        fields[extra_key] += extra_values
1✔
173
                    else:
NEW
174
                        fields[extra_key] = extra_values
×
175
                continue
1✔
176
            if key == 'images':
1✔
177
                value = value.as_dict(flatten)
1✔
178
            if value is None:
1✔
179
                continue
1✔
180
            if flatten and key != 'filename' and isinstance(value, str):
1✔
181
                fields[key] = [value]
1✔
182
            else:
183
                fields[key] = value
1✔
184
        return fields
1✔
185

186
    @classmethod
1✔
187
    def _get_parser_for_filename(
1✔
188
            cls, filename: bytes | str | PathLike[Any]) -> type[TinyTag] | None:
189
        if cls._file_extension_mapping is None:
1✔
190
            cls._file_extension_mapping = {
1✔
191
                ('.mp1', '.mp2', '.mp3'): _ID3,
192
                ('.oga', '.ogg', '.opus', '.spx'): _Ogg,
193
                ('.wav',): _Wave,
194
                ('.flac',): _Flac,
195
                ('.wma',): _Wma,
196
                ('.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc'): _MP4,
197
                ('.aiff', '.aifc', '.aif', '.afc'): _Aiff,
198
            }
199
        filename = os.fsdecode(filename).lower()
1✔
200
        for ext, tagclass in cls._file_extension_mapping.items():
1✔
201
            if filename.endswith(ext):
1✔
202
                return tagclass
1✔
203
        return None
1✔
204

205
    @classmethod
1✔
206
    def _get_parser_for_file_handle(cls, fh: BinaryIO) -> type[TinyTag] | None:
1✔
207
        # https://en.wikipedia.org/wiki/List_of_file_signatures
208
        if cls._magic_bytes_mapping is None:
1✔
209
            cls._magic_bytes_mapping = {
1✔
210
                b'^ID3': _ID3,
211
                b'^\xff\xfb': _ID3,
212
                b'^OggS.........................FLAC': _Ogg,
213
                b'^OggS........................Opus': _Ogg,
214
                b'^OggS........................Speex': _Ogg,
215
                b'^OggS.........................vorbis': _Ogg,
216
                b'^RIFF....WAVE': _Wave,
217
                b'^fLaC': _Flac,
218
                b'^\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C': _Wma,
219
                b'....ftypM4A': _MP4,  # https://www.file-recovery.com/m4a-signature-format.htm
220
                b'....ftypaax': _MP4,  # Audible proprietary M4A container
221
                b'....ftypaaxc': _MP4,  # Audible proprietary M4A container
222
                b'\xff\xf1': _MP4,  # https://www.garykessler.net/library/file_sigs.html
223
                b'^FORM....AIFF': _Aiff,
224
                b'^FORM....AIFC': _Aiff,
225
            }
226
        header = fh.read(max(len(sig) for sig in cls._magic_bytes_mapping))
1✔
227
        fh.seek(0)
1✔
228
        for magic, parser in cls._magic_bytes_mapping.items():
1✔
229
            if re.match(magic, header):
1✔
230
                return parser
1✔
231
        return None
1✔
232

233
    @classmethod
1✔
234
    def _get_parser_class(cls, filename: bytes | str | PathLike[Any] | None = None,
1✔
235
                          filehandle: BinaryIO | None = None) -> type[TinyTag]:
236
        if cls != TinyTag:  # if `get` is invoked on TinyTag, find parser by ext
1✔
237
            return cls  # otherwise use the class on which `get` was invoked
1✔
238
        if filename:
1✔
239
            parser_class = cls._get_parser_for_filename(filename)
1✔
240
            if parser_class is not None:
1✔
241
                return parser_class
1✔
242
        # try determining the file type by magic byte header
243
        if filehandle:
1✔
244
            parser_class = cls._get_parser_for_file_handle(filehandle)
1✔
245
            if parser_class is not None:
1✔
246
                return parser_class
1✔
247
        raise UnsupportedFormatError('No tag reader found to support file type')
1✔
248

249
    def _load(self, tags: bool, duration: bool, image: bool = False) -> None:
1✔
250
        self._parse_tags = tags
1✔
251
        self._parse_duration = duration
1✔
252
        self._load_image = image
1✔
253
        if self._filehandler is None:
1✔
254
            return
1✔
255
        if tags:
1✔
256
            self._parse_tag(self._filehandler)
1✔
257
        if duration:
1✔
258
            if tags:  # rewind file if the tags were already parsed
1✔
259
                self._filehandler.seek(0)
1✔
260
            self._determine_duration(self._filehandler)
1✔
261

262
    def _set_field(self, fieldname: str, value: str | int | float,
1✔
263
                   check_conflict: bool = True) -> None:
264
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
265
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
266
            if check_conflict and fieldname in self.__dict__:
1✔
267
                fieldname = '_' + fieldname
1✔
268
            extra_values = self.extra.get(fieldname, [])
1✔
269
            if not isinstance(value, str) or value in extra_values:
1✔
270
                return
1✔
271
            extra_values.append(value)
1✔
272
            if DEBUG:
1✔
273
                print(f'Setting extra field "{fieldname}" to "{extra_values!r}"')
1✔
274
            self.extra[fieldname] = extra_values
1✔
275
            return
1✔
276
        old_value = self.__dict__.get(fieldname)
1✔
277
        new_value = value
1✔
278
        if isinstance(new_value, str):
1✔
279
            # First value goes in tag, others in tag.extra
280
            values = new_value.split('\x00')
1✔
281
            for index, i_value in enumerate(values):
1✔
282
                if index or old_value and i_value != old_value:
1✔
283
                    self._set_field(self._EXTRA_PREFIX + fieldname, i_value, check_conflict=False)
1✔
284
                    continue
1✔
285
                new_value = i_value
1✔
286
            if old_value:
1✔
287
                return
1✔
288
        elif not new_value and old_value:
1✔
289
            # Prioritize non-zero integer values
290
            return
1✔
291
        if DEBUG:
1✔
292
            print(f'Setting field "{fieldname}" to "{new_value!r}"')
1✔
293
        self.__dict__[fieldname] = new_value
1✔
294

295
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
296
        raise NotImplementedError
1✔
297

298
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
299
        raise NotImplementedError
1✔
300

301
    def _update(self, other: TinyTag) -> None:
1✔
302
        # update the values of this tag with the values from another tag
303
        for key, value in other.as_dict(flatten=False).items():
1✔
304
            if isinstance(value, dict):
1✔
305
                if key != 'extra':
1✔
306
                    continue
1✔
307
                for extra_key, extra_values in value.items():
1✔
308
                    for extra_value in extra_values:
1✔
309
                        if isinstance(extra_value, str):
1✔
310
                            self._set_field(
1✔
311
                                self._EXTRA_PREFIX + extra_key, extra_value, check_conflict=False)
312
                continue
1✔
313
            if value is not None and not isinstance(value, list):
1✔
314
                self._set_field(key, value)
1✔
315
        self.images._update(other.images)
1✔
316

317
    @staticmethod
1✔
318
    def _bytes_to_int_le(b: bytes) -> int:
1✔
319
        fmt = {1: '<B', 2: '<H', 4: '<I', 8: '<Q'}.get(len(b))
1✔
320
        result: int = struct.unpack(fmt, b)[0] if fmt is not None else 0
1✔
321
        return result
1✔
322

323
    @staticmethod
1✔
324
    def _bytes_to_int(b: tuple[int, ...]) -> int:
1✔
325
        return reduce(lambda accu, elem: (accu << 8) + elem, b, 0)
1✔
326

327
    @staticmethod
1✔
328
    def _unpad(s: str) -> str:
1✔
329
        # strings in mp3 and asf *may* be terminated with a zero byte at the end
330
        return s.strip('\x00')
1✔
331

332
    def get_image(self) -> bytes | None:
1✔
333
        """Deprecated, use images.any instead."""
334
        warn('get_image() is deprecated, and will be removed in a future 2.x release. '
1✔
335
             'Use images.any instead.', DeprecationWarning, stacklevel=2)
336
        image = self.images.any
1✔
337
        return image.data if image is not None else None
1✔
338

339
    @property
1✔
340
    def audio_offset(self) -> None:
1✔
341
        """Obsolete."""
342
        warn('audio_offset attribute is obsolete, and will be '
1✔
343
             'removed in a future 2.x release', DeprecationWarning, stacklevel=2)
344

345

346
class TagImages:
1✔
347
    """A class containing images embedded in an audio file."""
1✔
348
    _EXTRA_PREFIX = 'extra.'
1✔
349

350
    def __init__(self) -> None:
1✔
351
        self.front_cover: list[TagImage] = []
1✔
352
        self.back_cover: list[TagImage] = []
1✔
353
        self.leaflet: list[TagImage] = []
1✔
354
        self.media: list[TagImage] = []
1✔
355
        self.other: list[TagImage] = []
1✔
356
        self.extra: dict[str, list[TagImage]] = {}
1✔
357

358
    def __repr__(self) -> str:
1✔
359
        return str(self.as_dict(flatten=False))
1✔
360

361
    @property
1✔
362
    def any(self) -> TagImage | None:
1✔
363
        """Return a cover image.
364
        If not present, fall back to any other available image.
365
        """
366
        for image_list in self.as_dict(flatten=True).values():
1✔
367
            for image in image_list:
1✔
368
                return image
1✔
369
        return None
1✔
370

371
    def as_dict(self, flatten: bool = True) -> dict[str, list[TagImage]]:
1✔
372
        """Return a dictionary representation of the tag images."""
373
        images: dict[str, list[TagImage]] = {}
1✔
374
        for key, value in self.__dict__.items():
1✔
375
            if flatten and key == 'extra':
1✔
376
                for extra_key, extra_values in value.items():
1✔
377
                    if extra_key in images:
1✔
NEW
378
                        images[extra_key] += extra_values
×
379
                    else:
380
                        images[extra_key] = extra_values
1✔
381
                continue
1✔
382
            if value or key == 'extra':
1✔
383
                images[key] = value
1✔
384
        return images
1✔
385

386
    def _set_field(self, fieldname: str, value: TagImage) -> None:
1✔
387
        write_dest = self.__dict__
1✔
388
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
389
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
390
            write_dest = self.extra
1✔
391
        old_values = write_dest.get(fieldname)
1✔
392
        values = [value]
1✔
393
        if old_values is not None:
1✔
394
            values = old_values + values
1✔
395
        if DEBUG:
1✔
396
            print(f'Setting image field "{fieldname}"')
1✔
397
        write_dest[fieldname] = values
1✔
398

399
    def _update(self, other: TagImages) -> None:
1✔
400
        for key, value in other.as_dict(flatten=False).items():
1✔
401
            if isinstance(value, dict):
1✔
402
                for extra_key, extra_values in value.items():
1✔
403
                    for image_extra in extra_values:
1✔
404
                        self._set_field(self._EXTRA_PREFIX + extra_key, image_extra)
1✔
405
                continue
1✔
406
            for image in value:
1✔
407
                self._set_field(key, image)
1✔
408

409

410
class TagImage:
1✔
411
    """A class representing an image embedded in an audio file."""
1✔
412
    def __init__(self, name: str, data: bytes, mime_type: str | None = None) -> None:
1✔
413
        self.name = name
1✔
414
        self.data = data
1✔
415
        self.mime_type = mime_type
1✔
416
        self.description: str | None = None
1✔
417

418
    def __repr__(self) -> str:
1✔
419
        variables = vars(self).copy()
1✔
420
        data = variables.get("data")
1✔
421
        if data is not None:
1✔
422
            variables["data"] = (data[:45] + b'..') if len(data) > 45 else data
1✔
423
        return str(variables)
1✔
424

425

426
class _MP4(TinyTag):
1✔
427
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html
428
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html
429

430
    class _Parser:
1✔
431
        atom_decoder_by_type: dict[
1✔
432
            int, Callable[[bytes], int | str | bytes | TagImage]] | None = None
433
        _CUSTOM_FIELD_NAME_MAPPING = {
1✔
434
            'artists': 'artist',
435
            'conductor': 'extra.conductor',
436
            'discsubtitle': 'extra.set_subtitle',
437
            'initialkey': 'extra.initial_key',
438
            'isrc': 'extra.isrc',
439
            'language': 'extra.language',
440
            'lyricist': 'extra.lyricist',
441
            'media': 'extra.media',
442
            'website': 'extra.url',
443
            'originaldate': 'extra.original_date',
444
            'originalyear': 'extra.original_year',
445
            'license': 'extra.license',
446
            'barcode': 'extra.barcode',
447
            'catalognumber': 'extra.catalog_number',
448
        }
449

450
        @classmethod
1✔
451
        def _unpack_integer(cls, value: bytes, signed: bool = True) -> str:
1✔
452
            value_length = len(value)
1✔
453
            result = -1
1✔
454
            if value_length == 1:
1✔
UNCOV
455
                result = struct.unpack('>b' if signed else '>B', value)[0]
×
456
            elif value_length == 2:
1✔
457
                result = struct.unpack('>h' if signed else '>H', value)[0]
1✔
458
            elif value_length == 4:
1✔
459
                result = struct.unpack('>i' if signed else '>I', value)[0]
1✔
460
            elif value_length == 8:
1✔
461
                result = struct.unpack('>q' if signed else '>Q', value)[0]
1✔
462
            return str(result)
1✔
463

464
        @classmethod
1✔
465
        def _unpack_integer_unsigned(cls, value: bytes) -> str:
1✔
UNCOV
466
            return cls._unpack_integer(value, signed=False)
×
467

468
        @classmethod
1✔
469
        def _make_data_atom_parser(
1✔
470
                cls, fieldname: str) -> Callable[[bytes], dict[str, int | str | bytes | TagImage]]:
471
            def _parse_data_atom(data_atom: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
472
                data_type = struct.unpack('>I', data_atom[:4])[0]
1✔
473
                if cls.atom_decoder_by_type is None:
1✔
474
                    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html#//apple_ref/doc/uid/TP40000939-CH1-SW34
475
                    cls.atom_decoder_by_type = {
1✔
476
                        # 0: 'reserved'
477
                        1: lambda x: x.decode('utf-8', 'replace'),   # UTF-8
478
                        2: lambda x: x.decode('utf-16', 'replace'),  # UTF-16
479
                        3: lambda x: x.decode('s/jis', 'replace'),   # S/JIS
480
                        # 16: duration in millis
481
                        13: lambda x: TagImage('front_cover', x, 'image/jpeg'),  # JPEG
482
                        14: lambda x: TagImage('front_cover', x, 'image/png'),   # PNG
483
                        21: cls._unpack_integer,                    # BE Signed int
484
                        22: cls._unpack_integer_unsigned,           # BE Unsigned int
485
                        # 23: lambda x: struct.unpack('>f', x)[0],  # BE Float32
486
                        # 24: lambda x: struct.unpack('>d', x)[0],  # BE Float64
487
                        # 27: lambda x: x,                          # BMP
488
                        # 28: lambda x: x,                          # QuickTime Metadata atom
489
                        65: cls._unpack_integer,                    # 8-bit Signed int
490
                        66: cls._unpack_integer,                    # BE 16-bit Signed int
491
                        67: cls._unpack_integer,                    # BE 32-bit Signed int
492
                        74: cls._unpack_integer,                    # BE 64-bit Signed int
493
                        75: cls._unpack_integer_unsigned,           # 8-bit Unsigned int
494
                        76: cls._unpack_integer_unsigned,           # BE 16-bit Unsigned int
495
                        77: cls._unpack_integer_unsigned,           # BE 32-bit Unsigned int
496
                        78: cls._unpack_integer_unsigned,           # BE 64-bit Unsigned int
497
                    }
498
                conversion = cls.atom_decoder_by_type.get(data_type)
1✔
499
                if conversion is None:
1✔
500
                    if DEBUG:
1✔
501
                        print(f'Cannot convert data type: {data_type}', file=stderr)
1✔
502
                    return {}  # don't know how to convert data atom
1✔
503
                # skip header & null-bytes, convert rest
504
                return {fieldname: conversion(data_atom[8:])}
1✔
505
            return _parse_data_atom
1✔
506

507
        @classmethod
1✔
508
        def _make_number_parser(
1✔
509
                cls, fieldname1: str, fieldname2: str) -> Callable[[bytes], dict[str, int]]:
510
            def _(data_atom: bytes) -> dict[str, int]:
1✔
511
                number_data = data_atom[8:14]
1✔
512
                numbers = struct.unpack('>HHH', number_data)
1✔
513
                # for some reason the first number is always irrelevant.
514
                return {fieldname1: numbers[1], fieldname2: numbers[2]}
1✔
515
            return _
1✔
516

517
        @classmethod
1✔
518
        def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]:
1✔
519
            # dunno why the genre is offset by -1 but that's how mutagen does it
520
            idx = struct.unpack('>H', data_atom[8:])[0] - 1
1✔
521
            result = {}
1✔
522
            if idx < len(_ID3._ID3V1_GENRES):
1✔
523
                result['genre'] = _ID3._ID3V1_GENRES[idx]
1✔
524
            return result
1✔
525

526
        @classmethod
1✔
527
        def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None:
1✔
528
            for _i in range(4):
1✔
529
                if esds_atom.read(1) != b'\x80':
1✔
530
                    break
1✔
531

532
        @classmethod
1✔
533
        def _parse_custom_field(cls, data: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
534
            fh = io.BytesIO(data)
1✔
535
            header_size = 8
1✔
536
            field_name = None
1✔
537
            data_atom = b''
1✔
538
            atom_header = fh.read(header_size)
1✔
539
            while len(atom_header) == header_size:
1✔
540
                atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
541
                atom_type = atom_header[4:]
1✔
542
                if atom_type == b'name':
1✔
543
                    atom_value = fh.read(atom_size)[4:].lower()
1✔
544
                    field_name = atom_value.decode('utf-8', 'replace')
1✔
545
                    field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get(
1✔
546
                        field_name, TinyTag._EXTRA_PREFIX + field_name)
547
                elif atom_type == b'data':
1✔
548
                    data_atom = fh.read(atom_size)
1✔
549
                else:
550
                    fh.seek(atom_size, os.SEEK_CUR)
1✔
551
                atom_header = fh.read(header_size)  # read next atom
1✔
552
            if len(data_atom) < 8 or field_name is None:
1✔
553
                return {}
1✔
554
            parser = cls._make_data_atom_parser(field_name)
1✔
555
            return parser(data_atom)
1✔
556

557
        @classmethod
1✔
558
        def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]:
1✔
559
            # this atom also contains the esds atom:
560
            # https://ffmpeg.org/doxygen/0.6/mov_8c-source.html
561
            # http://xhelmboyx.tripod.com/formats/mp4-layout.txt
562
            # http://sasperger.tistory.com/103
563
            datafh = io.BytesIO(data)
1✔
564
            datafh.seek(16, os.SEEK_CUR)  # jump over version and flags
1✔
565
            channels = struct.unpack('>H', datafh.read(2))[0]
1✔
566
            datafh.seek(2, os.SEEK_CUR)   # jump over bit_depth
1✔
567
            datafh.seek(2, os.SEEK_CUR)   # jump over QT compr id & pkt size
1✔
568
            sr = struct.unpack('>I', datafh.read(4))[0]
1✔
569

570
            # ES Description Atom
571
            esds_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
572
            esds_atom = io.BytesIO(data[36:36 + esds_atom_size])
1✔
573
            esds_atom.seek(5, os.SEEK_CUR)   # jump over version, flags and tag
1✔
574

575
            # ES Descriptor
576
            cls._read_extended_descriptor(esds_atom)
1✔
577
            esds_atom.seek(4, os.SEEK_CUR)   # jump over ES id, flags and tag
1✔
578

579
            # Decoder Config Descriptor
580
            cls._read_extended_descriptor(esds_atom)
1✔
581
            esds_atom.seek(9, os.SEEK_CUR)
1✔
582
            avg_br = struct.unpack('>I', esds_atom.read(4))[0] / 1000  # kbit/s
1✔
583
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br}
1✔
584

585
        @classmethod
1✔
586
        def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]:
1✔
587
            # https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt
588
            alac_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
589
            alac_atom = io.BytesIO(data[36:36 + alac_atom_size])
1✔
590
            alac_atom.seek(9, os.SEEK_CUR)
1✔
591
            bitdepth = struct.unpack('b', alac_atom.read(1))[0]
1✔
592
            alac_atom.seek(3, os.SEEK_CUR)
1✔
593
            channels = struct.unpack('b', alac_atom.read(1))[0]
1✔
594
            alac_atom.seek(6, os.SEEK_CUR)
1✔
595
            avg_br = struct.unpack('>I', alac_atom.read(4))[0] / 1000  # kbit/s
1✔
596
            sr = struct.unpack('>I', alac_atom.read(4))[0]
1✔
597
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br, 'bitdepth': bitdepth}
1✔
598

599
        @classmethod
1✔
600
        def _parse_mvhd(cls, data: bytes) -> dict[str, float]:
1✔
601
            # http://stackoverflow.com/a/3639993/1191373
602
            walker = io.BytesIO(data)
1✔
603
            version = struct.unpack('b', walker.read(1))[0]
1✔
604
            walker.seek(3, os.SEEK_CUR)  # jump over flags
1✔
605
            if version == 0:  # uses 32 bit integers for timestamps
1✔
606
                walker.seek(8, os.SEEK_CUR)  # jump over create & mod times
1✔
607
                time_scale = struct.unpack('>I', walker.read(4))[0]
1✔
608
                duration = struct.unpack('>I', walker.read(4))[0]
1✔
609
            else:  # version == 1:  # uses 64 bit integers for timestamps
UNCOV
610
                walker.seek(16, os.SEEK_CUR)  # jump over create & mod times
×
UNCOV
611
                time_scale = struct.unpack('>I', walker.read(4))[0]
×
UNCOV
612
                duration = struct.unpack('>q', walker.read(8))[0]
×
613
            return {'duration': duration / time_scale}
1✔
614

615
    # The parser tree: Each key is an atom name which is traversed if existing.
616
    # Leaves of the parser tree are callables which receive the atom data.
617
    # callables return {fieldname: value} which is updates the TinyTag.
618
    _META_DATA_TREE = {b'moov': {b'udta': {b'meta': {b'ilst': {
1✔
619
        # see: http://atomicparsley.sourceforge.net/mpeg-4files.html
620
        # and: https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093
621
        b'\xa9ART': {b'data': _Parser._make_data_atom_parser('artist')},
622
        b'\xa9alb': {b'data': _Parser._make_data_atom_parser('album')},
623
        b'\xa9cmt': {b'data': _Parser._make_data_atom_parser('comment')},
624
        b'\xa9con': {b'data': _Parser._make_data_atom_parser('extra.conductor')},
625
        # need test-data for this
626
        # b'cpil':   {b'data': _Parser._make_data_atom_parser('extra.compilation')},
627
        b'\xa9day': {b'data': _Parser._make_data_atom_parser('year')},
628
        b'\xa9des': {b'data': _Parser._make_data_atom_parser('extra.description')},
629
        b'\xa9dir': {b'data': _Parser._make_data_atom_parser('extra.director')},
630
        b'\xa9gen': {b'data': _Parser._make_data_atom_parser('genre')},
631
        b'\xa9lyr': {b'data': _Parser._make_data_atom_parser('extra.lyrics')},
632
        b'\xa9mvn': {b'data': _Parser._make_data_atom_parser('movement')},
633
        b'\xa9nam': {b'data': _Parser._make_data_atom_parser('title')},
634
        b'\xa9pub': {b'data': _Parser._make_data_atom_parser('extra.publisher')},
635
        b'\xa9too': {b'data': _Parser._make_data_atom_parser('extra.encoded_by')},
636
        b'\xa9wrt': {b'data': _Parser._make_data_atom_parser('composer')},
637
        b'aART': {b'data': _Parser._make_data_atom_parser('albumartist')},
638
        b'cprt': {b'data': _Parser._make_data_atom_parser('extra.copyright')},
639
        b'desc': {b'data': _Parser._make_data_atom_parser('extra.description')},
640
        b'disk': {b'data': _Parser._make_number_parser('disc', 'disc_total')},
641
        b'gnre': {b'data': _Parser._parse_id3v1_genre},
642
        b'trkn': {b'data': _Parser._make_number_parser('track', 'track_total')},
643
        b'tmpo': {b'data': _Parser._make_data_atom_parser('extra.bpm')},
644
        b'covr': {b'data': _Parser._make_data_atom_parser('images.front_cover')},
645
        b'----': _Parser._parse_custom_field,
646
    }}}}}
647

648
    # see: https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html
649
    _AUDIO_DATA_TREE = {
1✔
650
        b'moov': {
651
            b'mvhd': _Parser._parse_mvhd,
652
            b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": {
653
                b'mp4a': _Parser._parse_audio_sample_entry_mp4a,
654
                b'alac': _Parser._parse_audio_sample_entry_alac
655
            }}}}}
656
        }
657
    }
658

659
    _VERSIONED_ATOMS = {b'meta', b'stsd'}  # those have an extra 4 byte header
1✔
660
    _FLAGGED_ATOMS = {b'stsd'}  # these also have an extra 4 byte header
1✔
661

662
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
663
        self._traverse_atoms(fh, path=self._AUDIO_DATA_TREE)
1✔
664

665
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
666
        self._traverse_atoms(fh, path=self._META_DATA_TREE)
1✔
667

668
    def _traverse_atoms(self, fh: BinaryIO, path: dict[bytes, Any],
1✔
669
                        stop_pos: int | None = None,
670
                        curr_path: list[bytes] | None = None) -> None:
671
        header_size = 8
1✔
672
        atom_header = fh.read(header_size)
1✔
673
        while len(atom_header) == header_size:
1✔
674
            atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
675
            atom_type = atom_header[4:]
1✔
676
            if curr_path is None:  # keep track how we traversed in the tree
1✔
677
                curr_path = [atom_type]
1✔
678
            if atom_size <= 0:  # empty atom, jump to next one
1✔
679
                atom_header = fh.read(header_size)
1✔
680
                continue
1✔
681
            if DEBUG:
1✔
682
                print(f'{" " * 4 * len(curr_path)} pos: {fh.tell() - header_size} '
1✔
683
                      f'atom: {atom_type!r} len: {atom_size + header_size}')
684
            if atom_type in self._VERSIONED_ATOMS:  # jump atom version for now
1✔
685
                fh.seek(4, os.SEEK_CUR)
1✔
686
            if atom_type in self._FLAGGED_ATOMS:  # jump atom flags for now
1✔
687
                fh.seek(4, os.SEEK_CUR)
1✔
688
            sub_path = path.get(atom_type, None)
1✔
689
            # if the path leaf is a dict, traverse deeper into the tree:
690
            if isinstance(sub_path, dict):
1✔
691
                atom_end_pos = fh.tell() + atom_size
1✔
692
                self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos,
1✔
693
                                     curr_path=curr_path + [atom_type])
694
            # if the path-leaf is a callable, call it on the atom data
695
            elif callable(sub_path):
1✔
696
                for fieldname, value in sub_path(fh.read(atom_size)).items():
1✔
697
                    if DEBUG:
1✔
698
                        print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
1✔
699
                    if fieldname.startswith('images.'):
1✔
700
                        if self._load_image:
1✔
701
                            self.images._set_field(fieldname[len('images.'):], value)
1✔
702
                    elif fieldname:
1✔
703
                        self._set_field(fieldname, value)
1✔
704
            # if no action was specified using dict or callable, jump over atom
705
            else:
706
                fh.seek(atom_size, os.SEEK_CUR)
1✔
707
            # check if we have reached the end of this branch:
708
            if stop_pos and fh.tell() >= stop_pos:
1✔
709
                return  # return to parent (next parent node in tree)
1✔
710
            atom_header = fh.read(header_size)  # read next atom
1✔
711

712

713
class _ID3(TinyTag):
1✔
714
    _ID3_MAPPING = {
1✔
715
        # Mapping from Frame ID to a field of the TinyTag
716
        # https://exiftool.org/TagNames/ID3.html
717
        'COMM': 'comment', 'COM': 'comment',
718
        'TRCK': 'track', 'TRK': 'track',
719
        'TYER': 'year', 'TYE': 'year', 'TDRC': 'year',
720
        'TALB': 'album', 'TAL': 'album',
721
        'TPE1': 'artist', 'TP1': 'artist',
722
        'TIT2': 'title', 'TT2': 'title',
723
        'TCON': 'genre', 'TCO': 'genre',
724
        'TPOS': 'disc', 'TPA': 'disc',
725
        'TPE2': 'albumartist', 'TP2': 'albumartist',
726
        'TCOM': 'composer', 'TCM': 'composer',
727
        'WOAR': 'extra.url', 'WAR': 'extra.url',
728
        'TSRC': 'extra.isrc', 'TRC': 'extra.isrc',
729
        'TCOP': 'extra.copyright', 'TCR': 'extra.copyright',
730
        'TBPM': 'extra.bpm', 'TBP': 'extra.bpm',
731
        'TKEY': 'extra.initial_key', 'TKE': 'extra.initial_key',
732
        'TLAN': 'extra.language', 'TLA': 'extra.language',
733
        'TPUB': 'extra.publisher', 'TPB': 'extra.publisher',
734
        'USLT': 'extra.lyrics', 'ULT': 'extra.lyrics',
735
        'TPE3': 'extra.conductor', 'TP3': 'extra.conductor',
736
        'TEXT': 'extra.lyricist', 'TXT': 'extra.lyricist',
737
        'TSST': 'extra.set_subtitle',
738
        'TENC': 'extra.encoded_by', 'TEN': 'extra.encoded_by',
739
        'TSSE': 'extra.encoder_settings', 'TSS': 'extra.encoder_settings',
740
        'TMED': 'extra.media', 'TMT': 'extra.media',
741
        'TDOR': 'extra.original_date',
742
        'TORY': 'extra.original_year', 'TOR': 'extra.original_year',
743
        'WCOP': 'extra.license',
744
    }
745
    _ID3_MAPPING_CUSTOM = {
1✔
746
        'artists': 'artist',
747
        'director': 'extra.director',
748
        'license': 'extra.license',
749
        'originalyear': 'extra.original_year',
750
        'barcode': 'extra.barcode',
751
        'catalognumber': 'extra.catalog_number',
752
    }
753
    _IMAGE_FRAME_IDS = {'APIC', 'PIC'}
1✔
754
    _CUSTOM_FRAME_IDS = {'TXXX', 'TXX'}
1✔
755
    _DISALLOWED_FRAME_IDS = {'PRIV', 'RGAD', 'GEOB', 'GEO', 'ÿû°d'}
1✔
756
    _MAX_ESTIMATION_SEC = 30.0
1✔
757
    _CBR_DETECTION_FRAME_COUNT = 5
1✔
758
    _USE_XING_HEADER = True  # much faster, but can be deactivated for testing
1✔
759

760
    _ID3V1_GENRES = (
1✔
761
        'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco',
762
        'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies',
763
        'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial',
764
        'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack',
765
        'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion',
766
        'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game',
767
        'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk',
768
        'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock',
769
        'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic',
770
        'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult',
771
        'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle',
772
        'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave',
773
        'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz',
774
        'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock',
775

776
        # Wimamp Extended Genres
777
        'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob',
778
        'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock',
779
        'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock',
780
        'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech',
781
        'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', 'Booty Bass',
782
        'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', 'Tango', 'Samba',
783
        'Folklore', 'Ballad', 'Power Ballad', 'Rhythmic Soul', 'Freestyle',
784
        'Duet', 'Punk Rock', 'Drum Solo', 'A capella', 'Euro-House',
785
        'Dance Hall', 'Goa', 'Drum & Bass',
786

787
        # according to https://de.wikipedia.org/wiki/Liste_der_ID3v1-Genres:
788
        'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop',
789
        '',  # don't use ethnic slur ("Negerpunk", WTF!)
790
        'Polsk Punk', 'Beat', 'Christian Gangsta Rap', 'Heavy Metal',
791
        'Black Metal', 'Contemporary Christian', 'Christian Rock',
792
        # WinAmp 1.91
793
        'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop',
794
        # WinAmp 5.6
795
        'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat',
796
        'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro',
797
        'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient',
798
        'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge',
799
        'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock',
800
        'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music',
801
        'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle',
802
        'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', 'Psybient',
803
    )
804
    _ID3V2_2_IMAGE_FORMATS = {
1✔
805
        'bmp': 'image/bmp',
806
        'jpg': 'image/jpeg',
807
        'png': 'image/png',
808
    }
809
    _IMAGE_TYPES = (
1✔
810
        'other',
811
        'extra.icon',
812
        'extra.other_icon',
813
        'front_cover',
814
        'back_cover',
815
        'leaflet',
816
        'media',
817
        'extra.lead_artist',
818
        'extra.artist',
819
        'extra.conductor',
820
        'extra.band',
821
        'extra.composer',
822
        'extra.lyricist',
823
        'extra.recording_location',
824
        'extra.during_recording',
825
        'extra.during_performance',
826
        'extra.video',
827
        'extra.bright_colored_fish',
828
        'extra.illustration',
829
        'extra.band_logo',
830
        'extra.publisher_logo',
831
    )
832
    _UNKNOWN_IMAGE_TYPE = 'extra.unknown'
1✔
833

834
    # see this page for the magic values used in mp3:
835
    # http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
836
    _SAMPLE_RATES = (
1✔
837
        (11025, 12000, 8000),   # MPEG 2.5
838
        (0, 0, 0),              # reserved
839
        (22050, 24000, 16000),  # MPEG 2
840
        (44100, 48000, 32000),  # MPEG 1
841
    )
842
    _V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0)
1✔
843
    _V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0)
1✔
844
    _V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0)
1✔
845
    _V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0)
1✔
846
    _V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
1✔
847
    _V2L3 = _V2L2
1✔
848
    _NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
1✔
849
    _BITRATE_BY_VERSION_BY_LAYER = (
1✔
850
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2.5  # note that the layers go
851
        (_NONE, _NONE, _NONE, _NONE),  # reserved          # from 3 to 1 by design.
852
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2    # the first layer id is
853
        (_NONE, _V1L3, _V1L2, _V1L1),  # MPEG Version 1    # reserved
854
    )
855
    _SAMPLES_PER_FRAME = 1152  # the default frame size for mp3
1✔
856
    _CHANNELS_PER_CHANNEL_MODE = (
1✔
857
        2,  # 00 Stereo
858
        2,  # 01 Joint stereo (Stereo)
859
        2,  # 10 Dual channel (2 mono channels)
860
        1,  # 11 Single channel (Mono)
861
    )
862

863
    def __init__(self) -> None:
1✔
864
        super().__init__()
1✔
865
        # save position after the ID3 tag for duration measurement speedup
866
        self._bytepos_after_id3v2 = -1
1✔
867

868
    @staticmethod
1✔
869
    def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]:
1✔
870
        # see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip
871
        fh.seek(4, os.SEEK_CUR)  # read over Xing header
1✔
872
        header_flags = struct.unpack('>i', fh.read(4))[0]
1✔
873
        frames = byte_count = 0
1✔
874
        if header_flags & 1:  # FRAMES FLAG
1✔
875
            frames = struct.unpack('>i', fh.read(4))[0]
1✔
876
        if header_flags & 2:  # BYTES FLAG
1✔
877
            byte_count = struct.unpack('>i', fh.read(4))[0]
1✔
878
        if header_flags & 4:  # TOC FLAG
1✔
879
            fh.seek(100, os.SEEK_CUR)
1✔
880
        if header_flags & 8:  # VBR SCALE FLAG
1✔
881
            fh.seek(4, os.SEEK_CUR)
1✔
882
        return frames, byte_count
1✔
883

884
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
885
        # if tag reading was disabled, find start position of audio data
886
        if self._bytepos_after_id3v2 == -1:
1✔
887
            self._parse_id3v2_header(fh)
1✔
888

889
        max_estimation_frames = (_ID3._MAX_ESTIMATION_SEC * 44100) // _ID3._SAMPLES_PER_FRAME
1✔
890
        frame_size_accu = 0
1✔
891
        audio_offset = 0
1✔
892
        header_bytes = 4
1✔
893
        frames = 0  # count frames for determining mp3 duration
1✔
894
        bitrate_accu = 0    # add up bitrates to find average bitrate to detect
1✔
895
        last_bitrates = []  # CBR mp3s (multiple frames with same bitrates)
1✔
896
        # seek to first position after id3 tag (speedup for large header)
897
        fh.seek(self._bytepos_after_id3v2)
1✔
898
        file_offset = fh.tell()
1✔
899
        walker = io.BytesIO(fh.read())
1✔
900
        while True:
1✔
901
            # reading through garbage until 11 '1' sync-bits are found
902
            b = walker.read()
1✔
903
            walker.seek(-len(b), os.SEEK_CUR)
1✔
904
            if len(b) < 4:
1✔
905
                if frames:
1✔
906
                    self.bitrate = bitrate_accu / frames
1✔
907
                break  # EOF
1✔
908
            _sync, conf, bitrate_freq, rest = struct.unpack('BBBB', b[0:4])
1✔
909
            br_id = (bitrate_freq >> 4) & 0x0F  # biterate id
1✔
910
            sr_id = (bitrate_freq >> 2) & 0x03  # sample rate id
1✔
911
            padding = 1 if bitrate_freq & 0x02 > 0 else 0
1✔
912
            mpeg_id = (conf >> 3) & 0x03
1✔
913
            layer_id = (conf >> 1) & 0x03
1✔
914
            channel_mode = (rest >> 6) & 0x03
1✔
915
            # check for eleven 1s, validate bitrate and sample rate
916
            if (not b[:2] > b'\xFF\xE0' or br_id > 14 or br_id == 0 or sr_id == 3
1✔
917
                    or layer_id == 0 or mpeg_id == 1):  # noqa
918
                idx = b.find(b'\xFF', 1)  # invalid frame, find next sync header
1✔
919
                if idx == -1:
1✔
920
                    idx = len(b)  # not found: jump over the current peek buffer
1✔
921
                walker.seek(max(idx, 1), os.SEEK_CUR)
1✔
922
                continue
1✔
923
            self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode]
1✔
924
            frame_bitrate = self._BITRATE_BY_VERSION_BY_LAYER[mpeg_id][layer_id][br_id]
1✔
925
            self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id]
1✔
926
            # There might be a xing header in the first frame that contains
927
            # all the info we need, otherwise parse multiple frames to find the
928
            # accurate average bitrate
929
            if frames == 0 and self._USE_XING_HEADER:
1✔
930
                xing_header_offset = b.find(b'Xing')
1✔
931
                if xing_header_offset != -1:
1✔
932
                    walker.seek(xing_header_offset, os.SEEK_CUR)
1✔
933
                    xframes, byte_count = self._parse_xing_header(walker)
1✔
934
                    if xframes > 0 and byte_count > 0:
1✔
935
                        # MPEG-2 Audio Layer III uses 576 samples per frame
936
                        samples_per_frame = 576 if mpeg_id <= 2 else self._SAMPLES_PER_FRAME
1✔
937
                        self.duration = duration = xframes * samples_per_frame / samplerate
1✔
938
                        # self.duration = (xframes * self._SAMPLES_PER_FRAME / samplerate
939
                        #                  / self.channels)  # noqa
940
                        self.bitrate = byte_count * 8 / duration / 1000
1✔
941
                        return
1✔
UNCOV
942
                    continue
×
943

944
            frames += 1  # it's most probably an mp3 frame
1✔
945
            bitrate_accu += frame_bitrate
1✔
946
            if frames == 1:
1✔
947
                audio_offset = file_offset + walker.tell()
1✔
948
            if frames <= self._CBR_DETECTION_FRAME_COUNT:
1✔
949
                last_bitrates.append(frame_bitrate)
1✔
950
            walker.seek(4, os.SEEK_CUR)  # jump over peeked bytes
1✔
951

952
            frame_length = (144000 * frame_bitrate) // samplerate + padding
1✔
953
            frame_size_accu += frame_length
1✔
954
            # if bitrate does not change over time its probably CBR
955
            is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT and len(set(last_bitrates)) == 1)
1✔
956
            if frames == max_estimation_frames or is_cbr:
1✔
957
                # try to estimate duration
958
                fh.seek(-128, 2)  # jump to last byte (leaving out id3v1 tag)
1✔
959
                audio_stream_size = fh.tell() - audio_offset
1✔
960
                est_frame_count = audio_stream_size / (frame_size_accu / frames)
1✔
961
                samples = est_frame_count * self._SAMPLES_PER_FRAME
1✔
962
                self.duration = samples / samplerate
1✔
963
                self.bitrate = bitrate_accu / frames
1✔
964
                return
1✔
965

966
            if frame_length > 1:  # jump over current frame body
1✔
967
                walker.seek(frame_length - header_bytes, os.SEEK_CUR)
1✔
968
        if self.samplerate:
1✔
969
            self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate
1✔
970

971
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
972
        self._parse_id3v2(fh)
1✔
973
        if self.filesize > 128:
1✔
974
            fh.seek(-128, os.SEEK_END)  # try parsing id3v1 in last 128 bytes
1✔
975
            self._parse_id3v1(fh)
1✔
976

977
    def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]:
1✔
978
        size = major = 0
1✔
979
        extended = False
1✔
980
        # for info on the specs, see: http://id3.org/Developer%20Information
981
        header = struct.unpack('3sBBB4B', fh.read(10))
1✔
982
        tag = header[0].decode('ISO-8859-1', 'replace')
1✔
983
        # check if there is an ID3v2 tag at the beginning of the file
984
        if tag == 'ID3':
1✔
985
            major, _rev = header[1:3]
1✔
986
            if DEBUG:
1✔
987
                print(f'Found id3 v2.{major}')
1✔
988
            # unsync = (header[3] & 0x80) > 0
989
            extended = (header[3] & 0x40) > 0
1✔
990
            # experimental = (header[3] & 0x20) > 0
991
            # footer = (header[3] & 0x10) > 0
992
            size = self._calc_size(header[4:8], 7)
1✔
993
        self._bytepos_after_id3v2 = size
1✔
994
        return size, extended, major
1✔
995

996
    def _parse_id3v2(self, fh: BinaryIO) -> None:
1✔
997
        size, extended, major = self._parse_id3v2_header(fh)
1✔
998
        if size:
1✔
999
            end_pos = fh.tell() + size
1✔
1000
            parsed_size = 0
1✔
1001
            if extended:  # just read over the extended header.
1✔
1002
                size_bytes = struct.unpack('4B', fh.read(6)[0:4])
1✔
1003
                extd_size = self._calc_size(size_bytes, 7)
1✔
1004
                fh.seek(extd_size - 6, os.SEEK_CUR)  # jump over extended_header
1✔
1005
            while parsed_size < size:
1✔
1006
                frame_size = self._parse_frame(fh, id3version=major)
1✔
1007
                if frame_size == 0:
1✔
1008
                    break
1✔
1009
                parsed_size += frame_size
1✔
1010
            fh.seek(end_pos, os.SEEK_SET)
1✔
1011

1012
    def _parse_id3v1(self, fh: BinaryIO) -> None:
1✔
1013
        if fh.read(3) != b'TAG':  # check if this is an ID3 v1 tag
1✔
1014
            return
1✔
1015

1016
        def asciidecode(x: bytes) -> str:
1✔
1017
            return self._unpad(x.decode(self._default_encoding or 'latin1', 'replace'))
1✔
1018
        # Only set fields that were not set by ID3v2 tags, as ID3v1
1019
        # tags are more likely to be outdated or have encoding issues
1020
        fields = fh.read(30 + 30 + 30 + 4 + 30 + 1)
1✔
1021
        if not self.title:
1✔
1022
            value = asciidecode(fields[:30])
1✔
1023
            if value:
1✔
1024
                self._set_field('title', value)
1✔
1025
        if not self.artist:
1✔
1026
            value = asciidecode(fields[30:60])
1✔
1027
            if value:
1✔
1028
                self._set_field('artist', value)
1✔
1029
        if not self.album:
1✔
1030
            value = asciidecode(fields[60:90])
1✔
1031
            if value:
1✔
1032
                self._set_field('album', value)
1✔
1033
        if not self.year:
1✔
1034
            value = asciidecode(fields[90:94])
1✔
1035
            if value:
1✔
1036
                self._set_field('year', value)
1✔
1037
        comment = fields[94:124]
1✔
1038
        if b'\x00\x00' < comment[-2:] < b'\x01\x00':
1✔
1039
            if self.track is None:
1✔
1040
                self._set_field('track', ord(comment[-1:]))
1✔
1041
            comment = comment[:-2]
1✔
1042
        if not self.comment:
1✔
1043
            value = asciidecode(comment)
1✔
1044
            if value:
1✔
1045
                self._set_field('comment', value)
1✔
1046
        if not self.genre:
1✔
1047
            genre_id = ord(fields[124:125])
1✔
1048
            if genre_id < len(self._ID3V1_GENRES):
1✔
1049
                self._set_field('genre', self._ID3V1_GENRES[genre_id])
1✔
1050

1051
    def __parse_custom_field(self, content: str) -> bool:
1✔
1052
        custom_field_name, separator, value = content.partition('\x00')
1✔
1053
        custom_field_name_lower = custom_field_name.lower()
1✔
1054
        value = value.lstrip('\ufeff')
1✔
1055
        if custom_field_name_lower and separator and value:
1✔
1056
            field_name = self._ID3_MAPPING_CUSTOM.get(
1✔
1057
                custom_field_name_lower, self._EXTRA_PREFIX + custom_field_name_lower)
1058
            self._set_field(field_name, value)
1✔
1059
            return True
1✔
1060
        return False
1✔
1061

1062
    @classmethod
1✔
1063
    def _create_tag_image(cls, data: bytes, pic_type: int, mime_type: str | None = None,
1✔
1064
                          description: str | None = None) -> tuple[str, TagImage]:
1065
        field_name = cls._UNKNOWN_IMAGE_TYPE
1✔
1066
        if 0 <= pic_type <= len(cls._IMAGE_TYPES):
1✔
1067
            field_name = cls._IMAGE_TYPES[pic_type]
1✔
1068
        image = TagImage(field_name, data)
1✔
1069
        if mime_type:
1✔
1070
            image.mime_type = mime_type
1✔
1071
        if description:
1✔
1072
            image.description = description
1✔
1073
        return field_name, image
1✔
1074

1075
    @staticmethod
1✔
1076
    def _index_utf16(s: bytes, search: bytes) -> int:
1✔
1077
        for i in range(0, len(s), len(search)):
1✔
1078
            if s[i:i + len(search)] == search:
1✔
1079
                return i
1✔
UNCOV
1080
        return -1
×
1081

1082
    def _parse_frame(self, fh: BinaryIO, id3version: int | None = None) -> int:
1✔
1083
        # ID3v2.2 especially ugly. see: http://id3.org/id3v2-00
1084
        frame_header_size = 6 if id3version == 2 else 10
1✔
1085
        frame_size_bytes = 3 if id3version == 2 else 4
1✔
1086
        binformat = '3s3B' if id3version == 2 else '4s4B2B'
1✔
1087
        bits_per_byte = 7 if id3version == 4 else 8  # only id3v2.4 is synchsafe
1✔
1088
        frame_header_data = fh.read(frame_header_size)
1✔
1089
        if len(frame_header_data) != frame_header_size:
1✔
1090
            return 0
1✔
1091
        frame = struct.unpack(binformat, frame_header_data)
1✔
1092
        frame_id = self._decode_string(frame[0])
1✔
1093
        frame_size = self._calc_size(frame[1:1 + frame_size_bytes], bits_per_byte)
1✔
1094
        if DEBUG:
1✔
1095
            print(f'Found id3 Frame {frame_id} at {fh.tell()}-{fh.tell() + frame_size} '
1✔
1096
                  f'of {self.filesize}')
1097
        if frame_size > 0:
1✔
1098
            # flags = frame[1+frame_size_bytes:] # dont care about flags.
1099
            content = fh.read(frame_size)
1✔
1100
            fieldname = self._ID3_MAPPING.get(frame_id)
1✔
1101
            should_set_field = True
1✔
1102
            if fieldname:
1✔
1103
                if not self._parse_tags:
1✔
1104
                    return frame_size
1✔
1105
                language = fieldname in {'comment', 'extra.lyrics'}
1✔
1106
                value = self._decode_string(content, language)
1✔
1107
                if not value:
1✔
1108
                    return frame_size
1✔
1109
                if fieldname == "comment":
1✔
1110
                    # check if comment is a key-value pair (used by iTunes)
1111
                    should_set_field = not self.__parse_custom_field(value)
1✔
1112
                elif fieldname in {'track', 'disc'}:
1✔
1113
                    if '/' in value:
1✔
1114
                        value, total = value.split('/')[:2]
1✔
1115
                        if total.isdecimal():
1✔
1116
                            self._set_field(f'{fieldname}_total', int(total))
1✔
1117
                    if value.isdecimal():
1✔
1118
                        self._set_field(fieldname, int(value))
1✔
1119
                    should_set_field = False
1✔
1120
                elif fieldname == 'genre':
1✔
1121
                    genre_id = 255
1✔
1122
                    # funky: id3v1 genre hidden in a id3v2 field
1123
                    if value.isdecimal():
1✔
1124
                        genre_id = int(value)
1✔
1125
                    # funkier: the TCO may contain genres in parens, e.g. '(13)'
1126
                    elif value[:1] == '(':
1✔
1127
                        end_pos = value.find(')')
1✔
1128
                        parens_text = value[1:end_pos]
1✔
1129
                        if end_pos > 0 and parens_text.isdecimal():
1✔
1130
                            genre_id = int(parens_text)
1✔
1131
                    if 0 <= genre_id < len(_ID3._ID3V1_GENRES):
1✔
1132
                        value = _ID3._ID3V1_GENRES[genre_id]
1✔
1133
                if should_set_field:
1✔
1134
                    self._set_field(fieldname, value)
1✔
1135
            elif frame_id in self._CUSTOM_FRAME_IDS:
1✔
1136
                # custom fields
1137
                if self._parse_tags:
1✔
1138
                    value = self._decode_string(content)
1✔
1139
                    if value:
1✔
1140
                        self.__parse_custom_field(value)
1✔
1141
            elif frame_id in self._IMAGE_FRAME_IDS:
1✔
1142
                if self._load_image:
1✔
1143
                    # See section 4.14: http://id3.org/id3v2.4.0-frames
1144
                    encoding = content[0:1]
1✔
1145
                    if frame_id == 'PIC':  # ID3 v2.2:
1✔
1146
                        imgformat = self._decode_string(content[1:4]).lower()
1✔
1147
                        mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat)
1✔
1148
                        desc_start_pos = 1 + 3 + 1  # skip encoding (1), imgformat (3), pictype(1)
1✔
1149
                    else:  # ID3 v2.3+
1150
                        mime_type_end_pos = content.index(b'\x00', 1)
1✔
1151
                        mime_type = self._decode_string(content[1:mime_type_end_pos]).lower()
1✔
1152
                        if mime_type in self._ID3V2_2_IMAGE_FORMATS:  # ID3 v2.2 format in v2.3...
1✔
1153
                            mime_type = self._ID3V2_2_IMAGE_FORMATS[mime_type]
1✔
1154
                        desc_start_pos = mime_type_end_pos + 1 + 1  # skip mtype, pictype(1)
1✔
1155
                    pic_type = content[desc_start_pos - 1]
1✔
1156
                    # latin1 and utf-8 are 1 byte
1157
                    termination = b'\x00' if encoding in {b'\x00', b'\x03'} else b'\x00\x00'
1✔
1158
                    desc_length = self._index_utf16(content[desc_start_pos:], termination)
1✔
1159
                    desc_end_pos = desc_start_pos + desc_length + len(termination)
1✔
1160
                    description = self._decode_string(content[desc_start_pos:desc_end_pos])
1✔
1161
                    field_name, image = self._create_tag_image(
1✔
1162
                        content[desc_end_pos:], pic_type, mime_type, description)
1163
                    self.images._set_field(field_name, image)
1✔
1164
            elif frame_id not in self._DISALLOWED_FRAME_IDS:
1✔
1165
                # unknown, try to add to extra dict
1166
                if self._parse_tags:
1✔
1167
                    value = self._decode_string(content)
1✔
1168
                    if value:
1✔
1169
                        self._set_field(self._EXTRA_PREFIX + frame_id.lower(), value)
1✔
1170
            return frame_size
1✔
1171
        return 0
1✔
1172

1173
    def _decode_string(self, bytestr: bytes, language: bool = False) -> str:
1✔
1174
        default_encoding = 'ISO-8859-1'
1✔
1175
        if self._default_encoding:
1✔
1176
            default_encoding = self._default_encoding
1✔
1177
        # it's not my fault, this is the spec.
1178
        first_byte = bytestr[:1]
1✔
1179
        if first_byte == b'\x00':  # ISO-8859-1
1✔
1180
            bytestr = bytestr[1:]
1✔
1181
            encoding = default_encoding
1✔
1182
        elif first_byte == b'\x01':  # UTF-16 with BOM
1✔
1183
            bytestr = bytestr[1:]
1✔
1184
            # remove language (but leave BOM)
1185
            if language:
1✔
1186
                if bytestr[3:5] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1187
                    bytestr = bytestr[3:]
1✔
1188
                if bytestr[:3].isalpha():
1✔
1189
                    bytestr = bytestr[3:]  # remove language
1✔
1190
                bytestr = bytestr.lstrip(b'\x00')  # strip optional additional null bytes
1✔
1191
            # read byte order mark to determine endianness
1192
            encoding = 'UTF-16be' if bytestr[0:2] == b'\xfe\xff' else 'UTF-16le'
1✔
1193
            # strip the bom if it exists
1194
            if bytestr[:2] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1195
                bytestr = bytestr[2:] if len(bytestr) % 2 == 0 else bytestr[2:-1]
1✔
1196
            # remove ADDITIONAL EXTRA BOM :facepalm:
1197
            if bytestr[:4] == b'\x00\x00\xff\xfe':
1✔
1198
                bytestr = bytestr[4:]
1✔
1199
        elif first_byte == b'\x02':  # UTF-16LE
1✔
1200
            # strip optional null byte, if byte count uneven
UNCOV
1201
            bytestr = bytestr[1:-1] if len(bytestr) % 2 == 0 else bytestr[1:]
×
UNCOV
1202
            encoding = 'UTF-16le'
×
1203
        elif first_byte == b'\x03':  # UTF-8
1✔
1204
            bytestr = bytestr[1:]
1✔
1205
            encoding = 'UTF-8'
1✔
1206
        else:
1207
            encoding = default_encoding  # wild guess
1✔
1208
        if language and bytestr[:3].isalpha():
1✔
1209
            bytestr = bytestr[3:]  # remove language
1✔
1210
        return self._unpad(bytestr.decode(encoding, 'replace'))
1✔
1211

1212
    @staticmethod
1✔
1213
    def _calc_size(bytestr: tuple[int, ...], bits_per_byte: int) -> int:
1✔
1214
        # length of some mp3 header fields is described by 7 or 8-bit-bytes
1215
        return reduce(lambda accu, elem: (accu << bits_per_byte) + elem, bytestr, 0)
1✔
1216

1217

1218
class _Ogg(TinyTag):
1✔
1219
    _VORBIS_MAPPING = {
1✔
1220
        'album': 'album',
1221
        'albumartist': 'albumartist',
1222
        'title': 'title',
1223
        'artist': 'artist',
1224
        'artists': 'artist',
1225
        'author': 'artist',
1226
        'date': 'year',
1227
        'tracknumber': 'track',
1228
        'tracktotal': 'track_total',
1229
        'totaltracks': 'track_total',
1230
        'discnumber': 'disc',
1231
        'disctotal': 'disc_total',
1232
        'totaldiscs': 'disc_total',
1233
        'genre': 'genre',
1234
        'description': 'comment',
1235
        'comment': 'comment',
1236
        'comments': 'comment',
1237
        'composer': 'composer',
1238
        'bpm': 'extra.bpm',
1239
        'copyright': 'extra.copyright',
1240
        'isrc': 'extra.isrc',
1241
        'lyrics': 'extra.lyrics',
1242
        'publisher': 'extra.publisher',
1243
        'language': 'extra.language',
1244
        'director': 'extra.director',
1245
        'website': 'extra.url',
1246
        'conductor': 'extra.conductor',
1247
        'lyricist': 'extra.lyricist',
1248
        'discsubtitle': 'extra.set_subtitle',
1249
        'setsubtitle': 'extra.set_subtitle',
1250
        'initialkey': 'extra.initial_key',
1251
        'key': 'extra.initial_key',
1252
        'encodedby': 'extra.encoded_by',
1253
        'encodersettings': 'extra.encoder_settings',
1254
        'media': 'extra.media',
1255
        'originaldate': 'extra.original_date',
1256
        'originalyear': 'extra.original_year',
1257
        'license': 'extra.license',
1258
        'barcode': 'extra.barcode',
1259
        'catalognumber': 'extra.catalog_number',
1260
    }
1261

1262
    def __init__(self) -> None:
1✔
1263
        super().__init__()
1✔
1264
        self._max_samplenum = 0  # maximum sample position ever read
1✔
1265

1266
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1267
        max_page_size = 65536  # https://xiph.org/ogg/doc/libogg/ogg_page.html
1✔
1268
        if not self._tags_parsed:
1✔
1269
            self._parse_tag(fh)  # determine sample rate
1✔
1270
            fh.seek(0)           # and rewind to start
1✔
1271
        if self.duration is not None or not self.samplerate:
1✔
1272
            return  # either ogg flac or invalid file
1✔
1273
        if self.filesize > max_page_size:
1✔
1274
            fh.seek(-max_page_size, 2)  # go to last possible page position
1✔
1275
        while True:
1✔
1276
            file_offset = fh.tell()
1✔
1277
            b = fh.read()
1✔
1278
            if len(b) < 4:
1✔
UNCOV
1279
                return  # EOF
×
1280
            if b[:4] == b'OggS':  # look for an ogg header
1✔
1281
                fh.seek(file_offset)
1✔
1282
                for _ in self._parse_pages(fh):
1✔
1283
                    pass  # parse all remaining pages
1✔
1284
                self.duration = self._max_samplenum / self.samplerate
1✔
1285
                break
1✔
1286
            idx = b.find(b'OggS')  # try to find header in peeked data
1✔
1287
            if idx != -1:
1✔
1288
                fh.seek(file_offset + idx)
1✔
1289

1290
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1291
        check_flac_second_packet = False
1✔
1292
        check_speex_second_packet = False
1✔
1293
        for packet in self._parse_pages(fh):
1✔
1294
            walker = io.BytesIO(packet)
1✔
1295
            if packet[0:7] == b"\x01vorbis":
1✔
1296
                if self._parse_duration:
1✔
1297
                    (self.channels, self.samplerate, _max_bitrate, bitrate,
1✔
1298
                     _min_bitrate) = struct.unpack("<B4i", packet[11:28])
1299
                    self.bitrate = bitrate / 1000
1✔
1300
            elif packet[0:7] == b"\x03vorbis":
1✔
1301
                if self._parse_tags:
1✔
1302
                    walker.seek(7, os.SEEK_CUR)  # jump over header name
1✔
1303
                    self._parse_vorbis_comment(walker)
1✔
1304
            elif packet[0:8] == b'OpusHead':
1✔
1305
                if self._parse_duration:  # parse opus header
1✔
1306
                    # https://www.videolan.org/developers/vlc/modules/codec/opus_header.c
1307
                    # https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/structOpusHead.html
1308
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1309
                    (version, ch, _, _sr, _, _) = struct.unpack("<BBHIHB", walker.read(11))
1✔
1310
                    if (version & 0xF0) == 0:  # only major version 0 supported
1✔
1311
                        self.channels = ch
1✔
1312
                        self.samplerate = 48000  # internally opus always uses 48khz
1✔
1313
            elif packet[0:8] == b'OpusTags':
1✔
1314
                if self._parse_tags:  # parse opus metadata:
1✔
1315
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1316
                    self._parse_vorbis_comment(walker)
1✔
1317
            elif packet[0:5] == b'\x7fFLAC':
1✔
1318
                # https://xiph.org/flac/ogg_mapping.html
1319
                walker.seek(9, os.SEEK_CUR)  # jump over header name, version and number of headers
1✔
1320
                flactag = _Flac()
1✔
1321
                flactag._filehandler = walker
1✔
1322
                flactag.filesize = self.filesize
1✔
1323
                flactag._load(tags=self._parse_tags, duration=self._parse_duration,
1✔
1324
                              image=self._load_image)
1325
                self._update(flactag)
1✔
1326
                check_flac_second_packet = True
1✔
1327
            elif check_flac_second_packet:
1✔
1328
                # second packet contains FLAC metadata block
1329
                if self._parse_tags:
1✔
1330
                    meta_header = struct.unpack('B3B', walker.read(4))
1✔
1331
                    block_type = meta_header[0] & 0x7f
1✔
1332
                    if block_type == _Flac.METADATA_VORBIS_COMMENT:
1✔
1333
                        self._parse_vorbis_comment(walker)
1✔
1334
                check_flac_second_packet = False
1✔
1335
            elif packet[0:8] == b'Speex   ':
1✔
1336
                # https://speex.org/docs/manual/speex-manual/node8.html
1337
                if self._parse_duration:
1✔
1338
                    walker.seek(36, os.SEEK_CUR)  # jump over header name and irrelevant fields
1✔
1339
                    (self.samplerate, _, _, self.channels,
1✔
1340
                     self.bitrate) = struct.unpack("<5i", walker.read(20))
1341
                check_speex_second_packet = True
1✔
1342
            elif check_speex_second_packet:
1✔
1343
                if self._parse_tags:
1✔
1344
                    length = struct.unpack('I', walker.read(4))[0]  # starts with a comment string
1✔
1345
                    comment = walker.read(length).decode('utf-8', 'replace')
1✔
1346
                    self._set_field('comment', comment)
1✔
1347
                    self._parse_vorbis_comment(walker, contains_vendor=False)  # other tags
1✔
1348
                check_speex_second_packet = False
1✔
1349
            else:
1350
                if DEBUG:
1✔
1351
                    print('Unsupported Ogg page type: ', packet[:16], file=stderr)
1✔
1352
                break
1✔
1353
        self._tags_parsed = True
1✔
1354

1355
    def _parse_vorbis_comment(self, fh: BinaryIO, contains_vendor: bool = True) -> None:
1✔
1356
        # for the spec, see: http://xiph.org/vorbis/doc/v-comment.html
1357
        # discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment
1358
        # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html
1359
        if contains_vendor:
1✔
1360
            vendor_length = struct.unpack('I', fh.read(4))[0]
1✔
1361
            fh.seek(vendor_length, os.SEEK_CUR)  # jump over vendor
1✔
1362
        elements = struct.unpack('I', fh.read(4))[0]
1✔
1363
        for _i in range(elements):
1✔
1364
            length = struct.unpack('I', fh.read(4))[0]
1✔
1365
            keyvalpair = fh.read(length).decode('utf-8', 'replace')
1✔
1366
            if '=' in keyvalpair:
1✔
1367
                key, value = keyvalpair.split('=', 1)
1✔
1368
                key_lowercase = key.lower()
1✔
1369

1370
                if key_lowercase == "metadata_block_picture" and self._load_image:
1✔
1371
                    if DEBUG:
1✔
1372
                        print('Found Vorbis TagImage', key, value[:64])
1✔
1373
                    fieldname, fieldvalue = _Flac._parse_image(io.BytesIO(base64.b64decode(value)))
1✔
1374
                    self.images._set_field(fieldname, fieldvalue)
1✔
1375
                else:
1376
                    if DEBUG:
1✔
1377
                        print('Found Vorbis Comment', key, value[:64])
1✔
1378
                    fieldname = self._VORBIS_MAPPING.get(
1✔
1379
                        key_lowercase, self._EXTRA_PREFIX + key_lowercase)  # custom field
1380
                    if fieldname in {'track', 'disc', 'track_total', 'disc_total'}:
1✔
1381
                        if fieldname in {'track', 'disc'} and '/' in value:
1✔
1382
                            value, total = value.split('/')[:2]
1✔
1383
                            if total.isdecimal():
1✔
1384
                                self._set_field(f'{fieldname}_total', int(total))
1✔
1385
                        if value.isdecimal():
1✔
1386
                            self._set_field(fieldname, int(value))
1✔
1387
                    elif value:
1✔
1388
                        self._set_field(fieldname, value)
1✔
1389

1390
    def _parse_pages(self, fh: BinaryIO) -> Iterator[bytes]:
1✔
1391
        # for the spec, see: https://wiki.xiph.org/Ogg
1392
        previous_page = b''  # contains data from previous (continuing) pages
1✔
1393
        header_data = fh.read(27)  # read ogg page header
1✔
1394
        while len(header_data) == 27:
1✔
1395
            header = struct.unpack('<4sBBqIIiB', header_data)
1✔
1396
            # https://xiph.org/ogg/doc/framing.html
1397
            oggs, version, _flags, pos, _serial, _pageseq, _crc, segments = header
1✔
1398
            self._max_samplenum = max(self._max_samplenum, pos)
1✔
1399
            if oggs != b'OggS' or version != 0:
1✔
1400
                raise ParseError('Invalid OGG header')
1✔
1401
            segsizes = struct.unpack('B' * segments, fh.read(segments))
1✔
1402
            total = 0
1✔
1403
            for segsize in segsizes:  # read all segments
1✔
1404
                total += segsize
1✔
1405
                if total < 255:  # less than 255 bytes means end of page
1✔
1406
                    yield previous_page + fh.read(total)
1✔
1407
                    previous_page = b''
1✔
1408
                    total = 0
1✔
1409
            if total != 0:
1✔
1410
                if total % 255 == 0:
1✔
UNCOV
1411
                    previous_page += fh.read(total)
×
1412
                else:
1413
                    yield previous_page + fh.read(total)
1✔
1414
                    previous_page = b''
1✔
1415
            header_data = fh.read(27)
1✔
1416

1417

1418
class _Wave(TinyTag):
1✔
1419
    # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
1420
    _RIFF_MAPPING = {
1✔
1421
        b'INAM': 'title',
1422
        b'TITL': 'title',
1423
        b'IPRD': 'album',
1424
        b'IART': 'artist',
1425
        b'IBPM': 'extra.bpm',
1426
        b'ICMT': 'comment',
1427
        b'IMUS': 'composer',
1428
        b'ICOP': 'extra.copyright',
1429
        b'ICRD': 'year',
1430
        b'IGNR': 'genre',
1431
        b'ILNG': 'extra.language',
1432
        b'ISRC': 'extra.isrc',
1433
        b'IPUB': 'extra.publisher',
1434
        b'IPRT': 'track',
1435
        b'ITRK': 'track',
1436
        b'TRCK': 'track',
1437
        b'IBSU': 'extra.url',
1438
        b'YEAR': 'year',
1439
        b'IWRI': 'extra.lyricist',
1440
        b'IENC': 'extra.encoded_by',
1441
        b'IMED': 'extra.media',
1442
    }
1443

1444
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1445
        if not self._tags_parsed:
1✔
1446
            self._parse_tag(fh)
1✔
1447

1448
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1449
        # see: http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
1450
        # and: https://en.wikipedia.org/wiki/WAV
1451
        riff, _size, fformat = struct.unpack('4sI4s', fh.read(12))
1✔
1452
        if riff != b'RIFF' or fformat != b'WAVE':
1✔
1453
            raise ParseError('Invalid WAV header')
1✔
1454
        if self._parse_duration:
1✔
1455
            self.bitdepth = 16  # assume 16bit depth (CD quality)
1✔
1456
        chunk_header = fh.read(8)
1✔
1457
        while len(chunk_header) == 8:
1✔
1458
            subchunkid, subchunksize = struct.unpack('4sI', chunk_header)
1✔
1459
            subchunksize += subchunksize % 2  # IFF chunks are padded to an even number of bytes
1✔
1460
            if subchunkid == b'fmt ' and self._parse_duration:
1✔
1461
                _, channels, samplerate = struct.unpack('HHI', fh.read(8))
1✔
1462
                _, _, bitdepth = struct.unpack('<IHH', fh.read(8))
1✔
1463
                if bitdepth == 0:
1✔
1464
                    # Certain codecs (e.g. GSM 6.10) give us a bit depth of zero.
1465
                    # Avoid division by zero when calculating duration.
1466
                    bitdepth = 1
1✔
1467
                self.bitrate = samplerate * channels * bitdepth / 1000
1✔
1468
                self.channels, self.samplerate, self.bitdepth = channels, samplerate, bitdepth
1✔
1469
                remaining_size = subchunksize - 16
1✔
1470
                if remaining_size > 0:
1✔
1471
                    fh.seek(remaining_size, 1)  # skip remaining data in chunk
1✔
1472
            elif subchunkid == b'data' and self._parse_duration:
1✔
1473
                if (self.channels is not None and self.samplerate is not None
1✔
1474
                        and self.bitdepth is not None):
1475
                    self.duration = (
1✔
1476
                        subchunksize / self.channels / self.samplerate / (self.bitdepth / 8))
1477
                fh.seek(subchunksize, 1)
1✔
1478
            elif subchunkid == b'LIST' and self._parse_tags:
1✔
1479
                is_info = fh.read(4)  # check INFO header
1✔
1480
                if is_info != b'INFO':  # jump over non-INFO sections
1✔
UNCOV
1481
                    fh.seek(subchunksize - 4, os.SEEK_CUR)
×
1482
                else:
1483
                    sub_fh = io.BytesIO(fh.read(subchunksize - 4))
1✔
1484
                    field = sub_fh.read(4)
1✔
1485
                    while len(field) == 4:
1✔
1486
                        data_length = struct.unpack('I', sub_fh.read(4))[0]
1✔
1487
                        data_length += data_length % 2  # IFF chunks are padded to an even size
1✔
1488
                        data = sub_fh.read(data_length).split(b'\x00', 1)[0]  # strip zero-byte
1✔
1489
                        fieldname = self._RIFF_MAPPING.get(field)
1✔
1490
                        if fieldname:
1✔
1491
                            value = data.decode('utf-8', 'replace')
1✔
1492
                            if fieldname == 'track':
1✔
1493
                                if value.isdecimal():
1✔
1494
                                    self._set_field(fieldname, int(value))
1✔
1495
                            else:
1496
                                self._set_field(fieldname, value)
1✔
1497
                        field = sub_fh.read(4)
1✔
1498
            elif subchunkid in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1499
                id3 = _ID3()
1✔
1500
                id3._filehandler = fh
1✔
1501
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1502
                self._update(id3)
1✔
1503
            else:  # some other chunk, just skip the data
1504
                fh.seek(subchunksize, 1)
1✔
1505
            chunk_header = fh.read(8)
1✔
1506
        self._tags_parsed = True
1✔
1507

1508

1509
class _Flac(TinyTag):
1✔
1510
    METADATA_STREAMINFO = 0
1✔
1511
    METADATA_PADDING = 1
1✔
1512
    METADATA_APPLICATION = 2
1✔
1513
    METADATA_SEEKTABLE = 3
1✔
1514
    METADATA_VORBIS_COMMENT = 4
1✔
1515
    METADATA_CUESHEET = 5
1✔
1516
    METADATA_PICTURE = 6
1✔
1517

1518
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1519
        if not self._tags_parsed:
1✔
1520
            self._parse_tag(fh)
1✔
1521

1522
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1523
        id3 = None
1✔
1524
        header = fh.read(4)
1✔
1525
        if header[:3] == b'ID3':  # parse ID3 header if it exists
1✔
1526
            fh.seek(-4, os.SEEK_CUR)
1✔
1527
            id3 = _ID3()
1✔
1528
            id3._filehandler = fh
1✔
1529
            id3._parse_tags = self._parse_tags
1✔
1530
            id3._load_image = self._load_image
1✔
1531
            id3._parse_id3v2(fh)
1✔
1532
            header = fh.read(4)  # after ID3 should be fLaC
1✔
1533
        if header[:4] != b'fLaC':
1✔
1534
            raise ParseError('Invalid FLAC header')
1✔
1535
        # for spec, see https://xiph.org/flac/ogg_mapping.html
1536
        header_data = fh.read(4)
1✔
1537
        while len(header_data) == 4:
1✔
1538
            meta_header = struct.unpack('B3B', header_data)
1✔
1539
            block_type = meta_header[0] & 0x7f
1✔
1540
            is_last_block = meta_header[0] & 0x80
1✔
1541
            size = self._bytes_to_int(meta_header[1:4])
1✔
1542
            # http://xiph.org/flac/format.html#metadata_block_streaminfo
1543
            if block_type == self.METADATA_STREAMINFO and self._parse_duration:
1✔
1544
                stream_info_header = fh.read(size)
1✔
1545
                if len(stream_info_header) < 34:  # invalid streaminfo
1✔
1546
                    break
1✔
1547
                header_values = struct.unpack('HH3s3s8B16s', stream_info_header)
1✔
1548
                # From the xiph documentation:
1549
                # py | <bits>
1550
                # ----------------------------------------------
1551
                # H  | <16>  The minimum block size (in samples)
1552
                # H  | <16>  The maximum block size (in samples)
1553
                # 3s | <24>  The minimum frame size (in bytes)
1554
                # 3s | <24>  The maximum frame size (in bytes)
1555
                # 8B | <20>  Sample rate in Hz.
1556
                #    | <3>   (number of channels)-1.
1557
                #    | <5>   (bits per sample)-1.
1558
                #    | <36>  Total samples in stream.
1559
                # 16s| <128> MD5 signature
1560
                # min_blk, max_blk, min_frm, max_frm = header[0:4]
1561
                # min_frm = self._bytes_to_int(struct.unpack('3B', min_frm))
1562
                # max_frm = self._bytes_to_int(struct.unpack('3B', max_frm))
1563
                #                 channels--.  bits      total samples
1564
                # |----- samplerate -----| |-||----| |---------~   ~----|
1565
                # 0000 0000 0000 0000 0000 0000 0000 0000 0000      0000
1566
                # #---4---# #---5---# #---6---# #---7---# #--8-~   ~-12-#
1567
                self.samplerate = self._bytes_to_int(header_values[4:7]) >> 4
1✔
1568
                self.channels = ((header_values[6] >> 1) & 0x07) + 1
1✔
1569
                self.bitdepth = (
1✔
1570
                    ((header_values[6] & 1) << 4) + ((header_values[7] & 0xF0) >> 4) + 1)
1571
                total_sample_bytes = ((header_values[7] & 0x0F),) + header_values[8:12]
1✔
1572
                total_samples = self._bytes_to_int(total_sample_bytes)
1✔
1573
                self.duration = total_samples / self.samplerate
1✔
1574
                if self.duration > 0:
1✔
1575
                    self.bitrate = self.filesize / self.duration * 8 / 1000
1✔
1576
            elif block_type == self.METADATA_VORBIS_COMMENT and self._parse_tags:
1✔
1577
                oggtag = _Ogg()
1✔
1578
                oggtag._filehandler = fh
1✔
1579
                oggtag._parse_vorbis_comment(fh)
1✔
1580
                self._update(oggtag)
1✔
1581
            elif block_type == self.METADATA_PICTURE and self._load_image:
1✔
1582
                fieldname, value = self._parse_image(fh)
1✔
1583
                self.images._set_field(fieldname, value)
1✔
1584
            elif block_type >= 127:
1✔
UNCOV
1585
                break  # invalid block type
×
1586
            else:
1587
                if DEBUG:
1✔
1588
                    print('Unknown FLAC block type', block_type)
1✔
1589
                fh.seek(size, 1)  # seek over this block
1✔
1590

1591
            if is_last_block:
1✔
1592
                break
1✔
1593
            header_data = fh.read(4)
1✔
1594
        if id3 is not None:  # apply ID3 tags after vorbis
1✔
1595
            self._update(id3)
1✔
1596
        self._tags_parsed = True
1✔
1597

1598
    @classmethod
1✔
1599
    def _parse_image(cls, fh: BinaryIO) -> tuple[str, TagImage]:
1✔
1600
        # https://xiph.org/flac/format.html#metadata_block_picture
1601
        pic_type, mime_type_len = struct.unpack('>2I', fh.read(8))
1✔
1602
        mime_type = fh.read(mime_type_len).decode('utf-8', 'replace')
1✔
1603
        description_len = struct.unpack('>I', fh.read(4))[0]
1✔
1604
        description = fh.read(description_len).decode('utf-8', 'replace')
1✔
1605
        _width, _height, _depth, _colors, pic_len = struct.unpack('>5I', fh.read(20))
1✔
1606
        return _ID3._create_tag_image(fh.read(pic_len), pic_type, mime_type, description)
1✔
1607

1608

1609
class _Wma(TinyTag):
1✔
1610
    # see:
1611
    # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx
1612
    # and (japanese, but none the less helpful)
1613
    # http://uguisu.skr.jp/Windows/format_asf.html
1614
    _ASF_MAPPING = {
1✔
1615
        'WM/ARTISTS': 'artist',
1616
        'WM/TrackNumber': 'track',
1617
        'WM/PartOfSet': 'disc',
1618
        'WM/Year': 'year',
1619
        'WM/AlbumArtist': 'albumartist',
1620
        'WM/Genre': 'genre',
1621
        'WM/AlbumTitle': 'album',
1622
        'WM/Composer': 'composer',
1623
        'WM/Publisher': 'extra.publisher',
1624
        'WM/BeatsPerMinute': 'extra.bpm',
1625
        'WM/InitialKey': 'extra.initial_key',
1626
        'WM/Lyrics': 'extra.lyrics',
1627
        'WM/Language': 'extra.language',
1628
        'WM/Director': 'extra.director',
1629
        'WM/AuthorURL': 'extra.url',
1630
        'WM/ISRC': 'extra.isrc',
1631
        'WM/Conductor': 'extra.conductor',
1632
        'WM/Writer': 'extra.lyricist',
1633
        'WM/SetSubTitle': 'extra.set_subtitle',
1634
        'WM/EncodedBy': 'extra.encoded_by',
1635
        'WM/EncodingSettings': 'extra.encoder_settings',
1636
        'WM/Media': 'extra.media',
1637
        'WM/OriginalReleaseTime': 'extra.original_date',
1638
        'WM/OriginalReleaseYear': 'extra.original_year',
1639
        'WM/Barcode': 'extra.barcode',
1640
        'WM/CatalogNo': 'extra.catalog_number',
1641
    }
1642
    _ASF_CONTENT_DESCRIPTION_OBJECT = b'3&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
1✔
1643
    _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT = (b'@\xa4\xd0\xd2\x07\xe3\xd2\x11\x97\xf0\x00'
1✔
1644
                                                b'\xa0\xc9^\xa8P')
1645
    _STREAM_BITRATE_PROPERTIES_OBJECT = b'\xceu\xf8{\x8dF\xd1\x11\x8d\x82\x00`\x97\xc9\xa2\xb2'
1✔
1646
    _ASF_FILE_PROPERTY_OBJECT = b'\xa1\xdc\xab\x8cG\xa9\xcf\x11\x8e\xe4\x00\xc0\x0c Se'
1✔
1647
    _ASF_STREAM_PROPERTIES_OBJECT = b'\x91\x07\xdc\xb7\xb7\xa9\xcf\x11\x8e\xe6\x00\xc0\x0c Se'
1✔
1648
    _STREAM_TYPE_ASF_AUDIO_MEDIA = b'@\x9ei\xf8M[\xcf\x11\xa8\xfd\x00\x80_\\D+'
1✔
1649

1650
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1651
        if not self._tags_parsed:
1✔
1652
            self._parse_tag(fh)
1✔
1653

1654
    def _decode_string(self, bytestring: bytes) -> str:
1✔
1655
        return self._unpad(bytestring.decode('utf-16', 'replace'))
1✔
1656

1657
    def _decode_ext_desc(self, value_type: int, value: bytes) -> str | None:
1✔
1658
        """ decode _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT values"""
1659
        if value_type == 0:  # Unicode string
1✔
1660
            return self._decode_string(value)
1✔
1661
        if 1 < value_type < 6:  # DWORD / QWORD / WORD
1✔
1662
            return str(self._bytes_to_int_le(value))
1✔
UNCOV
1663
        return None
×
1664

1665
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1666
        header = fh.read(30)
1✔
1667
        # http://www.garykessler.net/library/file_sigs.html
1668
        # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958
1669
        if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'  # 128 bit GUID
1✔
1670
                or header[-1:] != b'\x02'):
1671
            raise ParseError('Invalid WMA header')
1✔
1672
        while True:
1✔
1673
            object_id = fh.read(16)
1✔
1674
            object_size = self._bytes_to_int_le(fh.read(8))
1✔
1675
            if object_size == 0 or object_size > self.filesize:
1✔
1676
                break  # invalid object, stop parsing.
1✔
1677
            if object_id == self._ASF_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1678
                title_length = self._bytes_to_int_le(fh.read(2))
1✔
1679
                author_length = self._bytes_to_int_le(fh.read(2))
1✔
1680
                copyright_length = self._bytes_to_int_le(fh.read(2))
1✔
1681
                description_length = self._bytes_to_int_le(fh.read(2))
1✔
1682
                rating_length = self._bytes_to_int_le(fh.read(2))
1✔
1683
                data_blocks = {
1✔
1684
                    'title': title_length,
1685
                    'artist': author_length,
1686
                    'extra.copyright': copyright_length,
1687
                    'comment': description_length,
1688
                    '_rating': rating_length,
1689
                }
1690
                for i_field_name, length in data_blocks.items():
1✔
1691
                    bytestring = fh.read(length)
1✔
1692
                    value = self._decode_string(bytestring)
1✔
1693
                    if not i_field_name.startswith('_') and value:
1✔
1694
                        self._set_field(i_field_name, value)
1✔
1695
            elif object_id == self._ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1696
                # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195
1697
                descriptor_count = self._bytes_to_int_le(fh.read(2))
1✔
1698
                for _ in range(descriptor_count):
1✔
1699
                    name_len = self._bytes_to_int_le(fh.read(2))
1✔
1700
                    name = self._decode_string(fh.read(name_len))
1✔
1701
                    value_type = self._bytes_to_int_le(fh.read(2))
1✔
1702
                    value_len = self._bytes_to_int_le(fh.read(2))
1✔
1703
                    if value_type == 1:
1✔
1704
                        fh.seek(value_len, os.SEEK_CUR)  # skip byte values
1✔
1705
                        continue
1✔
1706
                    field_name = self._ASF_MAPPING.get(name)  # try to get normalized field name
1✔
1707
                    if field_name is None:  # custom field
1✔
1708
                        if name.startswith('WM/'):
1✔
1709
                            name = name[3:]
1✔
1710
                        field_name = self._EXTRA_PREFIX + name.lower()
1✔
1711
                    field_value = self._decode_ext_desc(value_type, fh.read(value_len))
1✔
1712
                    if field_value is not None:
1✔
1713
                        if field_name in {'track', 'disc'}:
1✔
1714
                            if isinstance(field_value, int) or field_value.isdecimal():
1✔
1715
                                self._set_field(field_name, int(field_value))
1✔
1716
                        elif field_value:
1✔
1717
                            self._set_field(field_name, field_value)
1✔
1718
            elif object_id == self._ASF_FILE_PROPERTY_OBJECT and self._parse_duration:
1✔
1719
                fh.seek(40, os.SEEK_CUR)
1✔
1720
                play_duration = self._bytes_to_int_le(fh.read(8)) / 10000000
1✔
1721
                fh.seek(8, os.SEEK_CUR)
1✔
1722
                preroll = self._bytes_to_int_le(fh.read(8)) / 1000
1✔
1723
                fh.seek(16, os.SEEK_CUR)
1✔
1724
                # According to the specification, we need to subtract the preroll from play_duration
1725
                # to get the actual duration of the file
1726
                self.duration = max(play_duration - preroll, 0.0)
1✔
1727
            elif object_id == self._ASF_STREAM_PROPERTIES_OBJECT and self._parse_duration:
1✔
1728
                stream_type = fh.read(16)
1✔
1729
                fh.seek(24, os.SEEK_CUR)  # skip irrelevant fields
1✔
1730
                type_specific_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1731
                error_correction_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1732
                fh.seek(6, os.SEEK_CUR)   # skip irrelevant fields
1✔
1733
                already_read = 0
1✔
1734
                if stream_type == self._STREAM_TYPE_ASF_AUDIO_MEDIA:
1✔
1735
                    codec_id_format_tag = self._bytes_to_int_le(fh.read(2))
1✔
1736
                    self.channels = self._bytes_to_int_le(fh.read(2))
1✔
1737
                    self.samplerate = self._bytes_to_int_le(fh.read(4))
1✔
1738
                    avg_bytes_per_second = self._bytes_to_int_le(fh.read(4))
1✔
1739
                    self.bitrate = avg_bytes_per_second * 8 / 1000
1✔
1740
                    fh.seek(2, os.SEEK_CUR)  # skip irrelevant field
1✔
1741
                    bits_per_sample = self._bytes_to_int_le(fh.read(2))
1✔
1742
                    if codec_id_format_tag == 355:  # lossless
1✔
1743
                        self.bitdepth = bits_per_sample
1✔
1744
                    already_read = 16
1✔
1745
                fh.seek(type_specific_data_length - already_read, os.SEEK_CUR)
1✔
1746
                fh.seek(error_correction_data_length, os.SEEK_CUR)
1✔
1747
            else:
1748
                fh.seek(object_size - 24, os.SEEK_CUR)  # read over onknown object ids
1✔
1749
        self._tags_parsed = True
1✔
1750

1751

1752
class _Aiff(TinyTag):
1✔
1753
    #
1754
    # AIFF is part of the IFF family of file formats.
1755
    #
1756
    # https://en.wikipedia.org/wiki/Audio_Interchange_File_Format#Data_format
1757
    # https://web.archive.org/web/20171118222232/http://www-mmsp.ece.mcgill.ca/documents/audioformats/aiff/aiff.html
1758
    # https://web.archive.org/web/20071219035740/http://www.cnpbagwell.com/aiff-c.txt
1759
    #
1760
    # A few things about the spec:
1761
    #
1762
    # * IFF strings are not supposed to be null terminated.  They sometimes are.
1763
    # * Some tools might throw more metadata into the ANNO chunk but it is
1764
    #   wildly unreliable to count on it. In fact, the official spec recommends against
1765
    #   using it. That said... this code throws the ANNO field into comment and hopes
1766
    #   for the best.
1767
    #
1768
    # The key thing here is that AIFF metadata is usually in a handful of fields
1769
    # and the rest is an ID3 or XMP field.  XMP is too complicated and only Adobe-related
1770
    # products support it. The vast majority use ID3. As such, this code inherits from
1771
    # ID3 rather than TinyTag since it does everything that needs to be done here.
1772
    #
1773

1774
    _AIFF_MAPPING = {
1✔
1775
        #
1776
        # "Name Chunk text contains the name of the sampled sound."
1777
        #
1778
        # "Author Chunk text contains one or more author names.  An author in
1779
        # this case is the creator of a sampled sound."
1780
        #
1781
        # "Annotation Chunk text contains a comment.  Use of this chunk is
1782
        # discouraged within FORM AIFC." Some tools: "hold my beer"
1783
        #
1784
        # "The Copyright Chunk contains a copyright notice for the sound.  text
1785
        #  contains a date followed by the copyright owner.  The chunk ID '[c] '
1786
        # serves as the copyright character. " Some tools: "hold my beer"
1787
        #
1788
        b'NAME': 'title',
1789
        b'AUTH': 'artist',
1790
        b'ANNO': 'comment',
1791
        b'(c) ': 'extra.copyright',
1792
    }
1793

1794
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1795
        chunk_id, _size, form = struct.unpack('>4sI4s', fh.read(12))
1✔
1796
        if chunk_id != b'FORM' or form not in (b'AIFC', b'AIFF'):
1✔
1797
            raise ParseError('Invalid AIFF header')
1✔
1798
        chunk_header = fh.read(8)
1✔
1799
        while len(chunk_header) == 8:
1✔
1800
            sub_chunk_id, sub_chunk_size = struct.unpack('>4sI', chunk_header)
1✔
1801
            sub_chunk_size += sub_chunk_size % 2  # IFF chunks are padded to an even number of bytes
1✔
1802
            if sub_chunk_id in self._AIFF_MAPPING and self._parse_tags:
1✔
1803
                value = self._unpad(fh.read(sub_chunk_size).decode('utf-8', 'replace'))
1✔
1804
                self._set_field(self._AIFF_MAPPING[sub_chunk_id], value)
1✔
1805
            elif sub_chunk_id == b'COMM' and self._parse_duration:
1✔
1806
                channels, num_frames, bitdepth = struct.unpack('>hLh', fh.read(8))
1✔
1807
                self.channels, self.bitdepth = channels, bitdepth
1✔
1808
                try:
1✔
1809
                    exponent, mantissa = struct.unpack('>HQ', fh.read(10))   # Extended precision
1✔
1810
                    samplerate = int(mantissa * (2 ** (exponent - 0x3FFF - 63)))
1✔
1811
                    duration = num_frames / samplerate
1✔
1812
                    bitrate = samplerate * channels * bitdepth / 1000
1✔
1813
                    self.samplerate, self.duration, self.bitrate = samplerate, duration, bitrate
1✔
1814
                except OverflowError:
1✔
1815
                    pass
1✔
1816
                fh.seek(sub_chunk_size - 18, 1)  # skip remaining data in chunk
1✔
1817
            elif sub_chunk_id in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1818
                id3 = _ID3()
1✔
1819
                id3._filehandler = fh
1✔
1820
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1821
                self._update(id3)
1✔
1822
            else:  # some other chunk, just skip the data
1823
                fh.seek(sub_chunk_size, 1)
1✔
1824
            chunk_header = fh.read(8)
1✔
1825
        self._tags_parsed = True
1✔
1826

1827
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1828
        if not self._tags_parsed:
1✔
1829
            self._parse_tag(fh)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc