• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.08
/can/bus.py
1
"""
21✔
2
Contains the ABC bus implementation and its documentation.
3
"""
4

5
import contextlib
21✔
6
import logging
21✔
7
import threading
21✔
8
from abc import ABC, ABCMeta, abstractmethod
21✔
9
from collections.abc import Iterator, Sequence
21✔
10
from enum import Enum, auto
21✔
11
from time import time
21✔
12
from types import TracebackType
21✔
13
from typing import (
21✔
14
    Any,
15
    Callable,
16
    Optional,
17
    Union,
18
    cast,
19
)
20

21
from typing_extensions import Self
21✔
22

23
import can
21✔
24
import can.typechecking
21✔
25
from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask
21✔
26
from can.message import Message
21✔
27

28
LOG = logging.getLogger(__name__)
21✔
29

30

31
class BusState(Enum):
21✔
32
    """The state in which a :class:`can.BusABC` can be."""
21✔
33

34
    ACTIVE = auto()
21✔
35
    PASSIVE = auto()
21✔
36
    ERROR = auto()
21✔
37

38

39
class CanProtocol(Enum):
21✔
40
    """The CAN protocol type supported by a :class:`can.BusABC` instance"""
21✔
41

42
    CAN_20 = auto()
21✔
43
    CAN_FD = auto()  # ISO Mode
21✔
44
    CAN_FD_NON_ISO = auto()
21✔
45
    CAN_XL = auto()
21✔
46

47

48
class BusABC(metaclass=ABCMeta):
21✔
49
    """The CAN Bus Abstract Base Class that serves as the basis
21✔
50
    for all concrete interfaces.
51

52
    This class may be used as an iterator over the received messages
53
    and as a context manager for auto-closing the bus when done using it.
54

55
    Please refer to :ref:`errors` for possible exceptions that may be
56
    thrown by certain operations on this bus.
57
    """
58

59
    #: a string describing the underlying bus and/or channel
60
    channel_info = "unknown"
21✔
61

62
    #: Log level for received messages
63
    RECV_LOGGING_LEVEL = 9
21✔
64

65
    #: Assume that no cleanup is needed until something was initialized
66
    _is_shutdown: bool = True
21✔
67
    _can_protocol: CanProtocol = CanProtocol.CAN_20
21✔
68

69
    @abstractmethod
21✔
70
    def __init__(
21✔
71
        self,
72
        channel: Any,
73
        can_filters: Optional[can.typechecking.CanFilters] = None,
74
        **kwargs: object,
75
    ):
76
        """Construct and open a CAN bus instance of the specified type.
77

78
        Subclasses should call though this method with all given parameters
79
        as it handles generic tasks like applying filters.
80

81
        :param channel:
82
            The can interface identifier. Expected type is backend dependent.
83

84
        :param can_filters:
85
            See :meth:`~can.BusABC.set_filters` for details.
86

87
        :param dict kwargs:
88
            Any backend dependent configurations are passed in this dictionary
89

90
        :raises ValueError: If parameters are out of range
91
        :raises ~can.exceptions.CanInterfaceNotImplementedError:
92
            If the driver cannot be accessed
93
        :raises ~can.exceptions.CanInitializationError:
94
            If the bus cannot be initialized
95
        """
96
        self._periodic_tasks: list[_SelfRemovingCyclicTask] = []
21✔
97
        self.set_filters(can_filters)
21✔
98
        # Flip the class default value when the constructor finishes.  That
99
        # usually means the derived class constructor was also successful,
100
        # since it calls this parent constructor last.
101
        self._is_shutdown: bool = False
21✔
102

103
    def __str__(self) -> str:
21✔
UNCOV
104
        return self.channel_info
×
105

106
    def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
21✔
107
        """Block waiting for a message from the Bus.
108

109
        :param timeout:
110
            seconds to wait for a message or None to wait indefinitely
111

112
        :return:
113
            :obj:`None` on timeout or a :class:`~can.Message` object.
114

115
        :raises ~can.exceptions.CanOperationError:
116
            If an error occurred while reading
117
        """
118
        start = time()
21✔
119
        time_left = timeout
21✔
120

121
        while True:
15✔
122
            # try to get a message
123
            msg, already_filtered = self._recv_internal(timeout=time_left)
21✔
124

125
            # return it, if it matches
126
            if msg and (already_filtered or self._matches_filters(msg)):
21✔
127
                LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
21✔
128
                return msg
21✔
129

130
            # if not, and timeout is None, try indefinitely
131
            elif timeout is None:
21✔
132
                continue
21✔
133

134
            # try next one only if there still is time, and with
135
            # reduced timeout
136
            else:
137
                time_left = timeout - (time() - start)
21✔
138

139
                if time_left > 0:
21✔
140
                    continue
21✔
141

142
                return None
21✔
143

144
    def _recv_internal(
21✔
145
        self, timeout: Optional[float]
146
    ) -> tuple[Optional[Message], bool]:
147
        """
148
        Read a message from the bus and tell whether it was filtered.
149
        This methods may be called by :meth:`~can.BusABC.recv`
150
        to read a message multiple times if the filters set by
151
        :meth:`~can.BusABC.set_filters` do not match and the call has
152
        not yet timed out.
153

154
        New implementations should always override this method instead of
155
        :meth:`~can.BusABC.recv`, to be able to take advantage of the
156
        software based filtering provided by :meth:`~can.BusABC.recv`
157
        as a fallback. This method should never be called directly.
158

159
        .. note::
160

161
            This method is not an `@abstractmethod` (for now) to allow older
162
            external implementations to continue using their existing
163
            :meth:`~can.BusABC.recv` implementation.
164

165
        .. note::
166

167
            The second return value (whether filtering was already done) may
168
            change over time for some interfaces, like for example in the
169
            Kvaser interface. Thus it cannot be simplified to a constant value.
170

171
        :param float timeout: seconds to wait for a message,
172
                              see :meth:`~can.BusABC.send`
173

174
        :return:
175
            1.  a message that was read or None on timeout
176
            2.  a bool that is True if message filtering has already
177
                been done and else False
178

179
        :raises ~can.exceptions.CanOperationError:
180
            If an error occurred while reading
181
        :raises NotImplementedError:
182
            if the bus provides it's own :meth:`~can.BusABC.recv`
183
            implementation (legacy implementation)
184

185
        """
186
        raise NotImplementedError("Trying to read from a write only bus?")
187

188
    @abstractmethod
21✔
189
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
21✔
190
        """Transmit a message to the CAN bus.
191

192
        Override this method to enable the transmit path.
193

194
        :param Message msg: A message object.
195

196
        :param timeout:
197
            If > 0, wait up to this many seconds for message to be ACK'ed or
198
            for transmit queue to be ready depending on driver implementation.
199
            If timeout is exceeded, an exception will be raised.
200
            Might not be supported by all interfaces.
201
            None blocks indefinitely.
202

203
        :raises ~can.exceptions.CanOperationError:
204
            If an error occurred while sending
205
        """
206
        raise NotImplementedError("Trying to write to a readonly bus?")
207

208
    def send_periodic(
21✔
209
        self,
210
        msgs: Union[Message, Sequence[Message]],
211
        period: float,
212
        duration: Optional[float] = None,
213
        store_task: bool = True,
214
        autostart: bool = True,
215
        modifier_callback: Optional[Callable[[Message], None]] = None,
216
    ) -> can.broadcastmanager.CyclicSendTaskABC:
217
        """Start sending messages at a given period on this bus.
218

219
        The task will be active until one of the following conditions are met:
220

221
        - the (optional) duration expires
222
        - the Bus instance goes out of scope
223
        - the Bus instance is shutdown
224
        - :meth:`stop_all_periodic_tasks` is called
225
        - the task's :meth:`~can.broadcastmanager.CyclicTask.stop` method is called.
226

227
        :param msgs:
228
            Message(s) to transmit
229
        :param period:
230
            Period in seconds between each message
231
        :param duration:
232
            Approximate duration in seconds to continue sending messages. If
233
            no duration is provided, the task will continue indefinitely.
234
        :param store_task:
235
            If True (the default) the task will be attached to this Bus instance.
236
            Disable to instead manage tasks manually.
237
        :param autostart:
238
            If True (the default) the sending task will immediately start after creation.
239
            Otherwise, the task has to be started by calling the
240
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
241
        :param modifier_callback:
242
            Function which should be used to modify each message's data before
243
            sending. The callback modifies the :attr:`~can.Message.data` of the
244
            message and returns ``None``.
245
        :return:
246
            A started task instance. Note the task can be stopped (and depending on
247
            the backend modified) by calling the task's
248
            :meth:`~can.broadcastmanager.CyclicTask.stop` method.
249

250
        .. note::
251

252
            Note the duration before the messages stop being sent may not
253
            be exactly the same as the duration specified by the user. In
254
            general the message will be sent at the given rate until at
255
            least **duration** seconds.
256

257
        .. note::
258

259
            For extremely long-running Bus instances with many short-lived
260
            tasks the default api with ``store_task==True`` may not be
261
            appropriate as the stopped tasks are still taking up memory as they
262
            are associated with the Bus instance.
263
        """
264
        if isinstance(msgs, Message):
21✔
265
            msgs = [msgs]
21✔
266
        elif isinstance(msgs, Sequence):
5✔
267
            # A Sequence does not necessarily provide __bool__ we need to use len()
268
            if len(msgs) == 0:
5✔
269
                raise ValueError("Must be a sequence at least of length 1")
5✔
270
        else:
271
            raise ValueError("Must be either a message or a sequence of messages")
5✔
272

273
        # Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
274
        task = cast(
21✔
275
            "_SelfRemovingCyclicTask",
276
            self._send_periodic_internal(
277
                msgs, period, duration, autostart, modifier_callback
278
            ),
279
        )
280
        # we wrap the task's stop method to also remove it from the Bus's list of tasks
281
        periodic_tasks = self._periodic_tasks
21✔
282
        original_stop_method = task.stop
21✔
283

284
        def wrapped_stop_method(remove_task: bool = True) -> None:
21✔
285
            nonlocal task, periodic_tasks, original_stop_method
286
            if remove_task:
21✔
287
                try:
21✔
288
                    periodic_tasks.remove(task)
21✔
289
                except ValueError:
21✔
290
                    pass  # allow the task to be already removed
21✔
291
            original_stop_method()
21✔
292

293
        task.stop = wrapped_stop_method  # type: ignore[method-assign]
21✔
294

295
        if store_task:
21✔
296
            self._periodic_tasks.append(task)
21✔
297

298
        return task
21✔
299

300
    def _send_periodic_internal(
21✔
301
        self,
302
        msgs: Union[Sequence[Message], Message],
303
        period: float,
304
        duration: Optional[float] = None,
305
        autostart: bool = True,
306
        modifier_callback: Optional[Callable[[Message], None]] = None,
307
    ) -> can.broadcastmanager.CyclicSendTaskABC:
308
        """Default implementation of periodic message sending using threading.
309

310
        Override this method to enable a more efficient backend specific approach.
311

312
        :param msgs:
313
            Messages to transmit
314
        :param period:
315
            Period in seconds between each message
316
        :param duration:
317
            The duration between sending each message at the given rate. If
318
            no duration is provided, the task will continue indefinitely.
319
        :param autostart:
320
            If True (the default) the sending task will immediately start after creation.
321
            Otherwise, the task has to be started by calling the
322
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
323
        :return:
324
            A started task instance. Note the task can be stopped (and
325
            depending on the backend modified) by calling the
326
            :meth:`~can.broadcastmanager.CyclicTask.stop` method.
327
        """
328
        if not hasattr(self, "_lock_send_periodic"):
21✔
329
            # Create a send lock for this bus, but not for buses which override this method
330
            self._lock_send_periodic = (  # pylint: disable=attribute-defined-outside-init
21✔
331
                threading.Lock()
332
            )
333
        task = ThreadBasedCyclicSendTask(
21✔
334
            bus=self,
335
            lock=self._lock_send_periodic,
336
            messages=msgs,
337
            period=period,
338
            duration=duration,
339
            autostart=autostart,
340
            modifier_callback=modifier_callback,
341
        )
342
        return task
21✔
343

344
    def stop_all_periodic_tasks(self, remove_tasks: bool = True) -> None:
21✔
345
        """Stop sending any messages that were started using :meth:`send_periodic`.
346

347
        .. note::
348
            The result is undefined if a single task throws an exception while being stopped.
349

350
        :param remove_tasks:
351
            Stop tracking the stopped tasks.
352
        """
353
        if not hasattr(self, "_periodic_tasks"):
21✔
354
            # avoid AttributeError for partially initialized BusABC instance
UNCOV
355
            return
×
356

357
        for task in self._periodic_tasks:
21✔
358
            # we cannot let `task.stop()` modify `self._periodic_tasks` while we are
359
            # iterating over it (#634)
360
            task.stop(remove_task=False)
21✔
361

362
        if remove_tasks:
21✔
363
            self._periodic_tasks.clear()
21✔
364

365
    def __iter__(self) -> Iterator[Message]:
21✔
366
        """Allow iteration on messages as they are received.
367

368
        .. code-block:: python
369

370
            for msg in bus:
371
                print(msg)
372

373

374
        :yields:
375
            :class:`Message` msg objects.
376
        """
377
        while True:
UNCOV
378
            msg = self.recv(timeout=1.0)
×
UNCOV
379
            if msg is not None:
×
UNCOV
380
                yield msg
×
381

382
    @property
21✔
383
    def filters(self) -> Optional[can.typechecking.CanFilters]:
21✔
384
        """
385
        Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
386
        for details.
387
        """
UNCOV
388
        return self._filters
×
389

390
    @filters.setter
21✔
391
    def filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
21✔
392
        self.set_filters(filters)
×
393

394
    def set_filters(
21✔
395
        self, filters: Optional[can.typechecking.CanFilters] = None
396
    ) -> None:
397
        """Apply filtering to all messages received by this Bus.
398

399
        All messages that match at least one filter are returned.
400
        If `filters` is `None` or a zero length sequence, all
401
        messages are matched.
402

403
        Calling without passing any filters will reset the applied
404
        filters to ``None``.
405

406
        :param filters:
407
            A iterable of dictionaries each containing a "can_id",
408
            a "can_mask", and an optional "extended" key::
409

410
                [{"can_id": 0x11, "can_mask": 0x21, "extended": False}]
411

412
            A filter matches, when
413
            ``<received_can_id> & can_mask == can_id & can_mask``.
414
            If ``extended`` is set as well, it only matches messages where
415
            ``<received_is_extended> == extended``. Else it matches every
416
            messages based only on the arbitration ID and mask.
417
        """
418
        self._filters = filters or None
21✔
419
        with contextlib.suppress(NotImplementedError):
21✔
420
            self._apply_filters(self._filters)
21✔
421

422
    def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
21✔
423
        """
424
        Hook for applying the filters to the underlying kernel or
425
        hardware if supported/implemented by the interface.
426

427
        :param filters:
428
            See :meth:`~can.BusABC.set_filters` for details.
429
        """
430
        raise NotImplementedError
21✔
431

432
    def _matches_filters(self, msg: Message) -> bool:
21✔
433
        """Checks whether the given message matches at least one of the
434
        current filters. See :meth:`~can.BusABC.set_filters` for details
435
        on how the filters work.
436

437
        This method should not be overridden.
438

439
        :param msg:
440
            the message to check if matching
441
        :return: whether the given message matches at least one filter
442
        """
443

444
        # if no filters are set, all messages are matched
445
        if self._filters is None:
21✔
446
            return True
21✔
447

448
        for _filter in self._filters:
21✔
449
            # check if this filter even applies to the message
450
            if "extended" in _filter:
21✔
451
                _filter = cast("can.typechecking.CanFilterExtended", _filter)
21✔
452
                if _filter["extended"] != msg.is_extended_id:
21✔
UNCOV
453
                    continue
×
454

455
            # then check for the mask and id
456
            can_id = _filter["can_id"]
21✔
457
            can_mask = _filter["can_mask"]
21✔
458

459
            # basically, we compute
460
            # `msg.arbitration_id & can_mask == can_id & can_mask`
461
            # by using the shorter, but equivalent from below:
462
            if (can_id ^ msg.arbitration_id) & can_mask == 0:
21✔
463
                return True
21✔
464

465
        # nothing matched
466
        return False
21✔
467

468
    def flush_tx_buffer(self) -> None:
21✔
469
        """Discard every message that may be queued in the output buffer(s)."""
470
        raise NotImplementedError
21✔
471

472
    def shutdown(self) -> None:
21✔
473
        """
474
        Called to carry out any interface specific cleanup required in shutting down a bus.
475

476
        This method can be safely called multiple times.
477
        """
478
        if self._is_shutdown:
21✔
479
            LOG.debug("%s is already shut down", self.__class__)
21✔
480
            return
21✔
481

482
        self._is_shutdown = True
21✔
483
        self.stop_all_periodic_tasks()
21✔
484

485
    def __enter__(self) -> Self:
21✔
486
        return self
21✔
487

488
    def __exit__(
21✔
489
        self,
490
        exc_type: Optional[type[BaseException]],
491
        exc_value: Optional[BaseException],
492
        traceback: Optional[TracebackType],
493
    ) -> None:
494
        self.shutdown()
21✔
495

496
    def __del__(self) -> None:
21✔
497
        if not self._is_shutdown:
21✔
498
            LOG.warning("%s was not properly shut down", self.__class__.__name__)
21✔
499
            # We do some best-effort cleanup if the user
500
            # forgot to properly close the bus instance
501
            with contextlib.suppress(AttributeError):
21✔
502
                self.shutdown()
21✔
503

504
    @property
21✔
505
    def state(self) -> BusState:
21✔
506
        """
507
        Return the current state of the hardware
508
        """
UNCOV
509
        return BusState.ACTIVE
×
510

511
    @state.setter
21✔
512
    def state(self, new_state: BusState) -> None:
21✔
513
        """
514
        Set the new state of the hardware
515
        """
516
        raise NotImplementedError("Property is not implemented.")
517

518
    @property
21✔
519
    def protocol(self) -> CanProtocol:
21✔
520
        """Return the CAN protocol used by this bus instance.
521

522
        This value is set at initialization time and does not change
523
        during the lifetime of a bus instance.
524
        """
525
        return self._can_protocol
21✔
526

527
    @staticmethod
21✔
528
    def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]:
21✔
529
        """Detect all configurations/channels that this interface could
530
        currently connect with.
531

532
        This might be quite time consuming.
533

534
        May not to be implemented by every interface on every platform.
535

536
        :return: an iterable of dicts, each being a configuration suitable
537
                 for usage in the interface's bus constructor.
538
        """
539
        raise NotImplementedError()
21✔
540

541
    def fileno(self) -> int:
21✔
542
        raise NotImplementedError("fileno is not implemented using current CAN bus")
21✔
543

544

545
class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC):
21✔
546
    """Removes itself from a bus.
21✔
547

548
    Only needed for typing :meth:`Bus._periodic_tasks`. Do not instantiate.
549
    """
550

551
    def stop(self, remove_task: bool = True) -> None:
21✔
552
        raise NotImplementedError()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc