• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/can/listener.py
1
"""
21✔
2
This module contains the implementation of `can.Listener` and some readers.
3
"""
4

5
import asyncio
21✔
6
import sys
21✔
7
import warnings
21✔
8
from abc import ABCMeta, abstractmethod
21✔
9
from collections.abc import AsyncIterator
21✔
10
from queue import Empty, SimpleQueue
21✔
11
from typing import Any, Optional
21✔
12

13
from can.bus import BusABC
21✔
14
from can.message import Message
21✔
15

16

17
class Listener(metaclass=ABCMeta):
21✔
18
    """The basic listener that can be called directly to handle some
21✔
19
    CAN message::
20

21
        listener = SomeListener()
22
        msg = my_bus.recv()
23

24
        # now either call
25
        listener(msg)
26
        # or
27
        listener.on_message_received(msg)
28

29
        # Important to ensure all outputs are flushed
30
        listener.stop()
31
    """
32

33
    @abstractmethod
21✔
34
    def on_message_received(self, msg: Message) -> None:
21✔
35
        """This method is called to handle the given message.
36

37
        :param msg: the delivered message
38
        """
39

40
    def __call__(self, msg: Message) -> None:
21✔
41
        self.on_message_received(msg)
21✔
42

43
    def on_error(self, exc: Exception) -> None:
21✔
44
        """This method is called to handle any exception in the receive thread.
45

46
        :param exc: The exception causing the thread to stop
47
        """
48
        raise NotImplementedError()
49

50
    def stop(self) -> None:  # noqa: B027
21✔
51
        """
52
        Stop handling new messages, carry out any final tasks to ensure
53
        data is persisted and cleanup any open resources.
54

55
        Concrete implementations override.
56
        """
57

58

59
class RedirectReader(Listener):  # pylint: disable=abstract-method
21✔
60
    """
21✔
61
    A RedirectReader sends all received messages to another Bus.
62
    """
63

64
    def __init__(self, bus: BusABC, *args: Any, **kwargs: Any) -> None:
21✔
65
        super().__init__(*args, **kwargs)
×
UNCOV
66
        self.bus = bus
×
67

68
    def on_message_received(self, msg: Message) -> None:
21✔
UNCOV
69
        self.bus.send(msg)
×
70

71

72
class BufferedReader(Listener):  # pylint: disable=abstract-method
21✔
73
    """
21✔
74
    A BufferedReader is a subclass of :class:`~can.Listener` which implements a
75
    **message buffer**: that is, when the :class:`can.BufferedReader` instance is
76
    notified of a new message it pushes it into a queue of messages waiting to
77
    be serviced. The messages can then be fetched with
78
    :meth:`~can.BufferedReader.get_message`.
79

80
    Putting in messages after :meth:`~can.BufferedReader.stop` has been called will raise
81
    an exception, see :meth:`~can.BufferedReader.on_message_received`.
82

83
    :attr is_stopped: ``True`` if the reader has been stopped
84
    """
85

86
    def __init__(self) -> None:
21✔
87
        # set to "infinite" size
88
        self.buffer: SimpleQueue[Message] = SimpleQueue()
21✔
89
        self.is_stopped: bool = False
21✔
90

91
    def on_message_received(self, msg: Message) -> None:
21✔
92
        """Append a message to the buffer.
93

94
        :raises: BufferError
95
            if the reader has already been stopped
96
        """
97
        if self.is_stopped:
21✔
UNCOV
98
            raise RuntimeError("reader has already been stopped")
×
99
        else:
100
            self.buffer.put(msg)
21✔
101

102
    def get_message(self, timeout: float = 0.5) -> Optional[Message]:
21✔
103
        """
104
        Attempts to retrieve the message that has been in the queue for the longest amount
105
        of time (FIFO). If no message is available, it blocks for given timeout or until a
106
        message is received (whichever is shorter), or else returns None. This method does
107
        not block after :meth:`can.BufferedReader.stop` has been called.
108

109
        :param timeout: The number of seconds to wait for a new message.
110
        :return: the received :class:`can.Message` or `None`, if the queue is empty.
111
        """
112
        try:
21✔
113
            if self.is_stopped:
21✔
114
                return self.buffer.get(block=False)
21✔
115
            else:
116
                return self.buffer.get(block=True, timeout=timeout)
21✔
117
        except Empty:
21✔
118
            return None
21✔
119

120
    def stop(self) -> None:
21✔
121
        """Prohibits any more additions to this reader."""
122
        self.is_stopped = True
21✔
123

124

125
class AsyncBufferedReader(
21✔
126
    Listener, AsyncIterator[Message]
127
):  # pylint: disable=abstract-method
128
    """A message buffer for use with :mod:`asyncio`.
21✔
129

130
    See :ref:`asyncio` for how to use with :class:`can.Notifier`.
131

132
    Can also be used as an asynchronous iterator::
133

134
        async for msg in reader:
135
            print(msg)
136
    """
137

138
    def __init__(self, **kwargs: Any) -> None:
21✔
139
        self._is_stopped: bool = False
21✔
140
        self.buffer: asyncio.Queue[Message]
21✔
141

142
        if "loop" in kwargs:
21✔
143
            warnings.warn(
21✔
144
                "The 'loop' argument is deprecated since python-can 4.0.0 "
145
                "and has no effect starting with Python 3.10",
146
                DeprecationWarning,
147
                stacklevel=2,
148
            )
149
            if sys.version_info < (3, 10):
21✔
150
                self.buffer = asyncio.Queue(loop=kwargs["loop"])
6✔
151
                return
6✔
152

153
        self.buffer = asyncio.Queue()
21✔
154

155
    def on_message_received(self, msg: Message) -> None:
21✔
156
        """Append a message to the buffer.
157

158
        Must only be called inside an event loop!
159
        """
160
        if not self._is_stopped:
21✔
161
            self.buffer.put_nowait(msg)
21✔
162

163
    async def get_message(self) -> Message:
21✔
164
        """
165
        Retrieve the latest message when awaited for::
166

167
            msg = await reader.get_message()
168

169
        :return: The CAN message.
170
        """
171
        return await self.buffer.get()
21✔
172

173
    def __aiter__(self) -> AsyncIterator[Message]:
21✔
UNCOV
174
        return self
×
175

176
    async def __anext__(self) -> Message:
21✔
UNCOV
177
        return await self.buffer.get()
×
178

179
    def stop(self) -> None:
21✔
180
        self._is_stopped = True
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc