• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 14351131961

09 Apr 2025 07:08AM UTC coverage: 70.939% (+0.1%) from 70.799%
14351131961

Pull #1936

github

web-flow
Merge 095c205ea into 5d6239400
Pull Request #1936: support pcapng file format

187 of 195 new or added lines in 7 files covered. (95.9%)

5 existing lines in 1 file now uncovered.

7682 of 10829 relevant lines covered (70.94%)

15.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.08
/can/interfaces/socketcan/socketcan.py
1
"""
24✔
2
The main module of the socketcan interface containing most user-facing classes and methods
3
along some internal methods.
4

5
At the end of the file the usage of the internal methods is shown.
6
"""
7

8
import ctypes
24✔
9
import ctypes.util
24✔
10
import errno
24✔
11
import logging
24✔
12
import select
24✔
13
import socket
24✔
14
import struct
24✔
15
import threading
24✔
16
import time
24✔
17
import warnings
24✔
18
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
24✔
19

20
import can
24✔
21
from can import BusABC, CanProtocol, Message
24✔
22
from can.broadcastmanager import (
24✔
23
    LimitedDurationCyclicSendTaskABC,
24
    ModifiableCyclicTaskABC,
25
    RestartableCyclicTaskABC,
26
)
27
from can.interfaces.socketcan.utils import find_available_interfaces, pack_filters
24✔
28
from can.typechecking import CanFilters
24✔
29
import can.socketcan_common as common
24✔
30

31
log = logging.getLogger(__name__)
24✔
32
log_tx = log.getChild("tx")
24✔
33
log_rx = log.getChild("rx")
24✔
34

35
try:
24✔
36
    from socket import CMSG_SPACE
24✔
37

38
    CMSG_SPACE_available = True
16✔
39
except ImportError:
8✔
40
    CMSG_SPACE_available = False
8✔
41
    log.error("socket.CMSG_SPACE not available on this platform")
8✔
42

43

44
# Constants needed for precise handling of timestamps
45
RECEIVED_TIMESTAMP_STRUCT = struct.Struct("@ll")
24✔
46
RECEIVED_ANCILLARY_BUFFER_SIZE = (
24✔
47
    CMSG_SPACE(RECEIVED_TIMESTAMP_STRUCT.size) if CMSG_SPACE_available else 0
48
)
49

50

51
# Setup BCM struct
52
def bcm_header_factory(
24✔
53
    fields: List[Tuple[str, Union[Type[ctypes.c_uint32], Type[ctypes.c_long]]]],
54
    alignment: int = 8,
55
):
56
    curr_stride = 0
24✔
57
    results: List[
24✔
58
        Tuple[
59
            str, Union[Type[ctypes.c_uint8], Type[ctypes.c_uint32], Type[ctypes.c_long]]
60
        ]
61
    ] = []
62
    pad_index = 0
24✔
63
    for field in fields:
24✔
64
        field_alignment = ctypes.alignment(field[1])
24✔
65
        field_size = ctypes.sizeof(field[1])
24✔
66

67
        # If the current stride index isn't a multiple of the alignment
68
        # requirements of this field, then we must add padding bytes until we
69
        # are aligned
70
        while curr_stride % field_alignment != 0:
24✔
71
            results.append((f"pad_{pad_index}", ctypes.c_uint8))
24✔
72
            pad_index += 1
24✔
73
            curr_stride += 1
24✔
74

75
        # Now can it fit?
76
        # Example: If this is 8 bytes and the type requires 4 bytes alignment
77
        # then we can only fit when we're starting at 0. Otherwise, we will
78
        # split across 2 strides.
79
        #
80
        # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
81
        results.append(field)
24✔
82
        curr_stride += field_size
24✔
83

84
    # Add trailing padding to align to a multiple of the largest scalar member
85
    # in the structure
86
    while curr_stride % alignment != 0:
24✔
87
        results.append((f"pad_{pad_index}", ctypes.c_uint8))
24✔
88
        pad_index += 1
24✔
89
        curr_stride += 1
24✔
90

91
    return type("BcmMsgHead", (ctypes.Structure,), {"_fields_": results})
24✔
92

93

94
# The fields definition is taken from the C struct definitions in
95
# <linux/can/bcm.h>
96
#
97
#     struct bcm_timeval {
98
#             long tv_sec;
99
#             long tv_usec;
100
#     };
101
#
102
#     /**
103
#      * struct bcm_msg_head - head of messages to/from the broadcast manager
104
#      * @opcode:    opcode, see enum below.
105
#      * @flags:     special flags, see below.
106
#      * @count:     number of frames to send before changing interval.
107
#      * @ival1:     interval for the first @count frames.
108
#      * @ival2:     interval for the following frames.
109
#      * @can_id:    CAN ID of frames to be sent or received.
110
#      * @nframes:   number of frames appended to the message head.
111
#      * @frames:    array of CAN frames.
112
#      */
113
#     struct bcm_msg_head {
114
#             __u32 opcode;
115
#             __u32 flags;
116
#             __u32 count;
117
#             struct bcm_timeval ival1, ival2;
118
#             canid_t can_id;
119
#             __u32 nframes;
120
#             struct can_frame frames[0];
121
#     };
122
BcmMsgHead = bcm_header_factory(
24✔
123
    fields=[
124
        ("opcode", ctypes.c_uint32),
125
        ("flags", ctypes.c_uint32),
126
        ("count", ctypes.c_uint32),
127
        ("ival1_tv_sec", ctypes.c_long),
128
        ("ival1_tv_usec", ctypes.c_long),
129
        ("ival2_tv_sec", ctypes.c_long),
130
        ("ival2_tv_usec", ctypes.c_long),
131
        ("can_id", ctypes.c_uint32),
132
        ("nframes", ctypes.c_uint32),
133
    ]
134
)
135

136

137
def build_bcm_header(
24✔
138
    opcode: int,
139
    flags: int,
140
    count: int,
141
    ival1_seconds: int,
142
    ival1_usec: int,
143
    ival2_seconds: int,
144
    ival2_usec: int,
145
    can_id: int,
146
    nframes: int,
147
) -> bytes:
148
    result = BcmMsgHead(
24✔
149
        opcode=opcode,
150
        flags=flags,
151
        count=count,
152
        ival1_tv_sec=ival1_seconds,
153
        ival1_tv_usec=ival1_usec,
154
        ival2_tv_sec=ival2_seconds,
155
        ival2_tv_usec=ival2_usec,
156
        can_id=can_id,
157
        nframes=nframes,
158
    )
159
    return ctypes.string_at(ctypes.addressof(result), ctypes.sizeof(result))
24✔
160

161

162
def build_bcm_tx_delete_header(can_id: int, flags: int) -> bytes:
24✔
163
    opcode = common.CAN_BCM_TX_DELETE
24✔
164
    return build_bcm_header(opcode, flags, 0, 0, 0, 0, 0, can_id, 1)
24✔
165

166

167
def build_bcm_transmit_header(
24✔
168
    can_id: int,
169
    count: int,
170
    initial_period: float,
171
    subsequent_period: float,
172
    msg_flags: int,
173
    nframes: int = 1,
174
) -> bytes:
175
    opcode = common.CAN_BCM_TX_SETUP
24✔
176

177
    flags = msg_flags | common.SETTIMER | common.STARTTIMER
24✔
178

179
    if initial_period > 0:
24✔
180
        # Note `TX_COUNTEVT` creates the message TX_EXPIRED when count expires
181
        flags |= common.TX_COUNTEVT
24✔
182

183
    def split_time(value: float) -> Tuple[int, int]:
24✔
184
        """Given seconds as a float, return whole seconds and microseconds"""
185
        seconds = int(value)
24✔
186
        microseconds = int(1e6 * (value - seconds))
24✔
187
        return seconds, microseconds
24✔
188

189
    ival1_seconds, ival1_usec = split_time(initial_period)
24✔
190
    ival2_seconds, ival2_usec = split_time(subsequent_period)
24✔
191

192
    return build_bcm_header(
24✔
193
        opcode,
194
        flags,
195
        count,
196
        ival1_seconds,
197
        ival1_usec,
198
        ival2_seconds,
199
        ival2_usec,
200
        can_id,
201
        nframes,
202
    )
203

204

205
def build_bcm_update_header(can_id: int, msg_flags: int, nframes: int = 1) -> bytes:
24✔
206
    return build_bcm_header(
24✔
207
        common.CAN_BCM_TX_SETUP, msg_flags, 0, 0, 0, 0, 0, can_id, nframes
208
    )
209

210

211

212
def create_bcm_socket(channel: str) -> socket.socket:
24✔
213
    """create a broadcast manager socket and connect to the given interface"""
214
    s = socket.socket(common.PF_CAN, socket.SOCK_DGRAM, common.CAN_BCM)
6✔
215
    s.connect((channel,))
6✔
216
    return s
6✔
217

218

219
def send_bcm(bcm_socket: socket.socket, data: bytes) -> int:
24✔
220
    """
221
    Send raw frame to a BCM socket and handle errors.
222
    """
223
    try:
6✔
224
        return bcm_socket.send(data)
6✔
225
    except OSError as error:
×
226
        base = f"Couldn't send CAN BCM frame due to OS Error: {error.strerror}"
×
227

228
        if error.errno == errno.EINVAL:
×
229
            specific_message = " You are probably referring to a non-existing frame."
×
230
        elif error.errno == errno.ENETDOWN:
×
231
            specific_message = " The CAN interface appears to be down."
×
232
        elif error.errno == errno.EBADF:
×
233
            specific_message = " The CAN socket appears to be closed."
×
234
        else:
235
            specific_message = ""
×
236

237
        raise can.CanOperationError(base + specific_message, error.errno) from error
×
238

239

240
class CyclicSendTask(
24✔
241
    LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
242
):
243
    """
24✔
244
    A SocketCAN cyclic send task supports:
245

246
        - setting of a task duration
247
        - modifying the data
248
        - stopping then subsequent restarting of the task
249
    """
250

251
    def __init__(
24✔
252
        self,
253
        bcm_socket: socket.socket,
254
        task_id: int,
255
        messages: Union[Sequence[Message], Message],
256
        period: float,
257
        duration: Optional[float] = None,
258
        autostart: bool = True,
259
    ) -> None:
260
        """Construct and :meth:`~start` a task.
261

262
        :param bcm_socket: An open BCM socket on the desired CAN channel.
263
        :param task_id:
264
            The identifier used to uniquely reference particular cyclic send task
265
            within Linux BCM.
266
        :param messages:
267
            The messages to be sent periodically.
268
        :param period:
269
            The rate in seconds at which to send the messages.
270
        :param duration:
271
            Approximate duration in seconds to send the messages for.
272
        """
273
        # The following are assigned by LimitedDurationCyclicSendTaskABC:
274
        #   - self.messages
275
        #   - self.period
276
        #   - self.duration
277
        super().__init__(messages, period, duration)
6✔
278

279
        self.bcm_socket = bcm_socket
6✔
280
        self.task_id = task_id
6✔
281
        if autostart:
6✔
282
            self._tx_setup(self.messages)
6✔
283

284
    def _tx_setup(
24✔
285
        self,
286
        messages: Sequence[Message],
287
        raise_if_task_exists: bool = True,
288
    ) -> None:
289
        # Create a low level packed frame to pass to the kernel
290
        body = bytearray()
6✔
291
        self.flags = common.CAN_FD_FRAME if messages[0].is_fd else 0
6✔
292

293
        if self.duration:
6✔
294
            count = int(self.duration / self.period)
×
295
            ival1 = self.period
×
296
            ival2 = 0.0
×
297
        else:
298
            count = 0
6✔
299
            ival1 = 0.0
6✔
300
            ival2 = self.period
6✔
301

302
        if raise_if_task_exists:
6✔
303
            self._check_bcm_task()
6✔
304

305
        header = build_bcm_transmit_header(
6✔
306
            self.task_id, count, ival1, ival2, self.flags, nframes=len(messages)
307
        )
308
        for message in messages:
6✔
309
            body += common.build_can_frame(message)
6✔
310
        log.debug("Sending BCM command")
6✔
311
        send_bcm(self.bcm_socket, header + body)
6✔
312

313
    def _check_bcm_task(self) -> None:
24✔
314
        # Do a TX_READ on a task ID, and check if we get EINVAL. If so,
315
        # then we are referring to a CAN message with an existing ID
316
        check_header = build_bcm_header(
6✔
317
            opcode=common.CAN_BCM_TX_READ,
318
            flags=0,
319
            count=0,
320
            ival1_seconds=0,
321
            ival1_usec=0,
322
            ival2_seconds=0,
323
            ival2_usec=0,
324
            can_id=self.task_id,
325
            nframes=0,
326
        )
327
        log.debug(
6✔
328
            "Reading properties of (cyclic) transmission task id=%d", self.task_id
329
        )
330
        try:
6✔
331
            self.bcm_socket.send(check_header)
6✔
332
        except OSError as error:
6✔
333
            if error.errno != errno.EINVAL:
6✔
334
                raise can.CanOperationError("failed to check", error.errno) from error
×
335
            else:
336
                log.debug("Invalid argument - transmission task not known to kernel")
6✔
337
        else:
338
            # No exception raised - transmission task with this ID exists in kernel.
339
            # Existence of an existing transmission task might not be a problem!
340
            raise can.CanOperationError(
×
341
                f"A periodic task for task ID {self.task_id} is already in progress "
342
                "by the SocketCAN Linux layer"
343
            )
344

345
    def stop(self) -> None:
24✔
346
        """Stop a task by sending TX_DELETE message to Linux kernel.
347

348
        This will delete the entry for the transmission of the CAN-message
349
        with the specified ``task_id`` identifier. The message length
350
        for the command TX_DELETE is {[bcm_msg_head]} (only the header).
351
        """
352
        log.debug("Stopping periodic task")
6✔
353

354
        stopframe = build_bcm_tx_delete_header(self.task_id, self.flags)
6✔
355
        send_bcm(self.bcm_socket, stopframe)
6✔
356

357
    def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
24✔
358
        """Update the contents of the periodically sent CAN messages by
359
        sending TX_SETUP message to Linux kernel.
360

361
        The number of new cyclic messages to be sent must be equal to the
362
        original number of messages originally specified for this task.
363

364
        .. note:: The messages must all have the same
365
                  :attr:`~can.Message.arbitration_id` like the first message.
366

367
        :param messages:
368
            The messages with the new :attr:`can.Message.data`.
369
        """
370
        messages = self._check_and_convert_messages(messages)
6✔
371
        self._check_modified_messages(messages)
6✔
372

373
        self.messages = messages
6✔
374

375
        body = bytearray()
6✔
376
        header = build_bcm_update_header(
6✔
377
            can_id=self.task_id, msg_flags=self.flags, nframes=len(messages)
378
        )
379
        for message in messages:
6✔
380
            body += common.build_can_frame(message)
6✔
381
        log.debug("Sending BCM command")
6✔
382
        send_bcm(self.bcm_socket, header + body)
6✔
383

384
    def start(self) -> None:
24✔
385
        """Restart a periodic task by sending TX_SETUP message to Linux kernel.
386

387
        It verifies presence of the particular BCM task through sending TX_READ
388
        message to Linux kernel prior to scheduling.
389

390
        :raises ValueError:
391
            If the task referenced by ``task_id`` is already running.
392
        """
393
        self._tx_setup(self.messages, raise_if_task_exists=False)
6✔
394

395

396
class MultiRateCyclicSendTask(CyclicSendTask):
24✔
397
    """Exposes more of the full power of the TX_SETUP opcode."""
24✔
398

399
    def __init__(
24✔
400
        self,
401
        channel: socket.socket,
402
        task_id: int,
403
        messages: Sequence[Message],
404
        count: int,
405
        initial_period: float,
406
        subsequent_period: float,
407
    ):
408
        super().__init__(channel, task_id, messages, subsequent_period)
×
409

410
        # Create a low level packed frame to pass to the kernel
411
        header = build_bcm_transmit_header(
×
412
            self.task_id,
413
            count,
414
            initial_period,
415
            subsequent_period,
416
            self.flags,
417
            nframes=len(messages),
418
        )
419

420
        body = bytearray()
×
421
        for message in messages:
×
NEW
422
            body += common.build_can_frame(message)
×
423

424
        log.info("Sending BCM TX_SETUP command")
×
425
        send_bcm(self.bcm_socket, header + body)
×
426

427

428
def create_socket() -> socket.socket:
24✔
429
    """Creates a raw CAN socket. The socket will
430
    be returned unbound to any interface.
431
    """
432
    sock = socket.socket(common.PF_CAN, socket.SOCK_RAW, common.CAN_RAW)
8✔
433

434
    log.info("Created a socket")
8✔
435

436
    return sock
8✔
437

438

439
def bind_socket(sock: socket.socket, channel: str = "can0") -> None:
24✔
440
    """
441
    Binds the given socket to the given interface.
442

443
    :param sock:
444
        The socket to be bound
445
    :param channel:
446
        The channel / interface to bind to
447
    :raises OSError:
448
        If the specified interface isn't found.
449
    """
450
    log.debug("Binding socket to channel=%s", channel)
8✔
451
    sock.bind((channel,))
8✔
452
    log.debug("Bound socket.")
6✔
453

454

455
def capture_message(
24✔
456
    sock: socket.socket, get_channel: bool = False
457
) -> Optional[Message]:
458
    """
459
    Captures a message from given socket.
460

461
    :param sock:
462
        The socket to read a message from.
463
    :param get_channel:
464
        Find out which channel the message comes from.
465

466
    :return: The received message, or None on failure.
467
    """
468
    # Fetching the Arb ID, DLC and Data
469
    try:
6✔
470
        cf, ancillary_data, msg_flags, addr = sock.recvmsg(
6✔
471
            common.CANFD_MTU, RECEIVED_ANCILLARY_BUFFER_SIZE
472
        )
473
        if get_channel:
6✔
474
            channel = addr[0] if isinstance(addr, tuple) else addr
6✔
475
        else:
476
            channel = None
6✔
477
    except OSError as error:
×
478
        raise can.CanOperationError(
×
479
            f"Error receiving: {error.strerror}", error.errno
480
        ) from error
481

482
    # Fetching the timestamp
483
    assert len(ancillary_data) == 1, "only requested a single extra field"
6✔
484
    cmsg_level, cmsg_type, cmsg_data = ancillary_data[0]
6✔
485
    assert (
6✔
486
        cmsg_level == socket.SOL_SOCKET and cmsg_type == common.SO_TIMESTAMPNS
487
    ), "received control message type that was not requested"
488
    # see https://man7.org/linux/man-pages/man3/timespec.3.html -> struct timespec for details
489
    seconds, nanoseconds = RECEIVED_TIMESTAMP_STRUCT.unpack_from(cmsg_data)
6✔
490
    if nanoseconds >= 1e9:
6✔
491
        raise can.CanOperationError(
×
492
            f"Timestamp nanoseconds field was out of range: {nanoseconds} not less than 1e9"
493
        )
494
    timestamp = seconds + nanoseconds * 1e-9
6✔
495

496
    # Section 4.7.1: MSG_DONTROUTE: set when the received frame was created on the local host.
497
    is_rx = not bool(msg_flags & socket.MSG_DONTROUTE)
6✔
498

499
    msg = common.parse_can_frame(cf)
6✔
500
    msg.timestamp = timestamp
6✔
501
    msg.channel = channel
6✔
502
    msg.is_rx = is_rx
6✔
503

504
    return msg
6✔
505

506

507
class SocketcanBus(BusABC):  # pylint: disable=abstract-method
24✔
508
    """A SocketCAN interface to CAN.
24✔
509

510
    It implements :meth:`can.BusABC._detect_available_configs` to search for
511
    available interfaces.
512
    """
513

514
    def __init__(
24✔
515
        self,
516
        channel: str = "",
517
        receive_own_messages: bool = False,
518
        local_loopback: bool = True,
519
        fd: bool = False,
520
        can_filters: Optional[CanFilters] = None,
521
        ignore_rx_error_frames=False,
522
        **kwargs,
523
    ) -> None:
524
        """Creates a new socketcan bus.
525

526
        If setting some socket options fails, an error will be printed
527
        but no exception will be thrown. This includes enabling:
528

529
            - that own messages should be received,
530
            - CAN-FD frames and
531
            - error frames.
532

533
        :param channel:
534
            The can interface name with which to create this bus.
535
            An example channel would be 'vcan0' or 'can0'.
536
            An empty string '' will receive messages from all channels.
537
            In that case any sent messages must be explicitly addressed to a
538
            channel using :attr:`can.Message.channel`.
539
        :param receive_own_messages:
540
            If transmitted messages should also be received by this bus.
541
        :param local_loopback:
542
            If local loopback should be enabled on this bus.
543
            Please note that local loopback does not mean that messages sent
544
            on a socket will be readable on the same socket, they will only
545
            be readable on other open sockets on the same machine. More info
546
            can be read on the socketcan documentation:
547
            See https://www.kernel.org/doc/html/latest/networking/can.html#socketcan-local-loopback1
548
        :param fd:
549
            If CAN-FD frames should be supported.
550
        :param can_filters:
551
            See :meth:`can.BusABC.set_filters`.
552
        :param ignore_rx_error_frames:
553
            If incoming error frames should be discarded.
554
        """
555
        self.socket = create_socket()
8✔
556
        self.channel = channel
8✔
557
        self.channel_info = f"socketcan channel '{channel}'"
8✔
558
        self._bcm_sockets: Dict[str, socket.socket] = {}
8✔
559
        self._is_filtered = False
8✔
560
        self._task_id = 0
8✔
561
        self._task_id_guard = threading.Lock()
8✔
562
        self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20
8✔
563

564
        # set the local_loopback parameter
565
        try:
8✔
566
            self.socket.setsockopt(
8✔
567
                common.SOL_CAN_RAW,
568
                common.CAN_RAW_LOOPBACK,
569
                1 if local_loopback else 0,
570
            )
571
        except OSError as error:
×
572
            log.error("Could not set local loopback flag(%s)", error)
×
573

574
        # set the receive_own_messages parameter
575
        try:
8✔
576
            self.socket.setsockopt(
8✔
577
                common.SOL_CAN_RAW,
578
                common.CAN_RAW_RECV_OWN_MSGS,
579
                1 if receive_own_messages else 0,
580
            )
581
        except OSError as error:
×
582
            log.error("Could not receive own messages (%s)", error)
×
583

584
        # enable CAN-FD frames if desired
585
        if fd:
8✔
586
            try:
6✔
587
                self.socket.setsockopt(
6✔
588
                    common.SOL_CAN_RAW, common.CAN_RAW_FD_FRAMES, 1
589
                )
590
            except OSError as error:
×
591
                log.error("Could not enable CAN-FD frames (%s)", error)
×
592

593
        if not ignore_rx_error_frames:
8✔
594
            # enable error frames
595
            try:
8✔
596
                self.socket.setsockopt(
8✔
597
                    common.SOL_CAN_RAW, common.CAN_RAW_ERR_FILTER, 0x1FFFFFFF
598
                )
599
            except OSError as error:
×
600
                log.error("Could not enable error frames (%s)", error)
×
601

602
        # enable nanosecond resolution timestamping
603
        # we can always do this since
604
        #  1) it is guaranteed to be at least as precise as without
605
        #  2) it is available since Linux 2.6.22, and CAN support was only added afterward
606
        #     so this is always supported by the kernel
607
        self.socket.setsockopt(socket.SOL_SOCKET, common.SO_TIMESTAMPNS, 1)
8✔
608

609
        try:
8✔
610
            bind_socket(self.socket, channel)
8✔
611
            kwargs.update(
6✔
612
                {
613
                    "receive_own_messages": receive_own_messages,
614
                    "fd": fd,
615
                    "local_loopback": local_loopback,
616
                }
617
            )
618
        except OSError as error:
2✔
619
            log.error("Could not access SocketCAN device %s (%s)", channel, error)
2✔
620
            raise
2✔
621
        super().__init__(
6✔
622
            channel=channel,
623
            can_filters=can_filters,
624
            **kwargs,
625
        )
626

627
    def shutdown(self) -> None:
24✔
628
        """Stops all active periodic tasks and closes the socket."""
629
        super().shutdown()
24✔
630
        for channel, bcm_socket in self._bcm_sockets.items():
24✔
631
            log.debug("Closing bcm socket for channel %s", channel)
6✔
632
            bcm_socket.close()
6✔
633
        log.debug("Closing raw can socket")
6✔
634
        self.socket.close()
6✔
635

636
    def _recv_internal(
24✔
637
        self, timeout: Optional[float]
638
    ) -> Tuple[Optional[Message], bool]:
639
        try:
6✔
640
            # get all sockets that are ready (can be a list with a single value
641
            # being self.socket or an empty list if self.socket is not ready)
642
            ready_receive_sockets, _, _ = select.select([self.socket], [], [], timeout)
6✔
643
        except OSError as error:
×
644
            # something bad happened (e.g. the interface went down)
645
            raise can.CanOperationError(
×
646
                f"Failed to receive: {error.strerror}", error.errno
647
            ) from error
648

649
        if ready_receive_sockets:  # not empty
6✔
650
            get_channel = self.channel == ""
6✔
651
            msg = capture_message(self.socket, get_channel)
6✔
652
            if msg and not msg.channel and self.channel:
6✔
653
                # Default to our own channel
654
                msg.channel = self.channel
6✔
655
            return msg, self._is_filtered
6✔
656

657
        # socket wasn't readable or timeout occurred
658
        return None, self._is_filtered
6✔
659

660
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
24✔
661
        """Transmit a message to the CAN bus.
662

663
        :param msg: A message object.
664
        :param timeout:
665
            Wait up to this many seconds for the transmit queue to be ready.
666
            If not given, the call may fail immediately.
667

668
        :raises ~can.exceptions.CanError:
669
            if the message could not be written.
670
        """
671
        log.debug("We've been asked to write a message to the bus")
6✔
672
        logger_tx = log.getChild("tx")
6✔
673
        logger_tx.debug("sending: %s", msg)
6✔
674

675
        started = time.time()
6✔
676
        # If no timeout is given, poll for availability
677
        if timeout is None:
6✔
678
            timeout = 0
6✔
679
        time_left = timeout
6✔
680
        data = common.build_can_frame(msg)
6✔
681

682
        while time_left >= 0:
6✔
683
            # Wait for write availability
684
            ready = select.select([], [self.socket], [], time_left)[1]
6✔
685
            if not ready:
6✔
686
                # Timeout
687
                break
×
688
            channel = str(msg.channel) if msg.channel else None
6✔
689
            sent = self._send_once(data, channel)
6✔
690
            if sent == len(data):
6✔
691
                return
6✔
692
            # Not all data were sent, try again with remaining data
693
            data = data[sent:]
×
694
            time_left = timeout - (time.time() - started)
×
695

696
        raise can.CanOperationError("Transmit buffer full")
×
697

698
    def _send_once(self, data: bytes, channel: Optional[str] = None) -> int:
24✔
699
        try:
6✔
700
            if self.channel == "" and channel:
6✔
701
                # Message must be addressed to a specific channel
702
                sent = self.socket.sendto(data, (channel,))
6✔
703
            else:
704
                sent = self.socket.send(data)
6✔
705
        except OSError as error:
×
706
            raise can.CanOperationError(
×
707
                f"Failed to transmit: {error.strerror}", error.errno
708
            ) from error
709
        return sent
6✔
710

711
    def _send_periodic_internal(
24✔
712
        self,
713
        msgs: Union[Sequence[Message], Message],
714
        period: float,
715
        duration: Optional[float] = None,
716
        autostart: bool = True,
717
        modifier_callback: Optional[Callable[[Message], None]] = None,
718
    ) -> can.broadcastmanager.CyclicSendTaskABC:
719
        """Start sending messages at a given period on this bus.
720

721
        The Linux kernel's Broadcast Manager SocketCAN API is used to schedule
722
        periodic sending of CAN messages. The wrapping 32-bit counter (see
723
        :meth:`~_get_next_task_id()`) designated to distinguish different
724
        :class:`CyclicSendTask` within BCM provides flexibility to schedule
725
        CAN messages sending with the same CAN ID, but different CAN data.
726

727
        :param msgs:
728
            The message(s) to be sent periodically.
729
        :param period:
730
            The rate in seconds at which to send the messages.
731
        :param duration:
732
            Approximate duration in seconds to continue sending messages. If
733
            no duration is provided, the task will continue indefinitely.
734
        :param autostart:
735
            If True (the default) the sending task will immediately start after creation.
736
            Otherwise, the task has to be started by calling the
737
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
738

739
        :raises ValueError:
740
            If task identifier passed to :class:`CyclicSendTask` can't be used
741
            to schedule new task in Linux BCM.
742

743
        :return:
744
            A :class:`CyclicSendTask` task instance. This can be used to modify the data,
745
            pause/resume the transmission and to stop the transmission.
746

747
        .. note::
748

749
            Note the duration before the messages stop being sent may not
750
            be exactly the same as the duration specified by the user. In
751
            general the message will be sent at the given rate until at
752
            least *duration* seconds.
753
        """
754
        if modifier_callback is None:
6✔
755
            msgs = LimitedDurationCyclicSendTaskABC._check_and_convert_messages(  # pylint: disable=protected-access
6✔
756
                msgs
757
            )
758

759
            msgs_channel = str(msgs[0].channel) if msgs[0].channel else None
6✔
760
            bcm_socket = self._get_bcm_socket(msgs_channel or self.channel)
6✔
761
            task_id = self._get_next_task_id()
6✔
762
            task = CyclicSendTask(
6✔
763
                bcm_socket, task_id, msgs, period, duration, autostart=autostart
764
            )
765
            return task
6✔
766

767
        # fallback to thread based cyclic task
768
        warnings.warn(
×
769
            f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
770
            "when the `modifier_callback` argument is given.",
771
            stacklevel=3,
772
        )
773
        return BusABC._send_periodic_internal(
×
774
            self,
775
            msgs=msgs,
776
            period=period,
777
            duration=duration,
778
            autostart=autostart,
779
            modifier_callback=modifier_callback,
780
        )
781

782
    def _get_next_task_id(self) -> int:
24✔
783
        with self._task_id_guard:
6✔
784
            self._task_id = (self._task_id + 1) % (2**32 - 1)
6✔
785
            return self._task_id
6✔
786

787
    def _get_bcm_socket(self, channel: str) -> socket.socket:
24✔
788
        if channel not in self._bcm_sockets:
6✔
789
            self._bcm_sockets[channel] = create_bcm_socket(self.channel)
6✔
790
        return self._bcm_sockets[channel]
6✔
791

792
    def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
24✔
793
        try:
6✔
794
            self.socket.setsockopt(
6✔
795
                common.SOL_CAN_RAW, common.CAN_RAW_FILTER, pack_filters(filters)
796
            )
797
        except OSError as error:
×
798
            # fall back to "software filtering" (= not in kernel)
799
            self._is_filtered = False
×
800
            log.error(
×
801
                "Setting filters failed; falling back to software filtering (not in kernel): %s",
802
                error,
803
            )
804
        else:
805
            self._is_filtered = True
6✔
806

807
    def fileno(self) -> int:
24✔
808
        return self.socket.fileno()
6✔
809

810
    @staticmethod
24✔
811
    def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
24✔
812
        return [
24✔
813
            {"interface": "socketcan", "channel": channel}
814
            for channel in find_available_interfaces()
815
        ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc