• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 19867861057

02 Dec 2025 05:35PM UTC coverage: 55.225% (+1.4%) from 53.832%
19867861057

Pull #126

github

web-flow
Merge 293fabadd into a1a9a71de
Pull Request #126: Catch a syntax error when parsing an input file with the --stay-connected flag active

84 of 268 branches covered (31.34%)

Branch coverage included in aggregate %.

2125 of 3732 relevant lines covered (56.94%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.59
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2024 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
5

6
import argparse
1✔
7
import asyncio
1✔
8
import contextlib
1✔
9
import logging
1✔
10
import os
1✔
11
import subprocess
1✔
12
import sys
1✔
13
from abc import ABC, abstractmethod
1✔
14
from enum import IntEnum
1✔
15
from os import PathLike, path
1✔
16
from tempfile import NamedTemporaryFile
1✔
17
from typing import Awaitable, ContextManager, TextIO
1✔
18

19
import argcomplete
1✔
20
import questionary
1✔
21
from argcomplete.completers import FilesCompleter
1✔
22
from packaging.version import Version
1✔
23
from prompt_toolkit import Application
1✔
24
from prompt_toolkit.key_binding import KeyBindings
1✔
25
from prompt_toolkit.output import DummyOutput
1✔
26

27
from pybricksdev import __name__ as MODULE_NAME
1✔
28
from pybricksdev import __version__ as MODULE_VERSION
1✔
29
from pybricksdev.connections.pybricks import (
1✔
30
    HubDisconnectError,
31
    HubPowerButtonPressedError,
32
    PybricksHub,
33
)
34

35
PROG_NAME = (
1✔
36
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
37
    if sys.argv[0].endswith("__main__.py")
38
    else path.basename(sys.argv[0])
39
)
40

41

42
class CancelProgramError(RuntimeError):
1✔
43
    """Exception raised when a user interrupts the hub's running program from the cli."""
44

45

46
class Tool(ABC):
1✔
47
    """Common base class for tool implementations."""
48

49
    @abstractmethod
1✔
50
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
51
        """
52
        Overriding methods must at least do the following::
53

54
            parser = subparsers.add_parser('tool', ...)
55
            parser.tool = self
56

57
        Then additional arguments can be added using the ``parser`` object.
58
        """
59
        pass
×
60

61
    @abstractmethod
1✔
62
    async def run(self, args: argparse.Namespace):
1✔
63
        """
64
        Overriding methods should provide an implementation to run the tool.
65
        """
66
        pass
×
67

68

69
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
1✔
70
    """
71
    Gets the path to a script on the file system.
72

73
    If the file is ``sys.stdin``, the contents are copied to a temporary file
74
    and the path to the temporary file is returned. Otherwise, the file is closed
75
    and the path is returned.
76

77
    The context manager will delete the temporary file, if applicable.
78
    """
79
    if file is sys.stdin:
1✔
80
        # Have to close the temp file so that mpy-cross can read it, so we
81
        # create our own context manager to delete the file when we are done
82
        # using it.
83

84
        @contextlib.contextmanager
×
85
        def temp_context():
×
86
            try:
×
87
                with NamedTemporaryFile("wb", suffix=".py", delete=False) as temp:
×
88
                    temp.write(file.buffer.read())
×
89

90
                yield temp.name
×
91
            finally:
92
                try:
×
93
                    os.remove(temp.name)
×
94
                except NameError:
×
95
                    # if NamedTemporaryFile() throws, temp is not defined
96
                    pass
×
97
                except OSError:
×
98
                    # file was already deleted or other strangeness
99
                    pass
×
100

101
        return temp_context()
×
102

103
    file.close()
1✔
104
    return contextlib.nullcontext(file.name)
1✔
105

106

107
class Compile(Tool):
1✔
108
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
109
        parser = subparsers.add_parser(
1✔
110
            "compile",
111
            help="compile a Pybricks program without running it",
112
        )
113
        parser.add_argument(
1✔
114
            "file",
115
            metavar="<file>",
116
            help="path to a MicroPython script or `-` for stdin",
117
            type=argparse.FileType(encoding="utf-8"),
118
        )
119
        parser.add_argument(
1✔
120
            "--abi",
121
            metavar="<n>",
122
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
123
            default=6,
124
            choices=[5, 6],
125
            type=int,
126
        )
127
        parser.add_argument(
1✔
128
            "--bin",
129
            action="store_true",
130
            help="output unformatted binary data only (useful for pipes)",
131
        )
132
        parser.tool = self
1✔
133

134
    async def run(self, args: argparse.Namespace):
1✔
135
        from pybricksdev.compile import compile_multi_file, print_mpy
1✔
136

137
        with _get_script_path(args.file) as script_path:
1✔
138
            mpy = await compile_multi_file(script_path, args.abi)
1✔
139
        if args.bin:
1✔
140
            sys.stdout.buffer.write(mpy)
×
141
            return
×
142
        print_mpy(mpy)
1✔
143

144

145
class Run(Tool):
1✔
146
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
147
        parser = subparsers.add_parser(
1✔
148
            "run",
149
            help="run a Pybricks program",
150
        )
151
        parser.tool = self
1✔
152
        parser.add_argument(
1✔
153
            "conntype",
154
            metavar="<connection type>",
155
            help="connection type: %(choices)s",
156
            choices=["ble", "usb"],
157
        )
158
        parser.add_argument(
1✔
159
            "file",
160
            metavar="<file>",
161
            help="path to a MicroPython script or `-` for stdin",
162
            type=argparse.FileType(encoding="utf-8"),
163
        )
164
        parser.add_argument(
1✔
165
            "-n",
166
            "--name",
167
            metavar="<name>",
168
            required=False,
169
            help="Bluetooth device name or Bluetooth address for BLE connection; "
170
            "serial port name for USB connection",
171
        )
172

173
        parser.add_argument(
1✔
174
            "--start",
175
            help="Start the program immediately after downloading it.",
176
            action=argparse.BooleanOptionalAction,
177
            default=True,
178
        )
179

180
        parser.add_argument(
1✔
181
            "--wait",
182
            help="Wait for the program to complete before disconnecting. Only applies when starting program right away.",
183
            action=argparse.BooleanOptionalAction,
184
            default=True,
185
        )
186

187
        parser.add_argument(
1✔
188
            "--stay-connected",
189
            help="Add a menu option to resend the code with bluetooth instead of disconnecting from the robot after the program ends.",
190
            action=argparse.BooleanOptionalAction,
191
            default=False,
192
        )
193

194
    async def race_keypress(self, awaitable: Awaitable) -> None:
1✔
195
        """
196
        Races an awaitable against a keypress.
197
        The awaitable is cancelled and a CancelProgramError is raised if the key
198
        is pressed before the awaitable completes.
199
        The output of the awaitable is not passed to the caller.
200
        """
201

202
        async def stdin_monitor():
1✔
203
            kb = KeyBindings()
1✔
204

205
            @kb.add("q")
1✔
206
            def _(event):
1✔
207
                event.app.exit()
×
208

209
            app = Application(
1✔
210
                key_bindings=kb,
211
                full_screen=False,
212
                mouse_support=False,
213
                output=DummyOutput(),
214
            )
215

216
            await app.run_async()
1✔
217

218
        stop_task = asyncio.ensure_future(stdin_monitor())
1✔
219
        awaitable_task = asyncio.ensure_future(awaitable)
1✔
220

221
        print("------press q to cancel the program------")
1✔
222

223
        try:
1✔
224
            done, pending = await asyncio.wait(
1✔
225
                {awaitable_task, stop_task},
226
                return_when=asyncio.FIRST_COMPLETED,
227
            )
228
        except BaseException:
×
229
            awaitable_task.cancel()
×
230
            stop_task.cancel()
×
231
            raise
×
232

233
        for t in pending:
1✔
234
            t.cancel()
1✔
235

236
        # allow prompt-toolkit to unbind from the terminal
237
        await asyncio.sleep(0.1)
1✔
238

239
        if stop_task in done:
1✔
240
            print("------aborting program------")
×
241
            raise CancelProgramError
×
242

243
    async def stay_connected_menu(self, hub: PybricksHub, args: argparse.Namespace):
1✔
244

245
        if args.conntype == "ble":
1✔
246
            from pybricksdev.ble import find_device as find_ble
1✔
247
            from pybricksdev.connections.pybricks import PybricksHubBLE
1✔
248
        else:
249
            from usb.core import find as find_usb
×
250

251
            from pybricksdev.connections.pybricks import PybricksHubUSB
×
252
            from pybricksdev.usb import (
×
253
                EV3_USB_PID,
254
                LEGO_USB_VID,
255
                MINDSTORMS_INVENTOR_USB_PID,
256
                NXT_USB_PID,
257
                SPIKE_ESSENTIAL_USB_PID,
258
                SPIKE_PRIME_USB_PID,
259
            )
260

261
            def is_pybricks_usb(dev):
×
262
                return (
×
263
                    (dev.idVendor == LEGO_USB_VID)
264
                    and (
265
                        dev.idProduct
266
                        in [
267
                            NXT_USB_PID,
268
                            EV3_USB_PID,
269
                            SPIKE_PRIME_USB_PID,
270
                            SPIKE_ESSENTIAL_USB_PID,
271
                            MINDSTORMS_INVENTOR_USB_PID,
272
                        ]
273
                    )
274
                    and dev.product.endswith("Pybricks")
275
                )
276

277
        class ResponseOptions(IntEnum):
1✔
278
            RECOMPILE_RUN = 0
1✔
279
            RECOMPILE_DOWNLOAD = 1
1✔
280
            RUN_STORED = 2
1✔
281
            CHANGE_TARGET_FILE = 3
1✔
282
            EXIT = 4
1✔
283

284
        async def reconnect_hub():
1✔
285
            if not await questionary.confirm(
1✔
286
                "\nThe hub has been disconnected. Would you like to re-connect?"
287
            ).ask_async():
288
                exit()
×
289

290
            if args.conntype == "ble":
1✔
291
                print(
1✔
292
                    f"Searching for {args.name or 'any hub with Pybricks service'}..."
293
                )
294
                device_or_address = await find_ble(args.name)
1✔
295
                hub = PybricksHubBLE(device_or_address)
1✔
296
            elif args.conntype == "usb":
×
297
                device_or_address = find_usb(custom_match=is_pybricks_usb)
×
298
                hub = PybricksHubUSB(device_or_address)
×
299

300
            await hub.connect()
1✔
301
            # re-enable echoing of the hub's stdout
302
            hub._enable_line_handler = True
1✔
303
            hub.print_output = True
1✔
304
            return hub
1✔
305

306
        response_options = [
1✔
307
            "Recompile and Run",
308
            "Recompile and Download",
309
            "Run Stored Program",
310
            "Change Target File",
311
            "Exit",
312
        ]
313
        # the entry that is selected by default when the menu opens
314
        # this is overridden after the user picks an option
315
        # so that the default option is always the one that was last chosen
316
        default_response_option = (
1✔
317
            ResponseOptions.RECOMPILE_RUN
318
            if args.start
319
            else ResponseOptions.RECOMPILE_DOWNLOAD
320
        )
321

322
        while True:
1✔
323
            try:
1✔
324
                if args.file is sys.stdin:
1✔
325
                    await hub.race_disconnect(
×
326
                        hub.race_power_button_press(
327
                            questionary.press_any_key_to_continue(
328
                                "The hub will stay connected and echo its output to the terminal. Press any key to exit."
329
                            ).ask_async()
330
                        )
331
                    )
332
                    return
×
333
                response = await hub.race_disconnect(
1✔
334
                    hub.race_power_button_press(
335
                        questionary.select(
336
                            f"Would you like to re-compile {path.basename(args.file.name)}?",
337
                            response_options,
338
                            default=(response_options[default_response_option]),
339
                        ).ask_async()
340
                    )
341
                )
342

343
                default_response_option = response_options.index(response)
1✔
344

345
                match response_options.index(response):
1✔
346

347
                    case ResponseOptions.RECOMPILE_RUN:
1✔
348
                        with _get_script_path(args.file) as script_path:
1✔
349
                            await self.race_keypress(hub.run(script_path, wait=True))
1✔
350

351
                    case ResponseOptions.RECOMPILE_DOWNLOAD:
1✔
352
                        with _get_script_path(args.file) as script_path:
1✔
353
                            await hub.download(script_path)
1✔
354

355
                    case ResponseOptions.RUN_STORED:
1✔
356
                        if hub.fw_version < Version("3.2.0-beta.4"):
1✔
357
                            print(
×
358
                                "Running a stored program remotely is only supported in the hub firmware version >= v3.2.0."
359
                            )
360
                        else:
361
                            await hub.start_user_program()
1✔
362
                            await self.race_keypress(hub._wait_for_user_program_stop())
1✔
363

364
                    case ResponseOptions.CHANGE_TARGET_FILE:
1✔
365
                        args.file.close()
1✔
366
                        while True:
1✔
367
                            try:
1✔
368
                                args.file = open(
1✔
369
                                    await hub.race_disconnect(
370
                                        hub.race_power_button_press(
371
                                            questionary.path(
372
                                                "What file would you like to use?"
373
                                            ).ask_async()
374
                                        )
375
                                    ),
376
                                    encoding="utf-8",
377
                                )
378
                                break
1✔
379
                            except FileNotFoundError:
×
380
                                print("The file was not found. Please try again.")
×
381
                        # send the new target file to the hub
382
                        with _get_script_path(args.file) as script_path:
1✔
383
                            await hub.download(script_path)
1✔
384

385
                    case _:
1✔
386
                        return
1✔
387

388
            except subprocess.CalledProcessError as e:
1✔
389
                print()
×
390
                print("A syntax error occurred while parsing your program:")
×
391
                print(e.stderr.decode())
×
392

393
            except HubPowerButtonPressedError:
1✔
394
                # This means the user pressed the button on the hub to re-start the
395
                # current program, so the menu was canceled and we are now printing
396
                # the hub stdout until the user program ends on the hub.
397
                try:
1✔
398
                    await hub._wait_for_power_button_release()
1✔
399
                    await self.race_keypress(hub._wait_for_user_program_stop())
1✔
400

401
                except HubDisconnectError:
×
402
                    hub = await reconnect_hub()
×
403

404
                except CancelProgramError:
×
405
                    await hub.stop_user_program()
×
406

407
            except HubDisconnectError:
1✔
408
                # let terminal cool off before making a new prompt
409
                await asyncio.sleep(0.3)
1✔
410
                hub = await reconnect_hub()
1✔
411

412
            except CancelProgramError:
×
413
                await hub.stop_user_program()
×
414

415
    async def run(self, args: argparse.Namespace):
1✔
416

417
        # Pick the right connection
418
        if args.conntype == "ble":
1✔
419
            from pybricksdev.ble import find_device as find_ble
1✔
420
            from pybricksdev.connections.pybricks import PybricksHubBLE
1✔
421

422
            # It is a Pybricks Hub with BLE. Device name or address is given.
423
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
1✔
424
            device_or_address = await find_ble(args.name)
1✔
425
            hub = PybricksHubBLE(device_or_address)
1✔
426
        elif args.conntype == "usb":
1✔
427
            from usb.core import find as find_usb
1✔
428

429
            from pybricksdev.connections.pybricks import PybricksHubUSB
1✔
430
            from pybricksdev.usb import (
1✔
431
                EV3_USB_PID,
432
                LEGO_USB_VID,
433
                MINDSTORMS_INVENTOR_USB_PID,
434
                NXT_USB_PID,
435
                SPIKE_ESSENTIAL_USB_PID,
436
                SPIKE_PRIME_USB_PID,
437
            )
438

439
            def is_pybricks_usb(dev):
1✔
440
                return (
×
441
                    (dev.idVendor == LEGO_USB_VID)
442
                    and (
443
                        dev.idProduct
444
                        in [
445
                            NXT_USB_PID,
446
                            EV3_USB_PID,
447
                            SPIKE_PRIME_USB_PID,
448
                            SPIKE_ESSENTIAL_USB_PID,
449
                            MINDSTORMS_INVENTOR_USB_PID,
450
                        ]
451
                    )
452
                    and dev.product.endswith("Pybricks")
453
                )
454

455
            device_or_address = find_usb(custom_match=is_pybricks_usb)
1✔
456

457
            if device_or_address is None:
1✔
458
                print("Pybricks Hub not found.", file=sys.stderr)
×
459
                exit(1)
×
460

461
            hub = PybricksHubUSB(device_or_address)
1✔
462
        else:
463
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
464

465
        # Connect to the address and run the script
466
        await hub.connect()
1✔
467
        try:
1✔
468
            with _get_script_path(args.file) as script_path:
1✔
469
                if args.start:
1✔
470
                    await self.race_keypress(
1✔
471
                        hub.run(script_path, args.wait or args.stay_connected)
472
                    )
473
                else:
474
                    if args.stay_connected:
1✔
475
                        # if the user later starts the program by pressing the button on the hub,
476
                        # we still want the hub stdout to print to Python's stdout
477
                        hub.print_output = True
×
478
                        hub._enable_line_handler = True
×
479
                    await hub.download(script_path)
1✔
480

481
            if args.stay_connected:
1✔
482
                await self.stay_connected_menu(hub, args)
1✔
483

484
        except subprocess.CalledProcessError as e:
×
485
            print()
×
486
            print("A syntax error occurred while parsing your program:")
×
487
            print(e.stderr.decode())
×
488
            if args.stay_connected:
×
489
                await self.stay_connected_menu(hub, args)
×
490

491
        except CancelProgramError:
×
492
            await hub.stop_user_program()
×
493
            if args.stay_connected:
×
494
                await self.stay_connected_menu(hub, args)
×
495

496
        finally:
497
            await hub.disconnect()
1✔
498

499

500
class Flash(Tool):
1✔
501

502
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
503
        parser = subparsers.add_parser(
×
504
            "flash", help="flash firmware on a LEGO Powered Up device"
505
        )
506
        parser.tool = self
×
507

508
        parser.add_argument(
×
509
            "firmware",
510
            metavar="<firmware-file>",
511
            type=argparse.FileType(mode="rb"),
512
            help="the firmware .zip file",
513
        ).completer = FilesCompleter(allowednames=(".zip",))
514

515
        parser.add_argument(
×
516
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
517
        )
518

519
    def run(self, args: argparse.Namespace):
1✔
520
        from pybricksdev.cli.flash import flash_firmware
×
521

522
        return flash_firmware(args.firmware, args.name)
×
523

524

525
class DFUBackup(Tool):
1✔
526
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
527
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
528
        parser.tool = self
×
529
        parser.add_argument(
×
530
            "firmware",
531
            metavar="<firmware-file>",
532
            type=argparse.FileType(mode="wb"),
533
            help="the firmware .bin file",
534
        ).completer = FilesCompleter(allowednames=(".bin",))
535

536
    async def run(self, args: argparse.Namespace):
1✔
537
        from pybricksdev.dfu import backup_dfu
×
538

539
        backup_dfu(args.firmware)
×
540

541

542
class DFURestore(Tool):
1✔
543
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
544
        parser = subparsers.add_parser(
×
545
            "restore",
546
            help="restore firmware using DFU",
547
        )
548
        parser.tool = self
×
549
        parser.add_argument(
×
550
            "firmware",
551
            metavar="<firmware-file>",
552
            type=argparse.FileType(mode="rb"),
553
            help="the firmware .bin file",
554
        ).completer = FilesCompleter(allowednames=(".bin",))
555

556
    async def run(self, args: argparse.Namespace):
1✔
557
        from pybricksdev.dfu import restore_dfu
×
558

559
        restore_dfu(args.firmware)
×
560

561

562
class DFU(Tool):
1✔
563
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
564
        self.parser = subparsers.add_parser(
×
565
            "dfu",
566
            help="use DFU to backup or restore firmware",
567
        )
568
        self.parser.tool = self
×
569
        self.subparsers = self.parser.add_subparsers(
×
570
            metavar="<action>", dest="action", help="the action to perform"
571
        )
572

573
        for tool in DFUBackup(), DFURestore():
×
574
            tool.add_parser(self.subparsers)
×
575

576
    def run(self, args: argparse.Namespace):
1✔
577
        if args.action not in self.subparsers.choices:
×
578
            self.parser.error(
×
579
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
580
            )
581

582
        return self.subparsers.choices[args.action].tool.run(args)
×
583

584

585
class OADFlash(Tool):
1✔
586
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
587
        parser = subparsers.add_parser(
×
588
            "flash",
589
            help="update firmware on a LEGO Powered Up device using TI OAD",
590
        )
591
        parser.tool = self
×
592
        parser.add_argument(
×
593
            "firmware",
594
            metavar="<firmware-file>",
595
            type=argparse.FileType(mode="rb"),
596
            help="the firmware .oda file",
597
        ).completer = FilesCompleter(allowednames=(".oda",))
598

599
    async def run(self, args: argparse.Namespace):
1✔
600
        from pybricksdev.cli.oad import flash_oad_image
×
601

602
        await flash_oad_image(args.firmware)
×
603

604

605
class OADInfo(Tool):
1✔
606
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
607
        parser = subparsers.add_parser(
×
608
            "info",
609
            help="get information about firmware on a LEGO Powered Up device using TI OAD",
610
        )
611
        parser.tool = self
×
612

613
    async def run(self, args: argparse.Namespace):
1✔
614
        from pybricksdev.cli.oad import dump_oad_info
×
615

616
        await dump_oad_info()
×
617

618

619
class OAD(Tool):
1✔
620
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
621
        self.parser = subparsers.add_parser(
×
622
            "oad",
623
            help="update firmware on a LEGO Powered Up device using TI OAD",
624
        )
625
        self.parser.tool = self
×
626
        self.subparsers = self.parser.add_subparsers(
×
627
            metavar="<action>", dest="action", help="the action to perform"
628
        )
629

630
        for tool in OADFlash(), OADInfo():
×
631
            tool.add_parser(self.subparsers)
×
632

633
    def run(self, args: argparse.Namespace):
1✔
634
        if args.action not in self.subparsers.choices:
×
635
            self.parser.error(
×
636
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
637
            )
638

639
        return self.subparsers.choices[args.action].tool.run(args)
×
640

641

642
class LWP3Repl(Tool):
1✔
643
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
644
        parser = subparsers.add_parser(
×
645
            "repl",
646
            help="interactive REPL for sending and receiving LWP3 messages",
647
        )
648
        parser.tool = self
×
649

650
    def run(self, args: argparse.Namespace):
1✔
651
        from pybricksdev.cli.lwp3.repl import repl, setup_repl_logging
×
652

653
        setup_repl_logging()
×
654
        return repl()
×
655

656

657
class LWP3(Tool):
1✔
658
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
659
        self.parser = subparsers.add_parser(
×
660
            "lwp3", help="interact with devices using LWP3"
661
        )
662
        self.parser.tool = self
×
663
        self.subparsers = self.parser.add_subparsers(
×
664
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
665
        )
666

667
        for tool in (LWP3Repl(),):
×
668
            tool.add_parser(self.subparsers)
×
669

670
    def run(self, args: argparse.Namespace):
1✔
671
        if args.lwp3_tool not in self.subparsers.choices:
×
672
            self.parser.error(
×
673
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
674
            )
675

676
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
677

678

679
class Udev(Tool):
1✔
680
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
681
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
1✔
682
        parser.tool = self
1✔
683

684
    async def run(self, args: argparse.Namespace):
1✔
685
        from importlib.resources import read_text
1✔
686

687
        from pybricksdev import resources
1✔
688

689
        print(read_text(resources, resources.UDEV_RULES))
1✔
690

691

692
def main():
1✔
693
    """Runs ``pybricksdev`` command line interface."""
694

695
    if sys.platform == "win32":
×
696
        # Hack around bad side-effects of pythoncom on Windows
697
        try:
×
698
            from bleak_winrt._winrt import MTA, init_apartment
×
699
        except ImportError:
×
700
            from winrt._winrt import MTA, init_apartment
×
701

702
        init_apartment(MTA)
×
703

704
    # Provide main description and help.
705
    parser = argparse.ArgumentParser(
×
706
        prog=PROG_NAME,
707
        description="Utilities for Pybricks developers.",
708
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
709
    )
710

711
    parser.add_argument(
×
712
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
713
    )
714
    parser.add_argument(
×
715
        "-d", "--debug", action="store_true", help="enable debug logging"
716
    )
717

718
    subparsers = parser.add_subparsers(
×
719
        metavar="<tool>",
720
        dest="tool",
721
        help="the tool to use",
722
    )
723

724
    for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
×
725
        tool.add_parser(subparsers)
×
726

727
    argcomplete.autocomplete(parser)
×
728
    args = parser.parse_args()
×
729

730
    logging.basicConfig(
×
731
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
732
        level=logging.DEBUG if args.debug else logging.WARNING,
733
    )
734

735
    if not args.tool:
×
736
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
737

738
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc