• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.23
/can/interfaces/udp_multicast/bus.py
1
import errno
21✔
2
import logging
21✔
3
import platform
21✔
4
import select
21✔
5
import socket
21✔
6
import struct
21✔
7
import time
21✔
8
import warnings
21✔
9
from typing import Optional, Union
21✔
10

11
import can
21✔
12
from can import BusABC, CanProtocol
21✔
13
from can.typechecking import AutoDetectedConfig
21✔
14

15
from .utils import is_msgpack_installed, pack_message, unpack_message
21✔
16

17
is_linux = platform.system() == "Linux"
21✔
18
if is_linux:
21✔
19
    from fcntl import ioctl
7✔
20

21
log = logging.getLogger(__name__)
21✔
22

23

24
# see socket.getaddrinfo()
25
IPv4_ADDRESS_INFO = tuple[str, int]  # address, port
21✔
26
IPv6_ADDRESS_INFO = tuple[str, int, int, int]  # address, port, flowinfo, scope_id
21✔
27
IP_ADDRESS_INFO = Union[IPv4_ADDRESS_INFO, IPv6_ADDRESS_INFO]
21✔
28

29
# Additional constants for the interaction with Unix kernels
30
SO_TIMESTAMPNS = 35
21✔
31
SIOCGSTAMP = 0x8906
21✔
32

33
# Additional constants for the interaction with the Winsock API
34
WSAEINVAL = 10022
21✔
35

36

37
class UdpMulticastBus(BusABC):
21✔
38
    """A virtual interface for CAN communications between multiple processes using UDP over Multicast IP.
21✔
39

40
    It supports IPv4 and IPv6, specified via the channel (which really is just a multicast IP address as a
41
    string). You can also specify the port and the IPv6 *hop limit*/the IPv4 *time to live* (TTL).
42

43
    This bus does not support filtering based on message IDs on the kernel level but instead provides it in
44
    user space (in Python) as a fallback.
45

46
    Both default addresses should allow for multi-host CAN networks in a normal local area network (LAN) where
47
    multicast is enabled.
48

49
    .. note::
50
        The auto-detection of available interfaces (see) is implemented using heuristic that checks if the
51
        required socket operations are available. It then returns two configurations, one based on
52
        the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6` address and another one based on
53
        the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv4` address.
54

55
    .. warning::
56
        The parameter `receive_own_messages` is currently unsupported and setting it to `True` will raise an
57
        exception.
58

59
    .. warning::
60
        This interface does not make guarantees on reliable delivery and message ordering, and also does not
61
        implement rate limiting or ID arbitration/prioritization under high loads. Please refer to the section
62
        :ref:`virtual_interfaces_doc` for more information on this and a comparison to alternatives.
63

64
    :param channel: A multicast IPv4 address (in `224.0.0.0/4`) or an IPv6 address (in `ff00::/8`).
65
                    This defines which version of IP is used. See
66
                    `Wikipedia ("Multicast address") <https://en.wikipedia.org/wiki/Multicast_address>`__
67
                    for more details on the addressing schemes.
68
                    Defaults to :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6`.
69
    :param port: The IP port to read from and write to.
70
    :param hop_limit: The hop limit in IPv6 or in IPv4 the time to live (TTL).
71
    :param receive_own_messages: If transmitted messages should also be received by this bus.
72
                                 CURRENTLY UNSUPPORTED.
73
    :param fd:
74
        If CAN-FD frames should be supported. If set to false, an error will be raised upon sending such a
75
        frame and such received frames will be ignored.
76
    :param can_filters: See :meth:`~can.BusABC.set_filters`.
77

78
    :raises RuntimeError: If the *msgpack*-dependency is not available. It should be installed on all
79
                          non Windows platforms via the `setup.py` requirements.
80
    :raises NotImplementedError: If the `receive_own_messages` is passed as `True`.
81
    """
82

83
    #: An arbitrary IPv6 multicast address with "site-local" scope, i.e. only to be routed within the local
84
    #: physical network and not beyond it. It should allow for multi-host CAN networks in a normal IPv6 LAN.
85
    #: This is the default channel and should work with most modern routers if multicast is allowed.
86
    DEFAULT_GROUP_IPv6 = "ff15:7079:7468:6f6e:6465:6d6f:6d63:6173"
21✔
87

88
    #: An arbitrary IPv4 multicast address with "administrative" scope, i.e. only to be routed within
89
    #: administrative organizational boundaries and not beyond it.
90
    #: It should allow for multi-host CAN networks in a normal IPv4 LAN.
91
    #: This is provided as a default fallback channel if IPv6 is (still) not supported.
92
    DEFAULT_GROUP_IPv4 = "239.74.163.2"
21✔
93

94
    def __init__(
21✔
95
        self,
96
        channel: str = DEFAULT_GROUP_IPv6,
97
        port: int = 43113,
98
        hop_limit: int = 1,
99
        receive_own_messages: bool = False,
100
        fd: bool = True,
101
        **kwargs,
102
    ) -> None:
103
        is_msgpack_installed()
14✔
104

105
        if receive_own_messages:
14✔
106
            raise can.CanInterfaceNotImplementedError(
14✔
107
                "receiving own messages is not yet implemented"
108
            )
109

110
        super().__init__(
14✔
111
            channel,
112
            **kwargs,
113
        )
114

115
        self._multicast = GeneralPurposeUdpMulticastBus(channel, port, hop_limit)
14✔
116
        self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20
14✔
117

118
    @property
21✔
119
    def is_fd(self) -> bool:
21✔
UNCOV
120
        class_name = self.__class__.__name__
×
UNCOV
121
        warnings.warn(
×
122
            f"The {class_name}.is_fd property is deprecated and superseded by "
123
            f"{class_name}.protocol. It is scheduled for removal in python-can version 5.0.",
124
            DeprecationWarning,
125
            stacklevel=2,
126
        )
UNCOV
127
        return self._can_protocol is CanProtocol.CAN_FD
×
128

129
    def _recv_internal(self, timeout: Optional[float]):
21✔
130
        result = self._multicast.recv(timeout)
14✔
131
        if not result:
14✔
132
            return None, False
14✔
133

134
        data, _, timestamp = result
14✔
135
        try:
14✔
136
            can_message = unpack_message(
14✔
137
                data, replace={"timestamp": timestamp}, check=True
138
            )
UNCOV
139
        except Exception as exception:
×
UNCOV
140
            raise can.CanOperationError(
×
141
                "could not unpack received message"
142
            ) from exception
143

144
        if self._can_protocol is not CanProtocol.CAN_FD and can_message.is_fd:
14✔
UNCOV
145
            return None, False
×
146

147
        return can_message, False
14✔
148

149
    def send(self, msg: can.Message, timeout: Optional[float] = None) -> None:
21✔
150
        if self._can_protocol is not CanProtocol.CAN_FD and msg.is_fd:
14✔
UNCOV
151
            raise can.CanOperationError(
×
152
                "cannot send FD message over bus with CAN FD disabled"
153
            )
154

155
        data = pack_message(msg)
14✔
156
        self._multicast.send(data, timeout)
14✔
157

158
    def fileno(self) -> int:
21✔
159
        """Provides the internally used file descriptor of the socket or `-1` if not available."""
160
        return self._multicast.fileno()
14✔
161

162
    def shutdown(self) -> None:
21✔
163
        """Close all sockets and free up any resources.
164

165
        Never throws errors and only logs them.
166
        """
167
        super().shutdown()
21✔
168
        self._multicast.shutdown()
21✔
169

170
    @staticmethod
21✔
171
    def _detect_available_configs() -> list[AutoDetectedConfig]:
21✔
172
        if hasattr(socket, "CMSG_SPACE"):
21✔
173
            return [
14✔
174
                {
175
                    "interface": "udp_multicast",
176
                    "channel": UdpMulticastBus.DEFAULT_GROUP_IPv6,
177
                },
178
                {
179
                    "interface": "udp_multicast",
180
                    "channel": UdpMulticastBus.DEFAULT_GROUP_IPv4,
181
                },
182
            ]
183

184
        # else, this interface cannot be used
185
        return []
7✔
186

187

188
class GeneralPurposeUdpMulticastBus:
21✔
189
    """A general purpose send and receive handler for multicast over IP/UDP.
21✔
190

191
    However, it raises CAN-specific exceptions for convenience.
192
    """
193

194
    def __init__(
21✔
195
        self, group: str, port: int, hop_limit: int, max_buffer: int = 4096
196
    ) -> None:
197
        self.group = group
14✔
198
        self.port = port
14✔
199
        self.hop_limit = hop_limit
14✔
200
        self.max_buffer = max_buffer
14✔
201

202
        # `False` will always work, no matter the setup. This might be changed by _create_socket().
203
        self.timestamp_nanosecond = False
14✔
204

205
        # Look up multicast group address in name server and find out IP version of the first suitable target
206
        # and then get the address family of it (socket.AF_INET or socket.AF_INET6)
207
        connection_candidates = socket.getaddrinfo(  # type: ignore
14✔
208
            group, self.port, type=socket.SOCK_DGRAM
209
        )
210
        sock = None
14✔
211
        for connection_candidate in connection_candidates:
14✔
212
            address_family: socket.AddressFamily = connection_candidate[0]
14✔
213
            self.ip_version = 4 if address_family == socket.AF_INET else 6
14✔
214
            try:
14✔
215
                sock = self._create_socket(address_family)
14✔
UNCOV
216
            except OSError as error:
×
UNCOV
217
                log.info(
×
218
                    "could not connect to the multicast IP network of candidate %s; reason: %s",
219
                    connection_candidates,
220
                    error,
221
                )
222
        if sock is not None:
14✔
223
            self._socket = sock
14✔
224
        else:
UNCOV
225
            raise can.CanInitializationError(
×
226
                "could not connect to a multicast IP network"
227
            )
228

229
        # used in recv()
230
        self.received_timestamp_struct = "@ll"
14✔
231
        self.received_timestamp_struct_size = struct.calcsize(
14✔
232
            self.received_timestamp_struct
233
        )
234
        if self.timestamp_nanosecond:
14✔
235
            self.received_ancillary_buffer_size = socket.CMSG_SPACE(
7✔
236
                self.received_timestamp_struct_size
237
            )
238
        else:
239
            self.received_ancillary_buffer_size = 0
7✔
240

241
        # used by send()
242
        self._send_destination = (self.group, self.port)
14✔
243
        self._last_send_timeout: Optional[float] = None
14✔
244

245
    def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket:
21✔
246
        """Creates a new socket. This might fail and raise an exception!
247

248
        :param address_family: whether this is of type `socket.AF_INET` or `socket.AF_INET6`
249

250
        :raises can.CanInitializationError:
251
            if the socket could not be opened or configured correctly; in this case, it is
252
            guaranteed to be closed/cleaned up
253
        """
254
        # create the UDP socket
255
        # this might already fail but then there is nothing to clean up
256
        sock = socket.socket(address_family, socket.SOCK_DGRAM)
14✔
257

258
        # configure the socket
259
        try:
14✔
260
            # set hop limit / TTL
261
            ttl_as_binary = struct.pack("@I", self.hop_limit)
14✔
262
            if self.ip_version == 4:
14✔
263
                sock.setsockopt(
14✔
264
                    socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_as_binary
265
                )
266
            else:
267
                sock.setsockopt(
14✔
268
                    socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_as_binary
269
                )
270

271
            # Allow multiple programs to access that address + port
272
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
14✔
273

274
            # Option not supported on Windows.
275
            if hasattr(socket, "SO_REUSEPORT"):
14✔
276
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
7✔
277

278
            # set how to receive timestamps
279
            try:
14✔
280
                sock.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1)
14✔
281
            except OSError as error:
7✔
282
                if (
7✔
283
                    error.errno == errno.ENOPROTOOPT
284
                    or error.errno == errno.EINVAL
285
                    or error.errno == WSAEINVAL
286
                ):  # It is unavailable on macOS (ENOPROTOOPT) or windows(EINVAL/WSAEINVAL)
287
                    self.timestamp_nanosecond = False
7✔
288
                else:
289
                    raise error
×
290
            else:
291
                self.timestamp_nanosecond = True
7✔
292

293
            # Bind it to the port (on any interface)
294
            sock.bind(("", self.port))
14✔
295

296
            # Join the multicast group
297
            group_as_binary = socket.inet_pton(address_family, self.group)
14✔
298
            if self.ip_version == 4:
14✔
299
                request = group_as_binary + struct.pack("@I", socket.INADDR_ANY)
14✔
300
                sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, request)
14✔
301
            else:
302
                request = group_as_binary + struct.pack("@I", 0)
14✔
303
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, request)
14✔
304

305
            return sock
14✔
306

307
        except OSError as error:
×
308
            # clean up the incompletely configured but opened socket
309
            try:
×
310
                sock.close()
×
311
            except OSError as close_error:
×
312
                # ignore but log any failures in here
313
                log.warning("Could not close partly configured socket: %s", close_error)
×
314

315
            # still raise the error
316
            raise can.CanInitializationError(
×
317
                "could not create or configure socket"
318
            ) from error
319

320
    def send(self, data: bytes, timeout: Optional[float] = None) -> None:
21✔
321
        """Send data to all group members. This call blocks.
322

323
        :param timeout: the timeout in seconds after which an Exception is raised is sending has failed
324
        :param data: the data to be sent
325
        :raises can.CanOperationError: if an error occurred while writing to the underlying socket
326
        :raises can.CanTimeoutError: if the timeout ran out before sending was completed
327
        """
328
        if timeout != self._last_send_timeout:
14✔
329
            self._last_send_timeout = timeout
×
330
            # this applies to all blocking calls on the socket, but sending is the only one that is blocking
331
            self._socket.settimeout(timeout)
×
332

333
        try:
14✔
334
            bytes_sent = self._socket.sendto(data, self._send_destination)
14✔
335
            if bytes_sent < len(data):
14✔
336
                raise TimeoutError()
×
337
        except TimeoutError:
×
338
            raise can.CanTimeoutError() from None
×
339
        except OSError as error:
×
340
            raise can.CanOperationError("failed to send via socket") from error
×
341

342
    def recv(
21✔
343
        self, timeout: Optional[float] = None
344
    ) -> Optional[tuple[bytes, IP_ADDRESS_INFO, float]]:
345
        """
346
        Receive up to **max_buffer** bytes.
347

348
        :param timeout: the timeout in seconds after which `None` is returned if no data arrived
349
        :returns: `None` on timeout, or a 3-tuple comprised of:
350
            - received data,
351
            - the sender of the data, and
352
            - a timestamp in seconds
353
        """
354
        # get all sockets that are ready (can be a list with a single value
355
        # being self.socket or an empty list if self.socket is not ready)
356
        try:
14✔
357
            # get all sockets that are ready (can be a list with a single value
358
            # being self.socket or an empty list if self.socket is not ready)
359
            ready_receive_sockets, _, _ = select.select([self._socket], [], [], timeout)
14✔
360
        except OSError as exc:
×
361
            # something bad (not a timeout) happened (e.g. the interface went down)
362
            raise can.CanOperationError(
×
363
                f"Failed to wait for IP/UDP socket: {exc}"
364
            ) from exc
365

366
        if ready_receive_sockets:  # not empty
14✔
367
            # fetch timestamp; this is configured in _create_socket()
368
            if self.timestamp_nanosecond:
14✔
369
                # fetch data, timestamp & source address
370
                (
7✔
371
                    raw_message_data,
372
                    ancillary_data,
373
                    _,  # flags
374
                    sender_address,
375
                ) = self._socket.recvmsg(
376
                    self.max_buffer, self.received_ancillary_buffer_size
377
                )
378

379
                # Very similar to timestamp handling in can/interfaces/socketcan/socketcan.py -> capture_message()
380
                if len(ancillary_data) != 1:
7✔
381
                    raise can.CanOperationError(
×
382
                        "Only requested a single extra field but got a different amount"
383
                    )
384
                cmsg_level, cmsg_type, cmsg_data = ancillary_data[0]
7✔
385
                if cmsg_level != socket.SOL_SOCKET or cmsg_type != SO_TIMESTAMPNS:
7✔
386
                    raise can.CanOperationError(
×
387
                        "received control message type that was not requested"
388
                    )
389
                # see https://man7.org/linux/man-pages/man3/timespec.3.html -> struct timespec for details
390
                seconds, nanoseconds = struct.unpack(
7✔
391
                    self.received_timestamp_struct, cmsg_data
392
                )
393
                if nanoseconds >= 1e9:
7✔
394
                    raise can.CanOperationError(
×
395
                        f"Timestamp nanoseconds field was out of range: {nanoseconds} not less than 1e9"
396
                    )
397
                timestamp = seconds + nanoseconds * 1.0e-9
7✔
398
            else:
399
                # fetch data & source address
400
                (raw_message_data, sender_address) = self._socket.recvfrom(
7✔
401
                    self.max_buffer
402
                )
403

404
                if is_linux:
7✔
405
                    # This ioctl isn't supported on Darwin & Windows.
UNCOV
406
                    result_buffer = ioctl(
×
407
                        self._socket.fileno(),
408
                        SIOCGSTAMP,
409
                        bytes(self.received_timestamp_struct_size),
410
                    )
UNCOV
411
                    seconds, microseconds = struct.unpack(
×
412
                        self.received_timestamp_struct, result_buffer
413
                    )
414
                else:
415
                    # fallback to time.time_ns
416
                    now = time.time()
7✔
417

418
                    # Extract seconds and microseconds
419
                    seconds = int(now)
7✔
420
                    microseconds = int((now - seconds) * 1000000)
7✔
421

422
                if microseconds >= 1e6:
7✔
UNCOV
423
                    raise can.CanOperationError(
×
424
                        f"Timestamp microseconds field was out of range: {microseconds} not less than 1e6"
425
                    )
426
                timestamp = seconds + microseconds * 1e-6
7✔
427

428
            return raw_message_data, sender_address, timestamp
14✔
429

430
        # socket wasn't readable or timeout occurred
431
        return None
14✔
432

433
    def fileno(self) -> int:
21✔
434
        """Provides the internally used file descriptor of the socket or `-1` if not available."""
435
        return self._socket.fileno()
14✔
436

437
    def shutdown(self) -> None:
21✔
438
        """Close all sockets and free up any resources.
439

440
        Never throws errors and only logs them.
441
        """
442
        try:
14✔
443
            self._socket.close()
14✔
444
        except OSError as exception:
×
UNCOV
445
            log.error("could not close IP socket: %s", exception)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc