• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.85
/can/notifier.py
1
"""
21✔
2
This module contains the implementation of :class:`~can.Notifier`.
3
"""
4

5
import asyncio
21✔
6
import functools
21✔
7
import logging
21✔
8
import threading
21✔
9
import time
21✔
10
from collections.abc import Awaitable, Iterable
21✔
11
from contextlib import AbstractContextManager
21✔
12
from types import TracebackType
21✔
13
from typing import (
21✔
14
    Any,
15
    Callable,
16
    Final,
17
    NamedTuple,
18
    Optional,
19
    Union,
20
)
21

22
from can.bus import BusABC
21✔
23
from can.listener import Listener
21✔
24
from can.message import Message
21✔
25

26
logger = logging.getLogger("can.Notifier")
21✔
27

28
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]
21✔
29

30

31
class _BusNotifierPair(NamedTuple):
21✔
32
    bus: "BusABC"
21✔
33
    notifier: "Notifier"
21✔
34

35

36
class _NotifierRegistry:
21✔
37
    """A registry to manage the association between CAN buses and Notifiers.
21✔
38

39
    This class ensures that a bus is not added to multiple active Notifiers.
40
    """
41

42
    def __init__(self) -> None:
21✔
43
        """Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
44
        self.pairs: list[_BusNotifierPair] = []
21✔
45
        self.lock = threading.Lock()
21✔
46

47
    def register(self, bus: BusABC, notifier: "Notifier") -> None:
21✔
48
        """Register a bus and its associated notifier.
49

50
        Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.
51

52
        :param bus:
53
            The CAN bus to register.
54
        :param notifier:
55
            The :class:`~can.Notifier` instance associated with the bus.
56
        :raises ValueError:
57
            If the bus is already assigned to an active Notifier.
58
        """
59
        with self.lock:
21✔
60
            for pair in self.pairs:
21✔
61
                if bus is pair.bus and not pair.notifier.stopped:
21✔
62
                    raise ValueError(
21✔
63
                        "A bus can not be added to multiple active Notifier instances."
64
                    )
65
            self.pairs.append(_BusNotifierPair(bus, notifier))
21✔
66

67
    def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
21✔
68
        """Unregister a bus and its associated notifier.
69

70
        Removes the bus-notifier pair from the registry.
71

72
        :param bus:
73
            The CAN bus to unregister.
74
        :param notifier:
75
            The :class:`~can.Notifier` instance associated with the bus.
76
        """
77
        with self.lock:
21✔
78
            registered_pairs_to_remove: list[_BusNotifierPair] = []
21✔
79
            for pair in self.pairs:
21✔
80
                if pair.bus is bus and pair.notifier is notifier:
21✔
81
                    registered_pairs_to_remove.append(pair)
21✔
82
            for pair in registered_pairs_to_remove:
21✔
83
                self.pairs.remove(pair)
21✔
84

85
    def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]:
21✔
86
        """Find the :class:`~can.Notifier` instances associated with a given CAN bus.
87

88
        This method searches the registry for the :class:`~can.Notifier`
89
        that is linked to the specified bus. If the bus is found, the
90
        corresponding :class:`~can.Notifier` instances are returned. If the bus is not
91
        found in the registry, an empty tuple is returned.
92

93
        :param bus:
94
            The CAN bus for which to find the associated :class:`~can.Notifier` .
95
        :return:
96
            A tuple of :class:`~can.Notifier` instances associated with the given bus.
97
        """
98
        instance_list = []
21✔
99
        with self.lock:
21✔
100
            for pair in self.pairs:
21✔
101
                if bus is pair.bus:
21✔
102
                    instance_list.append(pair.notifier)
21✔
103
        return tuple(instance_list)
21✔
104

105

106
class Notifier(AbstractContextManager):
21✔
107

108
    _registry: Final = _NotifierRegistry()
21✔
109

110
    def __init__(
21✔
111
        self,
112
        bus: Union[BusABC, list[BusABC]],
113
        listeners: Iterable[MessageRecipient],
114
        timeout: float = 1.0,
115
        loop: Optional[asyncio.AbstractEventLoop] = None,
116
    ) -> None:
117
        """Manages the distribution of :class:`~can.Message` instances to listeners.
118

119
        Supports multiple buses and listeners.
120

121
        .. Note::
122

123
            Remember to call :meth:`~can.Notifier.stop` after all messages are received as
124
            many listeners carry out flush operations to persist data.
125

126

127
        :param bus:
128
            A :ref:`bus` or a list of buses to consume messages from.
129
        :param listeners:
130
            An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
131
            and return nothing.
132
        :param timeout:
133
            An optional maximum number of seconds to wait for any :class:`~can.Message`.
134
        :param loop:
135
            An :mod:`asyncio` event loop to schedule the ``listeners`` in.
136
        :raises ValueError:
137
            If a passed in *bus* is already assigned to an active :class:`~can.Notifier`.
138
        """
139
        self.listeners: list[MessageRecipient] = list(listeners)
21✔
140
        self._bus_list: list[BusABC] = []
21✔
141
        self.timeout = timeout
21✔
142
        self._loop = loop
21✔
143

144
        #: Exception raised in thread
145
        self.exception: Optional[Exception] = None
21✔
146

147
        self._stopped = False
21✔
148
        self._lock = threading.Lock()
21✔
149

150
        self._readers: list[Union[int, threading.Thread]] = []
21✔
151
        _bus_list: list[BusABC] = bus if isinstance(bus, list) else [bus]
21✔
152
        for each_bus in _bus_list:
21✔
153
            self.add_bus(each_bus)
21✔
154

155
    @property
21✔
156
    def bus(self) -> Union[BusABC, tuple["BusABC", ...]]:
21✔
157
        """Return the associated bus or a tuple of buses."""
158
        if len(self._bus_list) == 1:
×
159
            return self._bus_list[0]
×
160
        return tuple(self._bus_list)
×
161

162
    def add_bus(self, bus: BusABC) -> None:
21✔
163
        """Add a bus for notification.
164

165
        :param bus:
166
            CAN bus instance.
167
        :raises ValueError:
168
            If the *bus* is already assigned to an active :class:`~can.Notifier`.
169
        """
170
        # add bus to notifier registry
171
        Notifier._registry.register(bus, self)
21✔
172

173
        # add bus to internal bus list
174
        self._bus_list.append(bus)
21✔
175

176
        file_descriptor: int = -1
21✔
177
        try:
21✔
178
            file_descriptor = bus.fileno()
21✔
179
        except NotImplementedError:
21✔
180
            # Bus doesn't support fileno, we fall back to thread based reader
181
            pass
21✔
182

183
        if self._loop is not None and file_descriptor >= 0:
21✔
184
            # Use bus file descriptor to watch for messages
UNCOV
185
            self._loop.add_reader(file_descriptor, self._on_message_available, bus)
×
UNCOV
186
            self._readers.append(file_descriptor)
×
187
        else:
188
            reader_thread = threading.Thread(
21✔
189
                target=self._rx_thread,
190
                args=(bus,),
191
                name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"',
192
            )
193
            reader_thread.daemon = True
21✔
194
            reader_thread.start()
21✔
195
            self._readers.append(reader_thread)
21✔
196

197
    def stop(self, timeout: float = 5.0) -> None:
21✔
198
        """Stop notifying Listeners when new :class:`~can.Message` objects arrive
199
        and call :meth:`~can.Listener.stop` on each Listener.
200

201
        :param timeout:
202
            Max time in seconds to wait for receive threads to finish.
203
            Should be longer than timeout given at instantiation.
204
        """
205
        self._stopped = True
21✔
206
        end_time = time.time() + timeout
21✔
207
        for reader in self._readers:
21✔
208
            if isinstance(reader, threading.Thread):
21✔
209
                now = time.time()
21✔
210
                if now < end_time:
21✔
211
                    reader.join(end_time - now)
21✔
UNCOV
212
            elif self._loop:
×
213
                # reader is a file descriptor
UNCOV
214
                self._loop.remove_reader(reader)
×
215
        for listener in self.listeners:
21✔
216
            if hasattr(listener, "stop"):
21✔
217
                listener.stop()
21✔
218

219
        # remove bus from registry
220
        for bus in self._bus_list:
21✔
221
            Notifier._registry.unregister(bus, self)
21✔
222

223
    def _rx_thread(self, bus: BusABC) -> None:
21✔
224
        # determine message handling callable early, not inside while loop
225
        if self._loop:
21✔
226
            handle_message: Callable[[Message], Any] = functools.partial(
21✔
227
                self._loop.call_soon_threadsafe,
228
                self._on_message_received,  # type: ignore[arg-type]
229
            )
230
        else:
231
            handle_message = self._on_message_received
21✔
232

233
        while not self._stopped:
21✔
234
            try:
21✔
235
                if msg := bus.recv(self.timeout):
21✔
236
                    with self._lock:
21✔
237
                        handle_message(msg)
21✔
UNCOV
238
            except Exception as exc:  # pylint: disable=broad-except
×
UNCOV
239
                self.exception = exc
×
UNCOV
240
                if self._loop is not None:
×
UNCOV
241
                    self._loop.call_soon_threadsafe(self._on_error, exc)
×
242
                    # Raise anyway
UNCOV
243
                    raise
×
UNCOV
244
                elif not self._on_error(exc):
×
245
                    # If it was not handled, raise the exception here
UNCOV
246
                    raise
×
247
                else:
248
                    # It was handled, so only log it
UNCOV
249
                    logger.debug("suppressed exception: %s", exc)
×
250

251
    def _on_message_available(self, bus: BusABC) -> None:
21✔
UNCOV
252
        if msg := bus.recv(0):
×
UNCOV
253
            self._on_message_received(msg)
×
254

255
    def _on_message_received(self, msg: Message) -> None:
21✔
256
        for callback in self.listeners:
21✔
257
            res = callback(msg)
21✔
258
            if res and self._loop and asyncio.iscoroutine(res):
21✔
259
                # Schedule coroutine
UNCOV
260
                self._loop.create_task(res)
×
261

262
    def _on_error(self, exc: Exception) -> bool:
21✔
263
        """Calls ``on_error()`` for all listeners if they implement it.
264

265
        :returns: ``True`` if at least one error handler was called.
266
        """
UNCOV
267
        was_handled = False
×
268

UNCOV
269
        for listener in self.listeners:
×
UNCOV
270
            if hasattr(listener, "on_error"):
×
UNCOV
271
                try:
×
UNCOV
272
                    listener.on_error(exc)
×
UNCOV
273
                except NotImplementedError:
×
UNCOV
274
                    pass
×
275
                else:
UNCOV
276
                    was_handled = True
×
277

UNCOV
278
        return was_handled
×
279

280
    def add_listener(self, listener: MessageRecipient) -> None:
21✔
281
        """Add new Listener to the notification list.
282
        If it is already present, it will be called two times
283
        each time a message arrives.
284

285
        :param listener: Listener to be added to the list to be notified
286
        """
287
        self.listeners.append(listener)
21✔
288

289
    def remove_listener(self, listener: MessageRecipient) -> None:
21✔
290
        """Remove a listener from the notification list. This method
291
        throws an exception if the given listener is not part of the
292
        stored listeners.
293

294
        :param listener: Listener to be removed from the list to be notified
295
        :raises ValueError: if `listener` was never added to this notifier
296
        """
297
        self.listeners.remove(listener)
21✔
298

299
    @property
21✔
300
    def stopped(self) -> bool:
21✔
301
        """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
302
        return self._stopped
21✔
303

304
    @staticmethod
21✔
305
    def find_instances(bus: BusABC) -> tuple["Notifier", ...]:
21✔
306
        """Find :class:`~can.Notifier` instances associated with a given CAN bus.
307

308
        This method searches the registry for the :class:`~can.Notifier`
309
        that is linked to the specified bus. If the bus is found, the
310
        corresponding :class:`~can.Notifier` instances are returned. If the bus is not
311
        found in the registry, an empty tuple is returned.
312

313
        :param bus:
314
            The CAN bus for which to find the associated :class:`~can.Notifier` .
315
        :return:
316
            A tuple of :class:`~can.Notifier` instances associated with the given bus.
317
        """
318
        return Notifier._registry.find_instances(bus)
21✔
319

320
    def __exit__(
21✔
321
        self,
322
        exc_type: Optional[type[BaseException]],
323
        exc_value: Optional[BaseException],
324
        traceback: Optional[TracebackType],
325
    ) -> None:
326
        if not self._stopped:
21✔
327
            self.stop()
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc