• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.78
/can/interfaces/canalystii.py
1
import logging
21✔
2
import time
21✔
3
from collections import deque
21✔
4
from collections.abc import Sequence
21✔
5
from ctypes import c_ubyte
21✔
6
from typing import Any, Optional, Union
21✔
7

8
import canalystii as driver
21✔
9

10
from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message
21✔
11
from can.exceptions import CanTimeoutError
21✔
12
from can.typechecking import CanFilters
21✔
13
from can.util import check_or_adjust_timing_clock, deprecated_args_alias
21✔
14

15
logger = logging.getLogger(__name__)
21✔
16

17

18
class CANalystIIBus(BusABC):
21✔
19
    @deprecated_args_alias(
21✔
20
        deprecation_start="4.2.0", deprecation_end="5.0.0", bit_timing="timing"
21
    )
22
    def __init__(
21✔
23
        self,
24
        channel: Union[int, Sequence[int], str] = (0, 1),
25
        device: int = 0,
26
        bitrate: Optional[int] = None,
27
        timing: Optional[Union[BitTiming, BitTimingFd]] = None,
28
        can_filters: Optional[CanFilters] = None,
29
        rx_queue_size: Optional[int] = None,
30
        **kwargs: dict[str, Any],
31
    ):
32
        """
33

34
        :param channel:
35
            Optional channel number, list/tuple of multiple channels, or comma
36
            separated string of channels. Default is to configure both
37
            channels.
38
        :param device:
39
            Optional USB device number. Default is 0 (first device found).
40
        :param bitrate:
41
            CAN bitrate in bits/second. Required unless the bit_timing argument is set.
42
        :param timing:
43
            Optional :class:`~can.BitTiming` instance to use for custom bit timing setting.
44
            If this argument is set then it overrides the bitrate argument. The
45
            `f_clock` value of the timing instance must be set to 8_000_000 (8MHz)
46
            for standard CAN.
47
            CAN FD and the :class:`~can.BitTimingFd` class are not supported.
48
        :param can_filters:
49
            Optional filters for received CAN messages.
50
        :param rx_queue_size:
51
            If set, software received message queue can only grow to this many
52
            messages (for all channels) before older messages are dropped
53
        """
54
        if not (bitrate or timing):
21✔
55
            raise ValueError("Either bitrate or timing argument is required")
21✔
56

57
        # Do this after the error handling
58
        super().__init__(
21✔
59
            channel=channel,
60
            can_filters=can_filters,
61
            **kwargs,
62
        )
63
        if isinstance(channel, str):
21✔
64
            # Assume comma separated string of channels
UNCOV
65
            self.channels = [int(ch.strip()) for ch in channel.split(",")]
×
66
        elif isinstance(channel, int):
21✔
67
            self.channels = [channel]
21✔
68
        else:  # Sequence[int]
69
            self.channels = list(channel)
21✔
70

71
        self.channel_info = f"CANalyst-II: device {device}, channels {self.channels}"
21✔
72
        self.rx_queue: deque[tuple[int, driver.Message]] = deque(maxlen=rx_queue_size)
21✔
73
        self.device = driver.CanalystDevice(device_index=device)
21✔
74
        self._can_protocol = CanProtocol.CAN_20
21✔
75

76
        for single_channel in self.channels:
21✔
77
            if isinstance(timing, BitTiming):
21✔
78
                timing = check_or_adjust_timing_clock(timing, valid_clocks=[8_000_000])
21✔
79
                self.device.init(
21✔
80
                    single_channel, timing0=timing.btr0, timing1=timing.btr1
81
                )
82
            elif isinstance(timing, BitTimingFd):
21✔
83
                raise NotImplementedError(
84
                    f"CAN FD is not supported by {self.__class__.__name__}."
85
                )
86
            else:
87
                self.device.init(single_channel, bitrate=bitrate)
21✔
88

89
    # Delay to use between each poll for new messages
90
    #
91
    # The timeout is deliberately kept low to avoid the possibility of
92
    # a hardware buffer overflow. This value was determined
93
    # experimentally, but the ideal value will depend on the specific
94
    # system.
95
    RX_POLL_DELAY = 0.020
21✔
96

97
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
21✔
98
        """Send a CAN message to the bus
99

100
        :param msg: message to send
101
        :param timeout: timeout (in seconds) to wait for the TX queue to clear.
102
        If set to ``None`` (default) the function returns immediately.
103

104
        Note: Due to limitations in the device firmware and protocol, the
105
        timeout will not trigger if there are problems with CAN arbitration,
106
        but only if the device is overloaded with a backlog of too many
107
        messages to send.
108
        """
109
        raw_message = driver.Message(
21✔
110
            msg.arbitration_id,
111
            0,  # timestamp
112
            1,  # time_flag
113
            0,  # send_type
114
            msg.is_remote_frame,
115
            msg.is_extended_id,
116
            msg.dlc,
117
            (c_ubyte * 8)(*msg.data),
118
        )
119

120
        if msg.channel is not None:
21✔
UNCOV
121
            channel = msg.channel
×
122
        elif len(self.channels) == 1:
21✔
123
            channel = self.channels[0]
21✔
124
        else:
UNCOV
125
            raise ValueError(
×
126
                "Message channel must be set when using multiple channels."
127
            )
128

129
        send_result = self.device.send(channel, [raw_message], timeout)
21✔
130
        if timeout is not None and not send_result:
21✔
UNCOV
131
            raise CanTimeoutError(f"Send timed out after {timeout} seconds")
×
132

133
    def _recv_from_queue(self) -> tuple[Message, bool]:
21✔
134
        """Return a message from the internal receive queue"""
135
        channel, raw_msg = self.rx_queue.popleft()
21✔
136

137
        # Protocol timestamps are in units of 100us, convert to seconds as
138
        # float
139
        timestamp = raw_msg.timestamp * 100e-6
21✔
140

141
        return (
21✔
142
            Message(
143
                channel=channel,
144
                timestamp=timestamp,
145
                arbitration_id=raw_msg.can_id,
146
                is_extended_id=raw_msg.extended,
147
                is_remote_frame=raw_msg.remote,
148
                dlc=raw_msg.data_len,
149
                data=bytes(raw_msg.data),
150
            ),
151
            False,
152
        )
153

154
    def poll_received_messages(self) -> None:
21✔
155
        """Poll new messages from the device into the rx queue but don't
156
        return any message to the caller
157

158
        Calling this function isn't necessary as polling the device is done
159
        automatically when calling recv(). This function is for the situation
160
        where an application needs to empty the hardware receive buffer without
161
        consuming any message.
162
        """
163
        for channel in self.channels:
21✔
164
            self.rx_queue.extend(
21✔
165
                (channel, raw_msg) for raw_msg in self.device.receive(channel)
166
            )
167

168
    def _recv_internal(
21✔
169
        self, timeout: Optional[float] = None
170
    ) -> tuple[Optional[Message], bool]:
171
        """
172

173
        :param timeout: float in seconds
174
        :return:
175
        """
176

177
        if self.rx_queue:
21✔
UNCOV
178
            return self._recv_from_queue()
×
179

180
        deadline = None
21✔
181
        while deadline is None or time.time() < deadline:
21✔
182
            if deadline is None and timeout is not None:
21✔
183
                deadline = time.time() + timeout
21✔
184

185
            self.poll_received_messages()
21✔
186

187
            if self.rx_queue:
21✔
188
                return self._recv_from_queue()
21✔
189

190
            # If blocking on a timeout, add a sleep before we loop again
191
            # to reduce CPU usage.
192
            if deadline is None or deadline - time.time() > 0.050:
×
UNCOV
193
                time.sleep(self.RX_POLL_DELAY)
×
194

UNCOV
195
        return (None, False)
×
196

197
    def flush_tx_buffer(self, channel: Optional[int] = None) -> None:
21✔
198
        """Flush the TX buffer of the device.
199

200
        :param channel:
201
            Optional channel number to flush. If set to None, all initialized
202
            channels are flushed.
203

204
        Note that because of protocol limitations this function returning
205
        doesn't mean that messages have been sent, it may also mean they
206
        failed to send.
207
        """
208
        if channel:
×
UNCOV
209
            self.device.flush_tx_buffer(channel, float("infinity"))
×
210
        else:
211
            for ch in self.channels:
×
UNCOV
212
                self.device.flush_tx_buffer(ch, float("infinity"))
×
213

214
    def shutdown(self) -> None:
21✔
215
        super().shutdown()
21✔
216
        for channel in self.channels:
21✔
217
            self.device.stop(channel)
19✔
218
        self.device = None
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc