• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 16438269446

22 Jul 2025 07:57AM UTC coverage: 71.171%. First build
16438269446

Pull #1957

github

web-flow
Merge cb8fa8f76 into c4808b744
Pull Request #1957: Convert BusState to enum when read with configparser

3 of 5 new or added lines in 1 file covered. (60.0%)

7784 of 10937 relevant lines covered (71.17%)

13.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.82
/can/util.py
1
"""
21✔
2
Utilities and configuration file parsing.
3
"""
4

5
import contextlib
21✔
6
import copy
21✔
7
import functools
21✔
8
import json
21✔
9
import logging
21✔
10
import os
21✔
11
import os.path
21✔
12
import platform
21✔
13
import re
21✔
14
import warnings
21✔
15
from collections.abc import Iterable
21✔
16
from configparser import ConfigParser
21✔
17
from time import get_clock_info, perf_counter, time
21✔
18
from typing import (
21✔
19
    Any,
20
    Callable,
21
    Optional,
22
    TypeVar,
23
    Union,
24
    cast,
25
)
26

27
from typing_extensions import ParamSpec
21✔
28

29
import can
21✔
30

31
from . import typechecking
21✔
32
from .bit_timing import BitTiming, BitTimingFd
21✔
33
from .exceptions import CanInitializationError, CanInterfaceNotImplementedError
21✔
34
from .interfaces import VALID_INTERFACES
21✔
35

36
log = logging.getLogger("can.util")
21✔
37

38
# List of valid data lengths for a CAN FD message
39
CAN_FD_DLC = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
21✔
40

41
REQUIRED_KEYS = ["interface", "channel"]
21✔
42

43

44
CONFIG_FILES = ["~/can.conf"]
21✔
45

46
if platform.system() in ("Linux", "Darwin"):
21✔
47
    CONFIG_FILES.extend(["/etc/can.conf", "~/.can", "~/.canrc"])
14✔
48
elif platform.system() == "Windows" or platform.python_implementation() == "IronPython":
7✔
49
    CONFIG_FILES.extend(["can.ini", os.path.join(os.getenv("APPDATA", ""), "can.ini")])
7✔
50

51

52
def load_file_config(
21✔
53
    path: Optional[typechecking.AcceptedIOType] = None, section: str = "default"
54
) -> dict[str, str]:
55
    """
56
    Loads configuration from file with following content::
57

58
        [default]
59
        interface = socketcan
60
        channel = can0
61

62
    :param path:
63
        path to config file. If not specified, several sensible
64
        default locations are tried depending on platform.
65
    :param section:
66
        name of the section to read configuration from.
67
    """
68
    config = ConfigParser()
21✔
69

70
    # make sure to not transform the entries such that capitalization is preserved
71
    config.optionxform = lambda optionstr: optionstr  # type: ignore[method-assign]
21✔
72

73
    if path is None:
21✔
74
        config.read([os.path.expanduser(path) for path in CONFIG_FILES])
21✔
75
    else:
76
        config.read(path)
21✔
77

78
    _config: dict[str, str] = {}
21✔
79

80
    if config.has_section(section):
21✔
81
        _config.update(config.items(section))
21✔
82

83
    return _config
21✔
84

85

86
def load_environment_config(context: Optional[str] = None) -> dict[str, str]:
21✔
87
    """
88
    Loads config dict from environmental variables (if set):
89

90
    * CAN_INTERFACE
91
    * CAN_CHANNEL
92
    * CAN_BITRATE
93
    * CAN_CONFIG
94

95
    if context is supplied, "_{context}" is appended to the environment
96
    variable name we will look at. For example if context="ABC":
97

98
    * CAN_INTERFACE_ABC
99
    * CAN_CHANNEL_ABC
100
    * CAN_BITRATE_ABC
101
    * CAN_CONFIG_ABC
102

103
    """
104
    mapper = {
21✔
105
        "interface": "CAN_INTERFACE",
106
        "channel": "CAN_CHANNEL",
107
        "bitrate": "CAN_BITRATE",
108
    }
109

110
    context_suffix = f"_{context}" if context else ""
21✔
111
    can_config_key = f"CAN_CONFIG{context_suffix}"
21✔
112
    config: dict[str, str] = json.loads(os.environ.get(can_config_key, "{}"))
21✔
113

114
    for key, val in mapper.items():
21✔
115
        config_option = os.environ.get(val + context_suffix, None)
21✔
116
        if config_option:
21✔
117
            config[key] = config_option
21✔
118

119
    return config
21✔
120

121

122
def load_config(
21✔
123
    path: Optional[typechecking.AcceptedIOType] = None,
124
    config: Optional[dict[str, Any]] = None,
125
    context: Optional[str] = None,
126
) -> typechecking.BusConfig:
127
    """
128
    Returns a dict with configuration details which is loaded from (in this order):
129

130
    - config
131
    - can.rc
132
    - Environment variables CAN_INTERFACE, CAN_CHANNEL, CAN_BITRATE
133
    - Config files ``/etc/can.conf`` or ``~/.can`` or ``~/.canrc``
134
      where the latter may add or replace values of the former.
135

136
    Interface can be any of the strings from ``can.VALID_INTERFACES`` for example:
137
    kvaser, socketcan, pcan, usb2can, ixxat, nican, virtual.
138

139
    .. note::
140

141
            The key ``bustype`` is copied to ``interface`` if that one is missing
142
            and does never appear in the result.
143

144
    :param path:
145
        Optional path to config file.
146

147
    :param config:
148
        A dict which may set the 'interface', and/or the 'channel', or neither.
149
        It may set other values that are passed through.
150

151
    :param context:
152
        Extra 'context' pass to config sources. This can be used to section
153
        other than 'default' in the configuration file.
154

155
    :return:
156
        A config dictionary that should contain 'interface' & 'channel'::
157

158
            {
159
                'interface': 'python-can backend interface to use',
160
                'channel': 'default channel to use',
161
                # possibly more
162
            }
163

164
        Note ``None`` will be used if all the options are exhausted without
165
        finding a value.
166

167
        All unused values are passed from ``config`` over to this.
168

169
    :raises:
170
        CanInterfaceNotImplementedError if the ``interface`` name isn't recognized
171
    """
172

173
    # Start with an empty dict to apply filtering to all sources
174
    given_config = config or {}
21✔
175
    config = {}
21✔
176

177
    # Use the given dict for default values
178
    config_sources = cast(
21✔
179
        "Iterable[Union[dict[str, Any], Callable[[Any], dict[str, Any]]]]",
180
        [
181
            given_config,
182
            can.rc,
183
            lambda _context: load_environment_config(  # pylint: disable=unnecessary-lambda
184
                _context
185
            ),
186
            lambda _context: load_environment_config(),
187
            lambda _context: load_file_config(path, _context),
188
            lambda _context: load_file_config(path),
189
        ],
190
    )
191

192
    # Slightly complex here to only search for the file config if required
193
    for _cfg in config_sources:
21✔
194
        cfg = _cfg(context) if callable(_cfg) else _cfg
21✔
195
        # remove legacy operator (and copy to interface if not already present)
196
        if "bustype" in cfg:
21✔
197
            if "interface" not in cfg or not cfg["interface"]:
×
198
                cfg["interface"] = cfg["bustype"]
×
199
            del cfg["bustype"]
×
200
        # copy all new parameters
201
        for key, val in cfg.items():
21✔
202
            if key not in config:
21✔
203
                if isinstance(val, str):
21✔
204
                    config[key] = cast_from_string(val)
21✔
205
                else:
206
                    config[key] = cfg[key]
21✔
207

208
    bus_config = _create_bus_config(config)
21✔
209
    can.log.debug("can config: %s", bus_config)
21✔
210
    return bus_config
21✔
211

212

213
def _create_bus_config(config: dict[str, Any]) -> typechecking.BusConfig:
21✔
214
    """Validates some config values, performs compatibility mappings and creates specific
215
    structures (e.g. for bit timings).
216

217
    :param config: The raw config as specified by the user
218
    :return: A config that can be used by a :class:`~can.BusABC`
219
    :raises NotImplementedError: if the ``interface`` is unknown
220
    """
221
    # substitute None for all values not found
222
    for key in REQUIRED_KEYS:
21✔
223
        if key not in config:
21✔
224
            config[key] = None
21✔
225

226
    if config["interface"] not in VALID_INTERFACES:
21✔
227
        raise CanInterfaceNotImplementedError(
×
228
            f'Unknown interface type "{config["interface"]}"'
229
        )
230
    if "port" in config:
21✔
231
        # convert port to integer if necessary
232
        if isinstance(config["port"], int):
21✔
233
            port = config["port"]
×
234
        elif isinstance(config["port"], str):
21✔
235
            if config["port"].isnumeric():
21✔
236
                config["port"] = port = int(config["port"])
21✔
237
            else:
238
                raise ValueError("Port config must be a number!")
21✔
239
        else:
240
            raise TypeError("Port config must be string or integer!")
21✔
241

242
        if not 0 < port < 65535:
21✔
243
            raise ValueError("Port config must be inside 0-65535 range!")
21✔
244

245
    if "timing" not in config:
21✔
246
        if timing := _dict2timing(config):
21✔
247
            config["timing"] = timing
21✔
248

249
    if "fd" in config:
21✔
250
        config["fd"] = config["fd"] not in (0, False)
21✔
251

252
    if "state" in config and not isinstance(config["state"], can.BusState):
21✔
253
        try:
21✔
254
            config["state"] = can.BusState[config["state"]]
21✔
NEW
255
        except KeyError as e:
×
NEW
256
            raise ValueError("State config not valid!") from e
×
257

258
    return cast("typechecking.BusConfig", config)
21✔
259

260

261
def _dict2timing(data: dict[str, Any]) -> Union[BitTiming, BitTimingFd, None]:
21✔
262
    """Try to instantiate a :class:`~can.BitTiming` or :class:`~can.BitTimingFd` from
263
    a dictionary. Return `None` if not possible."""
264

265
    with contextlib.suppress(ValueError, TypeError):
21✔
266
        if set(typechecking.BitTimingFdDict.__annotations__).issubset(data):
21✔
267
            return BitTimingFd(
21✔
268
                **{
269
                    key: int(data[key])
270
                    for key in typechecking.BitTimingFdDict.__annotations__
271
                },
272
                strict=False,
273
            )
274
        elif set(typechecking.BitTimingDict.__annotations__).issubset(data):
21✔
275
            return BitTiming(
21✔
276
                **{
277
                    key: int(data[key])
278
                    for key in typechecking.BitTimingDict.__annotations__
279
                },
280
                strict=False,
281
            )
282

283
    return None
21✔
284

285

286
def set_logging_level(level_name: str) -> None:
21✔
287
    """Set the logging level for the `"can"` logger.
288

289
    :param level_name:
290
        One of: `'critical'`, `'error'`, `'warning'`, `'info'`,
291
        `'debug'`, `'subdebug'`, or the value :obj:`None` (=default).
292
        Defaults to `'debug'`.
293
    """
294
    can_logger = logging.getLogger("can")
21✔
295

296
    try:
21✔
297
        can_logger.setLevel(getattr(logging, level_name.upper()))
21✔
298
    except AttributeError:
×
299
        can_logger.setLevel(logging.DEBUG)
×
300
    log.debug("Logging set to %s", level_name)
21✔
301

302

303
def len2dlc(length: int) -> int:
21✔
304
    """Calculate the DLC from data length.
305

306
    :param length: Length in number of bytes (0-64)
307

308
    :returns: DLC (0-15)
309
    """
310
    if length <= 8:
21✔
311
        return length
21✔
312
    for dlc, nof_bytes in enumerate(CAN_FD_DLC):
21✔
313
        if nof_bytes >= length:
21✔
314
            return dlc
21✔
315
    return 15
×
316

317

318
def dlc2len(dlc: int) -> int:
21✔
319
    """Calculate the data length from DLC.
320

321
    :param dlc: DLC (0-15)
322

323
    :returns: Data length in number of bytes (0-64)
324
    """
325
    return CAN_FD_DLC[dlc] if dlc <= 15 else 64
21✔
326

327

328
def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]:
21✔
329
    """Try to convert the channel to an integer.
330

331
    :param channel:
332
        Channel string (e.g. `"can0"`, `"CAN1"`) or an integer
333

334
    :returns: Channel integer or ``None`` if unsuccessful
335
    """
336
    if isinstance(channel, int):
21✔
337
        return channel
21✔
338
    if isinstance(channel, str):
21✔
339
        match = re.match(r".*?(\d+)$", channel)
21✔
340
        if match:
21✔
341
            return int(match.group(1))
21✔
342
    return None
21✔
343

344

345
P1 = ParamSpec("P1")
21✔
346
T1 = TypeVar("T1")
21✔
347

348

349
def deprecated_args_alias(
21✔
350
    deprecation_start: str,
351
    deprecation_end: Optional[str] = None,
352
    **aliases: Optional[str],
353
) -> Callable[[Callable[P1, T1]], Callable[P1, T1]]:
354
    """Allows to rename/deprecate a function kwarg(s) and optionally
355
    have the deprecated kwarg(s) set as alias(es)
356

357
    Example::
358

359
        @deprecated_args_alias("1.2.0", oldArg="new_arg", anotherOldArg="another_new_arg")
360
        def library_function(new_arg, another_new_arg):
361
            pass
362

363
        @deprecated_args_alias(
364
            deprecation_start="1.2.0",
365
            deprecation_end="3.0.0",
366
            oldArg="new_arg",
367
            obsoleteOldArg=None,
368
        )
369
        def library_function(new_arg):
370
            pass
371

372
    :param deprecation_start:
373
        The *python-can* version, that introduced the :class:`DeprecationWarning`.
374
    :param deprecation_end:
375
        The *python-can* version, that marks the end of the deprecation period.
376
    :param aliases:
377
        keyword arguments, that map the deprecated argument names
378
        to the new argument names or ``None``.
379

380
    """
381

382
    def deco(f: Callable[P1, T1]) -> Callable[P1, T1]:
21✔
383
        @functools.wraps(f)
21✔
384
        def wrapper(*args: P1.args, **kwargs: P1.kwargs) -> T1:
21✔
385
            _rename_kwargs(
21✔
386
                func_name=f.__name__,
387
                start=deprecation_start,
388
                end=deprecation_end,
389
                kwargs=kwargs,
390
                aliases=aliases,
391
            )
392
            return f(*args, **kwargs)
21✔
393

394
        return wrapper
21✔
395

396
    return deco
21✔
397

398

399
def _rename_kwargs(
21✔
400
    func_name: str,
401
    start: str,
402
    end: Optional[str],
403
    kwargs: dict[str, Any],
404
    aliases: dict[str, Optional[str]],
405
) -> None:
406
    """Helper function for `deprecated_args_alias`"""
407
    for alias, new in aliases.items():
21✔
408
        if alias in kwargs:
21✔
409
            deprecation_notice = (
21✔
410
                f"The '{alias}' argument is deprecated since python-can v{start}"
411
            )
412
            if end:
21✔
413
                deprecation_notice += (
21✔
414
                    f", and scheduled for removal in python-can v{end}"
415
                )
416
            deprecation_notice += "."
21✔
417

418
            value = kwargs.pop(alias)
21✔
419
            if new is not None:
21✔
420
                deprecation_notice += f" Use '{new}' instead."
21✔
421

422
                if new in kwargs:
21✔
423
                    raise TypeError(
21✔
424
                        f"{func_name} received both '{alias}' (deprecated) and '{new}'."
425
                    )
426
                kwargs[new] = value
21✔
427

428
            warnings.warn(deprecation_notice, DeprecationWarning, stacklevel=3)
21✔
429

430

431
T2 = TypeVar("T2", BitTiming, BitTimingFd)
21✔
432

433

434
def check_or_adjust_timing_clock(timing: T2, valid_clocks: Iterable[int]) -> T2:
21✔
435
    """Adjusts the given timing instance to have an *f_clock* value that is within the
436
    allowed values specified by *valid_clocks*. If the *f_clock* value of timing is
437
    already within *valid_clocks*, then *timing* is returned unchanged.
438

439
    :param timing:
440
        The :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance to adjust.
441
    :param valid_clocks:
442
        An iterable of integers representing the valid *f_clock* values that the timing instance
443
        can be changed to. The order of the values in *valid_clocks* determines the priority in
444
        which they are tried, with earlier values being tried before later ones.
445
    :return:
446
        A new :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance with an
447
        *f_clock* value within *valid_clocks*.
448
    :raises ~can.exceptions.CanInitializationError:
449
        If no compatible *f_clock* value can be found within *valid_clocks*.
450
    """
451
    if timing.f_clock in valid_clocks:
21✔
452
        # create a copy so this function always returns a new instance
453
        return copy.deepcopy(timing)
21✔
454

455
    for clock in valid_clocks:
21✔
456
        try:
21✔
457
            # Try to use a different f_clock
458
            adjusted_timing = timing.recreate_with_f_clock(clock)
21✔
459
            warnings.warn(
21✔
460
                f"Adjusted f_clock in {timing.__class__.__name__} from "
461
                f"{timing.f_clock} to {adjusted_timing.f_clock}",
462
                stacklevel=2,
463
            )
464
            return adjusted_timing
21✔
465
        except ValueError:
21✔
466
            pass
21✔
467

468
    raise CanInitializationError(
21✔
469
        f"The specified timing.f_clock value {timing.f_clock} "
470
        f"doesn't match any of the allowed device f_clock values: "
471
        f"{', '.join([str(f) for f in valid_clocks])}"
472
    ) from None
473

474

475
def time_perfcounter_correlation() -> tuple[float, float]:
21✔
476
    """Get the `perf_counter` value nearest to when time.time() is updated
477

478
    Computed if the default timer used by `time.time` on this platform has a resolution
479
    higher than 10μs, otherwise the current time and perf_counter is directly returned.
480
    This was chosen as typical timer resolution on Linux/macOS is ~1μs, and the Windows
481
    platform can vary from ~500μs to 10ms.
482

483
    Note this value is based on when `time.time()` is observed to update from Python,
484
    it is not directly returned by the operating system.
485

486
    :returns:
487
        (t, performance_counter) time.time value and time.perf_counter value when the time.time
488
        is updated
489

490
    """
491

492
    # use this if the resolution is higher than 10us
493
    if get_clock_info("time").resolution > 1e-5:
6✔
494
        t0 = time()
6✔
495
        while True:
4✔
496
            t1, performance_counter = time(), perf_counter()
6✔
497
            if t1 != t0:
6✔
498
                break
6✔
499
    else:
500
        return time(), perf_counter()
×
501
    return t1, performance_counter
6✔
502

503

504
def cast_from_string(string_val: str) -> Union[str, int, float, bool]:
21✔
505
    """Perform trivial type conversion from :class:`str` values.
506

507
    :param string_val:
508
        the string, that shall be converted
509
    """
510
    if re.match(r"^[-+]?\d+$", string_val):
21✔
511
        # value is integer
512
        return int(string_val)
21✔
513

514
    if re.match(r"^[-+]?\d*\.\d+(?:e[-+]?\d+)?$", string_val):
21✔
515
        # value is float
516
        return float(string_val)
21✔
517

518
    if re.match(r"^(?:True|False)$", string_val, re.IGNORECASE):
21✔
519
        # value is bool
520
        return string_val.lower() == "true"
21✔
521

522
    # value is string
523
    return string_val
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc