• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/can/interface.py
1
"""
21✔
2
This module contains the base implementation of :class:`can.BusABC` as well
3
as a list of all available backends and some implemented
4
CyclicSendTasks.
5
"""
6

7
import concurrent.futures.thread
21✔
8
import importlib
21✔
9
import logging
21✔
10
from collections.abc import Callable, Iterable
21✔
11
from typing import Any, Optional, Union, cast
21✔
12

13
from . import util
21✔
14
from .bus import BusABC
21✔
15
from .exceptions import CanInterfaceNotImplementedError
21✔
16
from .interfaces import BACKENDS
21✔
17
from .typechecking import AutoDetectedConfig, Channel
21✔
18

19
log = logging.getLogger("can.interface")
21✔
20
log_autodetect = log.getChild("detect_available_configs")
21✔
21

22

23
def _get_class_for_interface(interface: str) -> type[BusABC]:
21✔
24
    """
25
    Returns the main bus class for the given interface.
26

27
    :raises:
28
        NotImplementedError if the interface is not known
29
    :raises CanInterfaceNotImplementedError:
30
         if there was a problem while importing the interface or the bus class within that
31
    """
32
    # Find the correct backend
33
    try:
21✔
34
        module_name, class_name = BACKENDS[interface]
21✔
UNCOV
35
    except KeyError:
×
36
        raise NotImplementedError(
37
            f"CAN interface '{interface}' not supported"
38
        ) from None
39

40
    # Import the correct interface module
41
    try:
21✔
42
        module = importlib.import_module(module_name)
21✔
43
    except Exception as e:
21✔
44
        raise CanInterfaceNotImplementedError(
21✔
45
            f"Cannot import module {module_name} for CAN interface '{interface}': {e}"
46
        ) from None
47

48
    # Get the correct class
49
    try:
21✔
50
        bus_class = getattr(module, class_name)
21✔
UNCOV
51
    except Exception as e:
×
UNCOV
52
        raise CanInterfaceNotImplementedError(
×
53
            f"Cannot import class {class_name} from module {module_name} for CAN interface "
54
            f"'{interface}': {e}"
55
        ) from None
56

57
    return cast("type[BusABC]", bus_class)
21✔
58

59

60
@util.deprecated_args_alias(
21✔
61
    deprecation_start="4.2.0",
62
    deprecation_end="5.0.0",
63
    bustype="interface",
64
    context="config_context",
65
)
66
def Bus(  # noqa: N802
21✔
67
    channel: Optional[Channel] = None,
68
    interface: Optional[str] = None,
69
    config_context: Optional[str] = None,
70
    ignore_config: bool = False,
71
    **kwargs: Any,
72
) -> BusABC:
73
    """Create a new bus instance with configuration loading.
74

75
    Instantiates a CAN Bus of the given ``interface``, falls back to reading a
76
    configuration file from default locations.
77

78
    .. note::
79
        Please note that while the arguments provided to this class take precedence
80
        over any existing values from configuration, it is possible that other parameters
81
        from the configuration may be added to the bus instantiation.
82
        This could potentially have unintended consequences. To prevent this,
83
        you may use the *ignore_config* parameter to ignore any existing configurations.
84

85
    :param channel:
86
        Channel identification. Expected type is backend dependent.
87
        Set to ``None`` to let it be resolved automatically from the default
88
        :ref:`configuration`.
89

90
    :param interface:
91
        See :ref:`interface names` for a list of supported interfaces.
92
        Set to ``None`` to let it be resolved automatically from the default
93
        :ref:`configuration`.
94

95
    :param config_context:
96
        Extra 'context', that is passed to config sources.
97
        This can be used to select a section other than 'default' in the configuration file.
98

99
    :param ignore_config:
100
        If ``True``, only the given arguments will be used for the bus instantiation. Existing
101
        configuration sources will be ignored.
102

103
    :param kwargs:
104
        ``interface`` specific keyword arguments.
105

106
    :raises ~can.exceptions.CanInterfaceNotImplementedError:
107
        if the ``interface`` isn't recognized or cannot be loaded
108

109
    :raises ~can.exceptions.CanInitializationError:
110
        if the bus cannot be instantiated
111

112
    :raises ValueError:
113
        if the ``channel`` could not be determined
114
    """
115

116
    # figure out the rest of the configuration; this might raise an error
117
    if interface is not None:
21✔
118
        kwargs["interface"] = interface
21✔
119
    if channel is not None:
21✔
120
        kwargs["channel"] = channel
21✔
121

122
    if not ignore_config:
21✔
123
        kwargs = util.load_config(config=kwargs, context=config_context)
21✔
124

125
    # resolve the bus class to use for that interface
126
    cls = _get_class_for_interface(kwargs["interface"])
21✔
127

128
    # remove the "interface" key, so it doesn't get passed to the backend
129
    del kwargs["interface"]
21✔
130

131
    # make sure the bus can handle this config format
132
    channel = kwargs.pop("channel", channel)
21✔
133
    if channel is None:
21✔
134
        # Use the default channel for the backend
135
        bus = cls(**kwargs)
21✔
136
    else:
137
        bus = cls(channel, **kwargs)
21✔
138

139
    return bus
21✔
140

141

142
def detect_available_configs(
21✔
143
    interfaces: Union[None, str, Iterable[str]] = None,
144
    timeout: float = 5.0,
145
) -> list[AutoDetectedConfig]:
146
    """Detect all configurations/channels that the interfaces could
147
    currently connect with.
148

149
    This might be quite time-consuming.
150

151
    Automated configuration detection may not be implemented by
152
    every interface on every platform. This method will not raise
153
    an error in that case, but will rather return an empty list
154
    for that interface.
155

156
    :param interfaces: either
157
        - the name of an interface to be searched in as a string,
158
        - an iterable of interface names to search in, or
159
        - `None` to search in all known interfaces.
160
    :param timeout: maximum number of seconds to wait for all interface
161
        detection tasks to complete. If exceeded, any pending tasks
162
        will be cancelled, a warning will be logged, and the method
163
        will return results gathered so far.
164
    :rtype: list[dict]
165
    :return: an iterable of dicts, each suitable for usage in
166
             the constructor of :class:`can.BusABC`. Interfaces that
167
             timed out will be logged as warnings and excluded.
168
    """
169

170
    # Determine which interfaces to search
171
    if interfaces is None:
21✔
172
        interfaces = BACKENDS
21✔
173
    elif isinstance(interfaces, str):
21✔
174
        interfaces = (interfaces,)
21✔
175
    # otherwise assume iterable of strings
176

177
    # Collect detection callbacks
178
    callbacks: dict[str, Callable[[], list[AutoDetectedConfig]]] = {}
21✔
179
    for interface_keyword in interfaces:
21✔
180
        try:
21✔
181
            bus_class = _get_class_for_interface(interface_keyword)
21✔
182
            callbacks[interface_keyword] = (
21✔
183
                bus_class._detect_available_configs  # pylint: disable=protected-access
184
            )
185
        except CanInterfaceNotImplementedError:
21✔
186
            log_autodetect.debug(
21✔
187
                'interface "%s" cannot be loaded for detection of available configurations',
188
                interface_keyword,
189
            )
190

191
    result: list[AutoDetectedConfig] = []
21✔
192

193
    # Use manual executor to allow shutdown without waiting
194
    executor = concurrent.futures.ThreadPoolExecutor()
21✔
195
    try:
21✔
196
        futures_to_keyword = {
21✔
197
            executor.submit(func): kw for kw, func in callbacks.items()
198
        }
199
        done, not_done = concurrent.futures.wait(
21✔
200
            futures_to_keyword,
201
            timeout=timeout,
202
            return_when=concurrent.futures.ALL_COMPLETED,
203
        )
204
        # Log timed-out tasks
205
        if not_done:
21✔
UNCOV
206
            log_autodetect.warning(
×
207
                "Timeout (%.2fs) reached for interfaces: %s",
208
                timeout,
209
                ", ".join(sorted(futures_to_keyword[fut] for fut in not_done)),
210
            )
211
        # Process completed futures
212
        for future in done:
21✔
213
            keyword = futures_to_keyword[future]
21✔
214
            try:
21✔
215
                available = future.result()
21✔
216
            except NotImplementedError:
21✔
217
                log_autodetect.debug(
21✔
218
                    'interface "%s" does not support detection of available configurations',
219
                    keyword,
220
                )
221
            else:
222
                log_autodetect.debug(
21✔
223
                    'interface "%s" detected %i available configurations',
224
                    keyword,
225
                    len(available),
226
                )
227
                for config in available:
21✔
228
                    config.setdefault("interface", keyword)
21✔
229
                result.extend(available)
21✔
230
    finally:
231
        # shutdown immediately, do not wait for pending threads
232
        executor.shutdown(wait=False, cancel_futures=True)
21✔
233
    return result
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc