• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16550730660

27 Jul 2025 11:41AM UTC coverage: 71.445% (+0.05%) from 71.398%
16550730660

Pull #1962

github

web-flow
Merge 0fec9c40d into afb1204c0
Pull Request #1962: Minor Typing Improvements

95 of 103 new or added lines in 21 files covered. (92.23%)

2 existing lines in 2 files now uncovered.

7909 of 11070 relevant lines covered (71.45%)

15.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.0
/can/bus.py
1
"""
24✔
2
Contains the ABC bus implementation and its documentation.
3
"""
4

5
import contextlib
24✔
6
import logging
24✔
7
import threading
24✔
8
from abc import ABC, abstractmethod
24✔
9
from collections.abc import Iterator, Sequence
24✔
10
from enum import Enum, auto
24✔
11
from time import time
24✔
12
from types import TracebackType
24✔
13
from typing import (
24✔
14
    Callable,
15
    Optional,
16
    Union,
17
    cast,
18
)
19

20
from typing_extensions import Self
24✔
21

22
import can.typechecking
24✔
23
from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask
24✔
24
from can.message import Message
24✔
25

26
LOG = logging.getLogger(__name__)
24✔
27

28

29
class BusState(Enum):
24✔
30
    """The state in which a :class:`can.BusABC` can be."""
24✔
31

32
    ACTIVE = auto()
24✔
33
    PASSIVE = auto()
24✔
34
    ERROR = auto()
24✔
35

36

37
class CanProtocol(Enum):
24✔
38
    """The CAN protocol type supported by a :class:`can.BusABC` instance"""
24✔
39

40
    CAN_20 = auto()
24✔
41
    CAN_FD = auto()  # ISO Mode
24✔
42
    CAN_FD_NON_ISO = auto()
24✔
43
    CAN_XL = auto()
24✔
44

45

46
class BusABC(ABC):
24✔
47
    """The CAN Bus Abstract Base Class that serves as the basis
24✔
48
    for all concrete interfaces.
49

50
    This class may be used as an iterator over the received messages
51
    and as a context manager for auto-closing the bus when done using it.
52

53
    Please refer to :ref:`errors` for possible exceptions that may be
54
    thrown by certain operations on this bus.
55
    """
56

57
    #: a string describing the underlying bus and/or channel
58
    channel_info = "unknown"
24✔
59

60
    #: Log level for received messages
61
    RECV_LOGGING_LEVEL = 9
24✔
62

63
    #: Assume that no cleanup is needed until something was initialized
64
    _is_shutdown: bool = True
24✔
65
    _can_protocol: CanProtocol = CanProtocol.CAN_20
24✔
66

67
    @abstractmethod
24✔
68
    def __init__(
24✔
69
        self,
70
        channel: can.typechecking.Channel,
71
        can_filters: Optional[can.typechecking.CanFilters] = None,
72
        **kwargs: object,
73
    ):
74
        """Construct and open a CAN bus instance of the specified type.
75

76
        Subclasses should call though this method with all given parameters
77
        as it handles generic tasks like applying filters.
78

79
        :param channel:
80
            The can interface identifier. Expected type is backend dependent.
81

82
        :param can_filters:
83
            See :meth:`~can.BusABC.set_filters` for details.
84

85
        :param dict kwargs:
86
            Any backend dependent configurations are passed in this dictionary
87

88
        :raises ValueError: If parameters are out of range
89
        :raises ~can.exceptions.CanInterfaceNotImplementedError:
90
            If the driver cannot be accessed
91
        :raises ~can.exceptions.CanInitializationError:
92
            If the bus cannot be initialized
93
        """
94
        self._periodic_tasks: list[_SelfRemovingCyclicTask] = []
24✔
95
        self.set_filters(can_filters)
24✔
96
        # Flip the class default value when the constructor finishes.  That
97
        # usually means the derived class constructor was also successful,
98
        # since it calls this parent constructor last.
99
        self._is_shutdown: bool = False
24✔
100

101
    def __str__(self) -> str:
24✔
102
        return self.channel_info
×
103

104
    def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
24✔
105
        """Block waiting for a message from the Bus.
106

107
        :param timeout:
108
            seconds to wait for a message or None to wait indefinitely
109

110
        :return:
111
            :obj:`None` on timeout or a :class:`~can.Message` object.
112

113
        :raises ~can.exceptions.CanOperationError:
114
            If an error occurred while reading
115
        """
116
        start = time()
24✔
117
        time_left = timeout
24✔
118

119
        while True:
21✔
120
            # try to get a message
121
            msg, already_filtered = self._recv_internal(timeout=time_left)
24✔
122

123
            # return it, if it matches
124
            if msg and (already_filtered or self._matches_filters(msg)):
24✔
125
                LOG.log(self.RECV_LOGGING_LEVEL, "Received: %s", msg)
24✔
126
                return msg
24✔
127

128
            # if not, and timeout is None, try indefinitely
129
            elif timeout is None:
24✔
130
                continue
24✔
131

132
            # try next one only if there still is time, and with
133
            # reduced timeout
134
            else:
135
                time_left = timeout - (time() - start)
24✔
136

137
                if time_left > 0:
24✔
138
                    continue
24✔
139

140
                return None
24✔
141

142
    def _recv_internal(
24✔
143
        self, timeout: Optional[float]
144
    ) -> tuple[Optional[Message], bool]:
145
        """
146
        Read a message from the bus and tell whether it was filtered.
147
        This methods may be called by :meth:`~can.BusABC.recv`
148
        to read a message multiple times if the filters set by
149
        :meth:`~can.BusABC.set_filters` do not match and the call has
150
        not yet timed out.
151

152
        New implementations should always override this method instead of
153
        :meth:`~can.BusABC.recv`, to be able to take advantage of the
154
        software based filtering provided by :meth:`~can.BusABC.recv`
155
        as a fallback. This method should never be called directly.
156

157
        .. note::
158

159
            This method is not an `@abstractmethod` (for now) to allow older
160
            external implementations to continue using their existing
161
            :meth:`~can.BusABC.recv` implementation.
162

163
        .. note::
164

165
            The second return value (whether filtering was already done) may
166
            change over time for some interfaces, like for example in the
167
            Kvaser interface. Thus it cannot be simplified to a constant value.
168

169
        :param float timeout: seconds to wait for a message,
170
                              see :meth:`~can.BusABC.send`
171

172
        :return:
173
            1.  a message that was read or None on timeout
174
            2.  a bool that is True if message filtering has already
175
                been done and else False
176

177
        :raises ~can.exceptions.CanOperationError:
178
            If an error occurred while reading
179
        :raises NotImplementedError:
180
            if the bus provides it's own :meth:`~can.BusABC.recv`
181
            implementation (legacy implementation)
182

183
        """
184
        raise NotImplementedError("Trying to read from a write only bus?")
185

186
    @abstractmethod
24✔
187
    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
24✔
188
        """Transmit a message to the CAN bus.
189

190
        Override this method to enable the transmit path.
191

192
        :param Message msg: A message object.
193

194
        :param timeout:
195
            If > 0, wait up to this many seconds for message to be ACK'ed or
196
            for transmit queue to be ready depending on driver implementation.
197
            If timeout is exceeded, an exception will be raised.
198
            Might not be supported by all interfaces.
199
            None blocks indefinitely.
200

201
        :raises ~can.exceptions.CanOperationError:
202
            If an error occurred while sending
203
        """
204
        raise NotImplementedError("Trying to write to a readonly bus?")
205

206
    def send_periodic(
24✔
207
        self,
208
        msgs: Union[Message, Sequence[Message]],
209
        period: float,
210
        duration: Optional[float] = None,
211
        store_task: bool = True,
212
        autostart: bool = True,
213
        modifier_callback: Optional[Callable[[Message], None]] = None,
214
    ) -> can.broadcastmanager.CyclicSendTaskABC:
215
        """Start sending messages at a given period on this bus.
216

217
        The task will be active until one of the following conditions are met:
218

219
        - the (optional) duration expires
220
        - the Bus instance goes out of scope
221
        - the Bus instance is shutdown
222
        - :meth:`stop_all_periodic_tasks` is called
223
        - the task's :meth:`~can.broadcastmanager.CyclicTask.stop` method is called.
224

225
        :param msgs:
226
            Message(s) to transmit
227
        :param period:
228
            Period in seconds between each message
229
        :param duration:
230
            Approximate duration in seconds to continue sending messages. If
231
            no duration is provided, the task will continue indefinitely.
232
        :param store_task:
233
            If True (the default) the task will be attached to this Bus instance.
234
            Disable to instead manage tasks manually.
235
        :param autostart:
236
            If True (the default) the sending task will immediately start after creation.
237
            Otherwise, the task has to be started by calling the
238
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
239
        :param modifier_callback:
240
            Function which should be used to modify each message's data before
241
            sending. The callback modifies the :attr:`~can.Message.data` of the
242
            message and returns ``None``.
243
        :return:
244
            A started task instance. Note the task can be stopped (and depending on
245
            the backend modified) by calling the task's
246
            :meth:`~can.broadcastmanager.CyclicTask.stop` method.
247

248
        .. note::
249

250
            Note the duration before the messages stop being sent may not
251
            be exactly the same as the duration specified by the user. In
252
            general the message will be sent at the given rate until at
253
            least **duration** seconds.
254

255
        .. note::
256

257
            For extremely long-running Bus instances with many short-lived
258
            tasks the default api with ``store_task==True`` may not be
259
            appropriate as the stopped tasks are still taking up memory as they
260
            are associated with the Bus instance.
261
        """
262
        if isinstance(msgs, Message):
24✔
263
            msgs = [msgs]
24✔
264
        elif isinstance(msgs, Sequence):
6✔
265
            # A Sequence does not necessarily provide __bool__ we need to use len()
266
            if len(msgs) == 0:
6✔
267
                raise ValueError("Must be a sequence at least of length 1")
6✔
268
        else:
269
            raise ValueError("Must be either a message or a sequence of messages")
6✔
270

271
        # Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
272
        task = cast(
24✔
273
            "_SelfRemovingCyclicTask",
274
            self._send_periodic_internal(
275
                msgs, period, duration, autostart, modifier_callback
276
            ),
277
        )
278
        # we wrap the task's stop method to also remove it from the Bus's list of tasks
279
        periodic_tasks = self._periodic_tasks
24✔
280
        original_stop_method = task.stop
24✔
281

282
        def wrapped_stop_method(remove_task: bool = True) -> None:
24✔
283
            nonlocal task, periodic_tasks, original_stop_method
284
            if remove_task:
24✔
285
                try:
24✔
286
                    periodic_tasks.remove(task)
24✔
287
                except ValueError:
24✔
288
                    pass  # allow the task to be already removed
24✔
289
            original_stop_method()
24✔
290

291
        task.stop = wrapped_stop_method  # type: ignore[method-assign]
24✔
292

293
        if store_task:
24✔
294
            self._periodic_tasks.append(task)
24✔
295

296
        return task
24✔
297

298
    def _send_periodic_internal(
24✔
299
        self,
300
        msgs: Union[Sequence[Message], Message],
301
        period: float,
302
        duration: Optional[float] = None,
303
        autostart: bool = True,
304
        modifier_callback: Optional[Callable[[Message], None]] = None,
305
    ) -> can.broadcastmanager.CyclicSendTaskABC:
306
        """Default implementation of periodic message sending using threading.
307

308
        Override this method to enable a more efficient backend specific approach.
309

310
        :param msgs:
311
            Messages to transmit
312
        :param period:
313
            Period in seconds between each message
314
        :param duration:
315
            The duration between sending each message at the given rate. If
316
            no duration is provided, the task will continue indefinitely.
317
        :param autostart:
318
            If True (the default) the sending task will immediately start after creation.
319
            Otherwise, the task has to be started by calling the
320
            tasks :meth:`~can.RestartableCyclicTaskABC.start` method on it.
321
        :return:
322
            A started task instance. Note the task can be stopped (and
323
            depending on the backend modified) by calling the
324
            :meth:`~can.broadcastmanager.CyclicTask.stop` method.
325
        """
326
        if not hasattr(self, "_lock_send_periodic"):
24✔
327
            # Create a send lock for this bus, but not for buses which override this method
328
            self._lock_send_periodic = (  # pylint: disable=attribute-defined-outside-init
24✔
329
                threading.Lock()
330
            )
331
        task = ThreadBasedCyclicSendTask(
24✔
332
            bus=self,
333
            lock=self._lock_send_periodic,
334
            messages=msgs,
335
            period=period,
336
            duration=duration,
337
            autostart=autostart,
338
            modifier_callback=modifier_callback,
339
        )
340
        return task
24✔
341

342
    def stop_all_periodic_tasks(self, remove_tasks: bool = True) -> None:
24✔
343
        """Stop sending any messages that were started using :meth:`send_periodic`.
344

345
        .. note::
346
            The result is undefined if a single task throws an exception while being stopped.
347

348
        :param remove_tasks:
349
            Stop tracking the stopped tasks.
350
        """
351
        if not hasattr(self, "_periodic_tasks"):
24✔
352
            # avoid AttributeError for partially initialized BusABC instance
353
            return
×
354

355
        for task in self._periodic_tasks:
24✔
356
            # we cannot let `task.stop()` modify `self._periodic_tasks` while we are
357
            # iterating over it (#634)
358
            task.stop(remove_task=False)
24✔
359

360
        if remove_tasks:
24✔
361
            self._periodic_tasks.clear()
24✔
362

363
    def __iter__(self) -> Iterator[Message]:
24✔
364
        """Allow iteration on messages as they are received.
365

366
        .. code-block:: python
367

368
            for msg in bus:
369
                print(msg)
370

371

372
        :yields:
373
            :class:`Message` msg objects.
374
        """
375
        while True:
376
            msg = self.recv(timeout=1.0)
×
377
            if msg is not None:
×
378
                yield msg
×
379

380
    @property
24✔
381
    def filters(self) -> Optional[can.typechecking.CanFilters]:
24✔
382
        """
383
        Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
384
        for details.
385
        """
386
        return self._filters
×
387

388
    @filters.setter
24✔
389
    def filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
24✔
390
        self.set_filters(filters)
×
391

392
    def set_filters(
24✔
393
        self, filters: Optional[can.typechecking.CanFilters] = None
394
    ) -> None:
395
        """Apply filtering to all messages received by this Bus.
396

397
        All messages that match at least one filter are returned.
398
        If `filters` is `None` or a zero length sequence, all
399
        messages are matched.
400

401
        Calling without passing any filters will reset the applied
402
        filters to ``None``.
403

404
        :param filters:
405
            A iterable of dictionaries each containing a "can_id",
406
            a "can_mask", and an optional "extended" key::
407

408
                [{"can_id": 0x11, "can_mask": 0x21, "extended": False}]
409

410
            A filter matches, when
411
            ``<received_can_id> & can_mask == can_id & can_mask``.
412
            If ``extended`` is set as well, it only matches messages where
413
            ``<received_is_extended> == extended``. Else it matches every
414
            messages based only on the arbitration ID and mask.
415
        """
416
        self._filters = filters or None
24✔
417
        with contextlib.suppress(NotImplementedError):
24✔
418
            self._apply_filters(self._filters)
24✔
419

420
    def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
24✔
421
        """
422
        Hook for applying the filters to the underlying kernel or
423
        hardware if supported/implemented by the interface.
424

425
        :param filters:
426
            See :meth:`~can.BusABC.set_filters` for details.
427
        """
428
        raise NotImplementedError
24✔
429

430
    def _matches_filters(self, msg: Message) -> bool:
24✔
431
        """Checks whether the given message matches at least one of the
432
        current filters. See :meth:`~can.BusABC.set_filters` for details
433
        on how the filters work.
434

435
        This method should not be overridden.
436

437
        :param msg:
438
            the message to check if matching
439
        :return: whether the given message matches at least one filter
440
        """
441

442
        # if no filters are set, all messages are matched
443
        if self._filters is None:
24✔
444
            return True
24✔
445

446
        for _filter in self._filters:
24✔
447
            # check if this filter even applies to the message
448
            if "extended" in _filter:
24✔
449
                if _filter["extended"] != msg.is_extended_id:
24✔
UNCOV
450
                    continue
×
451

452
            # then check for the mask and id
453
            can_id = _filter["can_id"]
24✔
454
            can_mask = _filter["can_mask"]
24✔
455

456
            # basically, we compute
457
            # `msg.arbitration_id & can_mask == can_id & can_mask`
458
            # by using the shorter, but equivalent from below:
459
            if (can_id ^ msg.arbitration_id) & can_mask == 0:
24✔
460
                return True
24✔
461

462
        # nothing matched
463
        return False
24✔
464

465
    def flush_tx_buffer(self) -> None:
24✔
466
        """Discard every message that may be queued in the output buffer(s)."""
467
        raise NotImplementedError
24✔
468

469
    def shutdown(self) -> None:
24✔
470
        """
471
        Called to carry out any interface specific cleanup required in shutting down a bus.
472

473
        This method can be safely called multiple times.
474
        """
475
        if self._is_shutdown:
24✔
476
            LOG.debug("%s is already shut down", self.__class__)
24✔
477
            return
24✔
478

479
        self._is_shutdown = True
24✔
480
        self.stop_all_periodic_tasks()
24✔
481

482
    def __enter__(self) -> Self:
24✔
483
        return self
24✔
484

485
    def __exit__(
24✔
486
        self,
487
        exc_type: Optional[type[BaseException]],
488
        exc_value: Optional[BaseException],
489
        traceback: Optional[TracebackType],
490
    ) -> None:
491
        self.shutdown()
24✔
492

493
    def __del__(self) -> None:
24✔
494
        if not self._is_shutdown:
24✔
495
            LOG.warning("%s was not properly shut down", self.__class__.__name__)
24✔
496
            # We do some best-effort cleanup if the user
497
            # forgot to properly close the bus instance
498
            with contextlib.suppress(AttributeError):
24✔
499
                self.shutdown()
24✔
500

501
    @property
24✔
502
    def state(self) -> BusState:
24✔
503
        """
504
        Return the current state of the hardware
505
        """
506
        return BusState.ACTIVE
×
507

508
    @state.setter
24✔
509
    def state(self, new_state: BusState) -> None:
24✔
510
        """
511
        Set the new state of the hardware
512
        """
513
        raise NotImplementedError("Property is not implemented.")
514

515
    @property
24✔
516
    def protocol(self) -> CanProtocol:
24✔
517
        """Return the CAN protocol used by this bus instance.
518

519
        This value is set at initialization time and does not change
520
        during the lifetime of a bus instance.
521
        """
522
        return self._can_protocol
24✔
523

524
    @staticmethod
24✔
525
    def _detect_available_configs() -> Sequence[can.typechecking.AutoDetectedConfig]:
24✔
526
        """Detect all configurations/channels that this interface could
527
        currently connect with.
528

529
        This might be quite time consuming.
530

531
        May not to be implemented by every interface on every platform.
532

533
        :return: an iterable of dicts, each being a configuration suitable
534
                 for usage in the interface's bus constructor.
535
        """
536
        raise NotImplementedError()
24✔
537

538
    def fileno(self) -> int:
24✔
539
        raise NotImplementedError("fileno is not implemented using current CAN bus")
24✔
540

541

542
class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC):
24✔
543
    """Removes itself from a bus.
24✔
544

545
    Only needed for typing :meth:`Bus._periodic_tasks`. Do not instantiate.
546
    """
547

548
    def stop(self, remove_task: bool = True) -> None:
24✔
549
        raise NotImplementedError()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc