• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 18807153421

25 Oct 2025 06:53PM UTC coverage: 52.955% (-0.1%) from 53.08%
18807153421

push

github

web-flow
cli: Add option to --stay connected menu to re-run stored program.

Add "Run Stored Program" menu item to run a program already on the hub without downloading it again.

65 of 256 branches covered (25.39%)

Branch coverage included in aggregate %.

1987 of 3619 relevant lines covered (54.9%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.34
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2024 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
5

6
import argparse
1✔
7
import asyncio
1✔
8
import contextlib
1✔
9
import logging
1✔
10
import os
1✔
11
import sys
1✔
12
from abc import ABC, abstractmethod
1✔
13
from enum import IntEnum
1✔
14
from os import PathLike, path
1✔
15
from tempfile import NamedTemporaryFile
1✔
16
from typing import ContextManager, TextIO
1✔
17

18
import argcomplete
1✔
19
import questionary
1✔
20
from argcomplete.completers import FilesCompleter
1✔
21
from packaging.version import Version
1✔
22

23
from pybricksdev import __name__ as MODULE_NAME
1✔
24
from pybricksdev import __version__ as MODULE_VERSION
1✔
25
from pybricksdev.connections.pybricks import (
1✔
26
    HubDisconnectError,
27
    HubPowerButtonPressedError,
28
)
29

30
PROG_NAME = (
1✔
31
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
32
    if sys.argv[0].endswith("__main__.py")
33
    else path.basename(sys.argv[0])
34
)
35

36

37
class Tool(ABC):
1✔
38
    """Common base class for tool implementations."""
39

40
    @abstractmethod
1✔
41
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
42
        """
43
        Overriding methods must at least do the following::
44

45
            parser = subparsers.add_parser('tool', ...)
46
            parser.tool = self
47

48
        Then additional arguments can be added using the ``parser`` object.
49
        """
50
        pass
×
51

52
    @abstractmethod
1✔
53
    async def run(self, args: argparse.Namespace):
1✔
54
        """
55
        Overriding methods should provide an implementation to run the tool.
56
        """
57
        pass
×
58

59

60
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
1✔
61
    """
62
    Gets the path to a script on the file system.
63

64
    If the file is ``sys.stdin``, the contents are copied to a temporary file
65
    and the path to the temporary file is returned. Otherwise, the file is closed
66
    and the path is returned.
67

68
    The context manager will delete the temporary file, if applicable.
69
    """
70
    if file is sys.stdin:
1✔
71
        # Have to close the temp file so that mpy-cross can read it, so we
72
        # create our own context manager to delete the file when we are done
73
        # using it.
74

75
        @contextlib.contextmanager
×
76
        def temp_context():
×
77
            try:
×
78
                with NamedTemporaryFile("wb", suffix=".py", delete=False) as temp:
×
79
                    temp.write(file.buffer.read())
×
80

81
                yield temp.name
×
82
            finally:
83
                try:
×
84
                    os.remove(temp.name)
×
85
                except NameError:
×
86
                    # if NamedTemporaryFile() throws, temp is not defined
87
                    pass
×
88
                except OSError:
×
89
                    # file was already deleted or other strangeness
90
                    pass
×
91

92
        return temp_context()
×
93

94
    file.close()
1✔
95
    return contextlib.nullcontext(file.name)
1✔
96

97

98
class Compile(Tool):
1✔
99
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
100
        parser = subparsers.add_parser(
1✔
101
            "compile",
102
            help="compile a Pybricks program without running it",
103
        )
104
        parser.add_argument(
1✔
105
            "file",
106
            metavar="<file>",
107
            help="path to a MicroPython script or `-` for stdin",
108
            type=argparse.FileType(encoding="utf-8"),
109
        )
110
        parser.add_argument(
1✔
111
            "--abi",
112
            metavar="<n>",
113
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
114
            default=6,
115
            choices=[5, 6],
116
            type=int,
117
        )
118
        parser.tool = self
1✔
119

120
    async def run(self, args: argparse.Namespace):
1✔
121
        from pybricksdev.compile import compile_multi_file, print_mpy
1✔
122

123
        with _get_script_path(args.file) as script_path:
1✔
124
            mpy = await compile_multi_file(script_path, args.abi)
1✔
125
        print_mpy(mpy)
1✔
126

127

128
class Run(Tool):
1✔
129
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
130
        parser = subparsers.add_parser(
1✔
131
            "run",
132
            help="run a Pybricks program",
133
        )
134
        parser.tool = self
1✔
135
        parser.add_argument(
1✔
136
            "conntype",
137
            metavar="<connection type>",
138
            help="connection type: %(choices)s",
139
            choices=["ble", "usb"],
140
        )
141
        parser.add_argument(
1✔
142
            "file",
143
            metavar="<file>",
144
            help="path to a MicroPython script or `-` for stdin",
145
            type=argparse.FileType(encoding="utf-8"),
146
        )
147
        parser.add_argument(
1✔
148
            "-n",
149
            "--name",
150
            metavar="<name>",
151
            required=False,
152
            help="Bluetooth device name or Bluetooth address for BLE connection; "
153
            "serial port name for USB connection",
154
        )
155

156
        parser.add_argument(
1✔
157
            "--start",
158
            help="Start the program immediately after downloading it.",
159
            action=argparse.BooleanOptionalAction,
160
            default=True,
161
        )
162

163
        parser.add_argument(
1✔
164
            "--wait",
165
            help="Wait for the program to complete before disconnecting. Only applies when starting program right away.",
166
            action=argparse.BooleanOptionalAction,
167
            default=True,
168
        )
169

170
        parser.add_argument(
1✔
171
            "--stay-connected",
172
            help="Add a menu option to resend the code with bluetooth instead of disconnecting from the robot after the program ends.",
173
            action=argparse.BooleanOptionalAction,
174
            default=False,
175
        )
176

177
    async def run(self, args: argparse.Namespace):
1✔
178

179
        # Pick the right connection
180
        if args.conntype == "ble":
1✔
181
            from pybricksdev.ble import find_device as find_ble
1✔
182
            from pybricksdev.connections.pybricks import PybricksHubBLE
1✔
183

184
            # It is a Pybricks Hub with BLE. Device name or address is given.
185
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
1✔
186
            device_or_address = await find_ble(args.name)
1✔
187
            hub = PybricksHubBLE(device_or_address)
1✔
188
        elif args.conntype == "usb":
1✔
189
            from usb.core import find as find_usb
1✔
190

191
            from pybricksdev.connections.pybricks import PybricksHubUSB
1✔
192
            from pybricksdev.usb import (
1✔
193
                EV3_USB_PID,
194
                LEGO_USB_VID,
195
                MINDSTORMS_INVENTOR_USB_PID,
196
                NXT_USB_PID,
197
                SPIKE_ESSENTIAL_USB_PID,
198
                SPIKE_PRIME_USB_PID,
199
            )
200

201
            def is_pybricks_usb(dev):
1✔
202
                return (
×
203
                    (dev.idVendor == LEGO_USB_VID)
204
                    and (
205
                        dev.idProduct
206
                        in [
207
                            NXT_USB_PID,
208
                            EV3_USB_PID,
209
                            SPIKE_PRIME_USB_PID,
210
                            SPIKE_ESSENTIAL_USB_PID,
211
                            MINDSTORMS_INVENTOR_USB_PID,
212
                        ]
213
                    )
214
                    and dev.product.endswith("Pybricks")
215
                )
216

217
            device_or_address = find_usb(custom_match=is_pybricks_usb)
1✔
218

219
            if device_or_address is None:
1✔
220
                print("Pybricks Hub not found.", file=sys.stderr)
×
221
                exit(1)
×
222

223
            hub = PybricksHubUSB(device_or_address)
1✔
224
        else:
225
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
226

227
        # Connect to the address and run the script
228
        await hub.connect()
1✔
229
        try:
1✔
230
            with _get_script_path(args.file) as script_path:
1✔
231
                if args.start:
1✔
232
                    await hub.run(script_path, args.wait or args.stay_connected)
1✔
233
                else:
234
                    if args.stay_connected:
1✔
235
                        # if the user later starts the program by pressing the button on the hub,
236
                        # we still want the hub stdout to print to Python's stdout
237
                        hub.print_output = True
×
238
                        hub._enable_line_handler = True
×
239
                    await hub.download(script_path)
1✔
240

241
            if not args.stay_connected:
1✔
242
                return
1✔
243

244
            class ResponseOptions(IntEnum):
×
245
                RECOMPILE_RUN = 0
×
246
                RECOMPILE_DOWNLOAD = 1
×
247
                RUN_STORED = 2
×
248
                CHANGE_TARGET_FILE = 3
×
249
                EXIT = 4
×
250

251
            async def reconnect_hub():
×
252
                if not await questionary.confirm(
×
253
                    "\nThe hub has been disconnected. Would you like to re-connect?"
254
                ).ask_async():
255
                    exit()
×
256

257
                if args.conntype == "ble":
×
258
                    print(
×
259
                        f"Searching for {args.name or 'any hub with Pybricks service'}..."
260
                    )
261
                    device_or_address = await find_ble(args.name)
×
262
                    hub = PybricksHubBLE(device_or_address)
×
263
                elif args.conntype == "usb":
×
264
                    device_or_address = find_usb(custom_match=is_pybricks_usb)
×
265
                    hub = PybricksHubUSB(device_or_address)
×
266

267
                await hub.connect()
×
268
                # re-enable echoing of the hub's stdout
269
                hub._enable_line_handler = True
×
270
                hub.print_output = True
×
271
                return hub
×
272

273
            response_options = [
×
274
                "Recompile and Run",
275
                "Recompile and Download",
276
                "Run Stored Program",
277
                "Change Target File",
278
                "Exit",
279
            ]
280
            # the entry that is selected by default when the menu opens
281
            # this is overridden after the user picks an option
282
            # so that the default option is always the one that was last chosen
283
            default_response_option = (
×
284
                ResponseOptions.RECOMPILE_RUN
285
                if args.start
286
                else ResponseOptions.RECOMPILE_DOWNLOAD
287
            )
288

289
            while True:
×
290
                try:
×
291
                    if args.file is sys.stdin:
×
292
                        await hub.race_disconnect(
×
293
                            hub.race_power_button_press(
294
                                questionary.press_any_key_to_continue(
295
                                    "The hub will stay connected and echo its output to the terminal. Press any key to exit."
296
                                ).ask_async()
297
                            )
298
                        )
299
                        return
×
300
                    response = await hub.race_disconnect(
×
301
                        hub.race_power_button_press(
302
                            questionary.select(
303
                                f"Would you like to re-compile {os.path.basename(args.file.name)}?",
304
                                response_options,
305
                                default=(response_options[default_response_option]),
306
                            ).ask_async()
307
                        )
308
                    )
309

310
                    default_response_option = response_options.index(response)
×
311

312
                    match response_options.index(response):
×
313

314
                        case ResponseOptions.RECOMPILE_RUN:
×
315
                            with _get_script_path(args.file) as script_path:
×
316
                                await hub.run(script_path, wait=True)
×
317

318
                        case ResponseOptions.RECOMPILE_DOWNLOAD:
×
319
                            with _get_script_path(args.file) as script_path:
×
320
                                await hub.download(script_path)
×
321

322
                        case ResponseOptions.RUN_STORED:
×
323
                            if hub.fw_version < Version("3.2.0-beta.4"):
×
324
                                print(
×
325
                                    "Running a stored program remotely is only supported in the hub firmware version >= v3.2.0."
326
                                )
327
                            else:
328
                                await hub.start_user_program()
×
329
                                await hub._wait_for_user_program_stop()
×
330

331
                        case ResponseOptions.CHANGE_TARGET_FILE:
×
332
                            args.file.close()
×
333
                            while True:
×
334
                                try:
×
335
                                    args.file = open(
×
336
                                        await hub.race_disconnect(
337
                                            hub.race_power_button_press(
338
                                                questionary.path(
339
                                                    "What file would you like to use?"
340
                                                ).ask_async()
341
                                            )
342
                                        )
343
                                    )
344
                                    break
×
345
                                except FileNotFoundError:
×
346
                                    print("The file was not found. Please try again.")
×
347
                            # send the new target file to the hub
348
                            with _get_script_path(args.file) as script_path:
×
349
                                await hub.download(script_path)
×
350

351
                        case _:
×
352
                            return
×
353

354
                except HubPowerButtonPressedError:
×
355
                    # This means the user pressed the button on the hub to re-start the
356
                    # current program, so the menu was canceled and we are now printing
357
                    # the hub stdout until the user program ends on the hub.
358
                    try:
×
359
                        await hub._wait_for_power_button_release()
×
360
                        await hub._wait_for_user_program_stop()
×
361

362
                    except HubDisconnectError:
×
363
                        hub = await reconnect_hub()
×
364

365
                except HubDisconnectError:
×
366
                    # let terminal cool off before making a new prompt
367
                    await asyncio.sleep(0.3)
×
368
                    hub = await reconnect_hub()
×
369

370
        finally:
371
            await hub.disconnect()
1✔
372

373

374
class Flash(Tool):
1✔
375

376
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
377
        parser = subparsers.add_parser(
×
378
            "flash", help="flash firmware on a LEGO Powered Up device"
379
        )
380
        parser.tool = self
×
381

382
        parser.add_argument(
×
383
            "firmware",
384
            metavar="<firmware-file>",
385
            type=argparse.FileType(mode="rb"),
386
            help="the firmware .zip file",
387
        ).completer = FilesCompleter(allowednames=(".zip",))
388

389
        parser.add_argument(
×
390
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
391
        )
392

393
    def run(self, args: argparse.Namespace):
1✔
394
        from pybricksdev.cli.flash import flash_firmware
×
395

396
        return flash_firmware(args.firmware, args.name)
×
397

398

399
class DFUBackup(Tool):
1✔
400
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
401
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
402
        parser.tool = self
×
403
        parser.add_argument(
×
404
            "firmware",
405
            metavar="<firmware-file>",
406
            type=argparse.FileType(mode="wb"),
407
            help="the firmware .bin file",
408
        ).completer = FilesCompleter(allowednames=(".bin",))
409

410
    async def run(self, args: argparse.Namespace):
1✔
411
        from pybricksdev.dfu import backup_dfu
×
412

413
        backup_dfu(args.firmware)
×
414

415

416
class DFURestore(Tool):
1✔
417
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
418
        parser = subparsers.add_parser(
×
419
            "restore",
420
            help="restore firmware using DFU",
421
        )
422
        parser.tool = self
×
423
        parser.add_argument(
×
424
            "firmware",
425
            metavar="<firmware-file>",
426
            type=argparse.FileType(mode="rb"),
427
            help="the firmware .bin file",
428
        ).completer = FilesCompleter(allowednames=(".bin",))
429

430
    async def run(self, args: argparse.Namespace):
1✔
431
        from pybricksdev.dfu import restore_dfu
×
432

433
        restore_dfu(args.firmware)
×
434

435

436
class DFU(Tool):
1✔
437
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
438
        self.parser = subparsers.add_parser(
×
439
            "dfu",
440
            help="use DFU to backup or restore firmware",
441
        )
442
        self.parser.tool = self
×
443
        self.subparsers = self.parser.add_subparsers(
×
444
            metavar="<action>", dest="action", help="the action to perform"
445
        )
446

447
        for tool in DFUBackup(), DFURestore():
×
448
            tool.add_parser(self.subparsers)
×
449

450
    def run(self, args: argparse.Namespace):
1✔
451
        if args.action not in self.subparsers.choices:
×
452
            self.parser.error(
×
453
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
454
            )
455

456
        return self.subparsers.choices[args.action].tool.run(args)
×
457

458

459
class OADFlash(Tool):
1✔
460
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
461
        parser = subparsers.add_parser(
×
462
            "flash",
463
            help="update firmware on a LEGO Powered Up device using TI OAD",
464
        )
465
        parser.tool = self
×
466
        parser.add_argument(
×
467
            "firmware",
468
            metavar="<firmware-file>",
469
            type=argparse.FileType(mode="rb"),
470
            help="the firmware .oda file",
471
        ).completer = FilesCompleter(allowednames=(".oda",))
472

473
    async def run(self, args: argparse.Namespace):
1✔
474
        from pybricksdev.cli.oad import flash_oad_image
×
475

476
        await flash_oad_image(args.firmware)
×
477

478

479
class OADInfo(Tool):
1✔
480
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
481
        parser = subparsers.add_parser(
×
482
            "info",
483
            help="get information about firmware on a LEGO Powered Up device using TI OAD",
484
        )
485
        parser.tool = self
×
486

487
    async def run(self, args: argparse.Namespace):
1✔
488
        from pybricksdev.cli.oad import dump_oad_info
×
489

490
        await dump_oad_info()
×
491

492

493
class OAD(Tool):
1✔
494
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
495
        self.parser = subparsers.add_parser(
×
496
            "oad",
497
            help="update firmware on a LEGO Powered Up device using TI OAD",
498
        )
499
        self.parser.tool = self
×
500
        self.subparsers = self.parser.add_subparsers(
×
501
            metavar="<action>", dest="action", help="the action to perform"
502
        )
503

504
        for tool in OADFlash(), OADInfo():
×
505
            tool.add_parser(self.subparsers)
×
506

507
    def run(self, args: argparse.Namespace):
1✔
508
        if args.action not in self.subparsers.choices:
×
509
            self.parser.error(
×
510
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
511
            )
512

513
        return self.subparsers.choices[args.action].tool.run(args)
×
514

515

516
class LWP3Repl(Tool):
1✔
517
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
518
        parser = subparsers.add_parser(
×
519
            "repl",
520
            help="interactive REPL for sending and receiving LWP3 messages",
521
        )
522
        parser.tool = self
×
523

524
    def run(self, args: argparse.Namespace):
1✔
525
        from pybricksdev.cli.lwp3.repl import repl, setup_repl_logging
×
526

527
        setup_repl_logging()
×
528
        return repl()
×
529

530

531
class LWP3(Tool):
1✔
532
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
533
        self.parser = subparsers.add_parser(
×
534
            "lwp3", help="interact with devices using LWP3"
535
        )
536
        self.parser.tool = self
×
537
        self.subparsers = self.parser.add_subparsers(
×
538
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
539
        )
540

541
        for tool in (LWP3Repl(),):
×
542
            tool.add_parser(self.subparsers)
×
543

544
    def run(self, args: argparse.Namespace):
1✔
545
        if args.lwp3_tool not in self.subparsers.choices:
×
546
            self.parser.error(
×
547
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
548
            )
549

550
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
551

552

553
class Udev(Tool):
1✔
554
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
555
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
1✔
556
        parser.tool = self
1✔
557

558
    async def run(self, args: argparse.Namespace):
1✔
559
        from importlib.resources import read_text
1✔
560

561
        from pybricksdev import resources
1✔
562

563
        print(read_text(resources, resources.UDEV_RULES))
1✔
564

565

566
def main():
1✔
567
    """Runs ``pybricksdev`` command line interface."""
568

569
    if sys.platform == "win32":
×
570
        # Hack around bad side-effects of pythoncom on Windows
571
        try:
×
572
            from bleak_winrt._winrt import MTA, init_apartment
×
573
        except ImportError:
×
574
            from winrt._winrt import MTA, init_apartment
×
575

576
        init_apartment(MTA)
×
577

578
    # Provide main description and help.
579
    parser = argparse.ArgumentParser(
×
580
        prog=PROG_NAME,
581
        description="Utilities for Pybricks developers.",
582
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
583
    )
584

585
    parser.add_argument(
×
586
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
587
    )
588
    parser.add_argument(
×
589
        "-d", "--debug", action="store_true", help="enable debug logging"
590
    )
591

592
    subparsers = parser.add_subparsers(
×
593
        metavar="<tool>",
594
        dest="tool",
595
        help="the tool to use",
596
    )
597

598
    for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
×
599
        tool.add_parser(subparsers)
×
600

601
    argcomplete.autocomplete(parser)
×
602
    args = parser.parse_args()
×
603

604
    logging.basicConfig(
×
605
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
606
        level=logging.DEBUG if args.debug else logging.WARNING,
607
    )
608

609
    if not args.tool:
×
610
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
611

612
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc