• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/can/broadcastmanager.py
1
"""
21✔
2
Exposes several methods for transmitting cyclic messages.
3

4
The main entry point to these classes should be through
5
:meth:`can.BusABC.send_periodic`.
6
"""
7

8
import abc
21✔
9
import logging
21✔
10
import platform
21✔
11
import sys
21✔
12
import threading
21✔
13
import time
21✔
14
import warnings
21✔
15
from collections.abc import Sequence
21✔
16
from typing import (
21✔
17
    TYPE_CHECKING,
18
    Callable,
19
    Final,
20
    Optional,
21
    Union,
22
    cast,
23
)
24

25
from can import typechecking
21✔
26
from can.message import Message
21✔
27

28
if TYPE_CHECKING:
21✔
UNCOV
29
    from can.bus import BusABC
×
30

31

32
log = logging.getLogger("can.bcm")
21✔
33
NANOSECONDS_IN_SECOND: Final[int] = 1_000_000_000
21✔
34

35

36
class _Pywin32Event:
21✔
37
    handle: int
21✔
38

39

40
class _Pywin32:
21✔
41
    def __init__(self) -> None:
21✔
42
        import pywintypes  # pylint: disable=import-outside-toplevel,import-error
4✔
43
        import win32event  # pylint: disable=import-outside-toplevel,import-error
2✔
44

45
        self.pywintypes = pywintypes
2✔
46
        self.win32event = win32event
2✔
47

48
    def create_timer(self) -> _Pywin32Event:
21✔
49
        try:
2✔
50
            event = self.win32event.CreateWaitableTimerEx(
2✔
51
                None,
52
                None,
53
                self.win32event.CREATE_WAITABLE_TIMER_HIGH_RESOLUTION,
54
                self.win32event.TIMER_ALL_ACCESS,
55
            )
UNCOV
56
        except (
×
57
            AttributeError,
58
            OSError,
59
            self.pywintypes.error,  # pylint: disable=no-member
60
        ):
UNCOV
61
            event = self.win32event.CreateWaitableTimer(None, False, None)
×
62

63
        return cast("_Pywin32Event", event)
2✔
64

65
    def set_timer(self, event: _Pywin32Event, period_ms: int) -> None:
21✔
66
        self.win32event.SetWaitableTimer(event.handle, 0, period_ms, None, None, False)
2✔
67

68
    def stop_timer(self, event: _Pywin32Event) -> None:
21✔
69
        self.win32event.SetWaitableTimer(event.handle, 0, 0, None, None, False)
2✔
70

71
    def wait_0(self, event: _Pywin32Event) -> None:
21✔
72
        self.win32event.WaitForSingleObject(event.handle, 0)
2✔
73

74
    def wait_inf(self, event: _Pywin32Event) -> None:
21✔
75
        self.win32event.WaitForSingleObject(
2✔
76
            event.handle,
77
            self.win32event.INFINITE,
78
        )
79

80

81
PYWIN32: Optional[_Pywin32] = None
21✔
82
if sys.platform == "win32" and sys.version_info < (3, 11):
21✔
83
    try:
4✔
84
        PYWIN32 = _Pywin32()
4✔
85
    except ImportError:
2✔
86
        pass
2✔
87

88

89
class CyclicTask(abc.ABC):
21✔
90
    """
21✔
91
    Abstract Base for all cyclic tasks.
92
    """
93

94
    @abc.abstractmethod
21✔
95
    def stop(self) -> None:
21✔
96
        """Cancel this periodic task.
97

98
        :raises ~can.exceptions.CanError:
99
            If stop is called on an already stopped task.
100
        """
101

102

103
class CyclicSendTaskABC(CyclicTask, abc.ABC):
21✔
104
    """
21✔
105
    Message send task with defined period
106
    """
107

108
    def __init__(
21✔
109
        self, messages: Union[Sequence[Message], Message], period: float
110
    ) -> None:
111
        """
112
        :param messages:
113
            The messages to be sent periodically.
114
        :param period: The rate in seconds at which to send the messages.
115

116
        :raises ValueError: If the given messages are invalid
117
        """
118
        messages = self._check_and_convert_messages(messages)
21✔
119

120
        # Take the Arbitration ID of the first element
121
        self.arbitration_id = messages[0].arbitration_id
21✔
122
        self.period = period
21✔
123
        self.period_ns = round(period * 1e9)
21✔
124
        self.messages = messages
21✔
125

126
    @staticmethod
21✔
127
    def _check_and_convert_messages(
21✔
128
        messages: Union[Sequence[Message], Message],
129
    ) -> tuple[Message, ...]:
130
        """Helper function to convert a Message or Sequence of messages into a
131
        tuple, and raises an error when the given value is invalid.
132

133
        Performs error checking to ensure that all Messages have the same
134
        arbitration ID and channel.
135

136
        Should be called when the cyclic task is initialized.
137

138
        :raises ValueError: If the given messages are invalid
139
        """
140
        if not isinstance(messages, (list, tuple)):
21✔
141
            if isinstance(messages, Message):
5✔
142
                messages = [messages]
5✔
143
            else:
144
                raise ValueError("Must be either a list, tuple, or a Message")
5✔
145
        if not messages:
21✔
UNCOV
146
            raise ValueError("Must be at least a list or tuple of length 1")
×
147
        messages = tuple(messages)
21✔
148

149
        all_same_id = all(
21✔
150
            message.arbitration_id == messages[0].arbitration_id for message in messages
151
        )
152
        if not all_same_id:
21✔
153
            raise ValueError("All Arbitration IDs should be the same")
5✔
154

155
        all_same_channel = all(
21✔
156
            message.channel == messages[0].channel for message in messages
157
        )
158
        if not all_same_channel:
21✔
UNCOV
159
            raise ValueError("All Channel IDs should be the same")
×
160

161
        return messages
21✔
162

163

164
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
21✔
165
    def __init__(
21✔
166
        self,
167
        messages: Union[Sequence[Message], Message],
168
        period: float,
169
        duration: Optional[float],
170
    ) -> None:
171
        """Message send task with a defined duration and period.
172

173
        :param messages:
174
            The messages to be sent periodically.
175
        :param period: The rate in seconds at which to send the messages.
176
        :param duration:
177
            Approximate duration in seconds to continue sending messages. If
178
            no duration is provided, the task will continue indefinitely.
179

180
        :raises ValueError: If the given messages are invalid
181
        """
182
        super().__init__(messages, period)
21✔
183
        self.duration = duration
21✔
184
        self.end_time: Optional[float] = None
21✔
185

186

187
class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
21✔
188
    """Adds support for restarting a stopped cyclic task"""
21✔
189

190
    @abc.abstractmethod
21✔
191
    def start(self) -> None:
21✔
192
        """Restart a stopped periodic task."""
193

194

195
class ModifiableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
21✔
196
    def _check_modified_messages(self, messages: tuple[Message, ...]) -> None:
21✔
197
        """Helper function to perform error checking when modifying the data in
198
        the cyclic task.
199

200
        Performs error checking to ensure the arbitration ID and the number of
201
        cyclic messages hasn't changed.
202

203
        Should be called when modify_data is called in the cyclic task.
204

205
        :raises ValueError: If the given messages are invalid
206
        """
207
        if len(self.messages) != len(messages):
5✔
208
            raise ValueError(
5✔
209
                "The number of new cyclic messages to be sent must be equal to "
210
                "the number of messages originally specified for this task"
211
            )
212
        if self.arbitration_id != messages[0].arbitration_id:
5✔
213
            raise ValueError(
5✔
214
                "The arbitration ID of new cyclic messages cannot be changed "
215
                "from when the task was created"
216
            )
217

218
    def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
21✔
219
        """Update the contents of the periodically sent messages, without
220
        altering the timing.
221

222
        :param messages:
223
            The messages with the new :attr:`Message.data`.
224

225
            Note: The arbitration ID cannot be changed.
226

227
            Note: The number of new cyclic messages to be sent must be equal
228
            to the original number of messages originally specified for this
229
            task.
230

231
        :raises ValueError: If the given messages are invalid
232
        """
UNCOV
233
        messages = self._check_and_convert_messages(messages)
×
234
        self._check_modified_messages(messages)
×
235

UNCOV
236
        self.messages = messages
×
237

238

239
class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
21✔
240
    """A Cyclic send task that supports switches send frequency after a set time."""
21✔
241

242
    def __init__(
21✔
243
        self,
244
        channel: typechecking.Channel,
245
        messages: Union[Sequence[Message], Message],
246
        count: int,  # pylint: disable=unused-argument
247
        initial_period: float,  # pylint: disable=unused-argument
248
        subsequent_period: float,
249
    ) -> None:
250
        """
251
        Transmits a message `count` times at `initial_period` then continues to
252
        transmit messages at `subsequent_period`.
253

254
        :param channel: See interface specific documentation.
255
        :param messages:
256
        :param count:
257
        :param initial_period:
258
        :param subsequent_period:
259

260
        :raises ValueError: If the given messages are invalid
261
        """
UNCOV
262
        super().__init__(messages, subsequent_period)
×
263
        self._channel = channel
×
264

265

266
class ThreadBasedCyclicSendTask(
21✔
267
    LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
268
):
269
    """Fallback cyclic send task using daemon thread."""
21✔
270

271
    def __init__(
21✔
272
        self,
273
        bus: "BusABC",
274
        lock: threading.Lock,
275
        messages: Union[Sequence[Message], Message],
276
        period: float,
277
        duration: Optional[float] = None,
278
        on_error: Optional[Callable[[Exception], bool]] = None,
279
        autostart: bool = True,
280
        modifier_callback: Optional[Callable[[Message], None]] = None,
281
    ) -> None:
282
        """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.
283

284
        The `on_error` is called if any error happens on `bus` while sending `messages`.
285
        If `on_error` present, and returns ``False`` when invoked, thread is
286
        stopped immediately, otherwise, thread continuously tries to send `messages`
287
        ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately
288
        on error.
289

290
        :param on_error: The callable that accepts an exception if any
291
                         error happened on a `bus` while sending `messages`,
292
                         it shall return either ``True`` or ``False`` depending
293
                         on desired behaviour of `ThreadBasedCyclicSendTask`.
294

295
        :raises ValueError: If the given messages are invalid
296
        """
297
        super().__init__(messages, period, duration)
21✔
298
        self.bus = bus
21✔
299
        self.send_lock = lock
21✔
300
        self.stopped = True
21✔
301
        self.thread: Optional[threading.Thread] = None
21✔
302
        self.on_error = on_error
21✔
303
        self.modifier_callback = modifier_callback
21✔
304

305
        self.period_ms = int(round(period * 1000, 0))
21✔
306

307
        self.event: Optional[_Pywin32Event] = None
21✔
308
        if PYWIN32:
21✔
309
            if self.period_ms == 0:
2✔
310
                # A period of 0 would mean that the timer is signaled only once
UNCOV
311
                raise ValueError("The period cannot be smaller than 0.001 (1 ms)")
×
312
            self.event = PYWIN32.create_timer()
2✔
313
        elif (
19✔
314
            sys.platform == "win32"
315
            and sys.version_info < (3, 11)
316
            and platform.python_implementation() == "CPython"
317
        ):
UNCOV
318
            warnings.warn(
×
319
                f"{self.__class__.__name__} may achieve better timing accuracy "
320
                f"if the 'pywin32' package is installed.",
321
                RuntimeWarning,
322
                stacklevel=1,
323
            )
324

325
        if autostart:
21✔
326
            self.start()
21✔
327

328
    def stop(self) -> None:
21✔
329
        self.stopped = True
21✔
330
        if self.event and PYWIN32:
21✔
331
            # Reset and signal any pending wait by setting the timer to 0
332
            PYWIN32.stop_timer(self.event)
2✔
333

334
    def start(self) -> None:
21✔
335
        self.stopped = False
21✔
336
        if self.thread is None or not self.thread.is_alive():
21✔
337
            name = f"Cyclic send task for 0x{self.messages[0].arbitration_id:X}"
21✔
338
            self.thread = threading.Thread(target=self._run, name=name)
21✔
339
            self.thread.daemon = True
21✔
340

341
            self.end_time: Optional[float] = (
21✔
342
                time.perf_counter() + self.duration if self.duration else None
343
            )
344

345
            if self.event and PYWIN32:
21✔
346
                PYWIN32.set_timer(self.event, self.period_ms)
2✔
347

348
            self.thread.start()
21✔
349

350
    def _run(self) -> None:
21✔
351
        msg_index = 0
21✔
352
        msg_due_time_ns = time.perf_counter_ns()
21✔
353

354
        if self.event and PYWIN32:
21✔
355
            # Make sure the timer is non-signaled before entering the loop
356
            PYWIN32.wait_0(self.event)
2✔
357

358
        while not self.stopped:
21✔
359
            if self.end_time is not None and time.perf_counter() >= self.end_time:
21✔
360
                self.stop()
21✔
361
                break
21✔
362

363
            try:
21✔
364
                if self.modifier_callback is not None:
21✔
365
                    self.modifier_callback(self.messages[msg_index])
21✔
366
                with self.send_lock:
21✔
367
                    # Prevent calling bus.send from multiple threads
368
                    self.bus.send(self.messages[msg_index])
21✔
UNCOV
369
            except Exception as exc:  # pylint: disable=broad-except
×
370
                log.exception(exc)
×
371

372
                # stop if `on_error` callback was not given
UNCOV
373
                if self.on_error is None:
×
374
                    self.stop()
×
375
                    raise exc
×
376

377
                # stop if `on_error` returns False
UNCOV
378
                if not self.on_error(exc):
×
379
                    self.stop()
×
380
                    break
×
381

382
            if not self.event:
21✔
383
                msg_due_time_ns += self.period_ns
19✔
384

385
            msg_index = (msg_index + 1) % len(self.messages)
21✔
386

387
            if self.event and PYWIN32:
21✔
388
                PYWIN32.wait_inf(self.event)
2✔
389
            else:
390
                # Compensate for the time it takes to send the message
391
                delay_ns = msg_due_time_ns - time.perf_counter_ns()
19✔
392
                if delay_ns > 0:
19✔
393
                    time.sleep(delay_ns / NANOSECONDS_IN_SECOND)
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc