• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16550730660

27 Jul 2025 11:41AM UTC coverage: 71.445% (+0.05%) from 71.398%
16550730660

Pull #1962

github

web-flow
Merge 0fec9c40d into afb1204c0
Pull Request #1962: Minor Typing Improvements

95 of 103 new or added lines in 21 files covered. (92.23%)

2 existing lines in 2 files now uncovered.

7909 of 11070 relevant lines covered (71.45%)

15.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/can/interfaces/cantact.py
1
"""
24✔
2
Interface for CANtact devices from Linklayer Labs
3
"""
4

5
import logging
24✔
6
import time
24✔
7
from collections.abc import Sequence
24✔
8
from typing import Any, Optional, Union
24✔
9
from unittest.mock import Mock
24✔
10

11
from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message
24✔
12

13
from ..exceptions import (
24✔
14
    CanInitializationError,
15
    CanInterfaceNotImplementedError,
16
    error_check,
17
)
18
from ..typechecking import AutoDetectedConfig
24✔
19
from ..util import check_or_adjust_timing_clock, deprecated_args_alias
24✔
20

21
logger = logging.getLogger(__name__)
24✔
22

23
try:
24✔
24
    import cantact
24✔
25
except ImportError:
24✔
26
    cantact = None
24✔
27
    logger.warning(
24✔
28
        "The CANtact module is not installed. Install it using `pip install cantact`"
29
    )
30

31

32
class CantactBus(BusABC):
24✔
33
    """CANtact interface"""
24✔
34

35
    @staticmethod
24✔
36
    def _detect_available_configs() -> Sequence[AutoDetectedConfig]:
24✔
37
        try:
24✔
38
            interface = cantact.Interface()
24✔
39
        except (NameError, SystemError, AttributeError):
24✔
40
            logger.debug(
24✔
41
                "Could not import or instantiate cantact, so no configurations are available"
42
            )
43
            return []
24✔
44

NEW
45
        channels: list[AutoDetectedConfig] = []
×
46
        for i in range(0, interface.channel_count()):
×
47
            channels.append({"interface": "cantact", "channel": f"ch:{i}"})
×
48
        return channels
×
49

50
    @deprecated_args_alias(
24✔
51
        deprecation_start="4.2.0", deprecation_end="5.0.0", bit_timing="timing"
52
    )
53
    def __init__(
24✔
54
        self,
55
        channel: int,
56
        bitrate: int = 500_000,
57
        poll_interval: float = 0.01,
58
        monitor: bool = False,
59
        timing: Optional[Union[BitTiming, BitTimingFd]] = None,
60
        **kwargs: Any,
61
    ) -> None:
62
        """
63
        :param int channel:
64
            Channel number (zero indexed, labeled on multi-channel devices)
65
        :param int bitrate:
66
            Bitrate in bits/s
67
        :param bool monitor:
68
            If true, operate in listen-only monitoring mode
69
        :param timing:
70
            Optional :class:`~can.BitTiming` instance to use for custom bit timing setting.
71
            If this argument is set then it overrides the bitrate argument. The
72
            `f_clock` value of the timing instance must be set to 24_000_000 (24MHz)
73
            for standard CAN.
74
            CAN FD and the :class:`~can.BitTimingFd` class are not supported.
75
        """
76

77
        if kwargs.get("_testing", False):
24✔
78
            self.interface = MockInterface()
24✔
79
        else:
80
            if cantact is None:
×
81
                raise CanInterfaceNotImplementedError(
×
82
                    "The CANtact module is not installed. "
83
                    "Install it using `python -m pip install cantact`"
84
                )
85
            with error_check(
×
86
                "Cannot create the cantact.Interface", CanInitializationError
87
            ):
88
                self.interface = cantact.Interface()
×
89

90
        self.channel = int(channel)
24✔
91
        self.channel_info = f"CANtact: ch:{channel}"
24✔
92
        self._can_protocol = CanProtocol.CAN_20
24✔
93

94
        # Configure the interface
95
        with error_check("Cannot setup the cantact.Interface", CanInitializationError):
24✔
96
            if isinstance(timing, BitTiming):
24✔
97
                timing = check_or_adjust_timing_clock(timing, valid_clocks=[24_000_000])
24✔
98

99
                # use custom bit timing
100
                self.interface.set_bit_timing(
24✔
101
                    int(channel),
102
                    int(timing.brp),
103
                    int(timing.tseg1),
104
                    int(timing.tseg2),
105
                    int(timing.sjw),
106
                )
107
            elif isinstance(timing, BitTimingFd):
24✔
108
                raise NotImplementedError(
109
                    f"CAN FD is not supported by {self.__class__.__name__}."
110
                )
111
            else:
112
                # use bitrate
113
                self.interface.set_bitrate(int(channel), int(bitrate))
24✔
114

115
            self.interface.set_enabled(int(channel), True)
24✔
116
            self.interface.set_monitor(int(channel), monitor)
24✔
117
            self.interface.start()
24✔
118

119
        super().__init__(
24✔
120
            channel=channel,
121
            bitrate=bitrate,
122
            poll_interval=poll_interval,
123
            **kwargs,
124
        )
125

126
    def _recv_internal(
24✔
127
        self, timeout: Optional[float]
128
    ) -> tuple[Optional[Message], bool]:
129
        if timeout is None:
24✔
NEW
130
            raise TypeError(
×
131
                f"{self.__class__.__name__} expects a numeric `timeout` value."
132
            )
133

134
        with error_check("Cannot receive message"):
24✔
135
            frame = self.interface.recv(int(timeout * 1000))
24✔
136
        if frame is None:
24✔
137
            # timeout occurred
138
            return None, False
24✔
139

140
        msg = Message(
24✔
141
            arbitration_id=frame["id"],
142
            is_extended_id=frame["extended"],
143
            timestamp=frame["timestamp"],
144
            is_remote_frame=frame["rtr"],
145
            dlc=frame["dlc"],
146
            data=frame["data"][: frame["dlc"]],
147
            channel=frame["channel"],
148
            is_rx=(not frame["loopback"]),  # received if not loopback frame
149
        )
150
        return msg, False
24✔
151

152
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
24✔
153
        with error_check("Cannot send message"):
24✔
154
            self.interface.send(
24✔
155
                self.channel,
156
                msg.arbitration_id,
157
                bool(msg.is_extended_id),
158
                bool(msg.is_remote_frame),
159
                msg.dlc,
160
                msg.data,
161
            )
162

163
    def shutdown(self) -> None:
24✔
164
        super().shutdown()
24✔
165
        with error_check("Cannot shutdown interface"):
24✔
166
            self.interface.stop()
24✔
167

168

169
def mock_recv(timeout: int) -> Optional[dict[str, Any]]:
24✔
170
    if timeout > 0:
24✔
171
        return {
24✔
172
            "id": 0x123,
173
            "extended": False,
174
            "timestamp": time.time(),
175
            "loopback": False,
176
            "rtr": False,
177
            "dlc": 8,
178
            "data": [1, 2, 3, 4, 5, 6, 7, 8],
179
            "channel": 0,
180
        }
181
    else:
182
        # simulate timeout when timeout = 0
183
        return None
24✔
184

185

186
class MockInterface:
24✔
187
    """
24✔
188
    Mock interface to replace real interface when testing.
189
    This allows for tests to run without actual hardware.
190
    """
191

192
    start = Mock()
24✔
193
    set_bitrate = Mock()
24✔
194
    set_bit_timing = Mock()
24✔
195
    set_enabled = Mock()
24✔
196
    set_monitor = Mock()
24✔
197
    stop = Mock()
24✔
198
    send = Mock()
24✔
199
    channel_count = Mock(return_value=1)
24✔
200

201
    recv = Mock(side_effect=mock_recv)
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc