• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 18768842123

24 Oct 2025 03:38AM UTC coverage: 53.08% (-0.3%) from 53.33%
18768842123

push

github

web-flow
cli: Allow changing the current file within the --stay-connected menu.

Add a "Change Target File" menu item that prompts the user to select a new file. This way the user can change projects without having to disconnect and reconnect. Once the file is selected, the program is compiled and downloaded, but it doesn't start.

65 of 254 branches covered (25.59%)

Branch coverage included in aggregate %.

1986 of 3610 relevant lines covered (55.01%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.58
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2024 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
5

6
import argparse
1✔
7
import asyncio
1✔
8
import contextlib
1✔
9
import logging
1✔
10
import os
1✔
11
import sys
1✔
12
from abc import ABC, abstractmethod
1✔
13
from enum import IntEnum
1✔
14
from os import PathLike, path
1✔
15
from tempfile import NamedTemporaryFile
1✔
16
from typing import ContextManager, TextIO
1✔
17

18
import argcomplete
1✔
19
import questionary
1✔
20
from argcomplete.completers import FilesCompleter
1✔
21

22
from pybricksdev import __name__ as MODULE_NAME
1✔
23
from pybricksdev import __version__ as MODULE_VERSION
1✔
24
from pybricksdev.connections.pybricks import (
1✔
25
    HubDisconnectError,
26
    HubPowerButtonPressedError,
27
)
28

29
PROG_NAME = (
1✔
30
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
31
    if sys.argv[0].endswith("__main__.py")
32
    else path.basename(sys.argv[0])
33
)
34

35

36
class Tool(ABC):
1✔
37
    """Common base class for tool implementations."""
38

39
    @abstractmethod
1✔
40
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
41
        """
42
        Overriding methods must at least do the following::
43

44
            parser = subparsers.add_parser('tool', ...)
45
            parser.tool = self
46

47
        Then additional arguments can be added using the ``parser`` object.
48
        """
49
        pass
×
50

51
    @abstractmethod
1✔
52
    async def run(self, args: argparse.Namespace):
1✔
53
        """
54
        Overriding methods should provide an implementation to run the tool.
55
        """
56
        pass
×
57

58

59
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
1✔
60
    """
61
    Gets the path to a script on the file system.
62

63
    If the file is ``sys.stdin``, the contents are copied to a temporary file
64
    and the path to the temporary file is returned. Otherwise, the file is closed
65
    and the path is returned.
66

67
    The context manager will delete the temporary file, if applicable.
68
    """
69
    if file is sys.stdin:
1✔
70
        # Have to close the temp file so that mpy-cross can read it, so we
71
        # create our own context manager to delete the file when we are done
72
        # using it.
73

74
        @contextlib.contextmanager
×
75
        def temp_context():
×
76
            try:
×
77
                with NamedTemporaryFile("wb", suffix=".py", delete=False) as temp:
×
78
                    temp.write(file.buffer.read())
×
79

80
                yield temp.name
×
81
            finally:
82
                try:
×
83
                    os.remove(temp.name)
×
84
                except NameError:
×
85
                    # if NamedTemporaryFile() throws, temp is not defined
86
                    pass
×
87
                except OSError:
×
88
                    # file was already deleted or other strangeness
89
                    pass
×
90

91
        return temp_context()
×
92

93
    file.close()
1✔
94
    return contextlib.nullcontext(file.name)
1✔
95

96

97
class Compile(Tool):
1✔
98
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
99
        parser = subparsers.add_parser(
1✔
100
            "compile",
101
            help="compile a Pybricks program without running it",
102
        )
103
        parser.add_argument(
1✔
104
            "file",
105
            metavar="<file>",
106
            help="path to a MicroPython script or `-` for stdin",
107
            type=argparse.FileType(encoding="utf-8"),
108
        )
109
        parser.add_argument(
1✔
110
            "--abi",
111
            metavar="<n>",
112
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
113
            default=6,
114
            choices=[5, 6],
115
            type=int,
116
        )
117
        parser.tool = self
1✔
118

119
    async def run(self, args: argparse.Namespace):
1✔
120
        from pybricksdev.compile import compile_multi_file, print_mpy
1✔
121

122
        with _get_script_path(args.file) as script_path:
1✔
123
            mpy = await compile_multi_file(script_path, args.abi)
1✔
124
        print_mpy(mpy)
1✔
125

126

127
class Run(Tool):
1✔
128
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
129
        parser = subparsers.add_parser(
1✔
130
            "run",
131
            help="run a Pybricks program",
132
        )
133
        parser.tool = self
1✔
134
        parser.add_argument(
1✔
135
            "conntype",
136
            metavar="<connection type>",
137
            help="connection type: %(choices)s",
138
            choices=["ble", "usb"],
139
        )
140
        parser.add_argument(
1✔
141
            "file",
142
            metavar="<file>",
143
            help="path to a MicroPython script or `-` for stdin",
144
            type=argparse.FileType(encoding="utf-8"),
145
        )
146
        parser.add_argument(
1✔
147
            "-n",
148
            "--name",
149
            metavar="<name>",
150
            required=False,
151
            help="Bluetooth device name or Bluetooth address for BLE connection; "
152
            "serial port name for USB connection",
153
        )
154

155
        parser.add_argument(
1✔
156
            "--start",
157
            help="Start the program immediately after downloading it.",
158
            action=argparse.BooleanOptionalAction,
159
            default=True,
160
        )
161

162
        parser.add_argument(
1✔
163
            "--wait",
164
            help="Wait for the program to complete before disconnecting. Only applies when starting program right away.",
165
            action=argparse.BooleanOptionalAction,
166
            default=True,
167
        )
168

169
        parser.add_argument(
1✔
170
            "--stay-connected",
171
            help="Add a menu option to resend the code with bluetooth instead of disconnecting from the robot after the program ends.",
172
            action=argparse.BooleanOptionalAction,
173
            default=False,
174
        )
175

176
    async def run(self, args: argparse.Namespace):
1✔
177

178
        # Pick the right connection
179
        if args.conntype == "ble":
1✔
180
            from pybricksdev.ble import find_device as find_ble
1✔
181
            from pybricksdev.connections.pybricks import PybricksHubBLE
1✔
182

183
            # It is a Pybricks Hub with BLE. Device name or address is given.
184
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
1✔
185
            device_or_address = await find_ble(args.name)
1✔
186
            hub = PybricksHubBLE(device_or_address)
1✔
187
        elif args.conntype == "usb":
1✔
188
            from usb.core import find as find_usb
1✔
189

190
            from pybricksdev.connections.pybricks import PybricksHubUSB
1✔
191
            from pybricksdev.usb import (
1✔
192
                EV3_USB_PID,
193
                LEGO_USB_VID,
194
                MINDSTORMS_INVENTOR_USB_PID,
195
                NXT_USB_PID,
196
                SPIKE_ESSENTIAL_USB_PID,
197
                SPIKE_PRIME_USB_PID,
198
            )
199

200
            def is_pybricks_usb(dev):
1✔
201
                return (
×
202
                    (dev.idVendor == LEGO_USB_VID)
203
                    and (
204
                        dev.idProduct
205
                        in [
206
                            NXT_USB_PID,
207
                            EV3_USB_PID,
208
                            SPIKE_PRIME_USB_PID,
209
                            SPIKE_ESSENTIAL_USB_PID,
210
                            MINDSTORMS_INVENTOR_USB_PID,
211
                        ]
212
                    )
213
                    and dev.product.endswith("Pybricks")
214
                )
215

216
            device_or_address = find_usb(custom_match=is_pybricks_usb)
1✔
217

218
            if device_or_address is None:
1✔
219
                print("Pybricks Hub not found.", file=sys.stderr)
×
220
                exit(1)
×
221

222
            hub = PybricksHubUSB(device_or_address)
1✔
223
        else:
224
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
225

226
        # Connect to the address and run the script
227
        await hub.connect()
1✔
228
        try:
1✔
229
            with _get_script_path(args.file) as script_path:
1✔
230
                if args.start:
1✔
231
                    await hub.run(script_path, args.wait or args.stay_connected)
1✔
232
                else:
233
                    if args.stay_connected:
1✔
234
                        # if the user later starts the program by pressing the button on the hub,
235
                        # we still want the hub stdout to print to Python's stdout
236
                        hub.print_output = True
×
237
                        hub._enable_line_handler = True
×
238
                    await hub.download(script_path)
1✔
239

240
            if not args.stay_connected:
1✔
241
                return
1✔
242

243
            class ResponseOptions(IntEnum):
×
244
                RECOMPILE_RUN = 0
×
245
                RECOMPILE_DOWNLOAD = 1
×
246
                CHANGE_TARGET_FILE = 2
×
247
                EXIT = 3
×
248

249
            async def reconnect_hub():
×
250
                if not await questionary.confirm(
×
251
                    "\nThe hub has been disconnected. Would you like to re-connect?"
252
                ).ask_async():
253
                    exit()
×
254

255
                if args.conntype == "ble":
×
256
                    print(
×
257
                        f"Searching for {args.name or 'any hub with Pybricks service'}..."
258
                    )
259
                    device_or_address = await find_ble(args.name)
×
260
                    hub = PybricksHubBLE(device_or_address)
×
261
                elif args.conntype == "usb":
×
262
                    device_or_address = find_usb(custom_match=is_pybricks_usb)
×
263
                    hub = PybricksHubUSB(device_or_address)
×
264

265
                await hub.connect()
×
266
                # re-enable echoing of the hub's stdout
267
                hub._enable_line_handler = True
×
268
                hub.print_output = True
×
269
                return hub
×
270

271
            response_options = [
×
272
                "Recompile and Run",
273
                "Recompile and Download",
274
                "Change Target File",
275
                "Exit",
276
            ]
277
            while True:
×
278
                try:
×
279
                    if args.file is sys.stdin:
×
280
                        await hub.race_disconnect(
×
281
                            hub.race_power_button_press(
282
                                questionary.press_any_key_to_continue(
283
                                    "The hub will stay connected and echo its output to the terminal. Press any key to exit."
284
                                ).ask_async()
285
                            )
286
                        )
287
                        return
×
288
                    response = await hub.race_disconnect(
×
289
                        hub.race_power_button_press(
290
                            questionary.select(
291
                                f"Would you like to re-compile {os.path.basename(args.file.name)}?",
292
                                response_options,
293
                                default=(
294
                                    response_options[ResponseOptions.RECOMPILE_RUN]
295
                                    if args.start
296
                                    else response_options[
297
                                        ResponseOptions.RECOMPILE_DOWNLOAD
298
                                    ]
299
                                ),
300
                            ).ask_async()
301
                        )
302
                    )
303

304
                    match response_options.index(response):
×
305

306
                        case ResponseOptions.RECOMPILE_RUN:
×
307
                            with _get_script_path(args.file) as script_path:
×
308
                                await hub.run(script_path, wait=True)
×
309

310
                        case ResponseOptions.RECOMPILE_DOWNLOAD:
×
311
                            with _get_script_path(args.file) as script_path:
×
312
                                await hub.download(script_path)
×
313

314
                        case ResponseOptions.CHANGE_TARGET_FILE:
×
315
                            args.file.close()
×
316
                            while True:
×
317
                                try:
×
318
                                    args.file = open(
×
319
                                        await hub.race_disconnect(
320
                                            hub.race_power_button_press(
321
                                                questionary.path(
322
                                                    "What file would you like to use?"
323
                                                ).ask_async()
324
                                            )
325
                                        )
326
                                    )
327
                                    break
×
328
                                except FileNotFoundError:
×
329
                                    print("The file was not found. Please try again.")
×
330
                            # send the new target file to the hub
331
                            with _get_script_path(args.file) as script_path:
×
332
                                await hub.download(script_path)
×
333

334
                        case _:
×
335
                            return
×
336

337
                except HubPowerButtonPressedError:
×
338
                    # This means the user pressed the button on the hub to re-start the
339
                    # current program, so the menu was canceled and we are now printing
340
                    # the hub stdout until the user program ends on the hub.
341
                    try:
×
342
                        await hub._wait_for_power_button_release()
×
343
                        await hub._wait_for_user_program_stop()
×
344

345
                    except HubDisconnectError:
×
346
                        hub = await reconnect_hub()
×
347

348
                except HubDisconnectError:
×
349
                    # let terminal cool off before making a new prompt
350
                    await asyncio.sleep(0.3)
×
351
                    hub = await reconnect_hub()
×
352

353
        finally:
354
            await hub.disconnect()
1✔
355

356

357
class Flash(Tool):
1✔
358

359
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
360
        parser = subparsers.add_parser(
×
361
            "flash", help="flash firmware on a LEGO Powered Up device"
362
        )
363
        parser.tool = self
×
364

365
        parser.add_argument(
×
366
            "firmware",
367
            metavar="<firmware-file>",
368
            type=argparse.FileType(mode="rb"),
369
            help="the firmware .zip file",
370
        ).completer = FilesCompleter(allowednames=(".zip",))
371

372
        parser.add_argument(
×
373
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
374
        )
375

376
    def run(self, args: argparse.Namespace):
1✔
377
        from pybricksdev.cli.flash import flash_firmware
×
378

379
        return flash_firmware(args.firmware, args.name)
×
380

381

382
class DFUBackup(Tool):
1✔
383
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
384
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
385
        parser.tool = self
×
386
        parser.add_argument(
×
387
            "firmware",
388
            metavar="<firmware-file>",
389
            type=argparse.FileType(mode="wb"),
390
            help="the firmware .bin file",
391
        ).completer = FilesCompleter(allowednames=(".bin",))
392

393
    async def run(self, args: argparse.Namespace):
1✔
394
        from pybricksdev.dfu import backup_dfu
×
395

396
        backup_dfu(args.firmware)
×
397

398

399
class DFURestore(Tool):
1✔
400
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
401
        parser = subparsers.add_parser(
×
402
            "restore",
403
            help="restore firmware using DFU",
404
        )
405
        parser.tool = self
×
406
        parser.add_argument(
×
407
            "firmware",
408
            metavar="<firmware-file>",
409
            type=argparse.FileType(mode="rb"),
410
            help="the firmware .bin file",
411
        ).completer = FilesCompleter(allowednames=(".bin",))
412

413
    async def run(self, args: argparse.Namespace):
1✔
414
        from pybricksdev.dfu import restore_dfu
×
415

416
        restore_dfu(args.firmware)
×
417

418

419
class DFU(Tool):
1✔
420
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
421
        self.parser = subparsers.add_parser(
×
422
            "dfu",
423
            help="use DFU to backup or restore firmware",
424
        )
425
        self.parser.tool = self
×
426
        self.subparsers = self.parser.add_subparsers(
×
427
            metavar="<action>", dest="action", help="the action to perform"
428
        )
429

430
        for tool in DFUBackup(), DFURestore():
×
431
            tool.add_parser(self.subparsers)
×
432

433
    def run(self, args: argparse.Namespace):
1✔
434
        if args.action not in self.subparsers.choices:
×
435
            self.parser.error(
×
436
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
437
            )
438

439
        return self.subparsers.choices[args.action].tool.run(args)
×
440

441

442
class OADFlash(Tool):
1✔
443
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
444
        parser = subparsers.add_parser(
×
445
            "flash",
446
            help="update firmware on a LEGO Powered Up device using TI OAD",
447
        )
448
        parser.tool = self
×
449
        parser.add_argument(
×
450
            "firmware",
451
            metavar="<firmware-file>",
452
            type=argparse.FileType(mode="rb"),
453
            help="the firmware .oda file",
454
        ).completer = FilesCompleter(allowednames=(".oda",))
455

456
    async def run(self, args: argparse.Namespace):
1✔
457
        from pybricksdev.cli.oad import flash_oad_image
×
458

459
        await flash_oad_image(args.firmware)
×
460

461

462
class OADInfo(Tool):
1✔
463
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
464
        parser = subparsers.add_parser(
×
465
            "info",
466
            help="get information about firmware on a LEGO Powered Up device using TI OAD",
467
        )
468
        parser.tool = self
×
469

470
    async def run(self, args: argparse.Namespace):
1✔
471
        from pybricksdev.cli.oad import dump_oad_info
×
472

473
        await dump_oad_info()
×
474

475

476
class OAD(Tool):
1✔
477
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
478
        self.parser = subparsers.add_parser(
×
479
            "oad",
480
            help="update firmware on a LEGO Powered Up device using TI OAD",
481
        )
482
        self.parser.tool = self
×
483
        self.subparsers = self.parser.add_subparsers(
×
484
            metavar="<action>", dest="action", help="the action to perform"
485
        )
486

487
        for tool in OADFlash(), OADInfo():
×
488
            tool.add_parser(self.subparsers)
×
489

490
    def run(self, args: argparse.Namespace):
1✔
491
        if args.action not in self.subparsers.choices:
×
492
            self.parser.error(
×
493
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
494
            )
495

496
        return self.subparsers.choices[args.action].tool.run(args)
×
497

498

499
class LWP3Repl(Tool):
1✔
500
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
501
        parser = subparsers.add_parser(
×
502
            "repl",
503
            help="interactive REPL for sending and receiving LWP3 messages",
504
        )
505
        parser.tool = self
×
506

507
    def run(self, args: argparse.Namespace):
1✔
508
        from pybricksdev.cli.lwp3.repl import repl, setup_repl_logging
×
509

510
        setup_repl_logging()
×
511
        return repl()
×
512

513

514
class LWP3(Tool):
1✔
515
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
516
        self.parser = subparsers.add_parser(
×
517
            "lwp3", help="interact with devices using LWP3"
518
        )
519
        self.parser.tool = self
×
520
        self.subparsers = self.parser.add_subparsers(
×
521
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
522
        )
523

524
        for tool in (LWP3Repl(),):
×
525
            tool.add_parser(self.subparsers)
×
526

527
    def run(self, args: argparse.Namespace):
1✔
528
        if args.lwp3_tool not in self.subparsers.choices:
×
529
            self.parser.error(
×
530
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
531
            )
532

533
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
534

535

536
class Udev(Tool):
1✔
537
    def add_parser(self, subparsers: argparse._SubParsersAction):
1✔
538
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
1✔
539
        parser.tool = self
1✔
540

541
    async def run(self, args: argparse.Namespace):
1✔
542
        from importlib.resources import read_text
1✔
543

544
        from pybricksdev import resources
1✔
545

546
        print(read_text(resources, resources.UDEV_RULES))
1✔
547

548

549
def main():
1✔
550
    """Runs ``pybricksdev`` command line interface."""
551

552
    if sys.platform == "win32":
×
553
        # Hack around bad side-effects of pythoncom on Windows
554
        try:
×
555
            from bleak_winrt._winrt import MTA, init_apartment
×
556
        except ImportError:
×
557
            from winrt._winrt import MTA, init_apartment
×
558

559
        init_apartment(MTA)
×
560

561
    # Provide main description and help.
562
    parser = argparse.ArgumentParser(
×
563
        prog=PROG_NAME,
564
        description="Utilities for Pybricks developers.",
565
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
566
    )
567

568
    parser.add_argument(
×
569
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
570
    )
571
    parser.add_argument(
×
572
        "-d", "--debug", action="store_true", help="enable debug logging"
573
    )
574

575
    subparsers = parser.add_subparsers(
×
576
        metavar="<tool>",
577
        dest="tool",
578
        help="the tool to use",
579
    )
580

581
    for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
×
582
        tool.add_parser(subparsers)
×
583

584
    argcomplete.autocomplete(parser)
×
585
    args = parser.parse_args()
×
586

587
    logging.basicConfig(
×
588
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
589
        level=logging.DEBUG if args.debug else logging.WARNING,
590
    )
591

592
    if not args.tool:
×
593
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
594

595
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc