• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 17632086958

11 Sep 2025 02:22AM UTC coverage: 53.38% (-0.9%) from 54.325%
17632086958

push

github

web-flow
cli: add --stay-connected argument run command

Add a `--stay-connected` argument that causes the `pybricksdev run` command to stay connected after the user program ends (implies `--wait`). After this, an interactive menu is show to allow compiling and running or downloading the program again. Also cancels the menu if the program is started by pressing the button on the hub so that stdout can print.

65 of 253 branches covered (25.69%)

Branch coverage included in aggregate %.

1988 of 3593 relevant lines covered (55.33%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.4
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2024 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
5

6
import argparse
1✔
7
import asyncio
1✔
8
import contextlib
1✔
9
import logging
1✔
10
import os
1✔
11
import sys
1✔
12
from abc import ABC, abstractmethod
1✔
13
from os import PathLike, path
1✔
14
from tempfile import NamedTemporaryFile
1✔
15
from typing import ContextManager, TextIO
1✔
16

17
import argcomplete
1✔
18
import questionary
1✔
19
from argcomplete.completers import FilesCompleter
1✔
20

21
from pybricksdev import __name__ as MODULE_NAME
1✔
22
from pybricksdev import __version__ as MODULE_VERSION
1✔
23
from pybricksdev.connections.pybricks import (
1✔
24
    HubDisconnectError,
25
    HubPowerButtonPressedError,
26
)
27

28
PROG_NAME = (
1✔
29
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
30
    if sys.argv[0].endswith("__main__.py")
31
    else path.basename(sys.argv[0])
32
)
33

34

35
class Tool(ABC):
1✔
36
    """Common base class for tool implementations."""
37

38
    @abstractmethod
1✔
39
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
40
        """
41
        Overriding methods must at least do the following::
42

43
            parser = subparsers.add_parser('tool', ...)
44
            parser.tool = self
45

46
        Then additional arguments can be added using the ``parser`` object.
47
        """
48
        pass
×
49

50
    @abstractmethod
1✔
51
    async def run(self, args: argparse.Namespace):
1✔
52
        """
53
        Overriding methods should provide an implementation to run the tool.
54
        """
55
        pass
×
56

57

58
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
1✔
59
    """
60
    Gets the path to a script on the file system.
61

62
    If the file is ``sys.stdin``, the contents are copied to a temporary file
63
    and the path to the temporary file is returned. Otherwise, the file is closed
64
    and the path is returned.
65

66
    The context manager will delete the temporary file, if applicable.
67
    """
68
    if file is sys.stdin:
1✔
69
        # Have to close the temp file so that mpy-cross can read it, so we
70
        # create our own context manager to delete the file when we are done
71
        # using it.
72

73
        @contextlib.contextmanager
×
74
        def temp_context():
×
75
            try:
×
76
                with NamedTemporaryFile("wb", suffix=".py", delete=False) as temp:
×
77
                    temp.write(file.buffer.read())
×
78

79
                yield temp.name
×
80
            finally:
81
                try:
×
82
                    os.remove(temp.name)
×
83
                except NameError:
×
84
                    # if NamedTemporaryFile() throws, temp is not defined
85
                    pass
×
86
                except OSError:
×
87
                    # file was already deleted or other strangeness
88
                    pass
×
89

90
        return temp_context()
×
91

92
    file.close()
1✔
93
    return contextlib.nullcontext(file.name)
1✔
94

95

96
class Compile(Tool):
1✔
97
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
98
        parser = subparsers.add_parser(
1✔
99
            "compile",
100
            help="compile a Pybricks program without running it",
101
        )
102
        parser.add_argument(
1✔
103
            "file",
104
            metavar="<file>",
105
            help="path to a MicroPython script or `-` for stdin",
106
            type=argparse.FileType(encoding="utf-8"),
107
        )
108
        parser.add_argument(
1✔
109
            "--abi",
110
            metavar="<n>",
111
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
112
            default=6,
113
            choices=[5, 6],
114
            type=int,
115
        )
116
        parser.tool = self
1✔
117

118
    async def run(self, args: argparse.Namespace):
1✔
119
        from pybricksdev.compile import compile_multi_file, print_mpy
1✔
120

121
        with _get_script_path(args.file) as script_path:
1✔
122
            mpy = await compile_multi_file(script_path, args.abi)
1✔
123
        print_mpy(mpy)
1✔
124

125

126
class Run(Tool):
1✔
127
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
128
        parser = subparsers.add_parser(
1✔
129
            "run",
130
            help="run a Pybricks program",
131
        )
132
        parser.tool = self
1✔
133
        parser.add_argument(
1✔
134
            "conntype",
135
            metavar="<connection type>",
136
            help="connection type: %(choices)s",
137
            choices=["ble", "usb"],
138
        )
139
        parser.add_argument(
1✔
140
            "file",
141
            metavar="<file>",
142
            help="path to a MicroPython script or `-` for stdin",
143
            type=argparse.FileType(encoding="utf-8"),
144
        )
145
        parser.add_argument(
1✔
146
            "-n",
147
            "--name",
148
            metavar="<name>",
149
            required=False,
150
            help="Bluetooth device name or Bluetooth address for BLE connection; "
151
            "serial port name for USB connection",
152
        )
153

154
        parser.add_argument(
1✔
155
            "--start",
156
            help="Start the program immediately after downloading it.",
157
            action=argparse.BooleanOptionalAction,
158
            default=True,
159
        )
160

161
        parser.add_argument(
1✔
162
            "--wait",
163
            help="Wait for the program to complete before disconnecting. Only applies when starting program right away.",
164
            action=argparse.BooleanOptionalAction,
165
            default=True,
166
        )
167

168
        parser.add_argument(
1✔
169
            "--stay-connected",
170
            help="Add a menu option to resend the code with bluetooth instead of disconnecting from the robot after the program ends.",
171
            action=argparse.BooleanOptionalAction,
172
            default=False,
173
        )
174

175
    async def run(self, args: argparse.Namespace):
1✔
176

177
        # Pick the right connection
178
        if args.conntype == "ble":
1✔
179
            from pybricksdev.ble import find_device as find_ble
1✔
180
            from pybricksdev.connections.pybricks import PybricksHubBLE
1✔
181

182
            # It is a Pybricks Hub with BLE. Device name or address is given.
183
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
1✔
184
            device_or_address = await find_ble(args.name)
1✔
185
            hub = PybricksHubBLE(device_or_address)
1✔
186
        elif args.conntype == "usb":
1✔
187
            from usb.core import find as find_usb
1✔
188

189
            from pybricksdev.connections.pybricks import PybricksHubUSB
1✔
190
            from pybricksdev.usb import (
1✔
191
                EV3_USB_PID,
192
                LEGO_USB_VID,
193
                MINDSTORMS_INVENTOR_USB_PID,
194
                NXT_USB_PID,
195
                SPIKE_ESSENTIAL_USB_PID,
196
                SPIKE_PRIME_USB_PID,
197
            )
198

199
            def is_pybricks_usb(dev):
1✔
200
                return (
×
201
                    (dev.idVendor == LEGO_USB_VID)
202
                    and (
203
                        dev.idProduct
204
                        in [
205
                            NXT_USB_PID,
206
                            EV3_USB_PID,
207
                            SPIKE_PRIME_USB_PID,
208
                            SPIKE_ESSENTIAL_USB_PID,
209
                            MINDSTORMS_INVENTOR_USB_PID,
210
                        ]
211
                    )
212
                    and dev.product.endswith("Pybricks")
213
                )
214

215
            device_or_address = find_usb(custom_match=is_pybricks_usb)
1✔
216

217
            if device_or_address is None:
1✔
218
                print("Pybricks Hub not found.", file=sys.stderr)
×
219
                exit(1)
×
220

221
            hub = PybricksHubUSB(device_or_address)
1✔
222
        else:
223
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
224

225
        # Connect to the address and run the script
226
        await hub.connect()
1✔
227
        try:
1✔
228
            with _get_script_path(args.file) as script_path:
1✔
229
                if args.start:
1✔
230
                    await hub.run(script_path, args.wait or args.stay_connected)
1✔
231
                else:
232
                    if args.stay_connected:
1✔
233
                        # if the user later starts the program by pressing the button on the hub,
234
                        # we still want the hub stdout to print to Python's stdout
235
                        hub.print_output = True
×
236
                        hub._enable_line_handler = True
×
237
                    await hub.download(script_path)
1✔
238

239
            if not args.stay_connected:
1✔
240
                return
1✔
241

242
            async def reconnect_hub():
×
243
                if not await questionary.confirm(
×
244
                    "\nThe hub has been disconnected. Would you like to re-connect?"
245
                ).ask_async():
246
                    exit()
×
247

248
                if args.conntype == "ble":
×
249
                    print(
×
250
                        f"Searching for {args.name or 'any hub with Pybricks service'}..."
251
                    )
252
                    device_or_address = await find_ble(args.name)
×
253
                    hub = PybricksHubBLE(device_or_address)
×
254
                elif args.conntype == "usb":
×
255
                    device_or_address = find_usb(custom_match=is_pybricks_usb)
×
256
                    hub = PybricksHubUSB(device_or_address)
×
257

258
                await hub.connect()
×
259
                # re-enable echoing of the hub's stdout
260
                hub._enable_line_handler = True
×
261
                hub.print_output = True
×
262
                return hub
×
263

264
            response_options = [
×
265
                "Recompile and Run",
266
                "Recompile and Download",
267
                "Exit",
268
            ]
269
            while True:
×
270
                try:
×
271
                    if args.file is sys.stdin:
×
272
                        await hub.race_disconnect(
×
273
                            hub.race_power_button_press(
274
                                questionary.press_any_key_to_continue(
275
                                    "The hub will stay connected and echo its output to the terminal. Press any key to exit."
276
                                ).ask_async()
277
                            )
278
                        )
279
                        return
×
280
                    response = await hub.race_disconnect(
×
281
                        hub.race_power_button_press(
282
                            questionary.select(
283
                                "Would you like to re-compile your code?",
284
                                response_options,
285
                                default=(
286
                                    response_options[0]
287
                                    if args.start
288
                                    else response_options[1]
289
                                ),
290
                            ).ask_async()
291
                        )
292
                    )
293
                    with _get_script_path(args.file) as script_path:
×
294
                        if response == response_options[0]:
×
295
                            await hub.run(script_path, wait=True)
×
296
                        elif response == response_options[1]:
×
297
                            await hub.download(script_path)
×
298
                        else:
299
                            return
×
300

301
                except HubPowerButtonPressedError:
×
302
                    # This means the user pressed the button on the hub to re-start the
303
                    # current program, so the menu was canceled and we are now printing
304
                    # the hub stdout until the user program ends on the hub.
305
                    try:
×
306
                        await hub._wait_for_power_button_release()
×
307
                        await hub._wait_for_user_program_stop()
×
308

309
                    except HubDisconnectError:
×
310
                        hub = await reconnect_hub()
×
311

312
                except HubDisconnectError:
×
313
                    # let terminal cool off before making a new prompt
314
                    await asyncio.sleep(0.3)
×
315
                    hub = await reconnect_hub()
×
316

317
        finally:
318
            await hub.disconnect()
1✔
319

320

321
class Flash(Tool):
1✔
322

323
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
324
        parser = subparsers.add_parser(
×
325
            "flash", help="flash firmware on a LEGO Powered Up device"
326
        )
327
        parser.tool = self
×
328

329
        parser.add_argument(
×
330
            "firmware",
331
            metavar="<firmware-file>",
332
            type=argparse.FileType(mode="rb"),
333
            help="the firmware .zip file",
334
        ).completer = FilesCompleter(allowednames=(".zip",))
335

336
        parser.add_argument(
×
337
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
338
        )
339

340
    def run(self, args: argparse.Namespace):
1✔
341
        from pybricksdev.cli.flash import flash_firmware
×
342

343
        return flash_firmware(args.firmware, args.name)
×
344

345

346
class DFUBackup(Tool):
1✔
347
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
348
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
349
        parser.tool = self
×
350
        parser.add_argument(
×
351
            "firmware",
352
            metavar="<firmware-file>",
353
            type=argparse.FileType(mode="wb"),
354
            help="the firmware .bin file",
355
        ).completer = FilesCompleter(allowednames=(".bin",))
356

357
    async def run(self, args: argparse.Namespace):
1✔
358
        from pybricksdev.dfu import backup_dfu
×
359

360
        backup_dfu(args.firmware)
×
361

362

363
class DFURestore(Tool):
1✔
364
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
365
        parser = subparsers.add_parser(
×
366
            "restore",
367
            help="restore firmware using DFU",
368
        )
369
        parser.tool = self
×
370
        parser.add_argument(
×
371
            "firmware",
372
            metavar="<firmware-file>",
373
            type=argparse.FileType(mode="rb"),
374
            help="the firmware .bin file",
375
        ).completer = FilesCompleter(allowednames=(".bin",))
376

377
    async def run(self, args: argparse.Namespace):
1✔
378
        from pybricksdev.dfu import restore_dfu
×
379

380
        restore_dfu(args.firmware)
×
381

382

383
class DFU(Tool):
1✔
384
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
385
        self.parser = subparsers.add_parser(
×
386
            "dfu",
387
            help="use DFU to backup or restore firmware",
388
        )
389
        self.parser.tool = self
×
390
        self.subparsers = self.parser.add_subparsers(
×
391
            metavar="<action>", dest="action", help="the action to perform"
392
        )
393

394
        for tool in DFUBackup(), DFURestore():
×
395
            tool.add_parser(self.subparsers)
×
396

397
    def run(self, args: argparse.Namespace):
1✔
398
        if args.action not in self.subparsers.choices:
×
399
            self.parser.error(
×
400
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
401
            )
402

403
        return self.subparsers.choices[args.action].tool.run(args)
×
404

405

406
class OADFlash(Tool):
1✔
407
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
408
        parser = subparsers.add_parser(
×
409
            "flash",
410
            help="update firmware on a LEGO Powered Up device using TI OAD",
411
        )
412
        parser.tool = self
×
413
        parser.add_argument(
×
414
            "firmware",
415
            metavar="<firmware-file>",
416
            type=argparse.FileType(mode="rb"),
417
            help="the firmware .oda file",
418
        ).completer = FilesCompleter(allowednames=(".oda",))
419

420
    async def run(self, args: argparse.Namespace):
1✔
421
        from pybricksdev.cli.oad import flash_oad_image
×
422

423
        await flash_oad_image(args.firmware)
×
424

425

426
class OADInfo(Tool):
1✔
427
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
428
        parser = subparsers.add_parser(
×
429
            "info",
430
            help="get information about firmware on a LEGO Powered Up device using TI OAD",
431
        )
432
        parser.tool = self
×
433

434
    async def run(self, args: argparse.Namespace):
1✔
435
        from pybricksdev.cli.oad import dump_oad_info
×
436

437
        await dump_oad_info()
×
438

439

440
class OAD(Tool):
1✔
441
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
442
        self.parser = subparsers.add_parser(
×
443
            "oad",
444
            help="update firmware on a LEGO Powered Up device using TI OAD",
445
        )
446
        self.parser.tool = self
×
447
        self.subparsers = self.parser.add_subparsers(
×
448
            metavar="<action>", dest="action", help="the action to perform"
449
        )
450

451
        for tool in OADFlash(), OADInfo():
×
452
            tool.add_parser(self.subparsers)
×
453

454
    def run(self, args: argparse.Namespace):
1✔
455
        if args.action not in self.subparsers.choices:
×
456
            self.parser.error(
×
457
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
458
            )
459

460
        return self.subparsers.choices[args.action].tool.run(args)
×
461

462

463
class LWP3Repl(Tool):
1✔
464
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
465
        parser = subparsers.add_parser(
×
466
            "repl",
467
            help="interactive REPL for sending and receiving LWP3 messages",
468
        )
469
        parser.tool = self
×
470

471
    def run(self, args: argparse.Namespace):
1✔
472
        from pybricksdev.cli.lwp3.repl import repl, setup_repl_logging
×
473

474
        setup_repl_logging()
×
475
        return repl()
×
476

477

478
class LWP3(Tool):
1✔
479
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
480
        self.parser = subparsers.add_parser(
×
481
            "lwp3", help="interact with devices using LWP3"
482
        )
483
        self.parser.tool = self
×
484
        self.subparsers = self.parser.add_subparsers(
×
485
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
486
        )
487

488
        for tool in (LWP3Repl(),):
×
489
            tool.add_parser(self.subparsers)
×
490

491
    def run(self, args: argparse.Namespace):
1✔
492
        if args.lwp3_tool not in self.subparsers.choices:
×
493
            self.parser.error(
×
494
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
495
            )
496

497
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
498

499

500
class Udev(Tool):
1✔
501
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
502
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
1✔
503
        parser.tool = self
1✔
504

505
    async def run(self, args: argparse.Namespace):
1✔
506
        from importlib.resources import read_text
1✔
507

508
        from pybricksdev import resources
1✔
509

510
        print(read_text(resources, resources.UDEV_RULES))
1✔
511

512

513
def main():
1✔
514
    """Runs ``pybricksdev`` command line interface."""
515

516
    if sys.platform == "win32":
×
517
        # Hack around bad side-effects of pythoncom on Windows
518
        try:
×
519
            from bleak_winrt._winrt import MTA, init_apartment
×
520
        except ImportError:
×
521
            from winrt._winrt import MTA, init_apartment
×
522

523
        init_apartment(MTA)
×
524

525
    # Provide main description and help.
526
    parser = argparse.ArgumentParser(
×
527
        prog=PROG_NAME,
528
        description="Utilities for Pybricks developers.",
529
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
530
    )
531

532
    parser.add_argument(
×
533
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
534
    )
535
    parser.add_argument(
×
536
        "-d", "--debug", action="store_true", help="enable debug logging"
537
    )
538

539
    subparsers = parser.add_subparsers(
×
540
        metavar="<tool>",
541
        dest="tool",
542
        help="the tool to use",
543
    )
544

545
    for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
×
546
        tool.add_parser(subparsers)
×
547

548
    argcomplete.autocomplete(parser)
×
549
    args = parser.parse_args()
×
550

551
    logging.basicConfig(
×
552
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
553
        level=logging.DEBUG if args.debug else logging.WARNING,
554
    )
555

556
    if not args.tool:
×
557
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
558

559
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc