• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.97
/can/io/trc.py
1
"""
21✔
2
Reader and writer for can logging files in peak trc format
3

4
See https://www.peak-system.com/produktcd/Pdf/English/PEAK_CAN_TRC_File_Format.pdf
5
for file format description
6

7
Version 1.1 will be implemented as it is most commonly used
8
"""
9

10
import logging
21✔
11
import os
21✔
12
from collections.abc import Generator
21✔
13
from datetime import datetime, timedelta, timezone
21✔
14
from enum import Enum
21✔
15
from typing import Any, Callable, Optional, TextIO, Union
21✔
16

17
from ..message import Message
21✔
18
from ..typechecking import StringPathLike
21✔
19
from ..util import channel2int, len2dlc
21✔
20
from .generic import TextIOMessageReader, TextIOMessageWriter
21✔
21

22
logger = logging.getLogger("can.io.trc")
21✔
23

24

25
class TRCFileVersion(Enum):
21✔
26
    UNKNOWN = 0
21✔
27
    V1_0 = 100
21✔
28
    V1_1 = 101
21✔
29
    V1_2 = 102
21✔
30
    V1_3 = 103
21✔
31
    V2_0 = 200
21✔
32
    V2_1 = 201
21✔
33

34
    def __ge__(self, other):
21✔
35
        if self.__class__ is other.__class__:
21✔
36
            return self.value >= other.value
21✔
UNCOV
37
        return NotImplemented
×
38

39

40
class TRCReader(TextIOMessageReader):
21✔
41
    """
21✔
42
    Iterator of CAN messages from a TRC logging file.
43
    """
44

45
    file: TextIO
21✔
46

47
    def __init__(
21✔
48
        self,
49
        file: Union[StringPathLike, TextIO],
50
        **kwargs: Any,
51
    ) -> None:
52
        """
53
        :param file: a path-like object or as file-like object to read from
54
                     If this is a file-like object, is has to opened in text
55
                     read mode, not binary read mode.
56
        """
57
        super().__init__(file, mode="r")
21✔
58
        self.file_version = TRCFileVersion.UNKNOWN
21✔
59
        self._start_time: float = 0
21✔
60
        self.columns: dict[str, int] = {}
21✔
61
        self._num_columns = -1
21✔
62

63
        if not self.file:
21✔
UNCOV
64
            raise ValueError("The given file cannot be None")
×
65

66
        self._parse_cols: Callable[[tuple[str, ...]], Optional[Message]] = (
21✔
67
            lambda x: None
68
        )
69

70
    @property
21✔
71
    def start_time(self) -> Optional[datetime]:
21✔
72
        if self._start_time:
×
73
            return datetime.fromtimestamp(self._start_time, timezone.utc)
×
UNCOV
74
        return None
×
75

76
    def _extract_header(self):
21✔
77
        line = ""
21✔
78
        for _line in self.file:
21✔
79
            line = _line.strip()
21✔
80
            if line.startswith(";$FILEVERSION"):
21✔
81
                logger.debug("TRCReader: Found file version '%s'", line)
21✔
82
                try:
21✔
83
                    file_version = line.split("=")[1]
21✔
84
                    if file_version == "1.1":
21✔
85
                        self.file_version = TRCFileVersion.V1_1
21✔
86
                    elif file_version == "1.3":
21✔
87
                        self.file_version = TRCFileVersion.V1_3
21✔
88
                    elif file_version == "2.0":
21✔
UNCOV
89
                        self.file_version = TRCFileVersion.V2_0
×
90
                    elif file_version == "2.1":
21✔
91
                        self.file_version = TRCFileVersion.V2_1
21✔
92
                    else:
93
                        self.file_version = TRCFileVersion.UNKNOWN
×
94
                except IndexError:
×
UNCOV
95
                    logger.debug("TRCReader: Failed to parse version")
×
96
            elif line.startswith(";$STARTTIME"):
21✔
97
                logger.debug("TRCReader: Found start time '%s'", line)
21✔
98
                try:
21✔
99
                    self._start_time = (
21✔
100
                        datetime(1899, 12, 30, tzinfo=timezone.utc)
101
                        + timedelta(days=float(line.split("=")[1]))
102
                    ).timestamp()
103
                except IndexError:
×
UNCOV
104
                    logger.debug("TRCReader: Failed to parse start time")
×
105
            elif line.startswith(";$COLUMNS"):
21✔
106
                logger.debug("TRCReader: Found columns '%s'", line)
21✔
107
                try:
21✔
108
                    columns = line.split("=")[1].split(",")
21✔
109
                    self.columns = {column: columns.index(column) for column in columns}
21✔
110
                    self._num_columns = len(columns) - 1
21✔
111
                except IndexError:
×
UNCOV
112
                    logger.debug("TRCReader: Failed to parse columns")
×
113
            elif line.startswith(";"):
21✔
114
                continue
21✔
115
            else:
116
                break
21✔
117

118
        if self.file_version >= TRCFileVersion.V1_1:
21✔
119
            if self._start_time is None:
21✔
UNCOV
120
                raise ValueError("File has no start time information")
×
121

122
        if self.file_version >= TRCFileVersion.V2_0:
21✔
123
            if not self.columns:
21✔
UNCOV
124
                raise ValueError("File has no column information")
×
125

126
        if self.file_version == TRCFileVersion.UNKNOWN:
21✔
127
            logger.info(
21✔
128
                "TRCReader: No file version was found, so version 1.0 is assumed"
129
            )
130
            self._parse_cols = self._parse_msg_v1_0
21✔
131
        elif self.file_version == TRCFileVersion.V1_0:
21✔
UNCOV
132
            self._parse_cols = self._parse_msg_v1_0
×
133
        elif self.file_version == TRCFileVersion.V1_1:
21✔
134
            self._parse_cols = self._parse_cols_v1_1
21✔
135
        elif self.file_version == TRCFileVersion.V1_3:
21✔
136
            self._parse_cols = self._parse_cols_v1_3
21✔
137
        elif self.file_version in [TRCFileVersion.V2_0, TRCFileVersion.V2_1]:
21✔
138
            self._parse_cols = self._parse_cols_v2_x
21✔
139
        else:
140
            raise NotImplementedError("File version not fully implemented for reading")
141

142
        return line
21✔
143

144
    def _parse_msg_v1_0(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
145
        arbit_id = cols[2]
21✔
146
        if arbit_id == "FFFFFFFF":
21✔
147
            logger.info("TRCReader: Dropping bus info line")
21✔
148
            return None
21✔
149

150
        msg = Message()
21✔
151
        msg.timestamp = float(cols[1]) / 1000
21✔
152
        msg.arbitration_id = int(arbit_id, 16)
21✔
153
        msg.is_extended_id = len(arbit_id) > 4
21✔
154
        msg.channel = 1
21✔
155
        msg.dlc = int(cols[3])
21✔
156
        msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)])
21✔
157
        return msg
21✔
158

159
    def _parse_msg_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
160
        arbit_id = cols[3]
21✔
161

162
        msg = Message()
21✔
163
        msg.timestamp = float(cols[1]) / 1000 + self._start_time
21✔
164
        msg.arbitration_id = int(arbit_id, 16)
21✔
165
        msg.is_extended_id = len(arbit_id) > 4
21✔
166
        msg.channel = 1
21✔
167
        msg.dlc = int(cols[4])
21✔
168
        msg.data = bytearray([int(cols[i + 5], 16) for i in range(msg.dlc)])
21✔
169
        msg.is_rx = cols[2] == "Rx"
21✔
170
        return msg
21✔
171

172
    def _parse_msg_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
173
        arbit_id = cols[4]
21✔
174

175
        msg = Message()
21✔
176
        msg.timestamp = float(cols[1]) / 1000 + self._start_time
21✔
177
        msg.arbitration_id = int(arbit_id, 16)
21✔
178
        msg.is_extended_id = len(arbit_id) > 4
21✔
179
        msg.channel = int(cols[2])
21✔
180
        msg.dlc = int(cols[6])
21✔
181
        msg.data = bytearray([int(cols[i + 7], 16) for i in range(msg.dlc)])
21✔
182
        msg.is_rx = cols[3] == "Rx"
21✔
183
        return msg
21✔
184

185
    def _parse_msg_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
186
        type_ = cols[self.columns["T"]]
21✔
187
        bus = self.columns.get("B", None)
21✔
188

189
        if "l" in self.columns:
21✔
190
            length = int(cols[self.columns["l"]])
×
UNCOV
191
            dlc = len2dlc(length)
×
192
        elif "L" in self.columns:
21✔
193
            dlc = int(cols[self.columns["L"]])
21✔
194
        else:
UNCOV
195
            raise ValueError("No length/dlc columns present.")
×
196

197
        msg = Message()
21✔
198
        msg.timestamp = float(cols[self.columns["O"]]) / 1000 + self._start_time
21✔
199
        msg.arbitration_id = int(cols[self.columns["I"]], 16)
21✔
200
        msg.is_extended_id = len(cols[self.columns["I"]]) > 4
21✔
201
        msg.channel = int(cols[bus]) if bus is not None else 1
21✔
202
        msg.dlc = dlc
21✔
203
        if dlc:
21✔
204
            msg.data = bytearray.fromhex(cols[self.columns["D"]])
21✔
205
        msg.is_rx = cols[self.columns["d"]] == "Rx"
21✔
206
        msg.is_fd = type_ in {"FD", "FB", "FE", "BI"}
21✔
207
        msg.bitrate_switch = type_ in {"FB", "FE"}
21✔
208
        msg.error_state_indicator = type_ in {"FE", "BI"}
21✔
209

210
        return msg
21✔
211

212
    def _parse_cols_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
213
        dtype = cols[2]
21✔
214
        if dtype in ("Tx", "Rx"):
21✔
215
            return self._parse_msg_v1_1(cols)
21✔
216
        else:
217
            logger.info("TRCReader: Unsupported type '%s'", dtype)
21✔
218
            return None
21✔
219

220
    def _parse_cols_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
221
        dtype = cols[3]
21✔
222
        if dtype in ("Tx", "Rx"):
21✔
223
            return self._parse_msg_v1_3(cols)
21✔
224
        else:
225
            logger.info("TRCReader: Unsupported type '%s'", dtype)
×
UNCOV
226
            return None
×
227

228
    def _parse_cols_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]:
21✔
229
        dtype = cols[self.columns["T"]]
21✔
230
        if dtype in {"DT", "FD", "FB", "FE", "BI"}:
21✔
231
            return self._parse_msg_v2_x(cols)
21✔
232
        else:
233
            logger.info("TRCReader: Unsupported type '%s'", dtype)
21✔
234
            return None
21✔
235

236
    def _parse_line(self, line: str) -> Optional[Message]:
21✔
237
        logger.debug("TRCReader: Parse '%s'", line)
21✔
238
        try:
21✔
239
            cols = tuple(line.split(maxsplit=self._num_columns))
21✔
240
            return self._parse_cols(cols)
21✔
241
        except IndexError:
×
242
            logger.warning("TRCReader: Failed to parse message '%s'", line)
×
UNCOV
243
            return None
×
244

245
    def __iter__(self) -> Generator[Message, None, None]:
21✔
246
        first_line = self._extract_header()
21✔
247

248
        if first_line is not None:
21✔
249
            msg = self._parse_line(first_line)
21✔
250
            if msg is not None:
21✔
251
                yield msg
21✔
252

253
        for line in self.file:
21✔
254
            temp = line.strip()
21✔
255
            if temp.startswith(";"):
21✔
256
                # Comment line
257
                continue
21✔
258

259
            if len(temp) == 0:
21✔
260
                # Empty line
UNCOV
261
                continue
×
262

263
            msg = self._parse_line(temp)
21✔
264
            if msg is not None:
21✔
265
                yield msg
21✔
266

267
        self.stop()
21✔
268

269

270
class TRCWriter(TextIOMessageWriter):
21✔
271
    """Logs CAN data to text file (.trc).
21✔
272

273
    The measurement starts with the timestamp of the first registered message.
274
    If a message has a timestamp smaller than the previous one or None,
275
    it gets assigned the timestamp that was written for the last message.
276
    If the first message does not have a timestamp, it is set to zero.
277
    """
278

279
    file: TextIO
21✔
280
    first_timestamp: Optional[float]
21✔
281

282
    FORMAT_MESSAGE = (
21✔
283
        "{msgnr:>7} {time:13.3f} DT {channel:>2} {id:>8} {dir:>2} -  {dlc:<4} {data}"
284
    )
285
    FORMAT_MESSAGE_V1_0 = "{msgnr:>6}) {time:7.0f} {id:>8} {dlc:<1} {data}"
21✔
286

287
    def __init__(
21✔
288
        self,
289
        file: Union[StringPathLike, TextIO],
290
        channel: int = 1,
291
        **kwargs: Any,
292
    ) -> None:
293
        """
294
        :param file: a path-like object or as file-like object to write to
295
                     If this is a file-like object, is has to opened in text
296
                     write mode, not binary write mode.
297
        :param channel: a default channel to use when the message does not
298
                        have a channel set
299
        """
300
        super().__init__(file, mode="w")
21✔
301
        self.channel = channel
21✔
302

303
        if hasattr(self.file, "reconfigure"):
21✔
304
            self.file.reconfigure(newline="\r\n")
21✔
305
        else:
UNCOV
306
            raise TypeError("File must be opened in text mode.")
×
307

308
        self.filepath = os.path.abspath(self.file.name)
21✔
309
        self.header_written = False
21✔
310
        self.msgnr = 0
21✔
311
        self.first_timestamp = None
21✔
312
        self.file_version = TRCFileVersion.V2_1
21✔
313
        self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0
21✔
314
        self._format_message = self._format_message_init
21✔
315

316
    def _write_header_v1_0(self, start_time: datetime) -> None:
21✔
317
        lines = [
21✔
318
            ";##########################################################################",
319
            f";   {self.filepath}",
320
            ";",
321
            ";    Generated by python-can TRCWriter",
322
            f";    Start time: {start_time}",
323
            ";    PCAN-Net: N/A",
324
            ";",
325
            ";    Columns description:",
326
            ";    ~~~~~~~~~~~~~~~~~~~~~",
327
            ";    +-current number in actual sample",
328
            ";    |     +time offset of message (ms",
329
            ";    |     |        +ID of message (hex",
330
            ";    |     |        |    +data length code",
331
            ";    |     |        |    |  +data bytes (hex ...",
332
            ";    |     |        |    |  |",
333
            ";----+- ---+--- ----+--- + -+ -- -- ...",
334
        ]
335
        self.file.writelines(line + "\n" for line in lines)
21✔
336

337
    def _write_header_v2_1(self, start_time: datetime) -> None:
21✔
338
        header_time = start_time - datetime(
21✔
339
            year=1899, month=12, day=30, tzinfo=timezone.utc
340
        )
341
        lines = [
21✔
342
            ";$FILEVERSION=2.1",
343
            f";$STARTTIME={header_time/timedelta(days=1)}",
344
            ";$COLUMNS=N,O,T,B,I,d,R,L,D",
345
            ";",
346
            f";   {self.filepath}",
347
            ";",
348
            f";   Start time: {start_time}",
349
            ";   Generated by python-can TRCWriter",
350
            ";-------------------------------------------------------------------------------",
351
            ";   Bus   Name            Connection               Protocol",
352
            ";   N/A   N/A             N/A                      N/A",
353
            ";-------------------------------------------------------------------------------",
354
            ";   Message   Time    Type    ID     Rx/Tx",
355
            ";   Number    Offset  |  Bus  [hex]  |  Reserved",
356
            ";   |         [ms]    |  |    |      |  |  Data Length Code",
357
            ";   |         |       |  |    |      |  |  |    Data [hex] ...",
358
            ";   |         |       |  |    |      |  |  |    |",
359
            ";---+-- ------+------ +- +- --+----- +- +- +--- +- -- -- -- -- -- -- --",
360
        ]
361
        self.file.writelines(line + "\n" for line in lines)
21✔
362

363
    def _format_message_by_format(self, msg, channel):
21✔
364
        if msg.is_extended_id:
21✔
365
            arb_id = f"{msg.arbitration_id:07X}"
21✔
366
        else:
367
            arb_id = f"{msg.arbitration_id:04X}"
21✔
368

369
        data = [f"{byte:02X}" for byte in msg.data]
21✔
370

371
        serialized = self._msg_fmt_string.format(
21✔
372
            msgnr=self.msgnr,
373
            time=(msg.timestamp - self.first_timestamp) * 1000,
374
            channel=channel,
375
            id=arb_id,
376
            dir="Rx" if msg.is_rx else "Tx",
377
            dlc=msg.dlc,
378
            data=" ".join(data),
379
        )
380
        return serialized
21✔
381

382
    def _format_message_init(self, msg, channel):
21✔
383
        if self.file_version == TRCFileVersion.V1_0:
21✔
384
            self._format_message = self._format_message_by_format
21✔
385
            self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0
21✔
386
        elif self.file_version == TRCFileVersion.V2_1:
21✔
387
            self._format_message = self._format_message_by_format
21✔
388
            self._msg_fmt_string = self.FORMAT_MESSAGE
21✔
389
        else:
390
            raise NotImplementedError("File format is not supported")
21✔
391

392
        return self._format_message_by_format(msg, channel)
21✔
393

394
    def write_header(self, timestamp: float) -> None:
21✔
395
        # write start of file header
396
        start_time = datetime.fromtimestamp(timestamp, timezone.utc)
21✔
397

398
        if self.file_version == TRCFileVersion.V1_0:
21✔
399
            self._write_header_v1_0(start_time)
21✔
400
        elif self.file_version == TRCFileVersion.V2_1:
21✔
401
            self._write_header_v2_1(start_time)
21✔
402
        else:
403
            raise NotImplementedError("File format is not supported")
404
        self.header_written = True
21✔
405

406
    def log_event(self, message: str, timestamp: float) -> None:
21✔
407
        if not self.header_written:
21✔
408
            self.write_header(timestamp)
21✔
409

410
        self.file.write(message + "\n")
21✔
411

412
    def on_message_received(self, msg: Message) -> None:
21✔
413
        if self.first_timestamp is None:
21✔
414
            self.first_timestamp = msg.timestamp
21✔
415

416
        if msg.is_error_frame:
21✔
417
            logger.warning("TRCWriter: Logging error frames is not implemented")
×
UNCOV
418
            return
×
419

420
        if msg.is_remote_frame:
21✔
421
            logger.warning("TRCWriter: Logging remote frames is not implemented")
×
UNCOV
422
            return
×
423

424
        channel = channel2int(msg.channel)
21✔
425
        if channel is None:
21✔
426
            channel = self.channel
21✔
427
        else:
428
            # Many interfaces start channel numbering at 0 which is invalid
429
            channel += 1
21✔
430

431
        if msg.is_fd:
21✔
432
            logger.warning("TRCWriter: Logging CAN FD is not implemented")
×
UNCOV
433
            return
×
434

435
        serialized = self._format_message(msg, channel)
21✔
436
        self.msgnr += 1
21✔
437
        self.log_event(serialized, msg.timestamp)
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc