• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

devsnd / tinytag / 9284673027

29 May 2024 10:42AM UTC coverage: 98.712% (-0.2%) from 98.863%
9284673027

Pull #209

github

web-flow
Merge 48391643b into 5b966007c
Pull Request #209: Allow reading multiple extra fields of same type

171 of 175 new or added lines in 4 files covered. (97.71%)

1 existing line in 1 file now uncovered.

1456 of 1475 relevant lines covered (98.71%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.42
/tinytag/tinytag.py
1
# tinytag - an audio file metadata reader
2
# Copyright (c) 2014-2023 Tom Wallroth
3
# Copyright (c) 2021-2024 Mat (mathiascode)
4
#
5
# Sources on GitHub:
6
# http://github.com/devsnd/tinytag/
7

8
# MIT License
9

10
# Copyright (c) 2014-2024 Tom Wallroth, Mat (mathiascode)
11

12
# Permission is hereby granted, free of charge, to any person obtaining a copy
13
# of this software and associated documentation files (the "Software"), to deal
14
# in the Software without restriction, including without limitation the rights
15
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16
# copies of the Software, and to permit persons to whom the Software is
17
# furnished to do so, subject to the following conditions:
18

19
# The above copyright notice and this permission notice shall be included in all
20
# copies or substantial portions of the Software.
21

22
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28
# SOFTWARE.
29

30
"""Audio file metadata reader"""
1✔
31

32
# pylint: disable=invalid-name,protected-access
33
# pylint: disable=too-many-lines,too-many-arguments,too-many-boolean-expressions
34
# pylint: disable=too-many-branches,too-many-instance-attributes,too-many-locals
35
# pylint: disable=too-many-nested-blocks,too-many-statements,too-few-public-methods
36

37

38
from __future__ import annotations
1✔
39
from collections.abc import Callable, Iterator
1✔
40
from functools import reduce
1✔
41
from os import PathLike
1✔
42
from sys import stderr
1✔
43
from typing import Any, BinaryIO
1✔
44
from warnings import warn
1✔
45

46
import base64
1✔
47
import io
1✔
48
import os
1✔
49
import re
1✔
50
import struct
1✔
51

52

53
DEBUG = bool(os.environ.get('TINYTAG_DEBUG'))  # some of the parsers can print debug info
1✔
54

55

56
class TinyTagException(Exception):
1✔
57
    """Base class for exceptions."""
1✔
58

59

60
class ParseError(TinyTagException):
1✔
61
    """Parsing an audio file failed."""
1✔
62

63

64
class UnsupportedFormatError(TinyTagException):
1✔
65
    """File format is not supported."""
1✔
66

67

68
class TinyTag:
1✔
69
    """A class containing audio file metadata."""
1✔
70

71
    SUPPORTED_FILE_EXTENSIONS = (
1✔
72
        '.mp1', '.mp2', '.mp3',
73
        '.oga', '.ogg', '.opus', '.spx',
74
        '.wav', '.flac', '.wma',
75
        '.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc',
76
        '.aiff', '.aifc', '.aif', '.afc'
77
    )
78
    _EXTRA_PREFIX = 'extra.'
1✔
79
    _file_extension_mapping: dict[tuple[str, ...], type[TinyTag]] | None = None
1✔
80
    _magic_bytes_mapping: dict[bytes, type[TinyTag]] | None = None
1✔
81

82
    def __init__(self) -> None:
1✔
83
        self.filename: bytes | str | PathLike[Any] | None = None
1✔
84
        self.filesize = 0
1✔
85
        self.duration: float | None = None
1✔
86
        self.channels: int | None = None
1✔
87
        self.bitrate: float | None = None
1✔
88
        self.bitdepth: int | None = None
1✔
89
        self.samplerate: int | None = None
1✔
90
        self.artist: str | None = None
1✔
91
        self.albumartist: str | None = None
1✔
92
        self.composer: str | None = None
1✔
93
        self.album: str | None = None
1✔
94
        self.disc: int | None = None
1✔
95
        self.disc_total: int | None = None
1✔
96
        self.title: str | None = None
1✔
97
        self.track: int | None = None
1✔
98
        self.track_total: int | None = None
1✔
99
        self.genre: str | None = None
1✔
100
        self.year: str | None = None
1✔
101
        self.comment: str | None = None
1✔
102
        self.extra: dict[str, list[str]] = {}
1✔
103
        self.images = TagImages()
1✔
104
        self._filehandler: BinaryIO | None = None
1✔
105
        self._default_encoding: str | None = None  # allow override for some file formats
1✔
106
        self._parse_duration = True
1✔
107
        self._parse_tags = True
1✔
108
        self._load_image = False
1✔
109
        self._tags_parsed = False
1✔
110

111
    def __repr__(self) -> str:
1✔
112
        return str(self.as_dict(flatten=False))
1✔
113

114
    @classmethod
1✔
115
    def get(cls,
1✔
116
            filename: bytes | str | PathLike[Any] | None = None,
117
            tags: bool = True,
118
            duration: bool = True,
119
            image: bool = False,
120
            encoding: str | None = None,
121
            file_obj: BinaryIO | None = None,
122
            **kwargs: Any) -> TinyTag:
123
        """Return a tag object for an audio file."""
124
        should_close_file = file_obj is None
1✔
125
        if filename and should_close_file:
1✔
126
            file_obj = open(filename, 'rb')  # pylint: disable=consider-using-with
1✔
127
        if file_obj is None:
1✔
128
            raise ValueError('Either filename or file_obj argument is required')
1✔
129
        if 'ignore_errors' in kwargs:
1✔
130
            warn('ignore_errors argument is obsolete, and will be removed in a future '
1✔
131
                 '2.x release', DeprecationWarning, stacklevel=2)
132
        try:
1✔
133
            file_obj.seek(0, os.SEEK_END)
1✔
134
            filesize = file_obj.tell()
1✔
135
            file_obj.seek(0)
1✔
136
            parser_class = cls._get_parser_class(filename, file_obj)
1✔
137
            tag = parser_class()
1✔
138
            tag._filehandler = file_obj
1✔
139
            tag._default_encoding = encoding
1✔
140
            tag.filename = filename
1✔
141
            tag.filesize = filesize
1✔
142
            if filesize > 0:
1✔
143
                try:
1✔
144
                    tag._load(tags=tags, duration=duration, image=image)
1✔
145
                except Exception as exc:
1✔
146
                    raise ParseError(exc) from exc
1✔
147
            return tag
1✔
148
        finally:
149
            if should_close_file:
1✔
150
                file_obj.close()
1✔
151

152
    @classmethod
1✔
153
    def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
1✔
154
        """Check if a specific file is supported based on its file extension."""
155
        return cls._get_parser_for_filename(filename) is not None
1✔
156

157
    def as_dict(self, flatten: bool = True) -> dict[
1✔
158
        str,
159
        str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
160
    ]:
161
        """Return a dictionary representation of the tag."""
162
        fields: dict[
1✔
163
            str,
164
            str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
165
        ] = {}
166
        for key, value in self.__dict__.items():
1✔
167
            if key.startswith('_'):
1✔
168
                continue
1✔
169
            if flatten and key == 'extra':
1✔
170
                for extra_key, extra_values in value.items():
1✔
171
                    if extra_key in fields:
1✔
NEW
172
                        fields[extra_key] += extra_values
×
173
                    else:
174
                        fields[extra_key] = extra_values
1✔
175
                continue
1✔
176
            if key == 'images':
1✔
177
                value = value.as_dict(flatten)
1✔
178
            if value is None:
1✔
179
                continue
1✔
180
            if flatten and key != 'filename' and isinstance(value, str):
1✔
181
                fields[key] = [value]
1✔
182
            else:
183
                fields[key] = value
1✔
184
        return fields
1✔
185

186
    @classmethod
1✔
187
    def _get_parser_for_filename(
1✔
188
            cls, filename: bytes | str | PathLike[Any]) -> type[TinyTag] | None:
189
        if cls._file_extension_mapping is None:
1✔
190
            cls._file_extension_mapping = {
1✔
191
                ('.mp1', '.mp2', '.mp3'): _ID3,
192
                ('.oga', '.ogg', '.opus', '.spx'): _Ogg,
193
                ('.wav',): _Wave,
194
                ('.flac',): _Flac,
195
                ('.wma',): _Wma,
196
                ('.m4b', '.m4a', '.m4r', '.m4v', '.mp4', '.aax', '.aaxc'): _MP4,
197
                ('.aiff', '.aifc', '.aif', '.afc'): _Aiff,
198
            }
199
        filename = os.fsdecode(filename).lower()
1✔
200
        for ext, tagclass in cls._file_extension_mapping.items():
1✔
201
            if filename.endswith(ext):
1✔
202
                return tagclass
1✔
203
        return None
1✔
204

205
    @classmethod
1✔
206
    def _get_parser_for_file_handle(cls, fh: BinaryIO) -> type[TinyTag] | None:
1✔
207
        # https://en.wikipedia.org/wiki/List_of_file_signatures
208
        if cls._magic_bytes_mapping is None:
1✔
209
            cls._magic_bytes_mapping = {
1✔
210
                b'^ID3': _ID3,
211
                b'^\xff\xfb': _ID3,
212
                b'^OggS.........................FLAC': _Ogg,
213
                b'^OggS........................Opus': _Ogg,
214
                b'^OggS........................Speex': _Ogg,
215
                b'^OggS.........................vorbis': _Ogg,
216
                b'^RIFF....WAVE': _Wave,
217
                b'^fLaC': _Flac,
218
                b'^\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C': _Wma,
219
                b'....ftypM4A': _MP4,  # https://www.file-recovery.com/m4a-signature-format.htm
220
                b'....ftypaax': _MP4,  # Audible proprietary M4A container
221
                b'....ftypaaxc': _MP4,  # Audible proprietary M4A container
222
                b'\xff\xf1': _MP4,  # https://www.garykessler.net/library/file_sigs.html
223
                b'^FORM....AIFF': _Aiff,
224
                b'^FORM....AIFC': _Aiff,
225
            }
226
        header = fh.read(max(len(sig) for sig in cls._magic_bytes_mapping))
1✔
227
        fh.seek(0)
1✔
228
        for magic, parser in cls._magic_bytes_mapping.items():
1✔
229
            if re.match(magic, header):
1✔
230
                return parser
1✔
231
        return None
1✔
232

233
    @classmethod
1✔
234
    def _get_parser_class(cls, filename: bytes | str | PathLike[Any] | None = None,
1✔
235
                          filehandle: BinaryIO | None = None) -> type[TinyTag]:
236
        if cls != TinyTag:  # if `get` is invoked on TinyTag, find parser by ext
1✔
237
            return cls  # otherwise use the class on which `get` was invoked
1✔
238
        if filename:
1✔
239
            parser_class = cls._get_parser_for_filename(filename)
1✔
240
            if parser_class is not None:
1✔
241
                return parser_class
1✔
242
        # try determining the file type by magic byte header
243
        if filehandle:
1✔
244
            parser_class = cls._get_parser_for_file_handle(filehandle)
1✔
245
            if parser_class is not None:
1✔
246
                return parser_class
1✔
247
        raise UnsupportedFormatError('No tag reader found to support file type')
1✔
248

249
    def _load(self, tags: bool, duration: bool, image: bool = False) -> None:
1✔
250
        self._parse_tags = tags
1✔
251
        self._parse_duration = duration
1✔
252
        self._load_image = image
1✔
253
        if self._filehandler is None:
1✔
254
            return
1✔
255
        if tags:
1✔
256
            self._parse_tag(self._filehandler)
1✔
257
        if duration:
1✔
258
            if tags:  # rewind file if the tags were already parsed
1✔
259
                self._filehandler.seek(0)
1✔
260
            self._determine_duration(self._filehandler)
1✔
261

262
    def _set_field(self, fieldname: str, value: str | int | float) -> None:
1✔
263
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
264
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
265
            extra_values = self.extra.get(fieldname, [])
1✔
266
            if not isinstance(value, str) or value in extra_values:
1✔
267
                return
1✔
268
            extra_values.append(value)
1✔
269
            if DEBUG:
1✔
270
                print(f'Setting extra field "{fieldname}" to "{extra_values!r}"')
1✔
271
            self.extra[fieldname] = extra_values
1✔
272
            return
1✔
273
        old_value = self.__dict__.get(fieldname)
1✔
274
        new_value = value
1✔
275
        if isinstance(new_value, str):
1✔
276
            # First value goes in tag, others in tag.extra
277
            values = new_value.split('\x00')
1✔
278
            new_value = values[0]
1✔
279
            start_pos = 0 if old_value else 1
1✔
280
            if len(values) > 1:
1✔
281
                for i_value in values[start_pos:]:
1✔
282
                    self._set_field(self._EXTRA_PREFIX + fieldname, i_value)
1✔
283
            elif old_value and new_value != old_value:
1✔
284
                self._set_field(self._EXTRA_PREFIX + fieldname, new_value)
1✔
285
                return
1✔
286
            if old_value:
1✔
287
                return
1✔
288
        elif not new_value and old_value:
1✔
289
            # Prioritize non-zero integer values
290
            return
1✔
291
        if DEBUG:
1✔
292
            print(f'Setting field "{fieldname}" to "{new_value!r}"')
1✔
293
        self.__dict__[fieldname] = new_value
1✔
294

295
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
296
        raise NotImplementedError
1✔
297

298
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
299
        raise NotImplementedError
1✔
300

301
    def _update(self, other: TinyTag) -> None:
1✔
302
        # update the values of this tag with the values from another tag
303
        for key, value in other.as_dict(flatten=False).items():
1✔
304
            if isinstance(value, dict):
1✔
305
                if key != 'extra':
1✔
306
                    continue
1✔
307
                for extra_key, extra_values in value.items():
1✔
308
                    for extra_value in extra_values:
1✔
309
                        if isinstance(extra_value, str):
1✔
310
                            self._set_field(self._EXTRA_PREFIX + extra_key, extra_value)
1✔
311
                continue
1✔
312
            if value is not None and not isinstance(value, list):
1✔
313
                self._set_field(key, value)
1✔
314
        self.images._update(other.images)
1✔
315

316
    @staticmethod
1✔
317
    def _bytes_to_int_le(b: bytes) -> int:
1✔
318
        fmt = {1: '<B', 2: '<H', 4: '<I', 8: '<Q'}.get(len(b))
1✔
319
        result: int = struct.unpack(fmt, b)[0] if fmt is not None else 0
1✔
320
        return result
1✔
321

322
    @staticmethod
1✔
323
    def _bytes_to_int(b: tuple[int, ...]) -> int:
1✔
324
        return reduce(lambda accu, elem: (accu << 8) + elem, b, 0)
1✔
325

326
    @staticmethod
1✔
327
    def _unpad(s: str) -> str:
1✔
328
        # strings in mp3 and asf *may* be terminated with a zero byte at the end
329
        return s.strip('\x00')
1✔
330

331
    def get_image(self) -> bytes | None:
1✔
332
        """Deprecated, use images.any instead."""
333
        warn('get_image() is deprecated, and will be removed in a future 2.x release. '
1✔
334
             'Use images.any instead.', DeprecationWarning, stacklevel=2)
335
        image = self.images.any
1✔
336
        return image.data if image is not None else None
1✔
337

338
    @property
1✔
339
    def audio_offset(self) -> None:
1✔
340
        """Obsolete."""
341
        warn('audio_offset attribute is obsolete, and will be '
1✔
342
             'removed in a future 2.x release', DeprecationWarning, stacklevel=2)
343

344

345
class TagImages:
1✔
346
    """A class containing images embedded in an audio file."""
1✔
347
    _EXTRA_PREFIX = 'extra.'
1✔
348

349
    def __init__(self) -> None:
1✔
350
        self.front_cover: list[TagImage] = []
1✔
351
        self.back_cover: list[TagImage] = []
1✔
352
        self.leaflet: list[TagImage] = []
1✔
353
        self.media: list[TagImage] = []
1✔
354
        self.other: list[TagImage] = []
1✔
355
        self.extra: dict[str, list[TagImage]] = {}
1✔
356

357
    def __repr__(self) -> str:
1✔
NEW
358
        return str(self.as_dict(flatten=False))
×
359

360
    @property
1✔
361
    def any(self) -> TagImage | None:
1✔
362
        """Return a cover image.
363
        If not present, fall back to any other available image.
364
        """
365
        for image_list in self.as_dict(flatten=True).values():
1✔
366
            for image in image_list:
1✔
367
                return image
1✔
368
        return None
1✔
369

370
    def as_dict(self, flatten: bool = True) -> dict[str, list[TagImage]]:
1✔
371
        """Return a dictionary representation of the tag images."""
372
        images: dict[str, list[TagImage]] = {}
1✔
373
        for key, value in self.__dict__.items():
1✔
374
            if key.startswith('_'):
1✔
NEW
375
                continue
×
376
            if flatten and key == 'extra':
1✔
377
                for extra_key, extra_values in value.items():
1✔
378
                    if extra_key in images:
1✔
NEW
379
                        images[extra_key] += extra_values
×
380
                    else:
381
                        images[extra_key] = extra_values
1✔
382
                continue
1✔
383
            if value or key == 'extra':
1✔
384
                images[key] = value
1✔
385
        return images
1✔
386

387
    def _set_field(self, fieldname: str, value: TagImage) -> None:
1✔
388
        write_dest = self.__dict__
1✔
389
        if fieldname.startswith(self._EXTRA_PREFIX):
1✔
390
            fieldname = fieldname[len(self._EXTRA_PREFIX):]
1✔
391
            write_dest = self.extra
1✔
392
        old_values = write_dest.get(fieldname)
1✔
393
        values = [value]
1✔
394
        if old_values is not None:
1✔
395
            values = old_values + values
1✔
396
        if DEBUG:
1✔
397
            print(f'Setting image field "{fieldname}"')
1✔
398
        write_dest[fieldname] = values
1✔
399

400
    def _update(self, other: TagImages) -> None:
1✔
401
        for key, value in other.as_dict(flatten=False).items():
1✔
402
            if isinstance(value, dict):
1✔
403
                for extra_key, extra_values in value.items():
1✔
404
                    for image_extra in extra_values:
1✔
405
                        self._set_field(self._EXTRA_PREFIX + extra_key, image_extra)
1✔
406
                continue
1✔
407
            for image in value:
1✔
408
                self._set_field(key, image)
1✔
409

410

411
class TagImage:
1✔
412
    """A class representing an image embedded in an audio file."""
1✔
413
    def __init__(self, name: str, data: bytes, mime_type: str | None = None) -> None:
1✔
414
        self.name = name
1✔
415
        self.data = data
1✔
416
        self.mime_type = mime_type
1✔
417
        self.description: str | None = None
1✔
418

419
    def __repr__(self) -> str:
1✔
420
        variables = vars(self).copy()
1✔
421
        data = variables.get("data")
1✔
422
        if data is not None:
1✔
423
            variables["data"] = (data[:45] + b'..') if len(data) > 45 else data
1✔
424
        return str(variables)
1✔
425

426

427
class _MP4(TinyTag):
1✔
428
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html
429
    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap2/qtff2.html
430

431
    class _Parser:
1✔
432
        atom_decoder_by_type: dict[
1✔
433
            int, Callable[[bytes], int | str | bytes | TagImage]] | None = None
434
        _CUSTOM_FIELD_NAME_MAPPING = {
1✔
435
            'artists': 'artist',
436
            'conductor': 'extra.conductor',
437
            'discsubtitle': 'extra.set_subtitle',
438
            'initialkey': 'extra.initial_key',
439
            'isrc': 'extra.isrc',
440
            'language': 'extra.language',
441
            'lyricist': 'extra.lyricist',
442
            'media': 'extra.media',
443
            'website': 'extra.url',
444
            'originaldate': 'extra.original_date',
445
            'originalyear': 'extra.original_year',
446
            'license': 'extra.license',
447
            'barcode': 'extra.barcode',
448
            'catalognumber': 'extra.catalog_number',
449
        }
450

451
        @classmethod
1✔
452
        def _unpack_integer(cls, value: bytes, signed: bool = True) -> str:
1✔
453
            value_length = len(value)
1✔
454
            result = -1
1✔
455
            if value_length == 1:
1✔
456
                result = struct.unpack('>b' if signed else '>B', value)[0]
×
457
            elif value_length == 2:
1✔
458
                result = struct.unpack('>h' if signed else '>H', value)[0]
1✔
459
            elif value_length == 4:
1✔
460
                result = struct.unpack('>i' if signed else '>I', value)[0]
1✔
461
            elif value_length == 8:
1✔
462
                result = struct.unpack('>q' if signed else '>Q', value)[0]
1✔
463
            return str(result)
1✔
464

465
        @classmethod
1✔
466
        def _unpack_integer_unsigned(cls, value: bytes) -> str:
1✔
467
            return cls._unpack_integer(value, signed=False)
×
468

469
        @classmethod
1✔
470
        def _make_data_atom_parser(
1✔
471
                cls, fieldname: str) -> Callable[[bytes], dict[str, int | str | bytes | TagImage]]:
472
            def _parse_data_atom(data_atom: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
473
                data_type = struct.unpack('>I', data_atom[:4])[0]
1✔
474
                if cls.atom_decoder_by_type is None:
1✔
475
                    # https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/Metadata/Metadata.html#//apple_ref/doc/uid/TP40000939-CH1-SW34
476
                    cls.atom_decoder_by_type = {
1✔
477
                        # 0: 'reserved'
478
                        1: lambda x: x.decode('utf-8', 'replace'),   # UTF-8
479
                        2: lambda x: x.decode('utf-16', 'replace'),  # UTF-16
480
                        3: lambda x: x.decode('s/jis', 'replace'),   # S/JIS
481
                        # 16: duration in millis
482
                        13: lambda x: TagImage('front_cover', x, 'image/jpeg'),  # JPEG
483
                        14: lambda x: TagImage('front_cover', x, 'image/png'),   # PNG
484
                        21: cls._unpack_integer,                    # BE Signed int
485
                        22: cls._unpack_integer_unsigned,           # BE Unsigned int
486
                        # 23: lambda x: struct.unpack('>f', x)[0],  # BE Float32
487
                        # 24: lambda x: struct.unpack('>d', x)[0],  # BE Float64
488
                        # 27: lambda x: x,                          # BMP
489
                        # 28: lambda x: x,                          # QuickTime Metadata atom
490
                        65: cls._unpack_integer,                    # 8-bit Signed int
491
                        66: cls._unpack_integer,                    # BE 16-bit Signed int
492
                        67: cls._unpack_integer,                    # BE 32-bit Signed int
493
                        74: cls._unpack_integer,                    # BE 64-bit Signed int
494
                        75: cls._unpack_integer_unsigned,           # 8-bit Unsigned int
495
                        76: cls._unpack_integer_unsigned,           # BE 16-bit Unsigned int
496
                        77: cls._unpack_integer_unsigned,           # BE 32-bit Unsigned int
497
                        78: cls._unpack_integer_unsigned,           # BE 64-bit Unsigned int
498
                    }
499
                conversion = cls.atom_decoder_by_type.get(data_type)
1✔
500
                if conversion is None:
1✔
501
                    if DEBUG:
1✔
502
                        print(f'Cannot convert data type: {data_type}', file=stderr)
1✔
503
                    return {}  # don't know how to convert data atom
1✔
504
                # skip header & null-bytes, convert rest
505
                return {fieldname: conversion(data_atom[8:])}
1✔
506
            return _parse_data_atom
1✔
507

508
        @classmethod
1✔
509
        def _make_number_parser(
1✔
510
                cls, fieldname1: str, fieldname2: str) -> Callable[[bytes], dict[str, int]]:
511
            def _(data_atom: bytes) -> dict[str, int]:
1✔
512
                number_data = data_atom[8:14]
1✔
513
                numbers = struct.unpack('>HHH', number_data)
1✔
514
                # for some reason the first number is always irrelevant.
515
                return {fieldname1: numbers[1], fieldname2: numbers[2]}
1✔
516
            return _
1✔
517

518
        @classmethod
1✔
519
        def _parse_id3v1_genre(cls, data_atom: bytes) -> dict[str, str]:
1✔
520
            # dunno why the genre is offset by -1 but that's how mutagen does it
521
            idx = struct.unpack('>H', data_atom[8:])[0] - 1
1✔
522
            result = {}
1✔
523
            if idx < len(_ID3._ID3V1_GENRES):
1✔
524
                result['genre'] = _ID3._ID3V1_GENRES[idx]
1✔
525
            return result
1✔
526

527
        @classmethod
1✔
528
        def _read_extended_descriptor(cls, esds_atom: BinaryIO) -> None:
1✔
529
            for _i in range(4):
1✔
530
                if esds_atom.read(1) != b'\x80':
1✔
531
                    break
1✔
532

533
        @classmethod
1✔
534
        def _parse_custom_field(cls, data: bytes) -> dict[str, int | str | bytes | TagImage]:
1✔
535
            fh = io.BytesIO(data)
1✔
536
            header_size = 8
1✔
537
            field_name = None
1✔
538
            data_atom = b''
1✔
539
            atom_header = fh.read(header_size)
1✔
540
            while len(atom_header) == header_size:
1✔
541
                atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
542
                atom_type = atom_header[4:]
1✔
543
                if atom_type == b'name':
1✔
544
                    atom_value = fh.read(atom_size)[4:].lower()
1✔
545
                    field_name = atom_value.decode('utf-8', 'replace')
1✔
546
                    field_name = cls._CUSTOM_FIELD_NAME_MAPPING.get(
1✔
547
                        field_name, TinyTag._EXTRA_PREFIX + field_name)
548
                elif atom_type == b'data':
1✔
549
                    data_atom = fh.read(atom_size)
1✔
550
                else:
551
                    fh.seek(atom_size, os.SEEK_CUR)
1✔
552
                atom_header = fh.read(header_size)  # read next atom
1✔
553
            if len(data_atom) < 8 or field_name is None:
1✔
554
                return {}
1✔
555
            parser = cls._make_data_atom_parser(field_name)
1✔
556
            return parser(data_atom)
1✔
557

558
        @classmethod
1✔
559
        def _parse_audio_sample_entry_mp4a(cls, data: bytes) -> dict[str, int]:
1✔
560
            # this atom also contains the esds atom:
561
            # https://ffmpeg.org/doxygen/0.6/mov_8c-source.html
562
            # http://xhelmboyx.tripod.com/formats/mp4-layout.txt
563
            # http://sasperger.tistory.com/103
564
            datafh = io.BytesIO(data)
1✔
565
            datafh.seek(16, os.SEEK_CUR)  # jump over version and flags
1✔
566
            channels = struct.unpack('>H', datafh.read(2))[0]
1✔
567
            datafh.seek(2, os.SEEK_CUR)   # jump over bit_depth
1✔
568
            datafh.seek(2, os.SEEK_CUR)   # jump over QT compr id & pkt size
1✔
569
            sr = struct.unpack('>I', datafh.read(4))[0]
1✔
570

571
            # ES Description Atom
572
            esds_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
573
            esds_atom = io.BytesIO(data[36:36 + esds_atom_size])
1✔
574
            esds_atom.seek(5, os.SEEK_CUR)   # jump over version, flags and tag
1✔
575

576
            # ES Descriptor
577
            cls._read_extended_descriptor(esds_atom)
1✔
578
            esds_atom.seek(4, os.SEEK_CUR)   # jump over ES id, flags and tag
1✔
579

580
            # Decoder Config Descriptor
581
            cls._read_extended_descriptor(esds_atom)
1✔
582
            esds_atom.seek(9, os.SEEK_CUR)
1✔
583
            avg_br = struct.unpack('>I', esds_atom.read(4))[0] / 1000  # kbit/s
1✔
584
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br}
1✔
585

586
        @classmethod
1✔
587
        def _parse_audio_sample_entry_alac(cls, data: bytes) -> dict[str, int]:
1✔
588
            # https://github.com/macosforge/alac/blob/master/ALACMagicCookieDescription.txt
589
            alac_atom_size = struct.unpack('>I', data[28:32])[0]
1✔
590
            alac_atom = io.BytesIO(data[36:36 + alac_atom_size])
1✔
591
            alac_atom.seek(9, os.SEEK_CUR)
1✔
592
            bitdepth = struct.unpack('b', alac_atom.read(1))[0]
1✔
593
            alac_atom.seek(3, os.SEEK_CUR)
1✔
594
            channels = struct.unpack('b', alac_atom.read(1))[0]
1✔
595
            alac_atom.seek(6, os.SEEK_CUR)
1✔
596
            avg_br = struct.unpack('>I', alac_atom.read(4))[0] / 1000  # kbit/s
1✔
597
            sr = struct.unpack('>I', alac_atom.read(4))[0]
1✔
598
            return {'channels': channels, 'samplerate': sr, 'bitrate': avg_br, 'bitdepth': bitdepth}
1✔
599

600
        @classmethod
1✔
601
        def _parse_mvhd(cls, data: bytes) -> dict[str, float]:
1✔
602
            # http://stackoverflow.com/a/3639993/1191373
603
            walker = io.BytesIO(data)
1✔
604
            version = struct.unpack('b', walker.read(1))[0]
1✔
605
            walker.seek(3, os.SEEK_CUR)  # jump over flags
1✔
606
            if version == 0:  # uses 32 bit integers for timestamps
1✔
607
                walker.seek(8, os.SEEK_CUR)  # jump over create & mod times
1✔
608
                time_scale = struct.unpack('>I', walker.read(4))[0]
1✔
609
                duration = struct.unpack('>I', walker.read(4))[0]
1✔
610
            else:  # version == 1:  # uses 64 bit integers for timestamps
611
                walker.seek(16, os.SEEK_CUR)  # jump over create & mod times
×
612
                time_scale = struct.unpack('>I', walker.read(4))[0]
×
613
                duration = struct.unpack('>q', walker.read(8))[0]
×
614
            return {'duration': duration / time_scale}
1✔
615

616
    # The parser tree: Each key is an atom name which is traversed if existing.
617
    # Leaves of the parser tree are callables which receive the atom data.
618
    # callables return {fieldname: value} which is updates the TinyTag.
619
    _META_DATA_TREE = {b'moov': {b'udta': {b'meta': {b'ilst': {
1✔
620
        # see: http://atomicparsley.sourceforge.net/mpeg-4files.html
621
        # and: https://metacpan.org/dist/Image-ExifTool/source/lib/Image/ExifTool/QuickTime.pm#L3093
622
        b'\xa9ART': {b'data': _Parser._make_data_atom_parser('artist')},
623
        b'\xa9alb': {b'data': _Parser._make_data_atom_parser('album')},
624
        b'\xa9cmt': {b'data': _Parser._make_data_atom_parser('comment')},
625
        b'\xa9con': {b'data': _Parser._make_data_atom_parser('extra.conductor')},
626
        # need test-data for this
627
        # b'cpil':   {b'data': _Parser._make_data_atom_parser('extra.compilation')},
628
        b'\xa9day': {b'data': _Parser._make_data_atom_parser('year')},
629
        b'\xa9des': {b'data': _Parser._make_data_atom_parser('extra.description')},
630
        b'\xa9dir': {b'data': _Parser._make_data_atom_parser('extra.director')},
631
        b'\xa9gen': {b'data': _Parser._make_data_atom_parser('genre')},
632
        b'\xa9lyr': {b'data': _Parser._make_data_atom_parser('extra.lyrics')},
633
        b'\xa9mvn': {b'data': _Parser._make_data_atom_parser('movement')},
634
        b'\xa9nam': {b'data': _Parser._make_data_atom_parser('title')},
635
        b'\xa9pub': {b'data': _Parser._make_data_atom_parser('extra.publisher')},
636
        b'\xa9too': {b'data': _Parser._make_data_atom_parser('extra.encoded_by')},
637
        b'\xa9wrt': {b'data': _Parser._make_data_atom_parser('composer')},
638
        b'aART': {b'data': _Parser._make_data_atom_parser('albumartist')},
639
        b'cprt': {b'data': _Parser._make_data_atom_parser('extra.copyright')},
640
        b'desc': {b'data': _Parser._make_data_atom_parser('extra.description')},
641
        b'disk': {b'data': _Parser._make_number_parser('disc', 'disc_total')},
642
        b'gnre': {b'data': _Parser._parse_id3v1_genre},
643
        b'trkn': {b'data': _Parser._make_number_parser('track', 'track_total')},
644
        b'tmpo': {b'data': _Parser._make_data_atom_parser('extra.bpm')},
645
        b'covr': {b'data': _Parser._make_data_atom_parser('images.front_cover')},
646
        b'----': _Parser._parse_custom_field,
647
    }}}}}
648

649
    # see: https://developer.apple.com/library/mac/documentation/QuickTime/QTFF/QTFFChap3/qtff3.html
650
    _AUDIO_DATA_TREE = {
1✔
651
        b'moov': {
652
            b'mvhd': _Parser._parse_mvhd,
653
            b'trak': {b'mdia': {b"minf": {b"stbl": {b"stsd": {
654
                b'mp4a': _Parser._parse_audio_sample_entry_mp4a,
655
                b'alac': _Parser._parse_audio_sample_entry_alac
656
            }}}}}
657
        }
658
    }
659

660
    _VERSIONED_ATOMS = {b'meta', b'stsd'}  # those have an extra 4 byte header
1✔
661
    _FLAGGED_ATOMS = {b'stsd'}  # these also have an extra 4 byte header
1✔
662

663
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
664
        self._traverse_atoms(fh, path=self._AUDIO_DATA_TREE)
1✔
665

666
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
667
        self._traverse_atoms(fh, path=self._META_DATA_TREE)
1✔
668

669
    def _traverse_atoms(self, fh: BinaryIO, path: dict[bytes, Any],
1✔
670
                        stop_pos: int | None = None,
671
                        curr_path: list[bytes] | None = None) -> None:
672
        header_size = 8
1✔
673
        atom_header = fh.read(header_size)
1✔
674
        while len(atom_header) == header_size:
1✔
675
            atom_size = struct.unpack('>I', atom_header[:4])[0] - header_size
1✔
676
            atom_type = atom_header[4:]
1✔
677
            if curr_path is None:  # keep track how we traversed in the tree
1✔
678
                curr_path = [atom_type]
1✔
679
            if atom_size <= 0:  # empty atom, jump to next one
1✔
680
                atom_header = fh.read(header_size)
1✔
681
                continue
1✔
682
            if DEBUG:
1✔
683
                print(f'{" " * 4 * len(curr_path)} pos: {fh.tell() - header_size} '
1✔
684
                      f'atom: {atom_type!r} len: {atom_size + header_size}')
685
            if atom_type in self._VERSIONED_ATOMS:  # jump atom version for now
1✔
686
                fh.seek(4, os.SEEK_CUR)
1✔
687
            if atom_type in self._FLAGGED_ATOMS:  # jump atom flags for now
1✔
688
                fh.seek(4, os.SEEK_CUR)
1✔
689
            sub_path = path.get(atom_type, None)
1✔
690
            # if the path leaf is a dict, traverse deeper into the tree:
691
            if isinstance(sub_path, dict):
1✔
692
                atom_end_pos = fh.tell() + atom_size
1✔
693
                self._traverse_atoms(fh, path=sub_path, stop_pos=atom_end_pos,
1✔
694
                                     curr_path=curr_path + [atom_type])
695
            # if the path-leaf is a callable, call it on the atom data
696
            elif callable(sub_path):
1✔
697
                for fieldname, value in sub_path(fh.read(atom_size)).items():
1✔
698
                    if DEBUG:
1✔
699
                        print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
1✔
700
                    if fieldname.startswith('images.'):
1✔
701
                        if self._load_image:
1✔
702
                            self.images._set_field(fieldname[len('images.'):], value)
1✔
703
                    elif fieldname:
1✔
704
                        self._set_field(fieldname, value)
1✔
705
            # if no action was specified using dict or callable, jump over atom
706
            else:
707
                fh.seek(atom_size, os.SEEK_CUR)
1✔
708
            # check if we have reached the end of this branch:
709
            if stop_pos and fh.tell() >= stop_pos:
1✔
710
                return  # return to parent (next parent node in tree)
1✔
711
            atom_header = fh.read(header_size)  # read next atom
1✔
712

713

714
class _ID3(TinyTag):
1✔
715
    _ID3_MAPPING = {
1✔
716
        # Mapping from Frame ID to a field of the TinyTag
717
        # https://exiftool.org/TagNames/ID3.html
718
        'COMM': 'comment', 'COM': 'comment',
719
        'TRCK': 'track', 'TRK': 'track',
720
        'TYER': 'year', 'TYE': 'year', 'TDRC': 'year',
721
        'TALB': 'album', 'TAL': 'album',
722
        'TPE1': 'artist', 'TP1': 'artist',
723
        'TIT2': 'title', 'TT2': 'title',
724
        'TCON': 'genre', 'TCO': 'genre',
725
        'TPOS': 'disc', 'TPA': 'disc',
726
        'TPE2': 'albumartist', 'TP2': 'albumartist',
727
        'TCOM': 'composer', 'TCM': 'composer',
728
        'WOAR': 'extra.url', 'WAR': 'extra.url',
729
        'TSRC': 'extra.isrc', 'TRC': 'extra.isrc',
730
        'TCOP': 'extra.copyright', 'TCR': 'extra.copyright',
731
        'TBPM': 'extra.bpm', 'TBP': 'extra.bpm',
732
        'TKEY': 'extra.initial_key', 'TKE': 'extra.initial_key',
733
        'TLAN': 'extra.language', 'TLA': 'extra.language',
734
        'TPUB': 'extra.publisher', 'TPB': 'extra.publisher',
735
        'USLT': 'extra.lyrics', 'ULT': 'extra.lyrics',
736
        'TPE3': 'extra.conductor', 'TP3': 'extra.conductor',
737
        'TEXT': 'extra.lyricist', 'TXT': 'extra.lyricist',
738
        'TSST': 'extra.set_subtitle',
739
        'TENC': 'extra.encoded_by', 'TEN': 'extra.encoded_by',
740
        'TSSE': 'extra.encoder_settings', 'TSS': 'extra.encoder_settings',
741
        'TMED': 'extra.media', 'TMT': 'extra.media',
742
        'TDOR': 'extra.original_date',
743
        'TORY': 'extra.original_year', 'TOR': 'extra.original_year',
744
        'WCOP': 'extra.license',
745
    }
746
    _ID3_MAPPING_CUSTOM = {
1✔
747
        'artists': 'artist',
748
        'director': 'extra.director',
749
        'license': 'extra.license',
750
        'originalyear': 'extra.original_year',
751
        'barcode': 'extra.barcode',
752
        'catalognumber': 'extra.catalog_number',
753
    }
754
    _IMAGE_FRAME_IDS = {'APIC', 'PIC'}
1✔
755
    _CUSTOM_FRAME_IDS = {'TXXX', 'TXX'}
1✔
756
    _DISALLOWED_FRAME_IDS = {'PRIV', 'RGAD', 'GEOB', 'GEO', 'ÿû°d'}
1✔
757
    _MAX_ESTIMATION_SEC = 30.0
1✔
758
    _CBR_DETECTION_FRAME_COUNT = 5
1✔
759
    _USE_XING_HEADER = True  # much faster, but can be deactivated for testing
1✔
760

761
    _ID3V1_GENRES = (
1✔
762
        'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco',
763
        'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies',
764
        'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial',
765
        'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack',
766
        'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion',
767
        'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game',
768
        'Sound Clip', 'Gospel', 'Noise', 'AlternRock', 'Bass', 'Soul', 'Punk',
769
        'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock',
770
        'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic',
771
        'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult',
772
        'Gangsta', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle',
773
        'Native American', 'Cabaret', 'New Wave', 'Psychadelic', 'Rave',
774
        'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz',
775
        'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock',
776

777
        # Wimamp Extended Genres
778
        'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast Fusion', 'Bebob',
779
        'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock',
780
        'Progressive Rock', 'Psychedelic Rock', 'Symphonic Rock', 'Slow Rock',
781
        'Big Band', 'Chorus', 'Easy listening', 'Acoustic', 'Humour', 'Speech',
782
        'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', 'Booty Bass',
783
        'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', 'Tango', 'Samba',
784
        'Folklore', 'Ballad', 'Power Ballad', 'Rhythmic Soul', 'Freestyle',
785
        'Duet', 'Punk Rock', 'Drum Solo', 'A capella', 'Euro-House',
786
        'Dance Hall', 'Goa', 'Drum & Bass',
787

788
        # according to https://de.wikipedia.org/wiki/Liste_der_ID3v1-Genres:
789
        'Club-House', 'Hardcore Techno', 'Terror', 'Indie', 'BritPop',
790
        '',  # don't use ethnic slur ("Negerpunk", WTF!)
791
        'Polsk Punk', 'Beat', 'Christian Gangsta Rap', 'Heavy Metal',
792
        'Black Metal', 'Contemporary Christian', 'Christian Rock',
793
        # WinAmp 1.91
794
        'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'Jpop', 'Synthpop',
795
        # WinAmp 5.6
796
        'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat',
797
        'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro',
798
        'Electroclash', 'Emo', 'Experimental', 'Garage', 'Illbient',
799
        'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge',
800
        'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock',
801
        'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music',
802
        'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle',
803
        'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', 'Psybient',
804
    )
805
    _ID3V2_2_IMAGE_FORMATS = {
1✔
806
        'bmp': 'image/bmp',
807
        'jpg': 'image/jpeg',
808
        'png': 'image/png',
809
    }
810
    _IMAGE_TYPES = (
1✔
811
        'other',
812
        'extra.icon',
813
        'extra.other_icon',
814
        'front_cover',
815
        'back_cover',
816
        'leaflet',
817
        'media',
818
        'extra.lead_artist',
819
        'extra.artist',
820
        'extra.conductor',
821
        'extra.band',
822
        'extra.composer',
823
        'extra.lyricist',
824
        'extra.recording_location',
825
        'extra.during_recording',
826
        'extra.during_performance',
827
        'extra.video',
828
        'extra.bright_colored_fish',
829
        'extra.illustration',
830
        'extra.band_logo',
831
        'extra.publisher_logo',
832
    )
833
    _UNKNOWN_IMAGE_TYPE = 'extra.unknown'
1✔
834

835
    # see this page for the magic values used in mp3:
836
    # http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
837
    _SAMPLE_RATES = (
1✔
838
        (11025, 12000, 8000),   # MPEG 2.5
839
        (0, 0, 0),              # reserved
840
        (22050, 24000, 16000),  # MPEG 2
841
        (44100, 48000, 32000),  # MPEG 1
842
    )
843
    _V1L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0)
1✔
844
    _V1L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0)
1✔
845
    _V1L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0)
1✔
846
    _V2L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0)
1✔
847
    _V2L2 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
1✔
848
    _V2L3 = _V2L2
1✔
849
    _NONE = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
1✔
850
    _BITRATE_BY_VERSION_BY_LAYER = (
1✔
851
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2.5  # note that the layers go
852
        (_NONE, _NONE, _NONE, _NONE),  # reserved          # from 3 to 1 by design.
853
        (_NONE, _V2L3, _V2L2, _V2L1),  # MPEG Version 2    # the first layer id is
854
        (_NONE, _V1L3, _V1L2, _V1L1),  # MPEG Version 1    # reserved
855
    )
856
    _SAMPLES_PER_FRAME = 1152  # the default frame size for mp3
1✔
857
    _CHANNELS_PER_CHANNEL_MODE = (
1✔
858
        2,  # 00 Stereo
859
        2,  # 01 Joint stereo (Stereo)
860
        2,  # 10 Dual channel (2 mono channels)
861
        1,  # 11 Single channel (Mono)
862
    )
863

864
    def __init__(self) -> None:
1✔
865
        super().__init__()
1✔
866
        # save position after the ID3 tag for duration measurement speedup
867
        self._bytepos_after_id3v2 = -1
1✔
868

869
    @staticmethod
1✔
870
    def _parse_xing_header(fh: BinaryIO) -> tuple[int, int]:
1✔
871
        # see: http://www.mp3-tech.org/programmer/sources/vbrheadersdk.zip
872
        fh.seek(4, os.SEEK_CUR)  # read over Xing header
1✔
873
        header_flags = struct.unpack('>i', fh.read(4))[0]
1✔
874
        frames = byte_count = 0
1✔
875
        if header_flags & 1:  # FRAMES FLAG
1✔
876
            frames = struct.unpack('>i', fh.read(4))[0]
1✔
877
        if header_flags & 2:  # BYTES FLAG
1✔
878
            byte_count = struct.unpack('>i', fh.read(4))[0]
1✔
879
        if header_flags & 4:  # TOC FLAG
1✔
880
            fh.seek(100, os.SEEK_CUR)
1✔
881
        if header_flags & 8:  # VBR SCALE FLAG
1✔
882
            fh.seek(4, os.SEEK_CUR)
1✔
883
        return frames, byte_count
1✔
884

885
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
886
        # if tag reading was disabled, find start position of audio data
887
        if self._bytepos_after_id3v2 == -1:
1✔
888
            self._parse_id3v2_header(fh)
1✔
889

890
        max_estimation_frames = (_ID3._MAX_ESTIMATION_SEC * 44100) // _ID3._SAMPLES_PER_FRAME
1✔
891
        frame_size_accu = 0
1✔
892
        audio_offset = 0
1✔
893
        header_bytes = 4
1✔
894
        frames = 0  # count frames for determining mp3 duration
1✔
895
        bitrate_accu = 0    # add up bitrates to find average bitrate to detect
1✔
896
        last_bitrates = []  # CBR mp3s (multiple frames with same bitrates)
1✔
897
        # seek to first position after id3 tag (speedup for large header)
898
        fh.seek(self._bytepos_after_id3v2)
1✔
899
        file_offset = fh.tell()
1✔
900
        walker = io.BytesIO(fh.read())
1✔
901
        while True:
1✔
902
            # reading through garbage until 11 '1' sync-bits are found
903
            b = walker.read()
1✔
904
            walker.seek(-len(b), os.SEEK_CUR)
1✔
905
            if len(b) < 4:
1✔
906
                if frames:
1✔
907
                    self.bitrate = bitrate_accu / frames
1✔
908
                break  # EOF
1✔
909
            _sync, conf, bitrate_freq, rest = struct.unpack('BBBB', b[0:4])
1✔
910
            br_id = (bitrate_freq >> 4) & 0x0F  # biterate id
1✔
911
            sr_id = (bitrate_freq >> 2) & 0x03  # sample rate id
1✔
912
            padding = 1 if bitrate_freq & 0x02 > 0 else 0
1✔
913
            mpeg_id = (conf >> 3) & 0x03
1✔
914
            layer_id = (conf >> 1) & 0x03
1✔
915
            channel_mode = (rest >> 6) & 0x03
1✔
916
            # check for eleven 1s, validate bitrate and sample rate
917
            if (not b[:2] > b'\xFF\xE0' or br_id > 14 or br_id == 0 or sr_id == 3
1✔
918
                    or layer_id == 0 or mpeg_id == 1):  # noqa
919
                idx = b.find(b'\xFF', 1)  # invalid frame, find next sync header
1✔
920
                if idx == -1:
1✔
921
                    idx = len(b)  # not found: jump over the current peek buffer
1✔
922
                walker.seek(max(idx, 1), os.SEEK_CUR)
1✔
923
                continue
1✔
924
            self.channels = self._CHANNELS_PER_CHANNEL_MODE[channel_mode]
1✔
925
            frame_bitrate = self._BITRATE_BY_VERSION_BY_LAYER[mpeg_id][layer_id][br_id]
1✔
926
            self.samplerate = samplerate = self._SAMPLE_RATES[mpeg_id][sr_id]
1✔
927
            # There might be a xing header in the first frame that contains
928
            # all the info we need, otherwise parse multiple frames to find the
929
            # accurate average bitrate
930
            if frames == 0 and self._USE_XING_HEADER:
1✔
931
                xing_header_offset = b.find(b'Xing')
1✔
932
                if xing_header_offset != -1:
1✔
933
                    walker.seek(xing_header_offset, os.SEEK_CUR)
1✔
934
                    xframes, byte_count = self._parse_xing_header(walker)
1✔
935
                    if xframes > 0 and byte_count > 0:
1✔
936
                        # MPEG-2 Audio Layer III uses 576 samples per frame
937
                        samples_per_frame = 576 if mpeg_id <= 2 else self._SAMPLES_PER_FRAME
1✔
938
                        self.duration = duration = xframes * samples_per_frame / samplerate
1✔
939
                        # self.duration = (xframes * self._SAMPLES_PER_FRAME / samplerate
940
                        #                  / self.channels)  # noqa
941
                        self.bitrate = byte_count * 8 / duration / 1000
1✔
942
                        return
1✔
943
                    continue
×
944

945
            frames += 1  # it's most probably an mp3 frame
1✔
946
            bitrate_accu += frame_bitrate
1✔
947
            if frames == 1:
1✔
948
                audio_offset = file_offset + walker.tell()
1✔
949
            if frames <= self._CBR_DETECTION_FRAME_COUNT:
1✔
950
                last_bitrates.append(frame_bitrate)
1✔
951
            walker.seek(4, os.SEEK_CUR)  # jump over peeked bytes
1✔
952

953
            frame_length = (144000 * frame_bitrate) // samplerate + padding
1✔
954
            frame_size_accu += frame_length
1✔
955
            # if bitrate does not change over time its probably CBR
956
            is_cbr = (frames == self._CBR_DETECTION_FRAME_COUNT and len(set(last_bitrates)) == 1)
1✔
957
            if frames == max_estimation_frames or is_cbr:
1✔
958
                # try to estimate duration
959
                fh.seek(-128, 2)  # jump to last byte (leaving out id3v1 tag)
1✔
960
                audio_stream_size = fh.tell() - audio_offset
1✔
961
                est_frame_count = audio_stream_size / (frame_size_accu / frames)
1✔
962
                samples = est_frame_count * self._SAMPLES_PER_FRAME
1✔
963
                self.duration = samples / samplerate
1✔
964
                self.bitrate = bitrate_accu / frames
1✔
965
                return
1✔
966

967
            if frame_length > 1:  # jump over current frame body
1✔
968
                walker.seek(frame_length - header_bytes, os.SEEK_CUR)
1✔
969
        if self.samplerate:
1✔
970
            self.duration = frames * self._SAMPLES_PER_FRAME / self.samplerate
1✔
971

972
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
973
        self._parse_id3v2(fh)
1✔
974
        if self.filesize > 128:
1✔
975
            fh.seek(-128, os.SEEK_END)  # try parsing id3v1 in last 128 bytes
1✔
976
            self._parse_id3v1(fh)
1✔
977

978
    def _parse_id3v2_header(self, fh: BinaryIO) -> tuple[int, bool, int]:
1✔
979
        size = major = 0
1✔
980
        extended = False
1✔
981
        # for info on the specs, see: http://id3.org/Developer%20Information
982
        header = struct.unpack('3sBBB4B', fh.read(10))
1✔
983
        tag = header[0].decode('ISO-8859-1', 'replace')
1✔
984
        # check if there is an ID3v2 tag at the beginning of the file
985
        if tag == 'ID3':
1✔
986
            major, _rev = header[1:3]
1✔
987
            if DEBUG:
1✔
988
                print(f'Found id3 v2.{major}')
1✔
989
            # unsync = (header[3] & 0x80) > 0
990
            extended = (header[3] & 0x40) > 0
1✔
991
            # experimental = (header[3] & 0x20) > 0
992
            # footer = (header[3] & 0x10) > 0
993
            size = self._calc_size(header[4:8], 7)
1✔
994
        self._bytepos_after_id3v2 = size
1✔
995
        return size, extended, major
1✔
996

997
    def _parse_id3v2(self, fh: BinaryIO) -> None:
1✔
998
        size, extended, major = self._parse_id3v2_header(fh)
1✔
999
        if size:
1✔
1000
            end_pos = fh.tell() + size
1✔
1001
            parsed_size = 0
1✔
1002
            if extended:  # just read over the extended header.
1✔
1003
                size_bytes = struct.unpack('4B', fh.read(6)[0:4])
1✔
1004
                extd_size = self._calc_size(size_bytes, 7)
1✔
1005
                fh.seek(extd_size - 6, os.SEEK_CUR)  # jump over extended_header
1✔
1006
            while parsed_size < size:
1✔
1007
                frame_size = self._parse_frame(fh, id3version=major)
1✔
1008
                if frame_size == 0:
1✔
1009
                    break
1✔
1010
                parsed_size += frame_size
1✔
1011
            fh.seek(end_pos, os.SEEK_SET)
1✔
1012

1013
    def _parse_id3v1(self, fh: BinaryIO) -> None:
1✔
1014
        if fh.read(3) != b'TAG':  # check if this is an ID3 v1 tag
1✔
1015
            return
1✔
1016

1017
        def asciidecode(x: bytes) -> str:
1✔
1018
            return self._unpad(x.decode(self._default_encoding or 'latin1', 'replace'))
1✔
1019
        # Only set fields that were not set by ID3v2 tags, as ID3v1
1020
        # tags are more likely to be outdated or have encoding issues
1021
        fields = fh.read(30 + 30 + 30 + 4 + 30 + 1)
1✔
1022
        if not self.title:
1✔
1023
            value = asciidecode(fields[:30])
1✔
1024
            if value:
1✔
1025
                self._set_field('title', value)
1✔
1026
        if not self.artist:
1✔
1027
            value = asciidecode(fields[30:60])
1✔
1028
            if value:
1✔
1029
                self._set_field('artist', value)
1✔
1030
        if not self.album:
1✔
1031
            value = asciidecode(fields[60:90])
1✔
1032
            if value:
1✔
1033
                self._set_field('album', value)
1✔
1034
        if not self.year:
1✔
1035
            value = asciidecode(fields[90:94])
1✔
1036
            if value:
1✔
1037
                self._set_field('year', value)
1✔
1038
        comment = fields[94:124]
1✔
1039
        if b'\x00\x00' < comment[-2:] < b'\x01\x00':
1✔
1040
            if self.track is None:
1✔
1041
                self._set_field('track', ord(comment[-1:]))
1✔
1042
            comment = comment[:-2]
1✔
1043
        if not self.comment:
1✔
1044
            value = asciidecode(comment)
1✔
1045
            if value:
1✔
1046
                self._set_field('comment', value)
1✔
1047
        if not self.genre:
1✔
1048
            genre_id = ord(fields[124:125])
1✔
1049
            if genre_id < len(self._ID3V1_GENRES):
1✔
1050
                self._set_field('genre', self._ID3V1_GENRES[genre_id])
1✔
1051

1052
    def __parse_custom_field(self, content: str) -> bool:
1✔
1053
        custom_field_name, separator, value = content.partition('\x00')
1✔
1054
        custom_field_name_lower = custom_field_name.lower()
1✔
1055
        value = value.lstrip('\ufeff')
1✔
1056
        if custom_field_name_lower and separator and value:
1✔
1057
            field_name = self._ID3_MAPPING_CUSTOM.get(
1✔
1058
                custom_field_name_lower, self._EXTRA_PREFIX + custom_field_name_lower)
1059
            self._set_field(field_name, value)
1✔
1060
            return True
1✔
1061
        return False
1✔
1062

1063
    @classmethod
1✔
1064
    def _create_tag_image(cls, data: bytes, pic_type: int, mime_type: str | None = None,
1✔
1065
                          description: str | None = None) -> tuple[str, TagImage]:
1066
        field_name = cls._UNKNOWN_IMAGE_TYPE
1✔
1067
        if 0 <= pic_type <= len(cls._IMAGE_TYPES):
1✔
1068
            field_name = cls._IMAGE_TYPES[pic_type]
1✔
1069
        image = TagImage(field_name, data)
1✔
1070
        if mime_type:
1✔
1071
            image.mime_type = mime_type
1✔
1072
        if description:
1✔
1073
            image.description = description
1✔
1074
        return field_name, image
1✔
1075

1076
    @staticmethod
1✔
1077
    def _index_utf16(s: bytes, search: bytes) -> int:
1✔
1078
        for i in range(0, len(s), len(search)):
1✔
1079
            if s[i:i + len(search)] == search:
1✔
1080
                return i
1✔
1081
        return -1
×
1082

1083
    def _parse_frame(self, fh: BinaryIO, id3version: int | None = None) -> int:
1✔
1084
        # ID3v2.2 especially ugly. see: http://id3.org/id3v2-00
1085
        frame_header_size = 6 if id3version == 2 else 10
1✔
1086
        frame_size_bytes = 3 if id3version == 2 else 4
1✔
1087
        binformat = '3s3B' if id3version == 2 else '4s4B2B'
1✔
1088
        bits_per_byte = 7 if id3version == 4 else 8  # only id3v2.4 is synchsafe
1✔
1089
        frame_header_data = fh.read(frame_header_size)
1✔
1090
        if len(frame_header_data) != frame_header_size:
1✔
1091
            return 0
1✔
1092
        frame = struct.unpack(binformat, frame_header_data)
1✔
1093
        frame_id = self._decode_string(frame[0])
1✔
1094
        frame_size = self._calc_size(frame[1:1 + frame_size_bytes], bits_per_byte)
1✔
1095
        if DEBUG:
1✔
1096
            print(f'Found id3 Frame {frame_id} at {fh.tell()}-{fh.tell() + frame_size} '
1✔
1097
                  f'of {self.filesize}')
1098
        if frame_size > 0:
1✔
1099
            # flags = frame[1+frame_size_bytes:] # dont care about flags.
1100
            content = fh.read(frame_size)
1✔
1101
            fieldname = self._ID3_MAPPING.get(frame_id)
1✔
1102
            should_set_field = True
1✔
1103
            if fieldname:
1✔
1104
                if not self._parse_tags:
1✔
1105
                    return frame_size
1✔
1106
                language = fieldname in {'comment', 'extra.lyrics'}
1✔
1107
                value = self._decode_string(content, language)
1✔
1108
                if not value:
1✔
1109
                    return frame_size
1✔
1110
                if fieldname == "comment":
1✔
1111
                    # check if comment is a key-value pair (used by iTunes)
1112
                    should_set_field = not self.__parse_custom_field(value)
1✔
1113
                elif fieldname in {'track', 'disc'}:
1✔
1114
                    if '/' in value:
1✔
1115
                        value, total = value.split('/')[:2]
1✔
1116
                        if total.isdecimal():
1✔
1117
                            self._set_field(f'{fieldname}_total', int(total))
1✔
1118
                    if value.isdecimal():
1✔
1119
                        self._set_field(fieldname, int(value))
1✔
1120
                    should_set_field = False
1✔
1121
                elif fieldname == 'genre':
1✔
1122
                    genre_id = 255
1✔
1123
                    # funky: id3v1 genre hidden in a id3v2 field
1124
                    if value.isdecimal():
1✔
1125
                        genre_id = int(value)
1✔
1126
                    # funkier: the TCO may contain genres in parens, e.g. '(13)'
1127
                    elif value[:1] == '(':
1✔
1128
                        end_pos = value.find(')')
1✔
1129
                        parens_text = value[1:end_pos]
1✔
1130
                        if end_pos > 0 and parens_text.isdecimal():
1✔
1131
                            genre_id = int(parens_text)
1✔
1132
                    if 0 <= genre_id < len(_ID3._ID3V1_GENRES):
1✔
1133
                        value = _ID3._ID3V1_GENRES[genre_id]
1✔
1134
                if should_set_field:
1✔
1135
                    self._set_field(fieldname, value)
1✔
1136
            elif frame_id in self._CUSTOM_FRAME_IDS:
1✔
1137
                # custom fields
1138
                if self._parse_tags:
1✔
1139
                    value = self._decode_string(content)
1✔
1140
                    if value:
1✔
1141
                        self.__parse_custom_field(value)
1✔
1142
            elif frame_id in self._IMAGE_FRAME_IDS:
1✔
1143
                if self._load_image:
1✔
1144
                    # See section 4.14: http://id3.org/id3v2.4.0-frames
1145
                    encoding = content[0:1]
1✔
1146
                    if frame_id == 'PIC':  # ID3 v2.2:
1✔
1147
                        imgformat = self._decode_string(content[1:4]).lower()
1✔
1148
                        mime_type = self._ID3V2_2_IMAGE_FORMATS.get(imgformat)
1✔
1149
                        desc_start_pos = 1 + 3 + 1  # skip encoding (1), imgformat (3), pictype(1)
1✔
1150
                    else:  # ID3 v2.3+
1151
                        mime_type_end_pos = content.index(b'\x00', 1)
1✔
1152
                        mime_type = self._decode_string(content[1:mime_type_end_pos]).lower()
1✔
1153
                        if mime_type in self._ID3V2_2_IMAGE_FORMATS:  # ID3 v2.2 format in v2.3...
1✔
1154
                            mime_type = self._ID3V2_2_IMAGE_FORMATS[mime_type]
1✔
1155
                        desc_start_pos = mime_type_end_pos + 1 + 1  # skip mtype, pictype(1)
1✔
1156
                    pic_type = content[desc_start_pos - 1]
1✔
1157
                    # latin1 and utf-8 are 1 byte
1158
                    termination = b'\x00' if encoding in {b'\x00', b'\x03'} else b'\x00\x00'
1✔
1159
                    desc_length = self._index_utf16(content[desc_start_pos:], termination)
1✔
1160
                    desc_end_pos = desc_start_pos + desc_length + len(termination)
1✔
1161
                    description = self._decode_string(content[desc_start_pos:desc_end_pos])
1✔
1162
                    field_name, image = self._create_tag_image(
1✔
1163
                        content[desc_end_pos:], pic_type, mime_type, description)
1164
                    self.images._set_field(field_name, image)
1✔
1165
            elif frame_id not in self._DISALLOWED_FRAME_IDS:
1✔
1166
                # unknown, try to add to extra dict
1167
                if self._parse_tags:
1✔
1168
                    value = self._decode_string(content)
1✔
1169
                    if value:
1✔
1170
                        self._set_field(self._EXTRA_PREFIX + frame_id.lower(), value)
1✔
1171
            return frame_size
1✔
1172
        return 0
1✔
1173

1174
    def _decode_string(self, bytestr: bytes, language: bool = False) -> str:
1✔
1175
        default_encoding = 'ISO-8859-1'
1✔
1176
        if self._default_encoding:
1✔
1177
            default_encoding = self._default_encoding
1✔
1178
        # it's not my fault, this is the spec.
1179
        first_byte = bytestr[:1]
1✔
1180
        if first_byte == b'\x00':  # ISO-8859-1
1✔
1181
            bytestr = bytestr[1:]
1✔
1182
            encoding = default_encoding
1✔
1183
        elif first_byte == b'\x01':  # UTF-16 with BOM
1✔
1184
            bytestr = bytestr[1:]
1✔
1185
            # remove language (but leave BOM)
1186
            if language:
1✔
1187
                if bytestr[3:5] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1188
                    bytestr = bytestr[3:]
1✔
1189
                if bytestr[:3].isalpha():
1✔
1190
                    bytestr = bytestr[3:]  # remove language
1✔
1191
                bytestr = bytestr.lstrip(b'\x00')  # strip optional additional null bytes
1✔
1192
            # read byte order mark to determine endianness
1193
            encoding = 'UTF-16be' if bytestr[0:2] == b'\xfe\xff' else 'UTF-16le'
1✔
1194
            # strip the bom if it exists
1195
            if bytestr[:2] in {b'\xfe\xff', b'\xff\xfe'}:
1✔
1196
                bytestr = bytestr[2:] if len(bytestr) % 2 == 0 else bytestr[2:-1]
1✔
1197
            # remove ADDITIONAL EXTRA BOM :facepalm:
1198
            if bytestr[:4] == b'\x00\x00\xff\xfe':
1✔
1199
                bytestr = bytestr[4:]
1✔
1200
        elif first_byte == b'\x02':  # UTF-16LE
1✔
1201
            # strip optional null byte, if byte count uneven
1202
            bytestr = bytestr[1:-1] if len(bytestr) % 2 == 0 else bytestr[1:]
×
1203
            encoding = 'UTF-16le'
×
1204
        elif first_byte == b'\x03':  # UTF-8
1✔
1205
            bytestr = bytestr[1:]
1✔
1206
            encoding = 'UTF-8'
1✔
1207
        else:
1208
            encoding = default_encoding  # wild guess
1✔
1209
        if language and bytestr[:3].isalpha():
1✔
1210
            bytestr = bytestr[3:]  # remove language
1✔
1211
        return self._unpad(bytestr.decode(encoding, 'replace'))
1✔
1212

1213
    @staticmethod
1✔
1214
    def _calc_size(bytestr: tuple[int, ...], bits_per_byte: int) -> int:
1✔
1215
        # length of some mp3 header fields is described by 7 or 8-bit-bytes
1216
        return reduce(lambda accu, elem: (accu << bits_per_byte) + elem, bytestr, 0)
1✔
1217

1218

1219
class _Ogg(TinyTag):
1✔
1220
    _VORBIS_MAPPING = {
1✔
1221
        'album': 'album',
1222
        'albumartist': 'albumartist',
1223
        'title': 'title',
1224
        'artist': 'artist',
1225
        'artists': 'artist',
1226
        'author': 'artist',
1227
        'date': 'year',
1228
        'tracknumber': 'track',
1229
        'tracktotal': 'track_total',
1230
        'totaltracks': 'track_total',
1231
        'discnumber': 'disc',
1232
        'disctotal': 'disc_total',
1233
        'totaldiscs': 'disc_total',
1234
        'genre': 'genre',
1235
        'description': 'comment',
1236
        'comment': 'comment',
1237
        'comments': 'comment',
1238
        'composer': 'composer',
1239
        'bpm': 'extra.bpm',
1240
        'copyright': 'extra.copyright',
1241
        'isrc': 'extra.isrc',
1242
        'lyrics': 'extra.lyrics',
1243
        'publisher': 'extra.publisher',
1244
        'language': 'extra.language',
1245
        'director': 'extra.director',
1246
        'website': 'extra.url',
1247
        'conductor': 'extra.conductor',
1248
        'lyricist': 'extra.lyricist',
1249
        'discsubtitle': 'extra.set_subtitle',
1250
        'setsubtitle': 'extra.set_subtitle',
1251
        'initialkey': 'extra.initial_key',
1252
        'key': 'extra.initial_key',
1253
        'encodedby': 'extra.encoded_by',
1254
        'encodersettings': 'extra.encoder_settings',
1255
        'media': 'extra.media',
1256
        'originaldate': 'extra.original_date',
1257
        'originalyear': 'extra.original_year',
1258
        'license': 'extra.license',
1259
        'barcode': 'extra.barcode',
1260
        'catalognumber': 'extra.catalog_number',
1261
    }
1262

1263
    def __init__(self) -> None:
1✔
1264
        super().__init__()
1✔
1265
        self._max_samplenum = 0  # maximum sample position ever read
1✔
1266

1267
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1268
        max_page_size = 65536  # https://xiph.org/ogg/doc/libogg/ogg_page.html
1✔
1269
        if not self._tags_parsed:
1✔
1270
            self._parse_tag(fh)  # determine sample rate
1✔
1271
            fh.seek(0)           # and rewind to start
1✔
1272
        if self.duration is not None or not self.samplerate:
1✔
1273
            return  # either ogg flac or invalid file
1✔
1274
        if self.filesize > max_page_size:
1✔
1275
            fh.seek(-max_page_size, 2)  # go to last possible page position
1✔
1276
        while True:
1✔
1277
            file_offset = fh.tell()
1✔
1278
            b = fh.read()
1✔
1279
            if len(b) < 4:
1✔
1280
                return  # EOF
×
1281
            if b[:4] == b'OggS':  # look for an ogg header
1✔
1282
                fh.seek(file_offset)
1✔
1283
                for _ in self._parse_pages(fh):
1✔
1284
                    pass  # parse all remaining pages
1✔
1285
                self.duration = self._max_samplenum / self.samplerate
1✔
1286
                break
1✔
1287
            idx = b.find(b'OggS')  # try to find header in peeked data
1✔
1288
            if idx != -1:
1✔
1289
                fh.seek(file_offset + idx)
1✔
1290

1291
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1292
        check_flac_second_packet = False
1✔
1293
        check_speex_second_packet = False
1✔
1294
        for packet in self._parse_pages(fh):
1✔
1295
            walker = io.BytesIO(packet)
1✔
1296
            if packet[0:7] == b"\x01vorbis":
1✔
1297
                if self._parse_duration:
1✔
1298
                    (self.channels, self.samplerate, _max_bitrate, bitrate,
1✔
1299
                     _min_bitrate) = struct.unpack("<B4i", packet[11:28])
1300
                    self.bitrate = bitrate / 1000
1✔
1301
            elif packet[0:7] == b"\x03vorbis":
1✔
1302
                if self._parse_tags:
1✔
1303
                    walker.seek(7, os.SEEK_CUR)  # jump over header name
1✔
1304
                    self._parse_vorbis_comment(walker)
1✔
1305
            elif packet[0:8] == b'OpusHead':
1✔
1306
                if self._parse_duration:  # parse opus header
1✔
1307
                    # https://www.videolan.org/developers/vlc/modules/codec/opus_header.c
1308
                    # https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/structOpusHead.html
1309
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1310
                    (version, ch, _, _sr, _, _) = struct.unpack("<BBHIHB", walker.read(11))
1✔
1311
                    if (version & 0xF0) == 0:  # only major version 0 supported
1✔
1312
                        self.channels = ch
1✔
1313
                        self.samplerate = 48000  # internally opus always uses 48khz
1✔
1314
            elif packet[0:8] == b'OpusTags':
1✔
1315
                if self._parse_tags:  # parse opus metadata:
1✔
1316
                    walker.seek(8, os.SEEK_CUR)  # jump over header name
1✔
1317
                    self._parse_vorbis_comment(walker)
1✔
1318
            elif packet[0:5] == b'\x7fFLAC':
1✔
1319
                # https://xiph.org/flac/ogg_mapping.html
1320
                walker.seek(9, os.SEEK_CUR)  # jump over header name, version and number of headers
1✔
1321
                flactag = _Flac()
1✔
1322
                flactag._filehandler = walker
1✔
1323
                flactag.filesize = self.filesize
1✔
1324
                flactag._load(tags=self._parse_tags, duration=self._parse_duration,
1✔
1325
                              image=self._load_image)
1326
                self._update(flactag)
1✔
1327
                check_flac_second_packet = True
1✔
1328
            elif check_flac_second_packet:
1✔
1329
                # second packet contains FLAC metadata block
1330
                if self._parse_tags:
1✔
1331
                    meta_header = struct.unpack('B3B', walker.read(4))
1✔
1332
                    block_type = meta_header[0] & 0x7f
1✔
1333
                    if block_type == _Flac.METADATA_VORBIS_COMMENT:
1✔
1334
                        self._parse_vorbis_comment(walker)
1✔
1335
                check_flac_second_packet = False
1✔
1336
            elif packet[0:8] == b'Speex   ':
1✔
1337
                # https://speex.org/docs/manual/speex-manual/node8.html
1338
                if self._parse_duration:
1✔
1339
                    walker.seek(36, os.SEEK_CUR)  # jump over header name and irrelevant fields
1✔
1340
                    (self.samplerate, _, _, self.channels,
1✔
1341
                     self.bitrate) = struct.unpack("<5i", walker.read(20))
1342
                check_speex_second_packet = True
1✔
1343
            elif check_speex_second_packet:
1✔
1344
                if self._parse_tags:
1✔
1345
                    length = struct.unpack('I', walker.read(4))[0]  # starts with a comment string
1✔
1346
                    comment = walker.read(length).decode('utf-8', 'replace')
1✔
1347
                    self._set_field('comment', comment)
1✔
1348
                    self._parse_vorbis_comment(walker, contains_vendor=False)  # other tags
1✔
1349
                check_speex_second_packet = False
1✔
1350
            else:
1351
                if DEBUG:
1✔
1352
                    print('Unsupported Ogg page type: ', packet[:16], file=stderr)
1✔
1353
                break
1✔
1354
        self._tags_parsed = True
1✔
1355

1356
    def _parse_vorbis_comment(self, fh: BinaryIO, contains_vendor: bool = True) -> None:
1✔
1357
        # for the spec, see: http://xiph.org/vorbis/doc/v-comment.html
1358
        # discnumber tag based on: https://en.wikipedia.org/wiki/Vorbis_comment
1359
        # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/Vorbis.html
1360
        if contains_vendor:
1✔
1361
            vendor_length = struct.unpack('I', fh.read(4))[0]
1✔
1362
            fh.seek(vendor_length, os.SEEK_CUR)  # jump over vendor
1✔
1363
        elements = struct.unpack('I', fh.read(4))[0]
1✔
1364
        for _i in range(elements):
1✔
1365
            length = struct.unpack('I', fh.read(4))[0]
1✔
1366
            keyvalpair = fh.read(length).decode('utf-8', 'replace')
1✔
1367
            if '=' in keyvalpair:
1✔
1368
                key, value = keyvalpair.split('=', 1)
1✔
1369
                key_lowercase = key.lower()
1✔
1370

1371
                if key_lowercase == "metadata_block_picture" and self._load_image:
1✔
1372
                    if DEBUG:
1✔
1373
                        print('Found Vorbis TagImage', key, value[:64])
1✔
1374
                    fieldname, fieldvalue = _Flac._parse_image(io.BytesIO(base64.b64decode(value)))
1✔
1375
                    self.images._set_field(fieldname, fieldvalue)
1✔
1376
                else:
1377
                    if DEBUG:
1✔
1378
                        print('Found Vorbis Comment', key, value[:64])
1✔
1379
                    fieldname = self._VORBIS_MAPPING.get(
1✔
1380
                        key_lowercase, self._EXTRA_PREFIX + key_lowercase)  # custom field
1381
                    if fieldname in {'track', 'disc', 'track_total', 'disc_total'}:
1✔
1382
                        if fieldname in {'track', 'disc'} and '/' in value:
1✔
1383
                            value, total = value.split('/')[:2]
1✔
1384
                            if total.isdecimal():
1✔
1385
                                self._set_field(f'{fieldname}_total', int(total))
1✔
1386
                        if value.isdecimal():
1✔
1387
                            self._set_field(fieldname, int(value))
1✔
1388
                    elif value:
1✔
1389
                        self._set_field(fieldname, value)
1✔
1390

1391
    def _parse_pages(self, fh: BinaryIO) -> Iterator[bytes]:
1✔
1392
        # for the spec, see: https://wiki.xiph.org/Ogg
1393
        previous_page = b''  # contains data from previous (continuing) pages
1✔
1394
        header_data = fh.read(27)  # read ogg page header
1✔
1395
        while len(header_data) == 27:
1✔
1396
            header = struct.unpack('<4sBBqIIiB', header_data)
1✔
1397
            # https://xiph.org/ogg/doc/framing.html
1398
            oggs, version, _flags, pos, _serial, _pageseq, _crc, segments = header
1✔
1399
            self._max_samplenum = max(self._max_samplenum, pos)
1✔
1400
            if oggs != b'OggS' or version != 0:
1✔
1401
                raise ParseError('Invalid OGG header')
1✔
1402
            segsizes = struct.unpack('B' * segments, fh.read(segments))
1✔
1403
            total = 0
1✔
1404
            for segsize in segsizes:  # read all segments
1✔
1405
                total += segsize
1✔
1406
                if total < 255:  # less than 255 bytes means end of page
1✔
1407
                    yield previous_page + fh.read(total)
1✔
1408
                    previous_page = b''
1✔
1409
                    total = 0
1✔
1410
            if total != 0:
1✔
1411
                if total % 255 == 0:
1✔
1412
                    previous_page += fh.read(total)
×
1413
                else:
1414
                    yield previous_page + fh.read(total)
1✔
1415
                    previous_page = b''
1✔
1416
            header_data = fh.read(27)
1✔
1417

1418

1419
class _Wave(TinyTag):
1✔
1420
    # https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
1421
    _RIFF_MAPPING = {
1✔
1422
        b'INAM': 'title',
1423
        b'TITL': 'title',
1424
        b'IPRD': 'album',
1425
        b'IART': 'artist',
1426
        b'IBPM': 'extra.bpm',
1427
        b'ICMT': 'comment',
1428
        b'IMUS': 'composer',
1429
        b'ICOP': 'extra.copyright',
1430
        b'ICRD': 'year',
1431
        b'IGNR': 'genre',
1432
        b'ILNG': 'extra.language',
1433
        b'ISRC': 'extra.isrc',
1434
        b'IPUB': 'extra.publisher',
1435
        b'IPRT': 'track',
1436
        b'ITRK': 'track',
1437
        b'TRCK': 'track',
1438
        b'IBSU': 'extra.url',
1439
        b'YEAR': 'year',
1440
        b'IWRI': 'extra.lyricist',
1441
        b'IENC': 'extra.encoded_by',
1442
        b'IMED': 'extra.media',
1443
    }
1444

1445
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1446
        if not self._tags_parsed:
1✔
1447
            self._parse_tag(fh)
1✔
1448

1449
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1450
        # see: http://www-mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
1451
        # and: https://en.wikipedia.org/wiki/WAV
1452
        riff, _size, fformat = struct.unpack('4sI4s', fh.read(12))
1✔
1453
        if riff != b'RIFF' or fformat != b'WAVE':
1✔
1454
            raise ParseError('Invalid WAV header')
1✔
1455
        if self._parse_duration:
1✔
1456
            self.bitdepth = 16  # assume 16bit depth (CD quality)
1✔
1457
        chunk_header = fh.read(8)
1✔
1458
        while len(chunk_header) == 8:
1✔
1459
            subchunkid, subchunksize = struct.unpack('4sI', chunk_header)
1✔
1460
            subchunksize += subchunksize % 2  # IFF chunks are padded to an even number of bytes
1✔
1461
            if subchunkid == b'fmt ' and self._parse_duration:
1✔
1462
                _, channels, samplerate = struct.unpack('HHI', fh.read(8))
1✔
1463
                _, _, bitdepth = struct.unpack('<IHH', fh.read(8))
1✔
1464
                if bitdepth == 0:
1✔
1465
                    # Certain codecs (e.g. GSM 6.10) give us a bit depth of zero.
1466
                    # Avoid division by zero when calculating duration.
1467
                    bitdepth = 1
1✔
1468
                self.bitrate = samplerate * channels * bitdepth / 1000
1✔
1469
                self.channels, self.samplerate, self.bitdepth = channels, samplerate, bitdepth
1✔
1470
                remaining_size = subchunksize - 16
1✔
1471
                if remaining_size > 0:
1✔
1472
                    fh.seek(remaining_size, 1)  # skip remaining data in chunk
1✔
1473
            elif subchunkid == b'data' and self._parse_duration:
1✔
1474
                if (self.channels is not None and self.samplerate is not None
1✔
1475
                        and self.bitdepth is not None):
1476
                    self.duration = (
1✔
1477
                        subchunksize / self.channels / self.samplerate / (self.bitdepth / 8))
1478
                fh.seek(subchunksize, 1)
1✔
1479
            elif subchunkid == b'LIST' and self._parse_tags:
1✔
1480
                is_info = fh.read(4)  # check INFO header
1✔
1481
                if is_info != b'INFO':  # jump over non-INFO sections
1✔
1482
                    fh.seek(subchunksize - 4, os.SEEK_CUR)
×
1483
                else:
1484
                    sub_fh = io.BytesIO(fh.read(subchunksize - 4))
1✔
1485
                    field = sub_fh.read(4)
1✔
1486
                    while len(field) == 4:
1✔
1487
                        data_length = struct.unpack('I', sub_fh.read(4))[0]
1✔
1488
                        data_length += data_length % 2  # IFF chunks are padded to an even size
1✔
1489
                        data = sub_fh.read(data_length).split(b'\x00', 1)[0]  # strip zero-byte
1✔
1490
                        fieldname = self._RIFF_MAPPING.get(field)
1✔
1491
                        if fieldname:
1✔
1492
                            value = data.decode('utf-8', 'replace')
1✔
1493
                            if fieldname == 'track':
1✔
1494
                                if value.isdecimal():
1✔
1495
                                    self._set_field(fieldname, int(value))
1✔
1496
                            else:
1497
                                self._set_field(fieldname, value)
1✔
1498
                        field = sub_fh.read(4)
1✔
1499
            elif subchunkid in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1500
                id3 = _ID3()
1✔
1501
                id3._filehandler = fh
1✔
1502
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1503
                self._update(id3)
1✔
1504
            else:  # some other chunk, just skip the data
1505
                fh.seek(subchunksize, 1)
1✔
1506
            chunk_header = fh.read(8)
1✔
1507
        self._tags_parsed = True
1✔
1508

1509

1510
class _Flac(TinyTag):
1✔
1511
    METADATA_STREAMINFO = 0
1✔
1512
    METADATA_PADDING = 1
1✔
1513
    METADATA_APPLICATION = 2
1✔
1514
    METADATA_SEEKTABLE = 3
1✔
1515
    METADATA_VORBIS_COMMENT = 4
1✔
1516
    METADATA_CUESHEET = 5
1✔
1517
    METADATA_PICTURE = 6
1✔
1518

1519
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1520
        if not self._tags_parsed:
1✔
1521
            self._parse_tag(fh)
1✔
1522

1523
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1524
        id3 = None
1✔
1525
        header = fh.read(4)
1✔
1526
        if header[:3] == b'ID3':  # parse ID3 header if it exists
1✔
1527
            fh.seek(-4, os.SEEK_CUR)
1✔
1528
            id3 = _ID3()
1✔
1529
            id3._filehandler = fh
1✔
1530
            id3._parse_tags = self._parse_tags
1✔
1531
            id3._load_image = self._load_image
1✔
1532
            id3._parse_id3v2(fh)
1✔
1533
            header = fh.read(4)  # after ID3 should be fLaC
1✔
1534
        if header[:4] != b'fLaC':
1✔
1535
            raise ParseError('Invalid FLAC header')
1✔
1536
        # for spec, see https://xiph.org/flac/ogg_mapping.html
1537
        header_data = fh.read(4)
1✔
1538
        while len(header_data) == 4:
1✔
1539
            meta_header = struct.unpack('B3B', header_data)
1✔
1540
            block_type = meta_header[0] & 0x7f
1✔
1541
            is_last_block = meta_header[0] & 0x80
1✔
1542
            size = self._bytes_to_int(meta_header[1:4])
1✔
1543
            # http://xiph.org/flac/format.html#metadata_block_streaminfo
1544
            if block_type == self.METADATA_STREAMINFO and self._parse_duration:
1✔
1545
                stream_info_header = fh.read(size)
1✔
1546
                if len(stream_info_header) < 34:  # invalid streaminfo
1✔
1547
                    break
1✔
1548
                header_values = struct.unpack('HH3s3s8B16s', stream_info_header)
1✔
1549
                # From the xiph documentation:
1550
                # py | <bits>
1551
                # ----------------------------------------------
1552
                # H  | <16>  The minimum block size (in samples)
1553
                # H  | <16>  The maximum block size (in samples)
1554
                # 3s | <24>  The minimum frame size (in bytes)
1555
                # 3s | <24>  The maximum frame size (in bytes)
1556
                # 8B | <20>  Sample rate in Hz.
1557
                #    | <3>   (number of channels)-1.
1558
                #    | <5>   (bits per sample)-1.
1559
                #    | <36>  Total samples in stream.
1560
                # 16s| <128> MD5 signature
1561
                # min_blk, max_blk, min_frm, max_frm = header[0:4]
1562
                # min_frm = self._bytes_to_int(struct.unpack('3B', min_frm))
1563
                # max_frm = self._bytes_to_int(struct.unpack('3B', max_frm))
1564
                #                 channels--.  bits      total samples
1565
                # |----- samplerate -----| |-||----| |---------~   ~----|
1566
                # 0000 0000 0000 0000 0000 0000 0000 0000 0000      0000
1567
                # #---4---# #---5---# #---6---# #---7---# #--8-~   ~-12-#
1568
                self.samplerate = self._bytes_to_int(header_values[4:7]) >> 4
1✔
1569
                self.channels = ((header_values[6] >> 1) & 0x07) + 1
1✔
1570
                self.bitdepth = (
1✔
1571
                    ((header_values[6] & 1) << 4) + ((header_values[7] & 0xF0) >> 4) + 1)
1572
                total_sample_bytes = ((header_values[7] & 0x0F),) + header_values[8:12]
1✔
1573
                total_samples = self._bytes_to_int(total_sample_bytes)
1✔
1574
                self.duration = total_samples / self.samplerate
1✔
1575
                if self.duration > 0:
1✔
1576
                    self.bitrate = self.filesize / self.duration * 8 / 1000
1✔
1577
            elif block_type == self.METADATA_VORBIS_COMMENT and self._parse_tags:
1✔
1578
                oggtag = _Ogg()
1✔
1579
                oggtag._filehandler = fh
1✔
1580
                oggtag._parse_vorbis_comment(fh)
1✔
1581
                self._update(oggtag)
1✔
1582
            elif block_type == self.METADATA_PICTURE and self._load_image:
1✔
1583
                fieldname, value = self._parse_image(fh)
1✔
1584
                self.images._set_field(fieldname, value)
1✔
1585
            elif block_type >= 127:
1✔
1586
                break  # invalid block type
×
1587
            else:
1588
                if DEBUG:
1✔
1589
                    print('Unknown FLAC block type', block_type)
1✔
1590
                fh.seek(size, 1)  # seek over this block
1✔
1591

1592
            if is_last_block:
1✔
1593
                break
1✔
1594
            header_data = fh.read(4)
1✔
1595
        if id3 is not None:  # apply ID3 tags after vorbis
1✔
1596
            self._update(id3)
1✔
1597
        self._tags_parsed = True
1✔
1598

1599
    @classmethod
1✔
1600
    def _parse_image(cls, fh: BinaryIO) -> tuple[str, TagImage]:
1✔
1601
        # https://xiph.org/flac/format.html#metadata_block_picture
1602
        pic_type, mime_type_len = struct.unpack('>2I', fh.read(8))
1✔
1603
        mime_type = fh.read(mime_type_len).decode('utf-8', 'replace')
1✔
1604
        description_len = struct.unpack('>I', fh.read(4))[0]
1✔
1605
        description = fh.read(description_len).decode('utf-8', 'replace')
1✔
1606
        _width, _height, _depth, _colors, pic_len = struct.unpack('>5I', fh.read(20))
1✔
1607
        return _ID3._create_tag_image(fh.read(pic_len), pic_type, mime_type, description)
1✔
1608

1609

1610
class _Wma(TinyTag):
1✔
1611
    # see:
1612
    # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx
1613
    # and (japanese, but none the less helpful)
1614
    # http://uguisu.skr.jp/Windows/format_asf.html
1615
    _ASF_MAPPING = {
1✔
1616
        'WM/ARTISTS': 'artist',
1617
        'WM/TrackNumber': 'track',
1618
        'WM/PartOfSet': 'disc',
1619
        'WM/Year': 'year',
1620
        'WM/AlbumArtist': 'albumartist',
1621
        'WM/Genre': 'genre',
1622
        'WM/AlbumTitle': 'album',
1623
        'WM/Composer': 'composer',
1624
        'WM/Publisher': 'extra.publisher',
1625
        'WM/BeatsPerMinute': 'extra.bpm',
1626
        'WM/InitialKey': 'extra.initial_key',
1627
        'WM/Lyrics': 'extra.lyrics',
1628
        'WM/Language': 'extra.language',
1629
        'WM/Director': 'extra.director',
1630
        'WM/AuthorURL': 'extra.url',
1631
        'WM/ISRC': 'extra.isrc',
1632
        'WM/Conductor': 'extra.conductor',
1633
        'WM/Writer': 'extra.lyricist',
1634
        'WM/SetSubTitle': 'extra.set_subtitle',
1635
        'WM/EncodedBy': 'extra.encoded_by',
1636
        'WM/EncodingSettings': 'extra.encoder_settings',
1637
        'WM/Media': 'extra.media',
1638
        'WM/OriginalReleaseTime': 'extra.original_date',
1639
        'WM/OriginalReleaseYear': 'extra.original_year',
1640
        'WM/Barcode': 'extra.barcode',
1641
        'WM/CatalogNo': 'extra.catalog_number',
1642
    }
1643
    _ASF_CONTENT_DESCRIPTION_OBJECT = b'3&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'
1✔
1644
    _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT = (b'@\xa4\xd0\xd2\x07\xe3\xd2\x11\x97\xf0\x00'
1✔
1645
                                                b'\xa0\xc9^\xa8P')
1646
    _STREAM_BITRATE_PROPERTIES_OBJECT = b'\xceu\xf8{\x8dF\xd1\x11\x8d\x82\x00`\x97\xc9\xa2\xb2'
1✔
1647
    _ASF_FILE_PROPERTY_OBJECT = b'\xa1\xdc\xab\x8cG\xa9\xcf\x11\x8e\xe4\x00\xc0\x0c Se'
1✔
1648
    _ASF_STREAM_PROPERTIES_OBJECT = b'\x91\x07\xdc\xb7\xb7\xa9\xcf\x11\x8e\xe6\x00\xc0\x0c Se'
1✔
1649
    _STREAM_TYPE_ASF_AUDIO_MEDIA = b'@\x9ei\xf8M[\xcf\x11\xa8\xfd\x00\x80_\\D+'
1✔
1650

1651
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1652
        if not self._tags_parsed:
1✔
1653
            self._parse_tag(fh)
1✔
1654

1655
    def _decode_string(self, bytestring: bytes) -> str:
1✔
1656
        return self._unpad(bytestring.decode('utf-16', 'replace'))
1✔
1657

1658
    def _decode_ext_desc(self, value_type: int, value: bytes) -> str | None:
1✔
1659
        """ decode _ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT values"""
1660
        if value_type == 0:  # Unicode string
1✔
1661
            return self._decode_string(value)
1✔
1662
        if 1 < value_type < 6:  # DWORD / QWORD / WORD
1✔
1663
            return str(self._bytes_to_int_le(value))
1✔
1664
        return None
×
1665

1666
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1667
        header = fh.read(30)
1✔
1668
        # http://www.garykessler.net/library/file_sigs.html
1669
        # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc521913958
1670
        if (header[:16] != b'0&\xb2u\x8ef\xcf\x11\xa6\xd9\x00\xaa\x00b\xcel'  # 128 bit GUID
1✔
1671
                or header[-1:] != b'\x02'):
1672
            raise ParseError('Invalid WMA header')
1✔
1673
        while True:
1✔
1674
            object_id = fh.read(16)
1✔
1675
            object_size = self._bytes_to_int_le(fh.read(8))
1✔
1676
            if object_size == 0 or object_size > self.filesize:
1✔
1677
                break  # invalid object, stop parsing.
1✔
1678
            if object_id == self._ASF_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1679
                title_length = self._bytes_to_int_le(fh.read(2))
1✔
1680
                author_length = self._bytes_to_int_le(fh.read(2))
1✔
1681
                copyright_length = self._bytes_to_int_le(fh.read(2))
1✔
1682
                description_length = self._bytes_to_int_le(fh.read(2))
1✔
1683
                rating_length = self._bytes_to_int_le(fh.read(2))
1✔
1684
                data_blocks = {
1✔
1685
                    'title': title_length,
1686
                    'artist': author_length,
1687
                    'extra.copyright': copyright_length,
1688
                    'comment': description_length,
1689
                    '_rating': rating_length,
1690
                }
1691
                for i_field_name, length in data_blocks.items():
1✔
1692
                    bytestring = fh.read(length)
1✔
1693
                    value = self._decode_string(bytestring)
1✔
1694
                    if not i_field_name.startswith('_') and value:
1✔
1695
                        self._set_field(i_field_name, value)
1✔
1696
            elif object_id == self._ASF_EXTENDED_CONTENT_DESCRIPTION_OBJECT and self._parse_tags:
1✔
1697
                # http://web.archive.org/web/20131203084402/http://msdn.microsoft.com/en-us/library/bb643323.aspx#_Toc509555195
1698
                descriptor_count = self._bytes_to_int_le(fh.read(2))
1✔
1699
                for _ in range(descriptor_count):
1✔
1700
                    name_len = self._bytes_to_int_le(fh.read(2))
1✔
1701
                    name = self._decode_string(fh.read(name_len))
1✔
1702
                    value_type = self._bytes_to_int_le(fh.read(2))
1✔
1703
                    value_len = self._bytes_to_int_le(fh.read(2))
1✔
1704
                    if value_type == 1:
1✔
1705
                        fh.seek(value_len, os.SEEK_CUR)  # skip byte values
1✔
1706
                        continue
1✔
1707
                    field_name = self._ASF_MAPPING.get(name)  # try to get normalized field name
1✔
1708
                    if field_name is None:  # custom field
1✔
1709
                        if name.startswith('WM/'):
1✔
1710
                            name = name[3:]
1✔
1711
                        field_name = self._EXTRA_PREFIX + name.lower()
1✔
1712
                    field_value = self._decode_ext_desc(value_type, fh.read(value_len))
1✔
1713
                    if field_value is not None:
1✔
1714
                        if field_name in {'track', 'disc'}:
1✔
1715
                            if isinstance(field_value, int) or field_value.isdecimal():
1✔
1716
                                self._set_field(field_name, int(field_value))
1✔
1717
                        elif field_value:
1✔
1718
                            self._set_field(field_name, field_value)
1✔
1719
            elif object_id == self._ASF_FILE_PROPERTY_OBJECT and self._parse_duration:
1✔
1720
                fh.seek(40, os.SEEK_CUR)
1✔
1721
                play_duration = self._bytes_to_int_le(fh.read(8)) / 10000000
1✔
1722
                fh.seek(8, os.SEEK_CUR)
1✔
1723
                preroll = self._bytes_to_int_le(fh.read(8)) / 1000
1✔
1724
                fh.seek(16, os.SEEK_CUR)
1✔
1725
                # According to the specification, we need to subtract the preroll from play_duration
1726
                # to get the actual duration of the file
1727
                self.duration = max(play_duration - preroll, 0.0)
1✔
1728
            elif object_id == self._ASF_STREAM_PROPERTIES_OBJECT and self._parse_duration:
1✔
1729
                stream_type = fh.read(16)
1✔
1730
                fh.seek(24, os.SEEK_CUR)  # skip irrelevant fields
1✔
1731
                type_specific_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1732
                error_correction_data_length = self._bytes_to_int_le(fh.read(4))
1✔
1733
                fh.seek(6, os.SEEK_CUR)   # skip irrelevant fields
1✔
1734
                already_read = 0
1✔
1735
                if stream_type == self._STREAM_TYPE_ASF_AUDIO_MEDIA:
1✔
1736
                    codec_id_format_tag = self._bytes_to_int_le(fh.read(2))
1✔
1737
                    self.channels = self._bytes_to_int_le(fh.read(2))
1✔
1738
                    self.samplerate = self._bytes_to_int_le(fh.read(4))
1✔
1739
                    avg_bytes_per_second = self._bytes_to_int_le(fh.read(4))
1✔
1740
                    self.bitrate = avg_bytes_per_second * 8 / 1000
1✔
1741
                    fh.seek(2, os.SEEK_CUR)  # skip irrelevant field
1✔
1742
                    bits_per_sample = self._bytes_to_int_le(fh.read(2))
1✔
1743
                    if codec_id_format_tag == 355:  # lossless
1✔
1744
                        self.bitdepth = bits_per_sample
1✔
1745
                    already_read = 16
1✔
1746
                fh.seek(type_specific_data_length - already_read, os.SEEK_CUR)
1✔
1747
                fh.seek(error_correction_data_length, os.SEEK_CUR)
1✔
1748
            else:
1749
                fh.seek(object_size - 24, os.SEEK_CUR)  # read over onknown object ids
1✔
1750
        self._tags_parsed = True
1✔
1751

1752

1753
class _Aiff(TinyTag):
1✔
1754
    #
1755
    # AIFF is part of the IFF family of file formats.
1756
    #
1757
    # https://en.wikipedia.org/wiki/Audio_Interchange_File_Format#Data_format
1758
    # https://web.archive.org/web/20171118222232/http://www-mmsp.ece.mcgill.ca/documents/audioformats/aiff/aiff.html
1759
    # https://web.archive.org/web/20071219035740/http://www.cnpbagwell.com/aiff-c.txt
1760
    #
1761
    # A few things about the spec:
1762
    #
1763
    # * IFF strings are not supposed to be null terminated.  They sometimes are.
1764
    # * Some tools might throw more metadata into the ANNO chunk but it is
1765
    #   wildly unreliable to count on it. In fact, the official spec recommends against
1766
    #   using it. That said... this code throws the ANNO field into comment and hopes
1767
    #   for the best.
1768
    #
1769
    # The key thing here is that AIFF metadata is usually in a handful of fields
1770
    # and the rest is an ID3 or XMP field.  XMP is too complicated and only Adobe-related
1771
    # products support it. The vast majority use ID3. As such, this code inherits from
1772
    # ID3 rather than TinyTag since it does everything that needs to be done here.
1773
    #
1774

1775
    _AIFF_MAPPING = {
1✔
1776
        #
1777
        # "Name Chunk text contains the name of the sampled sound."
1778
        #
1779
        # "Author Chunk text contains one or more author names.  An author in
1780
        # this case is the creator of a sampled sound."
1781
        #
1782
        # "Annotation Chunk text contains a comment.  Use of this chunk is
1783
        # discouraged within FORM AIFC." Some tools: "hold my beer"
1784
        #
1785
        # "The Copyright Chunk contains a copyright notice for the sound.  text
1786
        #  contains a date followed by the copyright owner.  The chunk ID '[c] '
1787
        # serves as the copyright character. " Some tools: "hold my beer"
1788
        #
1789
        b'NAME': 'title',
1790
        b'AUTH': 'artist',
1791
        b'ANNO': 'comment',
1792
        b'(c) ': 'extra.copyright',
1793
    }
1794

1795
    def _parse_tag(self, fh: BinaryIO) -> None:
1✔
1796
        chunk_id, _size, form = struct.unpack('>4sI4s', fh.read(12))
1✔
1797
        if chunk_id != b'FORM' or form not in (b'AIFC', b'AIFF'):
1✔
1798
            raise ParseError('Invalid AIFF header')
1✔
1799
        chunk_header = fh.read(8)
1✔
1800
        while len(chunk_header) == 8:
1✔
1801
            sub_chunk_id, sub_chunk_size = struct.unpack('>4sI', chunk_header)
1✔
1802
            sub_chunk_size += sub_chunk_size % 2  # IFF chunks are padded to an even number of bytes
1✔
1803
            if sub_chunk_id in self._AIFF_MAPPING and self._parse_tags:
1✔
1804
                value = self._unpad(fh.read(sub_chunk_size).decode('utf-8', 'replace'))
1✔
1805
                self._set_field(self._AIFF_MAPPING[sub_chunk_id], value)
1✔
1806
            elif sub_chunk_id == b'COMM' and self._parse_duration:
1✔
1807
                channels, num_frames, bitdepth = struct.unpack('>hLh', fh.read(8))
1✔
1808
                self.channels, self.bitdepth = channels, bitdepth
1✔
1809
                try:
1✔
1810
                    exponent, mantissa = struct.unpack('>HQ', fh.read(10))   # Extended precision
1✔
1811
                    samplerate = int(mantissa * (2 ** (exponent - 0x3FFF - 63)))
1✔
1812
                    duration = num_frames / samplerate
1✔
1813
                    bitrate = samplerate * channels * bitdepth / 1000
1✔
1814
                    self.samplerate, self.duration, self.bitrate = samplerate, duration, bitrate
1✔
1815
                except OverflowError:
1✔
1816
                    pass
1✔
1817
                fh.seek(sub_chunk_size - 18, 1)  # skip remaining data in chunk
1✔
1818
            elif sub_chunk_id in {b'id3 ', b'ID3 '} and self._parse_tags:
1✔
1819
                id3 = _ID3()
1✔
1820
                id3._filehandler = fh
1✔
1821
                id3._load(tags=True, duration=False, image=self._load_image)
1✔
1822
                self._update(id3)
1✔
1823
            else:  # some other chunk, just skip the data
1824
                fh.seek(sub_chunk_size, 1)
1✔
1825
            chunk_header = fh.read(8)
1✔
1826
        self._tags_parsed = True
1✔
1827

1828
    def _determine_duration(self, fh: BinaryIO) -> None:
1✔
1829
        if not self._tags_parsed:
1✔
1830
            self._parse_tag(fh)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc