• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.68
/can/interfaces/slcan.py
1
"""
21✔
2
Interface for slcan compatible interfaces (win32/linux).
3
"""
4

5
import io
21✔
6
import logging
21✔
7
import time
21✔
8
import warnings
21✔
9
from queue import SimpleQueue
21✔
10
from typing import Any, Optional, Union, cast
21✔
11

12
from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message, typechecking
21✔
13
from can.exceptions import (
21✔
14
    CanInitializationError,
15
    CanInterfaceNotImplementedError,
16
    CanOperationError,
17
    error_check,
18
)
19
from can.util import CAN_FD_DLC, check_or_adjust_timing_clock, deprecated_args_alias
21✔
20

21
logger = logging.getLogger(__name__)
21✔
22

23
try:
21✔
24
    import serial
21✔
25
except ImportError:
×
26
    logger.warning(
×
27
        "You won't be able to use the slcan can backend without "
28
        "the serial module installed!"
29
    )
30
    serial = None
×
31

32

33
class slcanBus(BusABC):
21✔
34
    """
21✔
35
    slcan interface
36
    """
37

38
    # the supported bitrates and their commands
39
    _BITRATES = {
21✔
40
        10000: "S0",
41
        20000: "S1",
42
        50000: "S2",
43
        100000: "S3",
44
        125000: "S4",
45
        250000: "S5",
46
        500000: "S6",
47
        750000: "S7",
48
        1000000: "S8",
49
        83300: "S9",
50
    }
51
    _DATA_BITRATES = {
21✔
52
        0: "",
53
        2000000: "Y2",
54
        5000000: "Y5",
55
    }
56

57
    _SLEEP_AFTER_SERIAL_OPEN = 2  # in seconds
21✔
58

59
    _OK = b"\r"
21✔
60
    _ERROR = b"\a"
21✔
61

62
    LINE_TERMINATOR = b"\r"
21✔
63

64
    @deprecated_args_alias(
21✔
65
        deprecation_start="4.5.0",
66
        deprecation_end="5.0.0",
67
        ttyBaudrate="tty_baudrate",
68
    )
69
    def __init__(
21✔
70
        self,
71
        channel: typechecking.ChannelStr,
72
        tty_baudrate: int = 115200,
73
        bitrate: Optional[int] = None,
74
        timing: Optional[Union[BitTiming, BitTimingFd]] = None,
75
        sleep_after_open: float = _SLEEP_AFTER_SERIAL_OPEN,
76
        rtscts: bool = False,
77
        listen_only: bool = False,
78
        timeout: float = 0.001,
79
        **kwargs: Any,
80
    ) -> None:
81
        """
82
        :param str channel:
83
            port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
84
            Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
85
        :param int tty_baudrate:
86
            baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
87
        :param bitrate:
88
            Bitrate in bit/s
89
        :param timing:
90
            Optional :class:`~can.BitTiming` instance to use for custom bit timing setting.
91
            If this argument is set then it overrides the bitrate and btr arguments. The
92
            `f_clock` value of the timing instance must be set to 8_000_000 (8MHz)
93
            for standard CAN.
94
            CAN FD and the :class:`~can.BitTimingFd` class have partial support according to the non-standard
95
            slcan protocol implementation in the CANABLE 2.0 firmware: currently only data rates of 2M and 5M.
96
        :param poll_interval:
97
            Poll interval in seconds when reading messages
98
        :param sleep_after_open:
99
            Time to wait in seconds after opening serial connection
100
        :param rtscts:
101
            turn hardware handshake (RTS/CTS) on and off
102
        :param listen_only:
103
            If True, open interface/channel in listen mode with ``L`` command.
104
            Otherwise, the (default) ``O`` command is still used. See ``open`` method.
105
        :param timeout:
106
            Timeout for the serial or usb device in seconds (default 0.001)
107

108
        :raise ValueError: if both ``bitrate`` and ``btr`` are set or the channel is invalid
109
        :raise CanInterfaceNotImplementedError: if the serial module is missing
110
        :raise CanInitializationError: if the underlying serial connection could not be established
111
        """
112
        self._listen_only = listen_only
21✔
113

114
        if serial is None:
21✔
115
            raise CanInterfaceNotImplementedError("The serial module is not installed")
×
116

117
        btr: Optional[str] = kwargs.get("btr", None)
21✔
118
        if btr is not None:
21✔
119
            warnings.warn(
×
120
                "The 'btr' argument is deprecated since python-can v4.5.0 "
121
                "and scheduled for removal in v5.0.0. "
122
                "Use the 'timing' argument instead.",
123
                DeprecationWarning,
124
                stacklevel=1,
125
            )
126

127
        if not channel:  # if None or empty
21✔
128
            raise ValueError("Must specify a serial port.")
×
129
        if "@" in channel:
21✔
130
            (channel, baudrate) = channel.split("@")
×
131
            tty_baudrate = int(baudrate)
×
132

133
        with error_check(exception_type=CanInitializationError):
21✔
134
            self.serialPortOrig = serial.serial_for_url(
21✔
135
                channel,
136
                baudrate=tty_baudrate,
137
                rtscts=rtscts,
138
                timeout=timeout,
139
            )
140

141
        self._queue: SimpleQueue[str] = SimpleQueue()
21✔
142
        self._buffer = bytearray()
21✔
143
        self._can_protocol = CanProtocol.CAN_20
21✔
144

145
        time.sleep(sleep_after_open)
21✔
146

147
        with error_check(exception_type=CanInitializationError):
21✔
148
            if isinstance(timing, BitTiming):
21✔
149
                timing = check_or_adjust_timing_clock(timing, valid_clocks=[8_000_000])
×
150
                self.set_bitrate_reg(f"{timing.btr0:02X}{timing.btr1:02X}")
×
151
            elif isinstance(timing, BitTimingFd):
21✔
NEW
152
                self.set_bitrate(timing.nom_bitrate, timing.data_bitrate)
×
153
            else:
154
                if bitrate is not None and btr is not None:
21✔
155
                    raise ValueError("Bitrate and btr mutually exclusive.")
×
156
                if bitrate is not None:
21✔
157
                    self.set_bitrate(bitrate)
×
158
                if btr is not None:
21✔
159
                    self.set_bitrate_reg(btr)
×
160
            self.open()
21✔
161

162
        super().__init__(channel, **kwargs)
21✔
163

164
    def set_bitrate(self, bitrate: int, data_bitrate: int = None) -> None:
21✔
165
        """
166
        :param bitrate:
167
            Bitrate in bit/s
168
        :param data_bitrate:
169
            Data Bitrate in bit/s for FD frames
170

171
        :raise ValueError: if ``bitrate`` is not among the possible values
172
        """
173
        if bitrate in self._BITRATES:
×
174
            bitrate_code = self._BITRATES[bitrate]
×
175
        else:
176
            bitrates = ", ".join(str(k) for k in self._BITRATES.keys())
×
177
            raise ValueError(f"Invalid bitrate, choose one of {bitrates}.")
×
NEW
178
        if data_bitrate in self._DATA_BITRATES:
×
NEW
179
            dbitrate_code = self._DATA_BITRATES[data_bitrate]
×
180
        else:
NEW
181
            dbitrates = ", ".join(str(k) for k in self._DATA_BITRATES.keys())
×
NEW
182
            raise ValueError(f"Invalid data bitrate, choose one of {dbitrates}.")
×
183

184
        self.close()
×
185
        self._write(bitrate_code)
×
NEW
186
        self._write(dbitrate_code)
×
UNCOV
187
        self.open()
×
188

189
    def set_bitrate_reg(self, btr: str) -> None:
21✔
190
        """
191
        :param btr:
192
            BTR register value to set custom can speed as a string `xxyy` where
193
            xx is the BTR0 value in hex and yy is the BTR1 value in hex.
194
        """
195
        self.close()
×
196
        self._write("s" + btr)
×
197
        self.open()
×
198

199
    def _write(self, string: str) -> None:
21✔
200
        with error_check("Could not write to serial device"):
21✔
201
            self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
21✔
202
            self.serialPortOrig.flush()
21✔
203

204
    def _read(self, timeout: Optional[float]) -> Optional[str]:
21✔
205
        _timeout = serial.Timeout(timeout)
21✔
206

207
        with error_check("Could not read from serial device"):
21✔
208
            while True:
15✔
209
                # Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
210
                # We read the `serialPortOrig.in_waiting` only once here.
211
                in_waiting = self.serialPortOrig.in_waiting
21✔
212
                for _ in range(max(1, in_waiting)):
21✔
213
                    new_byte = self.serialPortOrig.read(1)
21✔
214
                    if new_byte:
21✔
215
                        self._buffer.extend(new_byte)
21✔
216
                    else:
217
                        break
21✔
218

219
                    if new_byte in (self._ERROR, self._OK):
21✔
220
                        string = self._buffer.decode()
21✔
221
                        self._buffer.clear()
21✔
222
                        return string
21✔
223

224
                if _timeout.expired():
21✔
225
                    break
21✔
226

227
            return None
21✔
228

229
    def flush(self) -> None:
21✔
230
        self._buffer.clear()
×
231
        with error_check("Could not flush"):
×
232
            self.serialPortOrig.reset_input_buffer()
×
233

234
    def open(self) -> None:
21✔
235
        if self._listen_only:
21✔
236
            self._write("L")
×
237
        else:
238
            self._write("O")
21✔
239

240
    def close(self) -> None:
21✔
241
        self._write("C")
21✔
242

243
    def _recv_internal(
21✔
244
        self, timeout: Optional[float]
245
    ) -> tuple[Optional[Message], bool]:
246
        canId = None
21✔
247
        remote = False
21✔
248
        extended = False
21✔
249
        data = None
21✔
250
        isFd = False
21✔
251
        fdBrs = False
21✔
252

253
        if self._queue.qsize():
21✔
254
            string: Optional[str] = self._queue.get_nowait()
×
255
        else:
256
            string = self._read(timeout)
21✔
257

258
        if not string:
21✔
259
            pass
21✔
260
        elif string[0] in (
21✔
261
            "T",
262
            "x",  # x is an alternative extended message identifier for CANDapter
263
        ):
264
            # extended frame
265
            canId = int(string[1:9], 16)
21✔
266
            dlc = int(string[9])
21✔
267
            extended = True
21✔
268
            data = bytearray.fromhex(string[10 : 10 + dlc * 2])
21✔
269
        elif string[0] == "t":
21✔
270
            # normal frame
271
            canId = int(string[1:4], 16)
21✔
272
            dlc = int(string[4])
21✔
273
            data = bytearray.fromhex(string[5 : 5 + dlc * 2])
21✔
274
        elif string[0] == "r":
21✔
275
            # remote frame
276
            canId = int(string[1:4], 16)
21✔
277
            dlc = int(string[4])
21✔
278
            remote = True
21✔
279
        elif string[0] == "R":
21✔
280
            # remote extended frame
281
            canId = int(string[1:9], 16)
21✔
282
            dlc = int(string[9])
21✔
283
            extended = True
21✔
284
            remote = True
21✔
NEW
285
        elif string[0] == "d":
×
286
            # FD standard frame
NEW
287
            canId = int(string[1:4], 16)
×
NEW
288
            dlc = int(string[4], 16)
×
NEW
289
            isFd = True
×
NEW
290
            data = bytearray.fromhex(string[5 : 5 + CAN_FD_DLC[dlc] * 2])
×
NEW
291
        elif string[0] == "D":
×
292
            # FD extended frame
NEW
293
            canId = int(string[1:9], 16)
×
NEW
294
            dlc = int(string[9], 16)
×
NEW
295
            extended = True
×
NEW
296
            isFd = True
×
NEW
297
            data = bytearray.fromhex(string[10 : 10 + CAN_FD_DLC[dlc] * 2])
×
NEW
298
        elif string[0] == "b":
×
299
            # FD with bitrate switch
NEW
300
            canId = int(string[1:4], 16)
×
NEW
301
            dlc = int(string[4], 16)
×
NEW
302
            isFd = True
×
NEW
303
            fdBrs = True
×
NEW
304
            data = bytearray.fromhex(string[5 : 5 + CAN_FD_DLC[dlc] * 2])
×
NEW
305
        elif string[0] == "B":
×
306
            # FD extended with bitrate switch
NEW
307
            canId = int(string[1:9], 16)
×
NEW
308
            dlc = int(string[9], 16)
×
NEW
309
            extended = True
×
NEW
310
            isFd = True
×
NEW
311
            fdBrs = True
×
NEW
312
            data = bytearray.fromhex(string[10 : 10 + CAN_FD_DLC[dlc] * 2])
×
313

314
        if canId is not None:
21✔
315
            msg = Message(
21✔
316
                arbitration_id=canId,
317
                is_extended_id=extended,
318
                timestamp=time.time(),  # Better than nothing...
319
                is_remote_frame=remote,
320
                is_fd=isFd,
321
                bitrate_switch=fdBrs,
322
                dlc=dlc,
323
                data=data,
324
            )
325
            return msg, False
21✔
326
        return None, False
21✔
327

328
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
21✔
329
        if timeout != self.serialPortOrig.write_timeout:
21✔
330
            self.serialPortOrig.write_timeout = timeout
×
331
        if msg.is_remote_frame:
21✔
332
            if msg.is_extended_id:
21✔
333
                sendStr = f"R{msg.arbitration_id:08X}{msg.dlc:d}"
21✔
334
            else:
335
                sendStr = f"r{msg.arbitration_id:03X}{msg.dlc:d}"
21✔
336
        elif msg.is_fd:
21✔
NEW
337
            if msg.bitrate_switch:
×
NEW
338
                if msg.is_extended_id:
×
NEW
339
                    sendStr = f"B{msg.arbitration_id:08X}{msg.dlc:d}"
×
340
                else:
NEW
341
                    sendStr = f"b{msg.arbitration_id:03X}{msg.dlc:d}"
×
NEW
342
                sendStr += msg.data.hex().upper()
×
343
            else:
NEW
344
                if msg.is_extended_id:
×
NEW
345
                    sendStr = f"D{msg.arbitration_id:08X}{msg.dlc:d}"
×
346
                else:
NEW
347
                    sendStr = f"d{msg.arbitration_id:03X}{msg.dlc:d}"
×
NEW
348
                sendStr += msg.data.hex().upper()
×
349
        else:
350
            if msg.is_extended_id:
21✔
351
                sendStr = f"T{msg.arbitration_id:08X}{msg.dlc:d}"
21✔
352
            else:
353
                sendStr = f"t{msg.arbitration_id:03X}{msg.dlc:d}"
21✔
354
            sendStr += msg.data.hex().upper()
21✔
355
        self._write(sendStr)
21✔
356

357
    def shutdown(self) -> None:
21✔
358
        super().shutdown()
21✔
359
        self.close()
21✔
360
        with error_check("Could not close serial socket"):
21✔
361
            self.serialPortOrig.close()
21✔
362

363
    def fileno(self) -> int:
21✔
364
        try:
×
365
            return cast("int", self.serialPortOrig.fileno())
×
366
        except io.UnsupportedOperation:
×
367
            raise NotImplementedError(
368
                "fileno is not implemented using current CAN bus on this platform"
369
            ) from None
370
        except Exception as exception:
×
371
            raise CanOperationError("Cannot fetch fileno") from exception
×
372

373
    def get_version(
21✔
374
        self, timeout: Optional[float]
375
    ) -> tuple[Optional[int], Optional[int]]:
376
        """Get HW and SW version of the slcan interface.
377

378
        :param timeout:
379
            seconds to wait for version or None to wait indefinitely
380

381
        :returns: tuple (hw_version, sw_version)
382
            WHERE
383
            int hw_version is the hardware version or None on timeout
384
            int sw_version is the software version or None on timeout
385
        """
386
        _timeout = serial.Timeout(timeout)
21✔
387
        cmd = "V"
21✔
388
        self._write(cmd)
21✔
389

390
        while True:
15✔
391
            if string := self._read(_timeout.time_left()):
21✔
392
                if string[0] == cmd:
21✔
393
                    # convert ASCII coded version
394
                    hw_version = int(string[1:3])
21✔
395
                    sw_version = int(string[3:5])
21✔
396
                    return hw_version, sw_version
21✔
397
                else:
398
                    self._queue.put_nowait(string)
×
399
            if _timeout.expired():
×
400
                break
×
401
        return None, None
×
402

403
    def get_serial_number(self, timeout: Optional[float]) -> Optional[str]:
21✔
404
        """Get serial number of the slcan interface.
405

406
        :param timeout:
407
            seconds to wait for serial number or :obj:`None` to wait indefinitely
408

409
        :return:
410
            :obj:`None` on timeout or a :class:`str` object.
411
        """
412
        _timeout = serial.Timeout(timeout)
21✔
413
        cmd = "N"
21✔
414
        self._write(cmd)
21✔
415

416
        while True:
15✔
417
            if string := self._read(_timeout.time_left()):
21✔
418
                if string[0] == cmd:
21✔
419
                    serial_number = string[1:-1]
21✔
420
                    return serial_number
21✔
421
                else:
422
                    self._queue.put_nowait(string)
×
423
            if _timeout.expired():
×
424
                break
×
425
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc