• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.88
/can/io/canutils.py
1
"""
21✔
2
This module works with CAN data in ASCII log files (*.log).
3
It is is compatible with "candump -L" from the canutils program
4
(https://github.com/linux-can/can-utils).
5
"""
6

7
import logging
21✔
8
from collections.abc import Generator
21✔
9
from typing import Any, TextIO, Union
21✔
10

11
from can.message import Message
21✔
12

13
from ..typechecking import StringPathLike
21✔
14
from .generic import TextIOMessageReader, TextIOMessageWriter
21✔
15

16
log = logging.getLogger("can.io.canutils")
21✔
17

18
CAN_MSG_EXT = 0x80000000
21✔
19
CAN_ERR_FLAG = 0x20000000
21✔
20
CAN_ERR_BUSERROR = 0x00000080
21✔
21
CAN_ERR_DLC = 8
21✔
22

23
CANFD_BRS = 0x01
21✔
24
CANFD_ESI = 0x02
21✔
25

26

27
class CanutilsLogReader(TextIOMessageReader):
21✔
28
    """
21✔
29
    Iterator over CAN messages from a .log Logging File (candump -L).
30

31
    .. note::
32
        .log-format looks for example like this:
33

34
        ``(0.0) vcan0 001#8d00100100820100``
35
    """
36

37
    file: TextIO
21✔
38

39
    def __init__(
21✔
40
        self,
41
        file: Union[StringPathLike, TextIO],
42
        **kwargs: Any,
43
    ) -> None:
44
        """
45
        :param file: a path-like object or as file-like object to read from
46
                     If this is a file-like object, is has to opened in text
47
                     read mode, not binary read mode.
48
        """
49
        super().__init__(file, mode="r")
21✔
50

51
    def __iter__(self) -> Generator[Message, None, None]:
21✔
52
        for line in self.file:
21✔
53
            # skip empty lines
54
            temp = line.strip()
21✔
55
            if not temp:
21✔
UNCOV
56
                continue
×
57

58
            channel_string: str
59
            if temp[-2:].lower() in (" r", " t"):
21✔
60
                timestamp_string, channel_string, frame, is_rx_string = temp.split()
21✔
61
                is_rx = is_rx_string.strip().lower() == "r"
21✔
62
            else:
63
                timestamp_string, channel_string, frame = temp.split()
21✔
64
                is_rx = True
21✔
65
            timestamp = float(timestamp_string[1:-1])
21✔
66
            can_id_string, data = frame.split("#", maxsplit=1)
21✔
67

68
            channel: Union[int, str]
69
            if channel_string.isdigit():
21✔
UNCOV
70
                channel = int(channel_string)
×
71
            else:
72
                channel = channel_string
21✔
73

74
            is_extended = len(can_id_string) > 3
21✔
75
            can_id = int(can_id_string, 16)
21✔
76

77
            is_fd = False
21✔
78
            brs = False
21✔
79
            esi = False
21✔
80

81
            if data and data[0] == "#":
21✔
82
                is_fd = True
21✔
83
                fd_flags = int(data[1])
21✔
84
                brs = bool(fd_flags & CANFD_BRS)
21✔
85
                esi = bool(fd_flags & CANFD_ESI)
21✔
86
                data = data[2:]
21✔
87

88
            if data and data[0].lower() == "r":
21✔
89
                is_remote_frame = True
21✔
90

91
                if len(data) > 1:
21✔
UNCOV
92
                    dlc = int(data[1:])
×
93
                else:
94
                    dlc = 0
21✔
95

96
                data_bin = None
21✔
97
            else:
98
                is_remote_frame = False
21✔
99

100
                dlc = len(data) // 2
21✔
101
                data_bin = bytearray()
21✔
102
                for i in range(0, len(data), 2):
21✔
103
                    data_bin.append(int(data[i : (i + 2)], 16))
21✔
104

105
            if can_id & CAN_ERR_FLAG and can_id & CAN_ERR_BUSERROR:
21✔
106
                msg = Message(timestamp=timestamp, is_error_frame=True)
21✔
107
            else:
108
                msg = Message(
21✔
109
                    timestamp=timestamp,
110
                    arbitration_id=can_id & 0x1FFFFFFF,
111
                    is_extended_id=is_extended,
112
                    is_remote_frame=is_remote_frame,
113
                    is_fd=is_fd,
114
                    is_rx=is_rx,
115
                    bitrate_switch=brs,
116
                    error_state_indicator=esi,
117
                    dlc=dlc,
118
                    data=data_bin,
119
                    channel=channel,
120
                )
121
            yield msg
21✔
122

123
        self.stop()
21✔
124

125

126
class CanutilsLogWriter(TextIOMessageWriter):
21✔
127
    """Logs CAN data to an ASCII log file (.log).
21✔
128
    This class is is compatible with "candump -L".
129

130
    If a message has a timestamp smaller than the previous one (or 0 or None),
131
    it gets assigned the timestamp that was written for the last message.
132
    It the first message does not have a timestamp, it is set to zero.
133
    """
134

135
    def __init__(
21✔
136
        self,
137
        file: Union[StringPathLike, TextIO],
138
        channel: str = "vcan0",
139
        append: bool = False,
140
        **kwargs: Any,
141
    ):
142
        """
143
        :param file: a path-like object or as file-like object to write to
144
                     If this is a file-like object, is has to opened in text
145
                     write mode, not binary write mode.
146
        :param channel: a default channel to use when the message does not
147
                        have a channel set
148
        :param bool append: if set to `True` messages are appended to
149
                            the file, else the file is truncated
150
        """
151
        mode = "a" if append else "w"
21✔
152
        super().__init__(file, mode=mode)
21✔
153

154
        self.channel = channel
21✔
155
        self.last_timestamp = None
21✔
156

157
    def on_message_received(self, msg):
21✔
158
        # this is the case for the very first message:
159
        if self.last_timestamp is None:
21✔
160
            self.last_timestamp = msg.timestamp or 0.0
21✔
161

162
        # figure out the correct timestamp
163
        if msg.timestamp is None or msg.timestamp < self.last_timestamp:
21✔
UNCOV
164
            timestamp = self.last_timestamp
×
165
        else:
166
            timestamp = msg.timestamp
21✔
167

168
        channel = msg.channel if msg.channel is not None else self.channel
21✔
169
        if isinstance(channel, int) or (isinstance(channel, str) and channel.isdigit()):
21✔
170
            channel = f"can{channel}"
21✔
171

172
        framestr = f"({timestamp:f}) {channel}"
21✔
173

174
        if msg.is_error_frame:
21✔
175
            framestr += f" {CAN_ERR_FLAG | CAN_ERR_BUSERROR:08X}#"
21✔
176
        elif msg.is_extended_id:
21✔
177
            framestr += f" {msg.arbitration_id:08X}#"
21✔
178
        else:
179
            framestr += f" {msg.arbitration_id:03X}#"
21✔
180

181
        if msg.is_error_frame:
21✔
182
            eol = "\n"
21✔
183
        else:
184
            eol = " R\n" if msg.is_rx else " T\n"
21✔
185

186
        if msg.is_remote_frame:
21✔
187
            framestr += f"R{eol}"
21✔
188
        else:
189
            if msg.is_fd:
21✔
190
                fd_flags = 0
21✔
191
                if msg.bitrate_switch:
21✔
192
                    fd_flags |= CANFD_BRS
21✔
193
                if msg.error_state_indicator:
21✔
194
                    fd_flags |= CANFD_ESI
21✔
195
                framestr += f"#{fd_flags:X}"
21✔
196
            framestr += f"{msg.data.hex().upper()}{eol}"
21✔
197

198
        self.file.write(framestr)
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc