• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16362801995

18 Jul 2025 05:17AM UTC coverage: 70.862% (+0.1%) from 70.763%
16362801995

Pull #1920

github

web-flow
Merge f9e8a3c29 into 958fc64ed
Pull Request #1920: add FD support to slcan according to CANable 2.0 impementation

6 of 45 new or added lines in 1 file covered. (13.33%)

838 existing lines in 35 files now uncovered.

7770 of 10965 relevant lines covered (70.86%)

13.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.8
/can/util.py
1
"""
21✔
2
Utilities and configuration file parsing.
3
"""
4

5
import contextlib
21✔
6
import copy
21✔
7
import functools
21✔
8
import json
21✔
9
import logging
21✔
10
import os
21✔
11
import os.path
21✔
12
import platform
21✔
13
import re
21✔
14
import warnings
21✔
15
from collections.abc import Iterable
21✔
16
from configparser import ConfigParser
21✔
17
from time import get_clock_info, perf_counter, time
21✔
18
from typing import (
21✔
19
    Any,
20
    Callable,
21
    Optional,
22
    TypeVar,
23
    Union,
24
    cast,
25
)
26

27
from typing_extensions import ParamSpec
21✔
28

29
import can
21✔
30

31
from . import typechecking
21✔
32
from .bit_timing import BitTiming, BitTimingFd
21✔
33
from .exceptions import CanInitializationError, CanInterfaceNotImplementedError
21✔
34
from .interfaces import VALID_INTERFACES
21✔
35

36
log = logging.getLogger("can.util")
21✔
37

38
# List of valid data lengths for a CAN FD message
39
CAN_FD_DLC = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
21✔
40

41
REQUIRED_KEYS = ["interface", "channel"]
21✔
42

43

44
CONFIG_FILES = ["~/can.conf"]
21✔
45

46
if platform.system() in ("Linux", "Darwin"):
21✔
47
    CONFIG_FILES.extend(["/etc/can.conf", "~/.can", "~/.canrc"])
14✔
48
elif platform.system() == "Windows" or platform.python_implementation() == "IronPython":
7✔
49
    CONFIG_FILES.extend(["can.ini", os.path.join(os.getenv("APPDATA", ""), "can.ini")])
7✔
50

51

52
def load_file_config(
21✔
53
    path: Optional[typechecking.AcceptedIOType] = None, section: str = "default"
54
) -> dict[str, str]:
55
    """
56
    Loads configuration from file with following content::
57

58
        [default]
59
        interface = socketcan
60
        channel = can0
61

62
    :param path:
63
        path to config file. If not specified, several sensible
64
        default locations are tried depending on platform.
65
    :param section:
66
        name of the section to read configuration from.
67
    """
68
    config = ConfigParser()
21✔
69

70
    # make sure to not transform the entries such that capitalization is preserved
71
    config.optionxform = lambda optionstr: optionstr  # type: ignore[method-assign]
21✔
72

73
    if path is None:
21✔
74
        config.read([os.path.expanduser(path) for path in CONFIG_FILES])
21✔
75
    else:
76
        config.read(path)
21✔
77

78
    _config: dict[str, str] = {}
21✔
79

80
    if config.has_section(section):
21✔
81
        _config.update(config.items(section))
21✔
82

83
    return _config
21✔
84

85

86
def load_environment_config(context: Optional[str] = None) -> dict[str, str]:
21✔
87
    """
88
    Loads config dict from environmental variables (if set):
89

90
    * CAN_INTERFACE
91
    * CAN_CHANNEL
92
    * CAN_BITRATE
93
    * CAN_CONFIG
94

95
    if context is supplied, "_{context}" is appended to the environment
96
    variable name we will look at. For example if context="ABC":
97

98
    * CAN_INTERFACE_ABC
99
    * CAN_CHANNEL_ABC
100
    * CAN_BITRATE_ABC
101
    * CAN_CONFIG_ABC
102

103
    """
104
    mapper = {
21✔
105
        "interface": "CAN_INTERFACE",
106
        "channel": "CAN_CHANNEL",
107
        "bitrate": "CAN_BITRATE",
108
    }
109

110
    context_suffix = f"_{context}" if context else ""
21✔
111
    can_config_key = f"CAN_CONFIG{context_suffix}"
21✔
112
    config: dict[str, str] = json.loads(os.environ.get(can_config_key, "{}"))
21✔
113

114
    for key, val in mapper.items():
21✔
115
        config_option = os.environ.get(val + context_suffix, None)
21✔
116
        if config_option:
21✔
117
            config[key] = config_option
21✔
118

119
    return config
21✔
120

121

122
def load_config(
21✔
123
    path: Optional[typechecking.AcceptedIOType] = None,
124
    config: Optional[dict[str, Any]] = None,
125
    context: Optional[str] = None,
126
) -> typechecking.BusConfig:
127
    """
128
    Returns a dict with configuration details which is loaded from (in this order):
129

130
    - config
131
    - can.rc
132
    - Environment variables CAN_INTERFACE, CAN_CHANNEL, CAN_BITRATE
133
    - Config files ``/etc/can.conf`` or ``~/.can`` or ``~/.canrc``
134
      where the latter may add or replace values of the former.
135

136
    Interface can be any of the strings from ``can.VALID_INTERFACES`` for example:
137
    kvaser, socketcan, pcan, usb2can, ixxat, nican, virtual.
138

139
    .. note::
140

141
            The key ``bustype`` is copied to ``interface`` if that one is missing
142
            and does never appear in the result.
143

144
    :param path:
145
        Optional path to config file.
146

147
    :param config:
148
        A dict which may set the 'interface', and/or the 'channel', or neither.
149
        It may set other values that are passed through.
150

151
    :param context:
152
        Extra 'context' pass to config sources. This can be used to section
153
        other than 'default' in the configuration file.
154

155
    :return:
156
        A config dictionary that should contain 'interface' & 'channel'::
157

158
            {
159
                'interface': 'python-can backend interface to use',
160
                'channel': 'default channel to use',
161
                # possibly more
162
            }
163

164
        Note ``None`` will be used if all the options are exhausted without
165
        finding a value.
166

167
        All unused values are passed from ``config`` over to this.
168

169
    :raises:
170
        CanInterfaceNotImplementedError if the ``interface`` name isn't recognized
171
    """
172

173
    # Start with an empty dict to apply filtering to all sources
174
    given_config = config or {}
21✔
175
    config = {}
21✔
176

177
    # Use the given dict for default values
178
    config_sources = cast(
21✔
179
        "Iterable[Union[dict[str, Any], Callable[[Any], dict[str, Any]]]]",
180
        [
181
            given_config,
182
            can.rc,
183
            lambda _context: load_environment_config(  # pylint: disable=unnecessary-lambda
184
                _context
185
            ),
186
            lambda _context: load_environment_config(),
187
            lambda _context: load_file_config(path, _context),
188
            lambda _context: load_file_config(path),
189
        ],
190
    )
191

192
    # Slightly complex here to only search for the file config if required
193
    for _cfg in config_sources:
21✔
194
        cfg = _cfg(context) if callable(_cfg) else _cfg
21✔
195
        # remove legacy operator (and copy to interface if not already present)
196
        if "bustype" in cfg:
21✔
UNCOV
197
            if "interface" not in cfg or not cfg["interface"]:
×
UNCOV
198
                cfg["interface"] = cfg["bustype"]
×
199
            del cfg["bustype"]
×
200
        # copy all new parameters
201
        for key, val in cfg.items():
21✔
202
            if key not in config:
21✔
203
                if isinstance(val, str):
21✔
204
                    config[key] = cast_from_string(val)
21✔
205
                else:
206
                    config[key] = cfg[key]
21✔
207

208
    bus_config = _create_bus_config(config)
21✔
209
    can.log.debug("can config: %s", bus_config)
21✔
210
    return bus_config
21✔
211

212

213
def _create_bus_config(config: dict[str, Any]) -> typechecking.BusConfig:
21✔
214
    """Validates some config values, performs compatibility mappings and creates specific
215
    structures (e.g. for bit timings).
216

217
    :param config: The raw config as specified by the user
218
    :return: A config that can be used by a :class:`~can.BusABC`
219
    :raises NotImplementedError: if the ``interface`` is unknown
220
    """
221
    # substitute None for all values not found
222
    for key in REQUIRED_KEYS:
21✔
223
        if key not in config:
21✔
224
            config[key] = None
21✔
225

226
    if config["interface"] not in VALID_INTERFACES:
21✔
UNCOV
227
        raise CanInterfaceNotImplementedError(
×
228
            f'Unknown interface type "{config["interface"]}"'
229
        )
230
    if "port" in config:
21✔
231
        # convert port to integer if necessary
232
        if isinstance(config["port"], int):
21✔
UNCOV
233
            port = config["port"]
×
234
        elif isinstance(config["port"], str):
21✔
235
            if config["port"].isnumeric():
21✔
236
                config["port"] = port = int(config["port"])
21✔
237
            else:
238
                raise ValueError("Port config must be a number!")
21✔
239
        else:
240
            raise TypeError("Port config must be string or integer!")
21✔
241

242
        if not 0 < port < 65535:
21✔
243
            raise ValueError("Port config must be inside 0-65535 range!")
21✔
244

245
    if "timing" not in config:
21✔
246
        if timing := _dict2timing(config):
21✔
247
            config["timing"] = timing
21✔
248

249
    if "fd" in config:
21✔
250
        config["fd"] = config["fd"] not in (0, False)
21✔
251

252
    return cast("typechecking.BusConfig", config)
21✔
253

254

255
def _dict2timing(data: dict[str, Any]) -> Union[BitTiming, BitTimingFd, None]:
21✔
256
    """Try to instantiate a :class:`~can.BitTiming` or :class:`~can.BitTimingFd` from
257
    a dictionary. Return `None` if not possible."""
258

259
    with contextlib.suppress(ValueError, TypeError):
21✔
260
        if set(typechecking.BitTimingFdDict.__annotations__).issubset(data):
21✔
261
            return BitTimingFd(
21✔
262
                **{
263
                    key: int(data[key])
264
                    for key in typechecking.BitTimingFdDict.__annotations__
265
                },
266
                strict=False,
267
            )
268
        elif set(typechecking.BitTimingDict.__annotations__).issubset(data):
21✔
269
            return BitTiming(
21✔
270
                **{
271
                    key: int(data[key])
272
                    for key in typechecking.BitTimingDict.__annotations__
273
                },
274
                strict=False,
275
            )
276

277
    return None
21✔
278

279

280
def set_logging_level(level_name: str) -> None:
21✔
281
    """Set the logging level for the `"can"` logger.
282

283
    :param level_name:
284
        One of: `'critical'`, `'error'`, `'warning'`, `'info'`,
285
        `'debug'`, `'subdebug'`, or the value :obj:`None` (=default).
286
        Defaults to `'debug'`.
287
    """
288
    can_logger = logging.getLogger("can")
21✔
289

290
    try:
21✔
291
        can_logger.setLevel(getattr(logging, level_name.upper()))
21✔
UNCOV
292
    except AttributeError:
×
UNCOV
293
        can_logger.setLevel(logging.DEBUG)
×
294
    log.debug("Logging set to %s", level_name)
21✔
295

296

297
def len2dlc(length: int) -> int:
21✔
298
    """Calculate the DLC from data length.
299

300
    :param length: Length in number of bytes (0-64)
301

302
    :returns: DLC (0-15)
303
    """
304
    if length <= 8:
21✔
305
        return length
21✔
306
    for dlc, nof_bytes in enumerate(CAN_FD_DLC):
21✔
307
        if nof_bytes >= length:
21✔
308
            return dlc
21✔
UNCOV
309
    return 15
×
310

311

312
def dlc2len(dlc: int) -> int:
21✔
313
    """Calculate the data length from DLC.
314

315
    :param dlc: DLC (0-15)
316

317
    :returns: Data length in number of bytes (0-64)
318
    """
319
    return CAN_FD_DLC[dlc] if dlc <= 15 else 64
21✔
320

321

322
def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]:
21✔
323
    """Try to convert the channel to an integer.
324

325
    :param channel:
326
        Channel string (e.g. `"can0"`, `"CAN1"`) or an integer
327

328
    :returns: Channel integer or ``None`` if unsuccessful
329
    """
330
    if isinstance(channel, int):
21✔
331
        return channel
21✔
332
    if isinstance(channel, str):
21✔
333
        match = re.match(r".*?(\d+)$", channel)
21✔
334
        if match:
21✔
335
            return int(match.group(1))
21✔
336
    return None
21✔
337

338

339
P1 = ParamSpec("P1")
21✔
340
T1 = TypeVar("T1")
21✔
341

342

343
def deprecated_args_alias(
21✔
344
    deprecation_start: str,
345
    deprecation_end: Optional[str] = None,
346
    **aliases: Optional[str],
347
) -> Callable[[Callable[P1, T1]], Callable[P1, T1]]:
348
    """Allows to rename/deprecate a function kwarg(s) and optionally
349
    have the deprecated kwarg(s) set as alias(es)
350

351
    Example::
352

353
        @deprecated_args_alias("1.2.0", oldArg="new_arg", anotherOldArg="another_new_arg")
354
        def library_function(new_arg, another_new_arg):
355
            pass
356

357
        @deprecated_args_alias(
358
            deprecation_start="1.2.0",
359
            deprecation_end="3.0.0",
360
            oldArg="new_arg",
361
            obsoleteOldArg=None,
362
        )
363
        def library_function(new_arg):
364
            pass
365

366
    :param deprecation_start:
367
        The *python-can* version, that introduced the :class:`DeprecationWarning`.
368
    :param deprecation_end:
369
        The *python-can* version, that marks the end of the deprecation period.
370
    :param aliases:
371
        keyword arguments, that map the deprecated argument names
372
        to the new argument names or ``None``.
373

374
    """
375

376
    def deco(f: Callable[P1, T1]) -> Callable[P1, T1]:
21✔
377
        @functools.wraps(f)
21✔
378
        def wrapper(*args: P1.args, **kwargs: P1.kwargs) -> T1:
21✔
379
            _rename_kwargs(
21✔
380
                func_name=f.__name__,
381
                start=deprecation_start,
382
                end=deprecation_end,
383
                kwargs=kwargs,
384
                aliases=aliases,
385
            )
386
            return f(*args, **kwargs)
21✔
387

388
        return wrapper
21✔
389

390
    return deco
21✔
391

392

393
def _rename_kwargs(
21✔
394
    func_name: str,
395
    start: str,
396
    end: Optional[str],
397
    kwargs: dict[str, Any],
398
    aliases: dict[str, Optional[str]],
399
) -> None:
400
    """Helper function for `deprecated_args_alias`"""
401
    for alias, new in aliases.items():
21✔
402
        if alias in kwargs:
21✔
403
            deprecation_notice = (
21✔
404
                f"The '{alias}' argument is deprecated since python-can v{start}"
405
            )
406
            if end:
21✔
407
                deprecation_notice += (
21✔
408
                    f", and scheduled for removal in python-can v{end}"
409
                )
410
            deprecation_notice += "."
21✔
411

412
            value = kwargs.pop(alias)
21✔
413
            if new is not None:
21✔
414
                deprecation_notice += f" Use '{new}' instead."
21✔
415

416
                if new in kwargs:
21✔
417
                    raise TypeError(
21✔
418
                        f"{func_name} received both '{alias}' (deprecated) and '{new}'."
419
                    )
420
                kwargs[new] = value
21✔
421

422
            warnings.warn(deprecation_notice, DeprecationWarning, stacklevel=3)
21✔
423

424

425
T2 = TypeVar("T2", BitTiming, BitTimingFd)
21✔
426

427

428
def check_or_adjust_timing_clock(timing: T2, valid_clocks: Iterable[int]) -> T2:
21✔
429
    """Adjusts the given timing instance to have an *f_clock* value that is within the
430
    allowed values specified by *valid_clocks*. If the *f_clock* value of timing is
431
    already within *valid_clocks*, then *timing* is returned unchanged.
432

433
    :param timing:
434
        The :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance to adjust.
435
    :param valid_clocks:
436
        An iterable of integers representing the valid *f_clock* values that the timing instance
437
        can be changed to. The order of the values in *valid_clocks* determines the priority in
438
        which they are tried, with earlier values being tried before later ones.
439
    :return:
440
        A new :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance with an
441
        *f_clock* value within *valid_clocks*.
442
    :raises ~can.exceptions.CanInitializationError:
443
        If no compatible *f_clock* value can be found within *valid_clocks*.
444
    """
445
    if timing.f_clock in valid_clocks:
21✔
446
        # create a copy so this function always returns a new instance
447
        return copy.deepcopy(timing)
21✔
448

449
    for clock in valid_clocks:
21✔
450
        try:
21✔
451
            # Try to use a different f_clock
452
            adjusted_timing = timing.recreate_with_f_clock(clock)
21✔
453
            warnings.warn(
21✔
454
                f"Adjusted f_clock in {timing.__class__.__name__} from "
455
                f"{timing.f_clock} to {adjusted_timing.f_clock}",
456
                stacklevel=2,
457
            )
458
            return adjusted_timing
21✔
459
        except ValueError:
21✔
460
            pass
21✔
461

462
    raise CanInitializationError(
21✔
463
        f"The specified timing.f_clock value {timing.f_clock} "
464
        f"doesn't match any of the allowed device f_clock values: "
465
        f"{', '.join([str(f) for f in valid_clocks])}"
466
    ) from None
467

468

469
def time_perfcounter_correlation() -> tuple[float, float]:
21✔
470
    """Get the `perf_counter` value nearest to when time.time() is updated
471

472
    Computed if the default timer used by `time.time` on this platform has a resolution
473
    higher than 10μs, otherwise the current time and perf_counter is directly returned.
474
    This was chosen as typical timer resolution on Linux/macOS is ~1μs, and the Windows
475
    platform can vary from ~500μs to 10ms.
476

477
    Note this value is based on when `time.time()` is observed to update from Python,
478
    it is not directly returned by the operating system.
479

480
    :returns:
481
        (t, performance_counter) time.time value and time.perf_counter value when the time.time
482
        is updated
483

484
    """
485

486
    # use this if the resolution is higher than 10us
487
    if get_clock_info("time").resolution > 1e-5:
6✔
488
        t0 = time()
6✔
489
        while True:
4✔
490
            t1, performance_counter = time(), perf_counter()
6✔
491
            if t1 != t0:
6✔
492
                break
6✔
493
    else:
UNCOV
494
        return time(), perf_counter()
×
495
    return t1, performance_counter
6✔
496

497

498
def cast_from_string(string_val: str) -> Union[str, int, float, bool]:
21✔
499
    """Perform trivial type conversion from :class:`str` values.
500

501
    :param string_val:
502
        the string, that shall be converted
503
    """
504
    if re.match(r"^[-+]?\d+$", string_val):
21✔
505
        # value is integer
506
        return int(string_val)
21✔
507

508
    if re.match(r"^[-+]?\d*\.\d+(?:e[-+]?\d+)?$", string_val):
21✔
509
        # value is float
510
        return float(string_val)
21✔
511

512
    if re.match(r"^(?:True|False)$", string_val, re.IGNORECASE):
21✔
513
        # value is bool
514
        return string_val.lower() == "true"
21✔
515

516
    # value is string
517
    return string_val
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc