• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.04
/can/io/blf.py
1
"""
21✔
2
Implements support for BLF (Binary Logging Format) which is a proprietary
3
CAN log format from Vector Informatik GmbH (Germany).
4

5
No official specification of the binary logging format is available.
6
This implementation is based on Toby Lorenz' C++ library "Vector BLF" which is
7
licensed under GPLv3. https://bitbucket.org/tobylorenz/vector_blf.
8

9
The file starts with a header. The rest is one or more "log containers"
10
which consists of a header and some zlib compressed data, usually up to 128 kB
11
of uncompressed data each. This data contains the actual CAN messages and other
12
objects types.
13
"""
14

15
import datetime
21✔
16
import logging
21✔
17
import struct
21✔
18
import time
21✔
19
import zlib
21✔
20
from collections.abc import Generator
21✔
21
from decimal import Decimal
21✔
22
from typing import Any, BinaryIO, Optional, Union, cast
21✔
23

24
from ..message import Message
21✔
25
from ..typechecking import StringPathLike
21✔
26
from ..util import channel2int, dlc2len, len2dlc
21✔
27
from .generic import BinaryIOMessageReader, FileIOMessageWriter
21✔
28

29
TSystemTime = tuple[int, int, int, int, int, int, int, int]
21✔
30

31

32
class BLFParseError(Exception):
21✔
33
    """BLF file could not be parsed correctly."""
21✔
34

35

36
LOG = logging.getLogger(__name__)
21✔
37

38
# signature ("LOGG"), header size,
39
# application ID, application major, application minor, application build,
40
# bin log major, bin log minor, bin log build, bin log patch,
41
# file size, uncompressed size, count of objects, count of objects read,
42
# time start (SYSTEMTIME), time stop (SYSTEMTIME)
43
FILE_HEADER_STRUCT = struct.Struct("<4sLBBBBBBBBQQLL8H8H")
21✔
44

45
# Pad file header to this size
46
FILE_HEADER_SIZE = 144
21✔
47

48
# signature ("LOBJ"), header size, header version, object size, object type
49
OBJ_HEADER_BASE_STRUCT = struct.Struct("<4sHHLL")
21✔
50

51
# flags, client index, object version, timestamp
52
OBJ_HEADER_V1_STRUCT = struct.Struct("<LHHQ")
21✔
53

54
# flags, timestamp status, object version, timestamp, (original timestamp)
55
OBJ_HEADER_V2_STRUCT = struct.Struct("<LBxHQ8x")
21✔
56

57
# compression method, size uncompressed
58
LOG_CONTAINER_STRUCT = struct.Struct("<H6xL4x")
21✔
59

60
# channel, flags, dlc, arbitration id, data
61
CAN_MSG_STRUCT = struct.Struct("<HBBL8s")
21✔
62

63
# channel, flags, dlc, arbitration id, frame length, bit count, FD flags,
64
# valid data bytes, data
65
CAN_FD_MSG_STRUCT = struct.Struct("<HBBLLBBB5x64s")
21✔
66

67
# channel, dlc, valid payload length of data, tx count, arbitration id,
68
# frame length, flags, bit rate used in arbitration phase,
69
# bit rate used in data phase, time offset of brs field,
70
# time offset of crc delimiter field, bit count, direction,
71
# offset if extDataOffset is used, crc
72
CAN_FD_MSG_64_STRUCT = struct.Struct("<BBBBLLLLLLLHBBL")
21✔
73

74
# channel, length, flags, ecc, position, dlc, frame length, id, flags ext, data
75
CAN_ERROR_EXT_STRUCT = struct.Struct("<HHLBBBxLLH2x8s")
21✔
76

77
# commented event type, foreground color, background color, relocatable,
78
# group name length, marker name length, description length
79
GLOBAL_MARKER_STRUCT = struct.Struct("<LLL3xBLLL12x")
21✔
80

81

82
CAN_MESSAGE = 1
21✔
83
LOG_CONTAINER = 10
21✔
84
CAN_ERROR_EXT = 73
21✔
85
CAN_MESSAGE2 = 86
21✔
86
GLOBAL_MARKER = 96
21✔
87
CAN_FD_MESSAGE = 100
21✔
88
CAN_FD_MESSAGE_64 = 101
21✔
89

90
NO_COMPRESSION = 0
21✔
91
ZLIB_DEFLATE = 2
21✔
92

93
CAN_MSG_EXT = 0x80000000
21✔
94
REMOTE_FLAG = 0x80
21✔
95
EDL = 0x1
21✔
96
BRS = 0x2
21✔
97
ESI = 0x4
21✔
98
DIR = 0x1
21✔
99

100
TIME_TEN_MICS = 0x00000001
21✔
101
TIME_ONE_NANS = 0x00000002
21✔
102

103
TIME_TEN_MICS_FACTOR = Decimal("1e-5")
21✔
104
TIME_ONE_NANS_FACTOR = Decimal("1e-9")
21✔
105

106

107
def timestamp_to_systemtime(timestamp: float) -> TSystemTime:
21✔
108
    if timestamp is None or timestamp < 631152000:
21✔
109
        # Probably not a Unix timestamp
110
        return 0, 0, 0, 0, 0, 0, 0, 0
21✔
111
    t = datetime.datetime.fromtimestamp(round(timestamp, 3), tz=datetime.timezone.utc)
21✔
112
    return (
21✔
113
        t.year,
114
        t.month,
115
        t.isoweekday() % 7,
116
        t.day,
117
        t.hour,
118
        t.minute,
119
        t.second,
120
        t.microsecond // 1000,
121
    )
122

123

124
def systemtime_to_timestamp(systemtime: TSystemTime) -> float:
21✔
125
    try:
21✔
126
        t = datetime.datetime(
21✔
127
            systemtime[0],
128
            systemtime[1],
129
            systemtime[3],
130
            systemtime[4],
131
            systemtime[5],
132
            systemtime[6],
133
            systemtime[7] * 1000,
134
            tzinfo=datetime.timezone.utc,
135
        )
136
        return t.timestamp()
21✔
137
    except ValueError:
21✔
138
        return 0
21✔
139

140

141
class BLFReader(BinaryIOMessageReader):
21✔
142
    """
21✔
143
    Iterator of CAN messages from a Binary Logging File.
144

145
    Only CAN messages and error frames are supported. Other object types are
146
    silently ignored.
147
    """
148

149
    file: BinaryIO
21✔
150

151
    def __init__(
21✔
152
        self,
153
        file: Union[StringPathLike, BinaryIO],
154
        **kwargs: Any,
155
    ) -> None:
156
        """
157
        :param file: a path-like object or as file-like object to read from
158
                     If this is a file-like object, is has to opened in binary
159
                     read mode, not text read mode.
160
        """
161
        super().__init__(file, mode="rb")
21✔
162
        data = self.file.read(FILE_HEADER_STRUCT.size)
21✔
163
        header = FILE_HEADER_STRUCT.unpack(data)
21✔
164
        if header[0] != b"LOGG":
21✔
UNCOV
165
            raise BLFParseError("Unexpected file format")
×
166
        self.file_size = header[10]
21✔
167
        self.uncompressed_size = header[11]
21✔
168
        self.object_count = header[12]
21✔
169
        self.start_timestamp = systemtime_to_timestamp(
21✔
170
            cast("TSystemTime", header[14:22])
171
        )
172
        self.stop_timestamp = systemtime_to_timestamp(
21✔
173
            cast("TSystemTime", header[22:30])
174
        )
175
        # Read rest of header
176
        self.file.read(header[1] - FILE_HEADER_STRUCT.size)
21✔
177
        self._tail = b""
21✔
178
        self._pos = 0
21✔
179

180
    def __iter__(self) -> Generator[Message, None, None]:
21✔
181
        while True:
15✔
182
            data = self.file.read(OBJ_HEADER_BASE_STRUCT.size)
21✔
183
            if not data:
21✔
184
                # EOF
185
                break
21✔
186

187
            signature, _, _, obj_size, obj_type = OBJ_HEADER_BASE_STRUCT.unpack(data)
21✔
188
            if signature != b"LOBJ":
21✔
UNCOV
189
                raise BLFParseError()
×
190
            obj_data = self.file.read(obj_size - OBJ_HEADER_BASE_STRUCT.size)
21✔
191
            # Read padding bytes
192
            self.file.read(obj_size % 4)
21✔
193

194
            if obj_type == LOG_CONTAINER:
21✔
195
                method, _ = LOG_CONTAINER_STRUCT.unpack_from(obj_data)
21✔
196
                container_data = obj_data[LOG_CONTAINER_STRUCT.size :]
21✔
197
                if method == NO_COMPRESSION:
21✔
198
                    data = container_data
21✔
199
                elif method == ZLIB_DEFLATE:
21✔
200
                    zobj = zlib.decompressobj()
21✔
201
                    data = zobj.decompress(container_data)
21✔
202
                else:
203
                    # Unknown compression method
UNCOV
204
                    LOG.warning("Unknown compression method (%d)", method)
×
UNCOV
205
                    continue
×
206
                yield from self._parse_container(data)
21✔
207
        self.stop()
21✔
208

209
    def _parse_container(self, data):
21✔
210
        if self._tail:
21✔
UNCOV
211
            data = b"".join((self._tail, data))
×
212
        try:
21✔
213
            yield from self._parse_data(data)
21✔
UNCOV
214
        except struct.error:
×
215
            # There was not enough data in the container to unpack a struct
UNCOV
216
            pass
×
217
        # Save the remaining data that could not be processed
218
        self._tail = data[self._pos :]
21✔
219

220
    def _parse_data(self, data):
21✔
221
        """Optimized inner loop by making local copies of global variables
222
        and class members and hardcoding some values."""
223
        unpack_obj_header_base = OBJ_HEADER_BASE_STRUCT.unpack_from
21✔
224
        obj_header_base_size = OBJ_HEADER_BASE_STRUCT.size
21✔
225
        unpack_obj_header_v1 = OBJ_HEADER_V1_STRUCT.unpack_from
21✔
226
        obj_header_v1_size = OBJ_HEADER_V1_STRUCT.size
21✔
227
        unpack_obj_header_v2 = OBJ_HEADER_V2_STRUCT.unpack_from
21✔
228
        obj_header_v2_size = OBJ_HEADER_V2_STRUCT.size
21✔
229
        unpack_can_msg = CAN_MSG_STRUCT.unpack_from
21✔
230
        unpack_can_fd_msg = CAN_FD_MSG_STRUCT.unpack_from
21✔
231
        unpack_can_fd_64_msg = CAN_FD_MSG_64_STRUCT.unpack_from
21✔
232
        can_fd_64_msg_size = CAN_FD_MSG_64_STRUCT.size
21✔
233
        unpack_can_error_ext = CAN_ERROR_EXT_STRUCT.unpack_from
21✔
234

235
        start_timestamp = self.start_timestamp
21✔
236
        max_pos = len(data)
21✔
237
        pos = 0
21✔
238

239
        # Loop until a struct unpack raises an exception
240
        while True:
15✔
241
            self._pos = pos
21✔
242
            # Find next object after padding (depends on object type)
243
            try:
21✔
244
                pos = data.index(b"LOBJ", pos, pos + 8)
21✔
245
            except ValueError:
21✔
246
                if pos + 8 > max_pos:
21✔
247
                    # Not enough data in container
248
                    return
21✔
UNCOV
249
                raise BLFParseError("Could not find next object") from None
×
250
            header = unpack_obj_header_base(data, pos)
21✔
251
            # print(header)
252
            signature, header_size, header_version, obj_size, obj_type = header
21✔
253
            if signature != b"LOBJ":
21✔
UNCOV
254
                raise BLFParseError()
×
255

256
            # Calculate position of next object
257
            next_pos = pos + obj_size
21✔
258
            if next_pos > max_pos:
21✔
259
                # This object continues in the next container
260
                return
×
261
            pos += obj_header_base_size
21✔
262

263
            # Read rest of header
264
            if header_version == 1:
21✔
265
                flags, _, _, timestamp = unpack_obj_header_v1(data, pos)
21✔
266
                pos += obj_header_v1_size
21✔
UNCOV
267
            elif header_version == 2:
×
UNCOV
268
                flags, _, _, timestamp = unpack_obj_header_v2(data, pos)
×
UNCOV
269
                pos += obj_header_v2_size
×
270
            else:
UNCOV
271
                LOG.warning("Unknown object header version (%d)", header_version)
×
UNCOV
272
                pos = next_pos
×
UNCOV
273
                continue
×
274

275
            # Calculate absolute timestamp in seconds
276
            factor = TIME_TEN_MICS_FACTOR if flags == 1 else TIME_ONE_NANS_FACTOR
21✔
277
            timestamp = float(Decimal(timestamp) * factor) + start_timestamp
21✔
278

279
            if obj_type in (CAN_MESSAGE, CAN_MESSAGE2):
21✔
280
                channel, flags, dlc, can_id, can_data = unpack_can_msg(data, pos)
21✔
281
                yield Message(
21✔
282
                    timestamp=timestamp,
283
                    arbitration_id=can_id & 0x1FFFFFFF,
284
                    is_extended_id=bool(can_id & CAN_MSG_EXT),
285
                    is_remote_frame=bool(flags & REMOTE_FLAG),
286
                    is_rx=not bool(flags & DIR),
287
                    dlc=dlc,
288
                    data=can_data[:dlc],
289
                    channel=channel - 1,
290
                )
291
            elif obj_type == CAN_ERROR_EXT:
21✔
292
                members = unpack_can_error_ext(data, pos)
21✔
293
                channel = members[0]
21✔
294
                dlc = members[5]
21✔
295
                can_id = members[7]
21✔
296
                can_data = members[9]
21✔
297
                yield Message(
21✔
298
                    timestamp=timestamp,
299
                    is_error_frame=True,
300
                    is_extended_id=bool(can_id & CAN_MSG_EXT),
301
                    arbitration_id=can_id & 0x1FFFFFFF,
302
                    dlc=dlc,
303
                    data=can_data[:dlc],
304
                    channel=channel - 1,
305
                )
306
            elif obj_type == CAN_FD_MESSAGE:
21✔
307
                members = unpack_can_fd_msg(data, pos)
21✔
308
                (
21✔
309
                    channel,
310
                    flags,
311
                    dlc,
312
                    can_id,
313
                    _,
314
                    _,
315
                    fd_flags,
316
                    valid_bytes,
317
                    can_data,
318
                ) = members
319
                yield Message(
21✔
320
                    timestamp=timestamp,
321
                    arbitration_id=can_id & 0x1FFFFFFF,
322
                    is_extended_id=bool(can_id & CAN_MSG_EXT),
323
                    is_remote_frame=bool(flags & REMOTE_FLAG),
324
                    is_fd=bool(fd_flags & 0x1),
325
                    is_rx=not bool(flags & DIR),
326
                    bitrate_switch=bool(fd_flags & 0x2),
327
                    error_state_indicator=bool(fd_flags & 0x4),
328
                    dlc=dlc2len(dlc),
329
                    data=can_data[:valid_bytes],
330
                    channel=channel - 1,
331
                )
332
            elif obj_type == CAN_FD_MESSAGE_64:
21✔
333
                (
21✔
334
                    channel,
335
                    dlc,
336
                    valid_bytes,
337
                    _,
338
                    can_id,
339
                    _,
340
                    fd_flags,
341
                    _,
342
                    _,
343
                    _,
344
                    _,
345
                    _,
346
                    direction,
347
                    ext_data_offset,
348
                    _,
349
                ) = unpack_can_fd_64_msg(data, pos)
350

351
                # :issue:`1905`: `valid_bytes` can be higher than the actually available data.
352
                # Add zero-byte padding to mimic behavior of CANoe and binlog.dll.
353
                data_field_length = min(
21✔
354
                    valid_bytes,
355
                    (ext_data_offset or obj_size) - header_size - can_fd_64_msg_size,
356
                )
357
                msg_data_offset = pos + can_fd_64_msg_size
21✔
358
                msg_data = data[msg_data_offset : msg_data_offset + data_field_length]
21✔
359
                msg_data = msg_data.ljust(valid_bytes, b"\x00")
21✔
360

361
                yield Message(
21✔
362
                    timestamp=timestamp,
363
                    arbitration_id=can_id & 0x1FFFFFFF,
364
                    is_extended_id=bool(can_id & CAN_MSG_EXT),
365
                    is_remote_frame=bool(fd_flags & 0x0010),
366
                    is_fd=bool(fd_flags & 0x1000),
367
                    is_rx=not direction,
368
                    bitrate_switch=bool(fd_flags & 0x2000),
369
                    error_state_indicator=bool(fd_flags & 0x4000),
370
                    dlc=dlc2len(dlc),
371
                    data=msg_data,
372
                    channel=channel - 1,
373
                )
374

375
            pos = next_pos
21✔
376

377

378
class BLFWriter(FileIOMessageWriter):
21✔
379
    """
21✔
380
    Logs CAN data to a Binary Logging File compatible with Vector's tools.
381
    """
382

383
    file: BinaryIO
21✔
384

385
    #: Max log container size of uncompressed data
386
    max_container_size = 128 * 1024
21✔
387

388
    #: Application identifier for the log writer
389
    application_id = 5
21✔
390

391
    def __init__(
21✔
392
        self,
393
        file: Union[StringPathLike, BinaryIO],
394
        append: bool = False,
395
        channel: int = 1,
396
        compression_level: int = -1,
397
        **kwargs: Any,
398
    ) -> None:
399
        """
400
        :param file: a path-like object or as file-like object to write to
401
                     If this is a file-like object, is has to opened in mode "wb+".
402
        :param channel:
403
            Default channel to log as if not specified by the interface.
404
        :param append:
405
            Append messages to an existing log file.
406
        :param compression_level:
407
            An integer from 0 to 9 or -1 controlling the level of compression.
408
            1 (Z_BEST_SPEED) is fastest and produces the least compression.
409
            9 (Z_BEST_COMPRESSION) is slowest and produces the most.
410
            0 means that data will be stored without processing.
411
            The default value is -1 (Z_DEFAULT_COMPRESSION).
412
            Z_DEFAULT_COMPRESSION represents a default compromise between
413
            speed and compression (currently equivalent to level 6).
414
        """
415
        mode = "rb+" if append else "wb"
21✔
416
        try:
21✔
417
            super().__init__(file, mode=mode)
21✔
UNCOV
418
        except FileNotFoundError:
×
419
            # Trying to append to a non-existing file, create a new one
UNCOV
420
            append = False
×
UNCOV
421
            mode = "wb"
×
422
            super().__init__(file, mode=mode)
×
423
        assert self.file is not None
21✔
424
        self.channel = channel
21✔
425
        self.compression_level = compression_level
21✔
426
        self._buffer: list[bytes] = []
21✔
427
        self._buffer_size = 0
21✔
428
        # If max container size is located in kwargs, then update the instance
429
        if kwargs.get("max_container_size", False):
21✔
UNCOV
430
            self.max_container_size = kwargs["max_container_size"]
×
431
        if append:
21✔
432
            # Parse file header
433
            data = self.file.read(FILE_HEADER_STRUCT.size)
21✔
434
            header = FILE_HEADER_STRUCT.unpack(data)
21✔
435
            if header[0] != b"LOGG":
21✔
UNCOV
436
                raise BLFParseError("Unexpected file format")
×
437
            self.uncompressed_size = header[11]
21✔
438
            self.object_count = header[12]
21✔
439
            self.start_timestamp: Optional[float] = systemtime_to_timestamp(
21✔
440
                cast("TSystemTime", header[14:22])
441
            )
442
            self.stop_timestamp: Optional[float] = systemtime_to_timestamp(
21✔
443
                cast("TSystemTime", header[22:30])
444
            )
445
            # Jump to the end of the file
446
            self.file.seek(0, 2)
21✔
447
        else:
448
            self.object_count = 0
21✔
449
            self.uncompressed_size = FILE_HEADER_SIZE
21✔
450
            self.start_timestamp = None
21✔
451
            self.stop_timestamp = None
21✔
452
            # Write a default header which will be updated when stopped
453
            self._write_header(FILE_HEADER_SIZE)
21✔
454

455
    def _write_header(self, filesize):
21✔
456
        header = [b"LOGG", FILE_HEADER_SIZE, self.application_id, 0, 0, 0, 2, 6, 8, 1]
21✔
457
        # The meaning of "count of objects read" is unknown
458
        header.extend([filesize, self.uncompressed_size, self.object_count, 0])
21✔
459
        header.extend(timestamp_to_systemtime(self.start_timestamp))
21✔
460
        header.extend(timestamp_to_systemtime(self.stop_timestamp))
21✔
461
        self.file.write(FILE_HEADER_STRUCT.pack(*header))
21✔
462
        # Pad to header size
463
        self.file.write(b"\x00" * (FILE_HEADER_SIZE - FILE_HEADER_STRUCT.size))
21✔
464

465
    def on_message_received(self, msg):
21✔
466
        channel = channel2int(msg.channel)
21✔
467
        if channel is None:
21✔
468
            channel = self.channel
21✔
469
        else:
470
            # Many interfaces start channel numbering at 0 which is invalid
471
            channel += 1
21✔
472

473
        arb_id = msg.arbitration_id
21✔
474
        if msg.is_extended_id:
21✔
475
            arb_id |= CAN_MSG_EXT
21✔
476
        flags = REMOTE_FLAG if msg.is_remote_frame else 0
21✔
477
        if not msg.is_rx:
21✔
UNCOV
478
            flags |= DIR
×
479
        can_data = bytes(msg.data)
21✔
480

481
        if msg.is_error_frame:
21✔
482
            data = CAN_ERROR_EXT_STRUCT.pack(
21✔
483
                channel,
484
                0,  # length
485
                0,  # flags
486
                0,  # ecc
487
                0,  # position
488
                len2dlc(msg.dlc),
489
                0,  # frame length
490
                arb_id,
491
                0,  # ext flags
492
                can_data,
493
            )
494
            self._add_object(CAN_ERROR_EXT, data, msg.timestamp)
21✔
495
        elif msg.is_fd:
21✔
496
            fd_flags = EDL
21✔
497
            if msg.bitrate_switch:
21✔
498
                fd_flags |= BRS
21✔
499
            if msg.error_state_indicator:
21✔
500
                fd_flags |= ESI
21✔
501
            data = CAN_FD_MSG_STRUCT.pack(
21✔
502
                channel,
503
                flags,
504
                len2dlc(msg.dlc),
505
                arb_id,
506
                0,
507
                0,
508
                fd_flags,
509
                len(can_data),
510
                can_data,
511
            )
512
            self._add_object(CAN_FD_MESSAGE, data, msg.timestamp)
21✔
513
        else:
514
            data = CAN_MSG_STRUCT.pack(channel, flags, msg.dlc, arb_id, can_data)
21✔
515
            self._add_object(CAN_MESSAGE, data, msg.timestamp)
21✔
516

517
    def log_event(self, text, timestamp=None):
21✔
518
        """Add an arbitrary message to the log file as a global marker.
519

520
        :param str text:
521
            The group name of the marker.
522
        :param float timestamp:
523
            Absolute timestamp in Unix timestamp format. If not given, the
524
            marker will be placed along the last message.
525
        """
UNCOV
526
        try:
×
527
            # Only works on Windows
528
            text = text.encode("mbcs")
×
UNCOV
529
        except LookupError:
×
UNCOV
530
            text = text.encode("ascii")
×
UNCOV
531
        comment = b"Added by python-can"
×
532
        marker = b"python-can"
×
UNCOV
533
        data = GLOBAL_MARKER_STRUCT.pack(
×
534
            0, 0xFFFFFF, 0xFF3300, 0, len(text), len(marker), len(comment)
535
        )
UNCOV
536
        self._add_object(GLOBAL_MARKER, data + text + marker + comment, timestamp)
×
537

538
    def _add_object(self, obj_type, data, timestamp=None):
21✔
539
        if timestamp is None:
21✔
UNCOV
540
            timestamp = self.stop_timestamp or time.time()
×
541
        if self.start_timestamp is None:
21✔
542
            # Save start timestamp using the same precision as the BLF format
543
            # Truncating to milliseconds to avoid rounding errors when calculating
544
            # the timestamp difference
545
            self.start_timestamp = int(timestamp * 1000) / 1000
21✔
546
        self.stop_timestamp = timestamp
21✔
547
        timestamp = int((timestamp - self.start_timestamp) * 1e9)
21✔
548
        header_size = OBJ_HEADER_BASE_STRUCT.size + OBJ_HEADER_V1_STRUCT.size
21✔
549
        obj_size = header_size + len(data)
21✔
550
        base_header = OBJ_HEADER_BASE_STRUCT.pack(
21✔
551
            b"LOBJ", header_size, 1, obj_size, obj_type
552
        )
553
        obj_header = OBJ_HEADER_V1_STRUCT.pack(TIME_ONE_NANS, 0, 0, max(timestamp, 0))
21✔
554

555
        self._buffer.append(base_header)
21✔
556
        self._buffer.append(obj_header)
21✔
557
        self._buffer.append(data)
21✔
558
        padding_size = len(data) % 4
21✔
559
        if padding_size:
21✔
UNCOV
560
            self._buffer.append(b"\x00" * padding_size)
×
561

562
        self._buffer_size += obj_size + padding_size
21✔
563
        self.object_count += 1
21✔
564
        if self._buffer_size >= self.max_container_size:
21✔
UNCOV
565
            self._flush()
×
566

567
    def _flush(self):
21✔
568
        """Compresses and writes data in the buffer to file."""
569
        if self.file.closed:
21✔
UNCOV
570
            return
×
571
        buffer = b"".join(self._buffer)
21✔
572
        if not buffer:
21✔
573
            # Nothing to write
574
            return
21✔
575
        uncompressed_data = memoryview(buffer)[: self.max_container_size]
21✔
576
        # Save data that comes after max size to next container
577
        tail = buffer[self.max_container_size :]
21✔
578
        self._buffer = [tail]
21✔
579
        self._buffer_size = len(tail)
21✔
580
        if not self.compression_level:
21✔
UNCOV
581
            data = uncompressed_data
×
UNCOV
582
            method = NO_COMPRESSION
×
583
        else:
584
            data = zlib.compress(uncompressed_data, self.compression_level)
21✔
585
            method = ZLIB_DEFLATE
21✔
586
        obj_size = OBJ_HEADER_BASE_STRUCT.size + LOG_CONTAINER_STRUCT.size + len(data)
21✔
587
        base_header = OBJ_HEADER_BASE_STRUCT.pack(
21✔
588
            b"LOBJ", OBJ_HEADER_BASE_STRUCT.size, 1, obj_size, LOG_CONTAINER
589
        )
590
        container_header = LOG_CONTAINER_STRUCT.pack(method, len(uncompressed_data))
21✔
591
        self.file.write(base_header)
21✔
592
        self.file.write(container_header)
21✔
593
        self.file.write(data)
21✔
594
        # Write padding bytes
595
        self.file.write(b"\x00" * (obj_size % 4))
21✔
596
        self.uncompressed_size += OBJ_HEADER_BASE_STRUCT.size
21✔
597
        self.uncompressed_size += LOG_CONTAINER_STRUCT.size
21✔
598
        self.uncompressed_size += len(uncompressed_data)
21✔
599

600
    def file_size(self) -> int:
21✔
601
        """Return an estimate of the current file size in bytes."""
UNCOV
602
        return self.file.tell() + self._buffer_size
×
603

604
    def stop(self):
21✔
605
        """Stops logging and closes the file."""
606
        self._flush()
21✔
607
        if self.file.seekable():
21✔
608
            filesize = self.file.tell()
21✔
609
            # Write header in the beginning of the file
610
            self.file.seek(0)
21✔
611
            self._write_header(filesize)
21✔
612
        else:
UNCOV
613
            LOG.error("Could not write BLF header since file is not seekable")
×
614
        super().stop()
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc