• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 11256605622

09 Oct 2024 02:02PM UTC coverage: 70.371% (-0.1%) from 70.515%
11256605622

Pull #1870

github

web-flow
Merge 907b1d795 into 7a3d23fa3
Pull Request #1870: Add tox environment for doctest

7472 of 10618 relevant lines covered (70.37%)

15.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.74
/can/interfaces/iscan.py
1
"""
24✔
2
Interface for isCAN from *Thorsis Technologies GmbH*, former *ifak system GmbH*.
3
"""
4

5
import ctypes
24✔
6
import logging
24✔
7
import time
24✔
8
from typing import Optional, Tuple, Union
24✔
9

10
from can import (
24✔
11
    BusABC,
12
    CanError,
13
    CanInitializationError,
14
    CanInterfaceNotImplementedError,
15
    CanOperationError,
16
    CanProtocol,
17
    Message,
18
)
19

20
logger = logging.getLogger(__name__)
24✔
21

22
CanData = ctypes.c_ubyte * 8
24✔
23

24

25
class MessageExStruct(ctypes.Structure):
24✔
26
    _fields_ = [
24✔
27
        ("message_id", ctypes.c_ulong),
28
        ("is_extended", ctypes.c_ubyte),
29
        ("remote_req", ctypes.c_ubyte),
30
        ("data_len", ctypes.c_ubyte),
31
        ("data", CanData),
32
    ]
33

34

35
def check_status_initialization(result: int, function, arguments) -> int:
24✔
36
    if result > 0:
×
37
        raise IscanInitializationError(function, result, arguments)
×
38
    return result
×
39

40

41
def check_status(result: int, function, arguments) -> int:
24✔
42
    if result > 0:
×
43
        raise IscanOperationError(function, result, arguments)
×
44
    return result
×
45

46

47
try:
24✔
48
    iscan = ctypes.cdll.LoadLibrary("iscandrv")
24✔
49
except OSError as e:
24✔
50
    iscan = None
24✔
51
    logger.warning("Failed to load IS-CAN driver: %s", e)
24✔
52
else:
53
    iscan.isCAN_DeviceInitEx.argtypes = [ctypes.c_ubyte, ctypes.c_ubyte]
×
54
    iscan.isCAN_DeviceInitEx.errcheck = check_status_initialization
×
55
    iscan.isCAN_DeviceInitEx.restype = ctypes.c_ubyte
×
56

57
    iscan.isCAN_ReceiveMessageEx.errcheck = check_status
×
58
    iscan.isCAN_ReceiveMessageEx.restype = ctypes.c_ubyte
×
59

60
    iscan.isCAN_TransmitMessageEx.errcheck = check_status
×
61
    iscan.isCAN_TransmitMessageEx.restype = ctypes.c_ubyte
×
62

63
    iscan.isCAN_CloseDevice.errcheck = check_status
×
64
    iscan.isCAN_CloseDevice.restype = ctypes.c_ubyte
×
65

66

67
class IscanBus(BusABC):
24✔
68
    """isCAN interface"""
24✔
69

70
    BAUDRATES = {
24✔
71
        5000: 0,
72
        10000: 1,
73
        20000: 2,
74
        50000: 3,
75
        100000: 4,
76
        125000: 5,
77
        250000: 6,
78
        500000: 7,
79
        800000: 8,
80
        1000000: 9,
81
    }
82

83
    def __init__(
24✔
84
        self,
85
        channel: Union[str, int],
86
        bitrate: int = 500000,
87
        poll_interval: float = 0.01,
88
        **kwargs,
89
    ) -> None:
90
        """
91
        :param channel:
92
            Device number
93
        :param bitrate:
94
            Bitrate in bits/s
95
        :param poll_interval:
96
            Poll interval in seconds when reading messages
97
        """
98
        if iscan is None:
×
99
            raise CanInterfaceNotImplementedError("Could not load isCAN driver")
×
100

101
        self.channel = ctypes.c_ubyte(int(channel))
×
102
        self.channel_info = f"IS-CAN: {self.channel}"
×
103
        self._can_protocol = CanProtocol.CAN_20
×
104

105
        if bitrate not in self.BAUDRATES:
×
106
            raise ValueError(f"Invalid bitrate, choose one of {set(self.BAUDRATES)}")
×
107

108
        self.poll_interval = poll_interval
×
109
        iscan.isCAN_DeviceInitEx(self.channel, self.BAUDRATES[bitrate])
×
110

111
        super().__init__(
×
112
            channel=channel,
113
            bitrate=bitrate,
114
            poll_interval=poll_interval,
115
            **kwargs,
116
        )
117

118
    def _recv_internal(
24✔
119
        self, timeout: Optional[float]
120
    ) -> Tuple[Optional[Message], bool]:
121
        raw_msg = MessageExStruct()
×
122
        end_time = time.time() + timeout if timeout is not None else None
×
123
        while True:
124
            try:
×
125
                iscan.isCAN_ReceiveMessageEx(self.channel, ctypes.byref(raw_msg))
×
126
            except IscanError as e:
×
127
                if e.error_code != 8:  # "No message received"
×
128
                    # An error occurred
129
                    raise
×
130
                if end_time is not None and time.time() > end_time:
×
131
                    # No message within timeout
132
                    return None, False
×
133
                # Sleep a short time to avoid hammering
134
                time.sleep(self.poll_interval)
×
135
            else:
136
                # A message was received
137
                break
×
138

139
        msg = Message(
×
140
            arbitration_id=raw_msg.message_id,
141
            is_extended_id=bool(raw_msg.is_extended),
142
            timestamp=time.time(),  # Better than nothing...
143
            is_remote_frame=bool(raw_msg.remote_req),
144
            dlc=raw_msg.data_len,
145
            data=raw_msg.data[: raw_msg.data_len],
146
            channel=self.channel.value,
147
        )
148
        return msg, False
×
149

150
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
24✔
151
        raw_msg = MessageExStruct(
×
152
            msg.arbitration_id,
153
            bool(msg.is_extended_id),
154
            bool(msg.is_remote_frame),
155
            msg.dlc,
156
            CanData(*msg.data),
157
        )
158
        iscan.isCAN_TransmitMessageEx(self.channel, ctypes.byref(raw_msg))
×
159

160
    def shutdown(self) -> None:
24✔
161
        super().shutdown()
24✔
162
        iscan.isCAN_CloseDevice(self.channel)
24✔
163

164

165
class IscanError(CanError):
24✔
166
    ERROR_CODES = {
24✔
167
        0: "Success",
168
        1: "No access to device",
169
        2: "Device with ID not found",
170
        3: "Driver operation failed",
171
        4: "Invalid parameter",
172
        5: "Operation allowed only in online state",
173
        6: "Device timeout",
174
        7: "Device is transmitting a message",
175
        8: "No message received",
176
        9: "Thread not started",
177
        10: "Thread already started",
178
        11: "Buffer overrun",
179
        12: "Device not initialized",
180
        15: "Found the device, but it is being used by another process",
181
        16: "Bus error",
182
        17: "Bus off",
183
        18: "Error passive",
184
        19: "Data overrun",
185
        20: "Error warning",
186
        30: "Send error",
187
        31: "Transmission not acknowledged on bus",
188
        32: "Error critical bus",
189
        35: "Callbackthread is blocked, stopping thread failed",
190
        40: "Need a licence number under NT4",
191
    }
192

193
    def __init__(self, function, error_code: int, arguments) -> None:
24✔
194
        try:
×
195
            description = ": " + self.ERROR_CODES[error_code]
×
196
        except KeyError:
×
197
            description = ""
×
198

199
        super().__init__(
×
200
            f"Function {function.__name__} failed{description}",
201
            error_code=error_code,
202
        )
203

204
        #: Status code
205
        self.error_code = error_code
×
206
        #: Function that failed
207
        self.function = function
×
208
        #: Arguments passed to function
209
        self.arguments = arguments
×
210

211

212
class IscanOperationError(IscanError, CanOperationError):
24✔
213
    pass
24✔
214

215

216
class IscanInitializationError(IscanError, CanInitializationError):
24✔
217
    pass
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc