• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.17
/can/interfaces/socketcand/socketcand.py
1
"""
21✔
2
Interface to socketcand
3
see https://github.com/linux-can/socketcand
4

5
Authors: Marvin Seiler, Gerrit Telkamp
6

7
Copyright (C) 2021  DOMOLOGIC GmbH
8
http://www.domologic.de
9
"""
10

11
import logging
21✔
12
import os
21✔
13
import select
21✔
14
import socket
21✔
15
import time
21✔
16
import traceback
21✔
17
import urllib.parse as urlparselib
21✔
18
import xml.etree.ElementTree as ET
21✔
19
from collections import deque
21✔
20

21
import can
21✔
22

23
log = logging.getLogger(__name__)
21✔
24

25
DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS = ""
21✔
26
DEFAULT_SOCKETCAND_DISCOVERY_PORT = 42000
21✔
27

28

29
def detect_beacon(timeout_ms: int = 3100) -> list[can.typechecking.AutoDetectedConfig]:
21✔
30
    """
31
    Detects socketcand servers
32

33
    This is what :meth:`can.detect_available_configs` ends up calling to search
34
    for available socketcand servers with a default timeout of 3100ms
35
    (socketcand sends a beacon packet every 3000ms).
36

37
    Using this method directly allows for adjusting the timeout. Extending
38
    the timeout beyond the default time period could be useful if UDP
39
    packet loss is a concern.
40

41
    :param timeout_ms:
42
        Timeout in milliseconds to wait for socketcand beacon packets
43

44
    :return:
45
        See :meth:`~can.detect_available_configs`
46
    """
47
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
21✔
48
        sock.bind(
21✔
49
            (DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS, DEFAULT_SOCKETCAND_DISCOVERY_PORT)
50
        )
51
        log.info(
21✔
52
            "Listening on for socketcand UDP advertisement on %s:%s",
53
            DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS,
54
            DEFAULT_SOCKETCAND_DISCOVERY_PORT,
55
        )
56

57
        now = time.time() * 1000
21✔
58
        end_time = now + timeout_ms
21✔
59
        while (time.time() * 1000) < end_time:
21✔
60
            try:
21✔
61
                # get all sockets that are ready (can be a list with a single value
62
                # being self.socket or an empty list if self.socket is not ready)
63
                ready_receive_sockets, _, _ = select.select([sock], [], [], 1)
21✔
64

65
                if not ready_receive_sockets:
21✔
66
                    log.debug("No advertisement received")
21✔
67
                    continue
21✔
68

UNCOV
69
                msg = sock.recv(1024).decode("utf-8")
×
70
                root = ET.fromstring(msg)
×
71
                if root.tag != "CANBeacon":
×
72
                    log.debug("Unexpected message received over UDP")
×
73
                    continue
×
74

UNCOV
75
                det_devs = []
×
76
                det_host = None
×
77
                det_port = None
×
78
                for child in root:
×
79
                    if child.tag == "Bus":
×
80
                        bus_name = child.attrib["name"]
×
81
                        det_devs.append(bus_name)
×
82
                    elif child.tag == "URL":
×
83
                        url = urlparselib.urlparse(child.text)
×
84
                        det_host = url.hostname
×
85
                        det_port = url.port
×
86

UNCOV
87
                if not det_devs:
×
88
                    log.debug(
×
89
                        "Got advertisement, but no SocketCAN devices advertised by socketcand"
90
                    )
UNCOV
91
                    continue
×
92

UNCOV
93
                if (det_host is None) or (det_port is None):
×
94
                    det_host = None
×
95
                    det_port = None
×
96
                    log.debug(
×
97
                        "Got advertisement, but no SocketCAN URL advertised by socketcand"
98
                    )
UNCOV
99
                    continue
×
100

UNCOV
101
                log.info(f"Found SocketCAN devices: {det_devs}")
×
102
                return [
×
103
                    {
104
                        "interface": "socketcand",
105
                        "host": det_host,
106
                        "port": det_port,
107
                        "channel": channel,
108
                    }
109
                    for channel in det_devs
110
                ]
111

UNCOV
112
            except ET.ParseError:
×
113
                log.debug("Unexpected message received over UDP")
×
114
                continue
×
115

UNCOV
116
            except Exception as exc:
×
117
                # something bad happened (e.g. the interface went down)
UNCOV
118
                log.error(f"Failed to detect beacon: {exc}  {traceback.format_exc()}")
×
119
                raise OSError(
×
120
                    f"Failed to detect beacon: {exc} {traceback.format_exc()}"
121
                ) from exc
122

123
        return []
21✔
124

125

126
def convert_ascii_message_to_can_message(ascii_msg: str) -> can.Message:
21✔
127
    if not ascii_msg.endswith(" >"):
21✔
128
        log.warning(f"Missing ending character in ascii message: {ascii_msg}")
21✔
129
        return None
21✔
130

131
    if ascii_msg.startswith("< frame "):
21✔
132
        # frame_string = ascii_msg.removeprefix("< frame ").removesuffix(" >")
133
        frame_string = ascii_msg[8:-2]
21✔
134
        parts = frame_string.split(" ", 3)
21✔
135
        can_id, timestamp = int(parts[0], 16), float(parts[1])
21✔
136
        is_ext = len(parts[0]) != 3
21✔
137

138
        data = bytearray.fromhex(parts[2])
21✔
139
        can_dlc = len(data)
21✔
140
        can_message = can.Message(
21✔
141
            timestamp=timestamp,
142
            arbitration_id=can_id,
143
            data=data,
144
            dlc=can_dlc,
145
            is_extended_id=is_ext,
146
            is_rx=True,
147
        )
148
        return can_message
21✔
149

150
    if ascii_msg.startswith("< error "):
21✔
151
        frame_string = ascii_msg[8:-2]
21✔
152
        parts = frame_string.split(" ", 3)
21✔
153
        can_id, timestamp = int(parts[0], 16), float(parts[1])
21✔
154
        is_ext = len(parts[0]) != 3
21✔
155

156
        # socketcand sends no data in the error message so we don't have information
157
        # about the error details, therefore the can frame is created with one
158
        # data byte set to zero
159
        data = bytearray([0])
21✔
160
        can_dlc = len(data)
21✔
161
        can_message = can.Message(
21✔
162
            timestamp=timestamp,
163
            arbitration_id=can_id & 0x1FFFFFFF,
164
            is_error_frame=True,
165
            data=data,
166
            dlc=can_dlc,
167
            is_extended_id=True,
168
            is_rx=True,
169
        )
170
        return can_message
21✔
171

172
    log.warning(f"Could not parse ascii message: {ascii_msg}")
21✔
173
    return None
21✔
174

175

176
def convert_can_message_to_ascii_message(can_message: can.Message) -> str:
21✔
177
    # Note: socketcan bus adds extended flag, remote_frame_flag & error_flag to id
178
    # not sure if that is necessary here
UNCOV
179
    can_id = can_message.arbitration_id
×
UNCOV
180
    if can_message.is_extended_id:
×
UNCOV
181
        can_id_string = f"{(can_id&0x1FFFFFFF):08X}"
×
182
    else:
UNCOV
183
        can_id_string = f"{(can_id&0x7FF):03X}"
×
184
    # Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags
UNCOV
185
    data = can_message.data
×
UNCOV
186
    length = can_message.dlc
×
UNCOV
187
    bytes_string = " ".join(f"{x:x}" for x in data[0:length])
×
UNCOV
188
    return f"< send {can_id_string} {length:X} {bytes_string} >"
×
189

190

191
def connect_to_server(s, host, port):
21✔
UNCOV
192
    timeout_ms = 10000
×
UNCOV
193
    now = time.time() * 1000
×
UNCOV
194
    end_time = now + timeout_ms
×
UNCOV
195
    while now < end_time:
×
UNCOV
196
        try:
×
UNCOV
197
            s.connect((host, port))
×
UNCOV
198
            return
×
UNCOV
199
        except Exception as e:
×
UNCOV
200
            log.warning(f"Failed to connect to server: {type(e)} Message: {e}")
×
UNCOV
201
            now = time.time() * 1000
×
UNCOV
202
    raise TimeoutError(
×
203
        f"connect_to_server: Failed to connect server for {timeout_ms} ms"
204
    )
205

206

207
class SocketCanDaemonBus(can.BusABC):
21✔
208
    def __init__(self, channel, host, port, tcp_tune=False, can_filters=None, **kwargs):
21✔
209
        """Connects to a CAN bus served by socketcand.
210

211
        It implements :meth:`can.BusABC._detect_available_configs` to search for
212
        available interfaces.
213

214
        It will attempt to connect to the server for up to 10s, after which a
215
        TimeoutError exception will be thrown.
216

217
        If the handshake with the socketcand server fails, a CanError exception
218
        is thrown.
219

220
        :param channel:
221
            The can interface name served by socketcand.
222
            An example channel would be 'vcan0' or 'can0'.
223
        :param host:
224
            The host address of the socketcand server.
225
        :param port:
226
            The port of the socketcand server.
227
        :param tcp_tune:
228
            This tunes the TCP socket for low latency (TCP_NODELAY, and
229
            TCP_QUICKACK).
230
            This option is not available under windows.
231
        :param can_filters:
232
            See :meth:`can.BusABC.set_filters`.
233
        """
234
        self.__host = host
×
235
        self.__port = port
×
236

UNCOV
237
        self.__tcp_tune = tcp_tune
×
UNCOV
238
        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
×
239

240
        if self.__tcp_tune:
×
241
            if os.name == "nt":
×
UNCOV
242
                self.__tcp_tune = False
×
243
                log.warning("'tcp_tune' not available in Windows. Setting to False")
×
244
            else:
UNCOV
245
                self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
×
246

UNCOV
247
        self.__message_buffer = deque()
×
UNCOV
248
        self.__receive_buffer = ""  # i know string is not the most efficient here
×
249
        self.channel = channel
×
UNCOV
250
        self.channel_info = f"socketcand on {channel}@{host}:{port}"
×
251
        connect_to_server(self.__socket, self.__host, self.__port)
×
252
        self._expect_msg("< hi >")
×
253

254
        log.info(
×
255
            f"SocketCanDaemonBus: connected with address {self.__socket.getsockname()}"
256
        )
257
        self._tcp_send(f"< open {channel} >")
×
258
        self._expect_msg("< ok >")
×
UNCOV
259
        self._tcp_send("< rawmode >")
×
260
        self._expect_msg("< ok >")
×
UNCOV
261
        super().__init__(channel=channel, can_filters=can_filters, **kwargs)
×
262

263
    def _recv_internal(self, timeout):
21✔
264
        if len(self.__message_buffer) != 0:
×
265
            can_message = self.__message_buffer.popleft()
×
266
            return can_message, False
×
267

268
        try:
×
269
            # get all sockets that are ready (can be a list with a single value
270
            # being self.socket or an empty list if self.socket is not ready)
271
            ready_receive_sockets, _, _ = select.select(
×
272
                [self.__socket], [], [], timeout
273
            )
274
        except OSError as exc:
×
275
            # something bad happened (e.g. the interface went down)
UNCOV
276
            log.error(f"Failed to receive: {exc}")
×
UNCOV
277
            raise can.CanError(f"Failed to receive: {exc}") from exc
×
278

279
        try:
×
280
            if not ready_receive_sockets:
×
281
                # socket wasn't readable or timeout occurred
282
                log.debug("Socket not ready")
×
283
                return None, False
×
284

UNCOV
285
            ascii_msg = self.__socket.recv(1024).decode(
×
286
                "ascii"
287
            )  # may contain multiple messages
288
            if self.__tcp_tune:
×
289
                self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
×
290
            self.__receive_buffer += ascii_msg
×
291
            log.debug(f"Received Ascii Message: {ascii_msg}")
×
UNCOV
292
            buffer_view = self.__receive_buffer
×
UNCOV
293
            chars_processed_successfully = 0
×
294
            while True:
295
                if len(buffer_view) == 0:
×
UNCOV
296
                    break
×
297

298
                start = buffer_view.find("<")
×
299
                if start == -1:
×
UNCOV
300
                    log.warning(
×
301
                        f"Bad data: No opening < found => discarding entire buffer '{buffer_view}'"
302
                    )
UNCOV
303
                    chars_processed_successfully = len(self.__receive_buffer)
×
UNCOV
304
                    break
×
UNCOV
305
                end = buffer_view.find(">")
×
UNCOV
306
                if end == -1:
×
307
                    log.warning("Got incomplete message => waiting for more data")
×
UNCOV
308
                    if len(buffer_view) > 200:
×
309
                        log.warning(
×
310
                            "Incomplete message exceeds 200 chars => Discarding"
311
                        )
UNCOV
312
                        chars_processed_successfully = len(self.__receive_buffer)
×
UNCOV
313
                    break
×
UNCOV
314
                chars_processed_successfully += end + 1
×
UNCOV
315
                single_message = buffer_view[start : end + 1]
×
316
                parsed_can_message = convert_ascii_message_to_can_message(
×
317
                    single_message
318
                )
319
                if parsed_can_message is None:
×
UNCOV
320
                    log.warning(f"Invalid Frame: {single_message}")
×
321
                else:
322
                    parsed_can_message.channel = self.channel
×
323
                    self.__message_buffer.append(parsed_can_message)
×
324
                buffer_view = buffer_view[end + 1 :]
×
325

326
            self.__receive_buffer = self.__receive_buffer[chars_processed_successfully:]
×
UNCOV
327
            can_message = (
×
328
                None
329
                if len(self.__message_buffer) == 0
330
                else self.__message_buffer.popleft()
331
            )
UNCOV
332
            return can_message, False
×
333

334
        except Exception as exc:
×
335
            log.error(f"Failed to receive: {exc}  {traceback.format_exc()}")
×
UNCOV
336
            raise can.CanError(
×
337
                f"Failed to receive: {exc}  {traceback.format_exc()}"
338
            ) from exc
339

340
    def _tcp_send(self, msg: str):
21✔
UNCOV
341
        log.debug(f"Sending TCP Message: '{msg}'")
×
UNCOV
342
        self.__socket.sendall(msg.encode("ascii"))
×
UNCOV
343
        if self.__tcp_tune:
×
UNCOV
344
            self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
×
345

346
    def _expect_msg(self, msg):
21✔
347
        ascii_msg = self.__socket.recv(256).decode("ascii")
×
348
        if self.__tcp_tune:
×
UNCOV
349
            self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
×
UNCOV
350
        if not ascii_msg == msg:
×
UNCOV
351
            raise can.CanError(f"Expected '{msg}' got: '{ascii_msg}'")
×
352

353
    def send(self, msg, timeout=None):
21✔
354
        """Transmit a message to the CAN bus.
355

356
        :param msg: A message object.
357
        :param timeout: Ignored
358
        """
UNCOV
359
        ascii_msg = convert_can_message_to_ascii_message(msg)
×
UNCOV
360
        self._tcp_send(ascii_msg)
×
361

362
    def shutdown(self):
21✔
363
        """Stops all active periodic tasks and closes the socket."""
364
        super().shutdown()
21✔
365
        self.__socket.close()
21✔
366

367
    @staticmethod
21✔
368
    def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]:
21✔
369
        try:
21✔
370
            return detect_beacon()
21✔
UNCOV
371
        except Exception as e:
×
UNCOV
372
            log.warning(f"Could not detect socketcand beacon: {e}")
×
UNCOV
373
            return []
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc