• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 11256605622

09 Oct 2024 02:02PM UTC coverage: 70.371% (-0.1%) from 70.515%
11256605622

Pull #1870

github

web-flow
Merge 907b1d795 into 7a3d23fa3
Pull Request #1870: Add tox environment for doctest

7472 of 10618 relevant lines covered (70.37%)

15.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.95
/can/interfaces/socketcan/socketcan.py
1
"""
24✔
2
The main module of the socketcan interface containing most user-facing classes and methods
3
along some internal methods.
4

5
At the end of the file the usage of the internal methods is shown.
6
"""
7

8
import ctypes
24✔
9
import ctypes.util
24✔
10
import errno
24✔
11
import logging
24✔
12
import select
24✔
13
import socket
24✔
14
import struct
24✔
15
import threading
24✔
16
import time
24✔
17
import warnings
24✔
18
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
24✔
19

20
import can
24✔
21
from can import BusABC, CanProtocol, Message
24✔
22
from can.broadcastmanager import (
24✔
23
    LimitedDurationCyclicSendTaskABC,
24
    ModifiableCyclicTaskABC,
25
    RestartableCyclicTaskABC,
26
)
27
from can.interfaces.socketcan import constants
24✔
28
from can.interfaces.socketcan.utils import find_available_interfaces, pack_filters
24✔
29
from can.typechecking import CanFilters
24✔
30

31
log = logging.getLogger(__name__)
24✔
32
log_tx = log.getChild("tx")
24✔
33
log_rx = log.getChild("rx")
24✔
34

35
try:
24✔
36
    from socket import CMSG_SPACE
24✔
37

38
    CMSG_SPACE_available = True
16✔
39
except ImportError:
8✔
40
    CMSG_SPACE_available = False
8✔
41
    log.error("socket.CMSG_SPACE not available on this platform")
8✔
42

43

44
# Constants needed for precise handling of timestamps
45
RECEIVED_TIMESTAMP_STRUCT = struct.Struct("@ll")
24✔
46
RECEIVED_ANCILLARY_BUFFER_SIZE = (
24✔
47
    CMSG_SPACE(RECEIVED_TIMESTAMP_STRUCT.size) if CMSG_SPACE_available else 0
48
)
49

50

51
# Setup BCM struct
52
def bcm_header_factory(
24✔
53
    fields: List[Tuple[str, Union[Type[ctypes.c_uint32], Type[ctypes.c_long]]]],
54
    alignment: int = 8,
55
):
56
    curr_stride = 0
24✔
57
    results: List[
24✔
58
        Tuple[
59
            str, Union[Type[ctypes.c_uint8], Type[ctypes.c_uint32], Type[ctypes.c_long]]
60
        ]
61
    ] = []
62
    pad_index = 0
24✔
63
    for field in fields:
24✔
64
        field_alignment = ctypes.alignment(field[1])
24✔
65
        field_size = ctypes.sizeof(field[1])
24✔
66

67
        # If the current stride index isn't a multiple of the alignment
68
        # requirements of this field, then we must add padding bytes until we
69
        # are aligned
70
        while curr_stride % field_alignment != 0:
24✔
71
            results.append((f"pad_{pad_index}", ctypes.c_uint8))
24✔
72
            pad_index += 1
24✔
73
            curr_stride += 1
24✔
74

75
        # Now can it fit?
76
        # Example: If this is 8 bytes and the type requires 4 bytes alignment
77
        # then we can only fit when we're starting at 0. Otherwise, we will
78
        # split across 2 strides.
79
        #
80
        # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
81
        results.append(field)
24✔
82
        curr_stride += field_size
24✔
83

84
    # Add trailing padding to align to a multiple of the largest scalar member
85
    # in the structure
86
    while curr_stride % alignment != 0:
24✔
87
        results.append((f"pad_{pad_index}", ctypes.c_uint8))
24✔
88
        pad_index += 1
24✔
89
        curr_stride += 1
24✔
90

91
    return type("BcmMsgHead", (ctypes.Structure,), {"_fields_": results})
24✔
92

93

94
# The fields definition is taken from the C struct definitions in
95
# <linux/can/bcm.h>
96
#
97
#     struct bcm_timeval {
98
#             long tv_sec;
99
#             long tv_usec;
100
#     };
101
#
102
#     /**
103
#      * struct bcm_msg_head - head of messages to/from the broadcast manager
104
#      * @opcode:    opcode, see enum below.
105
#      * @flags:     special flags, see below.
106
#      * @count:     number of frames to send before changing interval.
107
#      * @ival1:     interval for the first @count frames.
108
#      * @ival2:     interval for the following frames.
109
#      * @can_id:    CAN ID of frames to be sent or received.
110
#      * @nframes:   number of frames appended to the message head.
111
#      * @frames:    array of CAN frames.
112
#      */
113
#     struct bcm_msg_head {
114
#             __u32 opcode;
115
#             __u32 flags;
116
#             __u32 count;
117
#             struct bcm_timeval ival1, ival2;
118
#             canid_t can_id;
119
#             __u32 nframes;
120
#             struct can_frame frames[0];
121
#     };
122
BcmMsgHead = bcm_header_factory(
24✔
123
    fields=[
124
        ("opcode", ctypes.c_uint32),
125
        ("flags", ctypes.c_uint32),
126
        ("count", ctypes.c_uint32),
127
        ("ival1_tv_sec", ctypes.c_long),
128
        ("ival1_tv_usec", ctypes.c_long),
129
        ("ival2_tv_sec", ctypes.c_long),
130
        ("ival2_tv_usec", ctypes.c_long),
131
        ("can_id", ctypes.c_uint32),
132
        ("nframes", ctypes.c_uint32),
133
    ]
134
)
135

136

137
# struct module defines a binary packing format:
138
# https://docs.python.org/3/library/struct.html#struct-format-strings
139
# The 32bit can id is directly followed by the 8bit data link count
140
# The data field is aligned on an 8 byte boundary, hence we add padding
141
# which aligns the data field to an 8 byte boundary.
142
CAN_FRAME_HEADER_STRUCT = struct.Struct("=IBB2x")
24✔
143

144

145
def build_can_frame(msg: Message) -> bytes:
24✔
146
    """CAN frame packing/unpacking (see 'struct can_frame' in <linux/can.h>)
147
    /**
148
     * struct can_frame - basic CAN frame structure
149
     * @can_id:  the CAN ID of the frame and CAN_*_FLAG flags, see above.
150
     * @can_dlc: the data length field of the CAN frame
151
     * @data:    the CAN frame payload.
152
     */
153
    struct can_frame {
154
        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
155
        __u8    can_dlc; /* data length code: 0 .. 8 */
156
        __u8    data[8] __attribute__((aligned(8)));
157
    };
158

159
    /**
160
    * struct canfd_frame - CAN flexible data rate frame structure
161
    * @can_id: CAN ID of the frame and CAN_*_FLAG flags, see canid_t definition
162
    * @len:    frame payload length in byte (0 .. CANFD_MAX_DLEN)
163
    * @flags:  additional flags for CAN FD
164
    * @__res0: reserved / padding
165
    * @__res1: reserved / padding
166
    * @data:   CAN FD frame payload (up to CANFD_MAX_DLEN byte)
167
    */
168
    struct canfd_frame {
169
        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
170
        __u8    len;     /* frame payload length in byte */
171
        __u8    flags;   /* additional flags for CAN FD */
172
        __u8    __res0;  /* reserved / padding */
173
        __u8    __res1;  /* reserved / padding */
174
        __u8    data[CANFD_MAX_DLEN] __attribute__((aligned(8)));
175
    };
176
    """
177
    can_id = _compose_arbitration_id(msg)
6✔
178
    flags = 0
6✔
179
    if msg.bitrate_switch:
6✔
180
        flags |= constants.CANFD_BRS
6✔
181
    if msg.error_state_indicator:
6✔
182
        flags |= constants.CANFD_ESI
×
183
    max_len = 64 if msg.is_fd else 8
6✔
184
    data = bytes(msg.data).ljust(max_len, b"\x00")
6✔
185
    return CAN_FRAME_HEADER_STRUCT.pack(can_id, msg.dlc, flags) + data
6✔
186

187

188
def build_bcm_header(
24✔
189
    opcode: int,
190
    flags: int,
191
    count: int,
192
    ival1_seconds: int,
193
    ival1_usec: int,
194
    ival2_seconds: int,
195
    ival2_usec: int,
196
    can_id: int,
197
    nframes: int,
198
) -> bytes:
199
    result = BcmMsgHead(
24✔
200
        opcode=opcode,
201
        flags=flags,
202
        count=count,
203
        ival1_tv_sec=ival1_seconds,
204
        ival1_tv_usec=ival1_usec,
205
        ival2_tv_sec=ival2_seconds,
206
        ival2_tv_usec=ival2_usec,
207
        can_id=can_id,
208
        nframes=nframes,
209
    )
210
    return ctypes.string_at(ctypes.addressof(result), ctypes.sizeof(result))
24✔
211

212

213
def build_bcm_tx_delete_header(can_id: int, flags: int) -> bytes:
24✔
214
    opcode = constants.CAN_BCM_TX_DELETE
24✔
215
    return build_bcm_header(opcode, flags, 0, 0, 0, 0, 0, can_id, 1)
24✔
216

217

218
def build_bcm_transmit_header(
24✔
219
    can_id: int,
220
    count: int,
221
    initial_period: float,
222
    subsequent_period: float,
223
    msg_flags: int,
224
    nframes: int = 1,
225
) -> bytes:
226
    opcode = constants.CAN_BCM_TX_SETUP
24✔
227

228
    flags = msg_flags | constants.SETTIMER | constants.STARTTIMER
24✔
229

230
    if initial_period > 0:
24✔
231
        # Note `TX_COUNTEVT` creates the message TX_EXPIRED when count expires
232
        flags |= constants.TX_COUNTEVT
24✔
233

234
    def split_time(value: float) -> Tuple[int, int]:
24✔
235
        """Given seconds as a float, return whole seconds and microseconds"""
236
        seconds = int(value)
24✔
237
        microseconds = int(1e6 * (value - seconds))
24✔
238
        return seconds, microseconds
24✔
239

240
    ival1_seconds, ival1_usec = split_time(initial_period)
24✔
241
    ival2_seconds, ival2_usec = split_time(subsequent_period)
24✔
242

243
    return build_bcm_header(
24✔
244
        opcode,
245
        flags,
246
        count,
247
        ival1_seconds,
248
        ival1_usec,
249
        ival2_seconds,
250
        ival2_usec,
251
        can_id,
252
        nframes,
253
    )
254

255

256
def build_bcm_update_header(can_id: int, msg_flags: int, nframes: int = 1) -> bytes:
24✔
257
    return build_bcm_header(
24✔
258
        constants.CAN_BCM_TX_SETUP, msg_flags, 0, 0, 0, 0, 0, can_id, nframes
259
    )
260

261

262
def dissect_can_frame(frame: bytes) -> Tuple[int, int, int, bytes]:
24✔
263
    can_id, can_dlc, flags = CAN_FRAME_HEADER_STRUCT.unpack_from(frame)
6✔
264
    if len(frame) != constants.CANFD_MTU:
6✔
265
        # Flags not valid in non-FD frames
266
        flags = 0
6✔
267
    return can_id, can_dlc, flags, frame[8 : 8 + can_dlc]
6✔
268

269

270
def create_bcm_socket(channel: str) -> socket.socket:
24✔
271
    """create a broadcast manager socket and connect to the given interface"""
272
    s = socket.socket(constants.PF_CAN, socket.SOCK_DGRAM, constants.CAN_BCM)
6✔
273
    s.connect((channel,))
6✔
274
    return s
6✔
275

276

277
def send_bcm(bcm_socket: socket.socket, data: bytes) -> int:
24✔
278
    """
279
    Send raw frame to a BCM socket and handle errors.
280
    """
281
    try:
6✔
282
        return bcm_socket.send(data)
6✔
283
    except OSError as error:
×
284
        base = f"Couldn't send CAN BCM frame due to OS Error: {error.strerror}"
×
285

286
        if error.errno == errno.EINVAL:
×
287
            specific_message = " You are probably referring to a non-existing frame."
×
288
        elif error.errno == errno.ENETDOWN:
×
289
            specific_message = " The CAN interface appears to be down."
×
290
        elif error.errno == errno.EBADF:
×
291
            specific_message = " The CAN socket appears to be closed."
×
292
        else:
293
            specific_message = ""
×
294

295
        raise can.CanOperationError(base + specific_message, error.errno) from error
×
296

297

298
def _compose_arbitration_id(message: Message) -> int:
24✔
299
    can_id = message.arbitration_id
6✔
300
    if message.is_extended_id:
6✔
301
        log.debug("sending an extended id type message")
6✔
302
        can_id |= constants.CAN_EFF_FLAG
6✔
303
    if message.is_remote_frame:
6✔
304
        log.debug("requesting a remote frame")
6✔
305
        can_id |= constants.CAN_RTR_FLAG
6✔
306
    if message.is_error_frame:
6✔
307
        log.debug("sending error frame")
×
308
        can_id |= constants.CAN_ERR_FLAG
×
309
    return can_id
6✔
310

311

312
class CyclicSendTask(
24✔
313
    LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
314
):
315
    """
24✔
316
    A SocketCAN cyclic send task supports:
317

318
        - setting of a task duration
319
        - modifying the data
320
        - stopping then subsequent restarting of the task
321
    """
322

323
    def __init__(
24✔
324
        self,
325
        bcm_socket: socket.socket,
326
        task_id: int,
327
        messages: Union[Sequence[Message], Message],
328
        period: float,
329
        duration: Optional[float] = None,
330
        autostart: bool = True,
331
    ) -> None:
332
        """Construct and :meth:`~start` a task.
333

334
        :param bcm_socket: An open BCM socket on the desired CAN channel.
335
        :param task_id:
336
            The identifier used to uniquely reference particular cyclic send task
337
            within Linux BCM.
338
        :param messages:
339
            The messages to be sent periodically.
340
        :param period:
341
            The rate in seconds at which to send the messages.
342
        :param duration:
343
            Approximate duration in seconds to send the messages for.
344
        """
345
        # The following are assigned by LimitedDurationCyclicSendTaskABC:
346
        #   - self.messages
347
        #   - self.period
348
        #   - self.duration
349
        super().__init__(messages, period, duration)
6✔
350

351
        self.bcm_socket = bcm_socket
6✔
352
        self.task_id = task_id
6✔
353
        if autostart:
6✔
354
            self._tx_setup(self.messages)
6✔
355

356
    def _tx_setup(
24✔
357
        self,
358
        messages: Sequence[Message],
359
        raise_if_task_exists: bool = True,
360
    ) -> None:
361
        # Create a low level packed frame to pass to the kernel
362
        body = bytearray()
6✔
363
        self.flags = constants.CAN_FD_FRAME if messages[0].is_fd else 0
6✔
364

365
        if self.duration:
6✔
366
            count = int(self.duration / self.period)
×
367
            ival1 = self.period
×
368
            ival2 = 0.0
×
369
        else:
370
            count = 0
6✔
371
            ival1 = 0.0
6✔
372
            ival2 = self.period
6✔
373

374
        if raise_if_task_exists:
6✔
375
            self._check_bcm_task()
6✔
376

377
        header = build_bcm_transmit_header(
6✔
378
            self.task_id, count, ival1, ival2, self.flags, nframes=len(messages)
379
        )
380
        for message in messages:
6✔
381
            body += build_can_frame(message)
6✔
382
        log.debug("Sending BCM command")
6✔
383
        send_bcm(self.bcm_socket, header + body)
6✔
384

385
    def _check_bcm_task(self) -> None:
24✔
386
        # Do a TX_READ on a task ID, and check if we get EINVAL. If so,
387
        # then we are referring to a CAN message with an existing ID
388
        check_header = build_bcm_header(
6✔
389
            opcode=constants.CAN_BCM_TX_READ,
390
            flags=0,
391
            count=0,
392
            ival1_seconds=0,
393
            ival1_usec=0,
394
            ival2_seconds=0,
395
            ival2_usec=0,
396
            can_id=self.task_id,
397
            nframes=0,
398
        )
399
        log.debug(
6✔
400
            "Reading properties of (cyclic) transmission task id=%d", self.task_id
401
        )
402
        try:
6✔
403
            self.bcm_socket.send(check_header)
6✔
404
        except OSError as error:
6✔
405
            if error.errno != errno.EINVAL:
6✔
406
                raise can.CanOperationError("failed to check", error.errno) from error
×
407
            else:
408
                log.debug("Invalid argument - transmission task not known to kernel")
6✔
409
        else:
410
            # No exception raised - transmission task with this ID exists in kernel.
411
            # Existence of an existing transmission task might not be a problem!
412
            raise can.CanOperationError(
×
413
                f"A periodic task for task ID {self.task_id} is already in progress "
414
                "by the SocketCAN Linux layer"
415
            )
416

417
    def stop(self) -> None:
24✔
418
        """Stop a task by sending TX_DELETE message to Linux kernel.
419

420
        This will delete the entry for the transmission of the CAN-message
421
        with the specified ``task_id`` identifier. The message length
422
        for the command TX_DELETE is {[bcm_msg_head]} (only the header).
423
        """
424
        log.debug("Stopping periodic task")
6✔
425

426
        stopframe = build_bcm_tx_delete_header(self.task_id, self.flags)
6✔
427
        send_bcm(self.bcm_socket, stopframe)
6✔
428

429
    def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
24✔
430
        """Update the contents of the periodically sent CAN messages by
431
        sending TX_SETUP message to Linux kernel.
432

433
        The number of new cyclic messages to be sent must be equal to the
434
        original number of messages originally specified for this task.
435

436
        .. note:: The messages must all have the same
437
                  :attr:`~can.Message.arbitration_id` like the first message.
438

439
        :param messages:
440
            The messages with the new :attr:`can.Message.data`.
441
        """
442
        messages = self._check_and_convert_messages(messages)
6✔
443
        self._check_modified_messages(messages)
6✔
444

445
        self.messages = messages
6✔
446

447
        body = bytearray()
6✔
448
        header = build_bcm_update_header(
6✔
449
            can_id=self.task_id, msg_flags=self.flags, nframes=len(messages)
450
        )
451
        for message in messages:
6✔
452
            body += build_can_frame(message)
6✔
453
        log.debug("Sending BCM command")
6✔
454
        send_bcm(self.bcm_socket, header + body)
6✔
455

456
    def start(self) -> None:
24✔
457
        """Restart a periodic task by sending TX_SETUP message to Linux kernel.
458

459
        It verifies presence of the particular BCM task through sending TX_READ
460
        message to Linux kernel prior to scheduling.
461

462
        :raises ValueError:
463
            If the task referenced by ``task_id`` is already running.
464
        """
465
        self._tx_setup(self.messages, raise_if_task_exists=False)
6✔
466

467

468
class MultiRateCyclicSendTask(CyclicSendTask):
24✔
469
    """Exposes more of the full power of the TX_SETUP opcode."""
24✔
470

471
    def __init__(
24✔
472
        self,
473
        channel: socket.socket,
474
        task_id: int,
475
        messages: Sequence[Message],
476
        count: int,
477
        initial_period: float,
478
        subsequent_period: float,
479
    ):
480
        super().__init__(channel, task_id, messages, subsequent_period)
×
481

482
        # Create a low level packed frame to pass to the kernel
483
        header = build_bcm_transmit_header(
×
484
            self.task_id,
485
            count,
486
            initial_period,
487
            subsequent_period,
488
            self.flags,
489
            nframes=len(messages),
490
        )
491

492
        body = bytearray()
×
493
        for message in messages:
×
494
            body += build_can_frame(message)
×
495

496
        log.info("Sending BCM TX_SETUP command")
×
497
        send_bcm(self.bcm_socket, header + body)
×
498

499

500
def create_socket() -> socket.socket:
24✔
501
    """Creates a raw CAN socket. The socket will
502
    be returned unbound to any interface.
503
    """
504
    sock = socket.socket(constants.PF_CAN, socket.SOCK_RAW, constants.CAN_RAW)
6✔
505

506
    log.info("Created a socket")
6✔
507

508
    return sock
6✔
509

510

511
def bind_socket(sock: socket.socket, channel: str = "can0") -> None:
24✔
512
    """
513
    Binds the given socket to the given interface.
514

515
    :param sock:
516
        The socket to be bound
517
    :param channel:
518
        The channel / interface to bind to
519
    :raises OSError:
520
        If the specified interface isn't found.
521
    """
522
    log.debug("Binding socket to channel=%s", channel)
6✔
523
    sock.bind((channel,))
6✔
524
    log.debug("Bound socket.")
6✔
525

526

527
def capture_message(
24✔
528
    sock: socket.socket, get_channel: bool = False
529
) -> Optional[Message]:
530
    """
531
    Captures a message from given socket.
532

533
    :param sock:
534
        The socket to read a message from.
535
    :param get_channel:
536
        Find out which channel the message comes from.
537

538
    :return: The received message, or None on failure.
539
    """
540
    # Fetching the Arb ID, DLC and Data
541
    try:
6✔
542
        cf, ancillary_data, msg_flags, addr = sock.recvmsg(
6✔
543
            constants.CANFD_MTU, RECEIVED_ANCILLARY_BUFFER_SIZE
544
        )
545
        if get_channel:
6✔
546
            channel = addr[0] if isinstance(addr, tuple) else addr
6✔
547
        else:
548
            channel = None
6✔
549
    except OSError as error:
×
550
        raise can.CanOperationError(
×
551
            f"Error receiving: {error.strerror}", error.errno
552
        ) from error
553

554
    can_id, can_dlc, flags, data = dissect_can_frame(cf)
6✔
555

556
    # Fetching the timestamp
557
    assert len(ancillary_data) == 1, "only requested a single extra field"
6✔
558
    cmsg_level, cmsg_type, cmsg_data = ancillary_data[0]
6✔
559
    assert (
6✔
560
        cmsg_level == socket.SOL_SOCKET and cmsg_type == constants.SO_TIMESTAMPNS
561
    ), "received control message type that was not requested"
562
    # see https://man7.org/linux/man-pages/man3/timespec.3.html -> struct timespec for details
563
    seconds, nanoseconds = RECEIVED_TIMESTAMP_STRUCT.unpack_from(cmsg_data)
6✔
564
    if nanoseconds >= 1e9:
6✔
565
        raise can.CanOperationError(
×
566
            f"Timestamp nanoseconds field was out of range: {nanoseconds} not less than 1e9"
567
        )
568
    timestamp = seconds + nanoseconds * 1e-9
6✔
569

570
    # EXT, RTR, ERR flags -> boolean attributes
571
    #   /* special address description flags for the CAN_ID */
572
    #   #define CAN_EFF_FLAG 0x80000000U /* EFF/SFF is set in the MSB */
573
    #   #define CAN_RTR_FLAG 0x40000000U /* remote transmission request */
574
    #   #define CAN_ERR_FLAG 0x20000000U /* error frame */
575
    is_extended_frame_format = bool(can_id & constants.CAN_EFF_FLAG)
6✔
576
    is_remote_transmission_request = bool(can_id & constants.CAN_RTR_FLAG)
6✔
577
    is_error_frame = bool(can_id & constants.CAN_ERR_FLAG)
6✔
578
    is_fd = len(cf) == constants.CANFD_MTU
6✔
579
    bitrate_switch = bool(flags & constants.CANFD_BRS)
6✔
580
    error_state_indicator = bool(flags & constants.CANFD_ESI)
6✔
581

582
    # Section 4.7.1: MSG_DONTROUTE: set when the received frame was created on the local host.
583
    is_rx = not bool(msg_flags & socket.MSG_DONTROUTE)
6✔
584

585
    if is_extended_frame_format:
6✔
586
        # log.debug("CAN: Extended")
587
        # TODO does this depend on SFF or EFF?
588
        arbitration_id = can_id & 0x1FFFFFFF
6✔
589
    else:
590
        # log.debug("CAN: Standard")
591
        arbitration_id = can_id & 0x000007FF
6✔
592

593
    msg = Message(
6✔
594
        timestamp=timestamp,
595
        channel=channel,
596
        arbitration_id=arbitration_id,
597
        is_extended_id=is_extended_frame_format,
598
        is_remote_frame=is_remote_transmission_request,
599
        is_error_frame=is_error_frame,
600
        is_fd=is_fd,
601
        is_rx=is_rx,
602
        bitrate_switch=bitrate_switch,
603
        error_state_indicator=error_state_indicator,
604
        dlc=can_dlc,
605
        data=data,
606
    )
607

608
    return msg
6✔
609

610

611
class SocketcanBus(BusABC):  # pylint: disable=abstract-method
24✔
612
    """A SocketCAN interface to CAN.
24✔
613

614
    It implements :meth:`can.BusABC._detect_available_configs` to search for
615
    available interfaces.
616
    """
617

618
    def __init__(
24✔
619
        self,
620
        channel: str = "",
621
        receive_own_messages: bool = False,
622
        local_loopback: bool = True,
623
        fd: bool = False,
624
        can_filters: Optional[CanFilters] = None,
625
        ignore_rx_error_frames=False,
626
        **kwargs,
627
    ) -> None:
628
        """Creates a new socketcan bus.
629

630
        If setting some socket options fails, an error will be printed
631
        but no exception will be thrown. This includes enabling:
632

633
            - that own messages should be received,
634
            - CAN-FD frames and
635
            - error frames.
636

637
        :param channel:
638
            The can interface name with which to create this bus.
639
            An example channel would be 'vcan0' or 'can0'.
640
            An empty string '' will receive messages from all channels.
641
            In that case any sent messages must be explicitly addressed to a
642
            channel using :attr:`can.Message.channel`.
643
        :param receive_own_messages:
644
            If transmitted messages should also be received by this bus.
645
        :param local_loopback:
646
            If local loopback should be enabled on this bus.
647
            Please note that local loopback does not mean that messages sent
648
            on a socket will be readable on the same socket, they will only
649
            be readable on other open sockets on the same machine. More info
650
            can be read on the socketcan documentation:
651
            See https://www.kernel.org/doc/html/latest/networking/can.html#socketcan-local-loopback1
652
        :param fd:
653
            If CAN-FD frames should be supported.
654
        :param can_filters:
655
            See :meth:`can.BusABC.set_filters`.
656
        :param ignore_rx_error_frames:
657
            If incoming error frames should be discarded.
658
        """
659
        self.socket = create_socket()
6✔
660
        self.channel = channel
6✔
661
        self.channel_info = f"socketcan channel '{channel}'"
6✔
662
        self._bcm_sockets: Dict[str, socket.socket] = {}
6✔
663
        self._is_filtered = False
6✔
664
        self._task_id = 0
6✔
665
        self._task_id_guard = threading.Lock()
6✔
666
        self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20
6✔
667

668
        # set the local_loopback parameter
669
        try:
6✔
670
            self.socket.setsockopt(
6✔
671
                constants.SOL_CAN_RAW,
672
                constants.CAN_RAW_LOOPBACK,
673
                1 if local_loopback else 0,
674
            )
675
        except OSError as error:
×
676
            log.error("Could not set local loopback flag(%s)", error)
×
677

678
        # set the receive_own_messages parameter
679
        try:
6✔
680
            self.socket.setsockopt(
6✔
681
                constants.SOL_CAN_RAW,
682
                constants.CAN_RAW_RECV_OWN_MSGS,
683
                1 if receive_own_messages else 0,
684
            )
685
        except OSError as error:
×
686
            log.error("Could not receive own messages (%s)", error)
×
687

688
        # enable CAN-FD frames if desired
689
        if fd:
6✔
690
            try:
6✔
691
                self.socket.setsockopt(
6✔
692
                    constants.SOL_CAN_RAW, constants.CAN_RAW_FD_FRAMES, 1
693
                )
694
            except OSError as error:
×
695
                log.error("Could not enable CAN-FD frames (%s)", error)
×
696

697
        if not ignore_rx_error_frames:
6✔
698
            # enable error frames
699
            try:
6✔
700
                self.socket.setsockopt(
6✔
701
                    constants.SOL_CAN_RAW, constants.CAN_RAW_ERR_FILTER, 0x1FFFFFFF
702
                )
703
            except OSError as error:
×
704
                log.error("Could not enable error frames (%s)", error)
×
705

706
        # enable nanosecond resolution timestamping
707
        # we can always do this since
708
        #  1) it is guaranteed to be at least as precise as without
709
        #  2) it is available since Linux 2.6.22, and CAN support was only added afterward
710
        #     so this is always supported by the kernel
711
        self.socket.setsockopt(socket.SOL_SOCKET, constants.SO_TIMESTAMPNS, 1)
6✔
712

713
        try:
6✔
714
            bind_socket(self.socket, channel)
6✔
715
            kwargs.update(
6✔
716
                {
717
                    "receive_own_messages": receive_own_messages,
718
                    "fd": fd,
719
                    "local_loopback": local_loopback,
720
                }
721
            )
722
        except OSError as error:
×
723
            log.error("Could not access SocketCAN device %s (%s)", channel, error)
×
724
            raise
×
725
        super().__init__(
6✔
726
            channel=channel,
727
            can_filters=can_filters,
728
            **kwargs,
729
        )
730

731
    def shutdown(self) -> None:
24✔
732
        """Stops all active periodic tasks and closes the socket."""
733
        super().shutdown()
24✔
734
        for channel, bcm_socket in self._bcm_sockets.items():
24✔
735
            log.debug("Closing bcm socket for channel %s", channel)
6✔
736
            bcm_socket.close()
6✔
737
        log.debug("Closing raw can socket")
6✔
738
        self.socket.close()
6✔
739

740
    def _recv_internal(
24✔
741
        self, timeout: Optional[float]
742
    ) -> Tuple[Optional[Message], bool]:
743
        try:
6✔
744
            # get all sockets that are ready (can be a list with a single value
745
            # being self.socket or an empty list if self.socket is not ready)
746
            ready_receive_sockets, _, _ = select.select([self.socket], [], [], timeout)
6✔
747
        except OSError as error:
×
748
            # something bad happened (e.g. the interface went down)
749
            raise can.CanOperationError(
×
750
                f"Failed to receive: {error.strerror}", error.errno
751
            ) from error
752

753
        if ready_receive_sockets:  # not empty
6✔
754
            get_channel = self.channel == ""
6✔
755
            msg = capture_message(self.socket, get_channel)
6✔
756
            if msg and not msg.channel and self.channel:
6✔
757
                # Default to our own channel
758
                msg.channel = self.channel
6✔
759
            return msg, self._is_filtered
6✔
760

761
        # socket wasn't readable or timeout occurred
762
        return None, self._is_filtered
6✔
763

764
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
24✔
765
        """Transmit a message to the CAN bus.
766

767
        :param msg: A message object.
768
        :param timeout:
769
            Wait up to this many seconds for the transmit queue to be ready.
770
            If not given, the call may fail immediately.
771

772
        :raises ~can.exceptions.CanError:
773
            if the message could not be written.
774
        """
775
        log.debug("We've been asked to write a message to the bus")
6✔
776
        logger_tx = log.getChild("tx")
6✔
777
        logger_tx.debug("sending: %s", msg)
6✔
778

779
        started = time.time()
6✔
780
        # If no timeout is given, poll for availability
781
        if timeout is None:
6✔
782
            timeout = 0
6✔
783
        time_left = timeout
6✔
784
        data = build_can_frame(msg)
6✔
785

786
        while time_left >= 0:
6✔
787
            # Wait for write availability
788
            ready = select.select([], [self.socket], [], time_left)[1]
6✔
789
            if not ready:
6✔
790
                # Timeout
791
                break
×
792
            channel = str(msg.channel) if msg.channel else None
6✔
793
            sent = self._send_once(data, channel)
6✔
794
            if sent == len(data):
6✔
795
                return
6✔
796
            # Not all data were sent, try again with remaining data
797
            data = data[sent:]
×
798
            time_left = timeout - (time.time() - started)
×
799

800
        raise can.CanOperationError("Transmit buffer full")
×
801

802
    def _send_once(self, data: bytes, channel: Optional[str] = None) -> int:
24✔
803
        try:
6✔
804
            if self.channel == "" and channel:
6✔
805
                # Message must be addressed to a specific channel
806
                sent = self.socket.sendto(data, (channel,))
6✔
807
            else:
808
                sent = self.socket.send(data)
6✔
809
        except OSError as error:
×
810
            raise can.CanOperationError(
×
811
                f"Failed to transmit: {error.strerror}", error.errno
812
            ) from error
813
        return sent
6✔
814

815
    def _send_periodic_internal(
24✔
816
        self,
817
        msgs: Union[Sequence[Message], Message],
818
        period: float,
819
        duration: Optional[float] = None,
820
        autostart: bool = True,
821
        modifier_callback: Optional[Callable[[Message], None]] = None,
822
    ) -> can.broadcastmanager.CyclicSendTaskABC:
823
        """Start sending messages at a given period on this bus.
824

825
        The Linux kernel's Broadcast Manager SocketCAN API is used to schedule
826
        periodic sending of CAN messages. The wrapping 32-bit counter (see
827
        :meth:`~_get_next_task_id()`) designated to distinguish different
828
        :class:`CyclicSendTask` within BCM provides flexibility to schedule
829
        CAN messages sending with the same CAN ID, but different CAN data.
830

831
        :param msgs:
832
            The message(s) to be sent periodically.
833
        :param period:
834
            The rate in seconds at which to send the messages.
835
        :param duration:
836
            Approximate duration in seconds to continue sending messages. If
837
            no duration is provided, the task will continue indefinitely.
838
        :param autostart:
839
            If True (the default) the sending task will immediately start after creation.
840
            Otherwise, the task has to be started by calling the
841
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
842

843
        :raises ValueError:
844
            If task identifier passed to :class:`CyclicSendTask` can't be used
845
            to schedule new task in Linux BCM.
846

847
        :return:
848
            A :class:`CyclicSendTask` task instance. This can be used to modify the data,
849
            pause/resume the transmission and to stop the transmission.
850

851
        .. note::
852

853
            Note the duration before the messages stop being sent may not
854
            be exactly the same as the duration specified by the user. In
855
            general the message will be sent at the given rate until at
856
            least *duration* seconds.
857
        """
858
        if modifier_callback is None:
6✔
859
            msgs = LimitedDurationCyclicSendTaskABC._check_and_convert_messages(  # pylint: disable=protected-access
6✔
860
                msgs
861
            )
862

863
            msgs_channel = str(msgs[0].channel) if msgs[0].channel else None
6✔
864
            bcm_socket = self._get_bcm_socket(msgs_channel or self.channel)
6✔
865
            task_id = self._get_next_task_id()
6✔
866
            task = CyclicSendTask(
6✔
867
                bcm_socket, task_id, msgs, period, duration, autostart=autostart
868
            )
869
            return task
6✔
870

871
        # fallback to thread based cyclic task
872
        warnings.warn(
×
873
            f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
874
            "when the `modifier_callback` argument is given.",
875
            stacklevel=3,
876
        )
877
        return BusABC._send_periodic_internal(
×
878
            self,
879
            msgs=msgs,
880
            period=period,
881
            duration=duration,
882
            autostart=autostart,
883
            modifier_callback=modifier_callback,
884
        )
885

886
    def _get_next_task_id(self) -> int:
24✔
887
        with self._task_id_guard:
6✔
888
            self._task_id = (self._task_id + 1) % (2**32 - 1)
6✔
889
            return self._task_id
6✔
890

891
    def _get_bcm_socket(self, channel: str) -> socket.socket:
24✔
892
        if channel not in self._bcm_sockets:
6✔
893
            self._bcm_sockets[channel] = create_bcm_socket(self.channel)
6✔
894
        return self._bcm_sockets[channel]
6✔
895

896
    def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
24✔
897
        try:
6✔
898
            self.socket.setsockopt(
6✔
899
                constants.SOL_CAN_RAW, constants.CAN_RAW_FILTER, pack_filters(filters)
900
            )
901
        except OSError as error:
×
902
            # fall back to "software filtering" (= not in kernel)
903
            self._is_filtered = False
×
904
            log.error(
×
905
                "Setting filters failed; falling back to software filtering (not in kernel): %s",
906
                error,
907
            )
908
        else:
909
            self._is_filtered = True
6✔
910

911
    def fileno(self) -> int:
24✔
912
        return self.socket.fileno()
6✔
913

914
    @staticmethod
24✔
915
    def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
24✔
916
        return [
24✔
917
            {"interface": "socketcan", "channel": channel}
918
            for channel in find_available_interfaces()
919
        ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc