• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 15474433755

05 Jun 2025 06:21PM UTC coverage: 70.972% (+0.07%) from 70.901%
15474433755

Pull #1949

github

web-flow
Merge e6599c908 into f43bedbc6
Pull Request #1949: Add public functions for creating bus command lines options

137 of 152 new or added lines in 4 files covered. (90.13%)

1 existing line in 1 file now uncovered.

7743 of 10910 relevant lines covered (70.97%)

13.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.68
/can/cli.py
1
import argparse
21✔
2
import re
21✔
3
from collections.abc import Sequence
21✔
4
from typing import Any, Optional, Union
21✔
5

6
import can
21✔
7
from can.typechecking import CanFilter, TAdditionalCliArgs
21✔
8
from can.util import _dict2timing, cast_from_string
21✔
9

10

11
def add_bus_arguments(
21✔
12
    parser: argparse.ArgumentParser,
13
    *,
14
    filter_arg: bool = False,
15
    prefix: Optional[str] = None,
16
    group_title: Optional[str] = None,
17
) -> None:
18
    """Adds CAN bus configuration options to an argument parser.
19

20
    :param parser:
21
        The argument parser to which the options will be added.
22
    :param filter_arg:
23
        Whether to include the filter argument.
24
    :param prefix:
25
        An optional prefix for the argument names, allowing configuration of multiple buses.
26
    :param group_title:
27
        The title of the argument group. If not provided, a default title will be generated
28
        based on the prefix. For example, "bus arguments (prefix)" if a prefix is specified,
29
        or "bus arguments" otherwise.
30
    """
31
    if group_title is None:
21✔
32
        group_title = f"bus arguments ({prefix})" if prefix else "bus arguments"
21✔
33

34
    group = parser.add_argument_group(group_title)
21✔
35

36
    flags = [f"--{prefix}-channel"] if prefix else ["-c", "--channel"]
21✔
37
    dest = f"{prefix}_channel" if prefix else "channel"
21✔
38
    group.add_argument(
21✔
39
        *flags,
40
        dest=dest,
41
        default=argparse.SUPPRESS,
42
        metavar="CHANNEL",
43
        help=r"Most backend interfaces require some sort of channel. For "
44
        r"example with the serial interface the channel might be a rfcomm"
45
        r' device: "/dev/rfcomm0". With the socketcan interface valid '
46
        r'channel examples include: "can0", "vcan0".',
47
    )
48

49
    flags = [f"--{prefix}-interface"] if prefix else ["-i", "--interface"]
21✔
50
    dest = f"{prefix}_interface" if prefix else "interface"
21✔
51
    group.add_argument(
21✔
52
        *flags,
53
        dest=dest,
54
        default=argparse.SUPPRESS,
55
        choices=sorted(can.VALID_INTERFACES),
56
        help="""Specify the backend CAN interface to use. If left blank,
57
                        fall back to reading from configuration files.""",
58
    )
59

60
    flags = [f"--{prefix}-bitrate"] if prefix else ["-b", "--bitrate"]
21✔
61
    dest = f"{prefix}_bitrate" if prefix else "bitrate"
21✔
62
    group.add_argument(
21✔
63
        *flags,
64
        dest=dest,
65
        type=int,
66
        default=argparse.SUPPRESS,
67
        metavar="BITRATE",
68
        help="Bitrate to use for the CAN bus.",
69
    )
70

71
    flags = [f"--{prefix}-fd"] if prefix else ["--fd"]
21✔
72
    dest = f"{prefix}_fd" if prefix else "fd"
21✔
73
    group.add_argument(
21✔
74
        *flags,
75
        dest=dest,
76
        default=argparse.SUPPRESS,
77
        action="store_true",
78
        help="Activate CAN-FD support",
79
    )
80

81
    flags = [f"--{prefix}-data-bitrate"] if prefix else ["--data-bitrate"]
21✔
82
    dest = f"{prefix}_data_bitrate" if prefix else "data_bitrate"
21✔
83
    group.add_argument(
21✔
84
        *flags,
85
        dest=dest,
86
        type=int,
87
        default=argparse.SUPPRESS,
88
        metavar="DATA_BITRATE",
89
        help="Bitrate to use for the data phase in case of CAN-FD.",
90
    )
91

92
    flags = [f"--{prefix}-timing"] if prefix else ["--timing"]
21✔
93
    dest = f"{prefix}_timing" if prefix else "timing"
21✔
94
    group.add_argument(
21✔
95
        *flags,
96
        dest=dest,
97
        action=_BitTimingAction,
98
        nargs=argparse.ONE_OR_MORE,
99
        default=argparse.SUPPRESS,
100
        metavar="TIMING_ARG",
101
        help="Configure bit rate and bit timing. For example, use "
102
        "`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN "
103
        "or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 "
104
        "data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. "
105
        "Check the python-can documentation to verify whether your "
106
        "CAN interface supports the `timing` argument.",
107
    )
108

109
    if filter_arg:
21✔
110
        flags = [f"--{prefix}-filter"] if prefix else ["--filter"]
21✔
111
        dest = f"{prefix}_can_filters" if prefix else "can_filters"
21✔
112
        group.add_argument(
21✔
113
            *flags,
114
            dest=dest,
115
            nargs=argparse.ONE_OR_MORE,
116
            action=_CanFilterAction,
117
            default=argparse.SUPPRESS,
118
            metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
119
            help="R|Space separated CAN filters for the given CAN interface:"
120
            "\n      <can_id>:<can_mask> (matches when <received_can_id> & mask =="
121
            " can_id & mask)"
122
            "\n      <can_id>~<can_mask> (matches when <received_can_id> & mask !="
123
            " can_id & mask)"
124
            "\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
125
            "\n      python -m can.viewer --filter 100:7FC 200:7F0"
126
            "\nNote that the ID and mask are always interpreted as hex values",
127
        )
128

129
    flags = [f"--{prefix}-bus-kwargs"] if prefix else ["--bus-kwargs"]
21✔
130
    dest = f"{prefix}_bus_kwargs" if prefix else "bus_kwargs"
21✔
131
    group.add_argument(
21✔
132
        *flags,
133
        dest=dest,
134
        action=_BusKwargsAction,
135
        nargs=argparse.ONE_OR_MORE,
136
        default=argparse.SUPPRESS,
137
        metavar="BUS_KWARG",
138
        help="Pass keyword arguments down to the instantiation of the bus class. "
139
        "For example, `-i vector -c 1 --bus-kwargs app_name=MyCanApp serial=1234` is equivalent "
140
        "to opening the bus with `can.Bus('vector', channel=1, app_name='MyCanApp', serial=1234)",
141
    )
142

143

144
def create_bus_from_namespace(
21✔
145
    namespace: argparse.Namespace,
146
    *,
147
    prefix: Optional[str] = None,
148
    **kwargs: Any,
149
) -> can.BusABC:
150
    """Creates and returns a CAN bus instance based on the provided namespace and arguments.
151

152
    :param namespace:
153
        The namespace containing parsed arguments.
154
    :param prefix:
155
        An optional prefix for the argument names, enabling support for multiple buses.
156
    :param kwargs:
157
        Additional keyword arguments to configure the bus.
158
    :return:
159
        A CAN bus instance.
160
    """
161
    config: dict[str, Any] = {"single_handle": True, **kwargs}
21✔
162

163
    for keyword in (
21✔
164
        "channel",
165
        "interface",
166
        "bitrate",
167
        "fd",
168
        "data_bitrate",
169
        "can_filters",
170
        "timing",
171
        "bus_kwargs",
172
    ):
173
        prefixed_keyword = f"{prefix}_{keyword}" if prefix else keyword
21✔
174

175
        if prefixed_keyword in namespace:
21✔
176
            value = getattr(namespace, prefixed_keyword)
21✔
177

178
            if keyword == "bus_kwargs":
21✔
179
                config.update(value)
21✔
180
            else:
181
                config[keyword] = value
21✔
182

183
    try:
21✔
184
        return can.Bus(**config)
21✔
NEW
185
    except Exception as exc:
×
NEW
186
        err_msg = f"Unable to instantiate bus from arguments {vars(namespace)}."
×
NEW
187
        raise argparse.ArgumentError(None, err_msg) from exc
×
188

189

190
class _CanFilterAction(argparse.Action):
21✔
191
    def __call__(
21✔
192
        self,
193
        parser: argparse.ArgumentParser,
194
        namespace: argparse.Namespace,
195
        values: Union[str, Sequence[Any], None],
196
        option_string: Optional[str] = None,
197
    ) -> None:
198
        if not isinstance(values, list):
21✔
NEW
199
            raise argparse.ArgumentError(self, "Invalid filter argument")
×
200

201
        print(f"Adding filter(s): {values}")
21✔
202
        can_filters: list[CanFilter] = []
21✔
203

204
        for filt in values:
21✔
205
            if ":" in filt:
21✔
206
                parts = filt.split(":")
21✔
207
                can_id = int(parts[0], base=16)
21✔
208
                can_mask = int(parts[1], base=16)
21✔
209
            elif "~" in filt:
21✔
210
                parts = filt.split("~")
21✔
211
                can_id = int(parts[0], base=16) | 0x20000000  # CAN_INV_FILTER
21✔
212
                can_mask = int(parts[1], base=16) & 0x20000000  # socket.CAN_ERR_FLAG
21✔
213
            else:
214
                raise argparse.ArgumentError(self, "Invalid filter argument")
14✔
215
            can_filters.append({"can_id": can_id, "can_mask": can_mask})
21✔
216

217
        setattr(namespace, self.dest, can_filters)
21✔
218

219

220
class _BitTimingAction(argparse.Action):
21✔
221
    def __call__(
21✔
222
        self,
223
        parser: argparse.ArgumentParser,
224
        namespace: argparse.Namespace,
225
        values: Union[str, Sequence[Any], None],
226
        option_string: Optional[str] = None,
227
    ) -> None:
228
        if not isinstance(values, list):
21✔
NEW
229
            raise argparse.ArgumentError(self, "Invalid --timing argument")
×
230

231
        timing_dict: dict[str, int] = {}
21✔
232
        for arg in values:
21✔
233
            try:
21✔
234
                key, value_string = arg.split("=")
21✔
235
                value = int(value_string)
21✔
236
                timing_dict[key] = value
21✔
NEW
237
            except ValueError:
×
NEW
238
                raise argparse.ArgumentError(
×
239
                    self, f"Invalid timing argument: {arg}"
240
                ) from None
241

242
        if not (timing := _dict2timing(timing_dict)):
21✔
243
            err_msg = "Invalid --timing argument. Incomplete parameters."
21✔
244
            raise argparse.ArgumentError(self, err_msg)
21✔
245

246
        setattr(namespace, self.dest, timing)
21✔
247
        print(timing)
21✔
248

249

250
class _BusKwargsAction(argparse.Action):
21✔
251
    def __call__(
21✔
252
        self,
253
        parser: argparse.ArgumentParser,
254
        namespace: argparse.Namespace,
255
        values: Union[str, Sequence[Any], None],
256
        option_string: Optional[str] = None,
257
    ) -> None:
258
        if not isinstance(values, list):
21✔
NEW
259
            raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument")
×
260

261
        bus_kwargs: dict[str, Union[str, int, float, bool]] = {}
21✔
262

263
        for arg in values:
21✔
264
            try:
21✔
265
                match = re.match(
21✔
266
                    r"^(?P<name>[_a-zA-Z][_a-zA-Z0-9]*)=(?P<value>\S*?)$",
267
                    arg,
268
                )
269
                if not match:
21✔
NEW
270
                    raise ValueError
×
271
                key = match["name"].replace("-", "_")
21✔
272
                string_val = match["value"]
21✔
273
                bus_kwargs[key] = cast_from_string(string_val)
21✔
NEW
274
            except ValueError:
×
NEW
275
                raise argparse.ArgumentError(
×
276
                    self,
277
                    f"Unable to parse bus keyword argument '{arg}'",
278
                ) from None
279

280
        setattr(namespace, self.dest, bus_kwargs)
21✔
281

282

283
def _add_extra_args(
21✔
284
    parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup],
285
) -> None:
286
    parser.add_argument(
21✔
287
        "extra_args",
288
        nargs=argparse.REMAINDER,
289
        help="The remaining arguments will be used for logger/player initialisation. "
290
        "For example, `can_logger -i virtual -c test -f logfile.blf --compression-level=9` "
291
        "passes the keyword argument `compression_level=9` to the BlfWriter.",
292
    )
293

294

295
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
21✔
296
    for arg in unknown_args:
21✔
297
        if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg):
21✔
298
            raise ValueError(f"Parsing argument {arg} failed")
21✔
299

300
    def _split_arg(_arg: str) -> tuple[str, str]:
21✔
301
        left, right = _arg.split("=", 1)
21✔
302
        return left.lstrip("-").replace("-", "_"), right
21✔
303

304
    args: dict[str, Union[str, int, float, bool]] = {}
21✔
305
    for key, string_val in map(_split_arg, unknown_args):
21✔
306
        args[key] = cast_from_string(string_val)
21✔
307
    return args
21✔
308

309

310
def _set_logging_level_from_namespace(namespace: argparse.Namespace) -> None:
21✔
311
    if "verbosity" in namespace:
21✔
312
        logging_level_names = [
21✔
313
            "critical",
314
            "error",
315
            "warning",
316
            "info",
317
            "debug",
318
            "subdebug",
319
        ]
320
        can.set_logging_level(logging_level_names[min(5, namespace.verbosity)])
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc