• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 13093259242

01 Feb 2025 11:02PM UTC coverage: 42.786% (-1.6%) from 44.367%
13093259242

push

github

dlech
Add support for USB connections

Adds a new subclass of PybricksHub that manages USB connections.

Co-developed-by: David Lechner <david@pybricks.com>
Signed-off-by: Nate Karstens <nate.karstens@gmail.com>

78 of 347 branches covered (22.48%)

Branch coverage included in aggregate %.

1639 of 3666 relevant lines covered (44.71%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2024 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
5

6
import argparse
×
7
import asyncio
×
8
import contextlib
×
9
import logging
×
10
import os
×
11
import socket
×
12
import sys
×
13
from abc import ABC, abstractmethod
×
14
from os import PathLike, path
×
15
from tempfile import NamedTemporaryFile
×
16
from typing import ContextManager, TextIO
×
17

18
import argcomplete
×
19
from argcomplete.completers import FilesCompleter
×
20

21
from pybricksdev import __name__ as MODULE_NAME
×
22
from pybricksdev import __version__ as MODULE_VERSION
×
23

24
PROG_NAME = (
×
25
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
26
    if sys.argv[0].endswith("__main__.py")
27
    else path.basename(sys.argv[0])
28
)
29

30

31
class Tool(ABC):
×
32
    """Common base class for tool implementations."""
33

34
    @abstractmethod
×
35
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
36
        """
37
        Overriding methods must at least do the following::
38

39
            parser = subparsers.add_parser('tool', ...)
40
            parser.tool = self
41

42
        Then additional arguments can be added using the ``parser`` object.
43
        """
44
        pass
×
45

46
    @abstractmethod
×
47
    async def run(self, args: argparse.Namespace):
×
48
        """
49
        Overriding methods should provide an implementation to run the tool.
50
        """
51
        pass
×
52

53

54
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
×
55
    """
56
    Gets the path to a script on the file system.
57

58
    If the file is ``sys.stdin``, the contents are copied to a temporary file
59
    and the path to the temporary file is returned. Otherwise, the file is closed
60
    and the path is returned.
61

62
    The context manager will delete the temporary file, if applicable.
63
    """
64
    if file is sys.stdin:
×
65
        # Have to close the temp file so that mpy-cross can read it, so we
66
        # create our own context manager to delete the file when we are done
67
        # using it.
68

69
        @contextlib.contextmanager
×
70
        def temp_context():
×
71
            try:
×
72
                with NamedTemporaryFile(suffix=".py", delete=False) as temp:
×
73
                    temp.write(file.buffer.read())
×
74

75
                yield temp.name
×
76
            finally:
77
                try:
×
78
                    os.remove(temp.name)
×
79
                except NameError:
×
80
                    # if NamedTemporaryFile() throws, temp is not defined
81
                    pass
×
82
                except OSError:
×
83
                    # file was already deleted or other strangeness
84
                    pass
×
85

86
        return temp_context()
×
87

88
    file.close()
×
89
    return contextlib.nullcontext(file.name)
×
90

91

92
class Compile(Tool):
×
93
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
94
        parser = subparsers.add_parser(
×
95
            "compile",
96
            help="compile a Pybricks program without running it",
97
        )
98
        parser.add_argument(
×
99
            "file",
100
            metavar="<file>",
101
            help="path to a MicroPython script or `-` for stdin",
102
            type=argparse.FileType(),
103
        )
104
        parser.add_argument(
×
105
            "--abi",
106
            metavar="<n>",
107
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
108
            default=6,
109
            choices=[5, 6],
110
            type=int,
111
        )
112
        parser.tool = self
×
113

114
    async def run(self, args: argparse.Namespace):
×
115
        from pybricksdev.compile import compile_multi_file, print_mpy
×
116

117
        with _get_script_path(args.file) as script_path:
×
118
            mpy = await compile_multi_file(script_path, args.abi)
×
119
        print_mpy(mpy)
×
120

121

122
class Run(Tool):
×
123
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
124
        parser = subparsers.add_parser(
×
125
            "run",
126
            help="run a Pybricks program",
127
        )
128
        parser.tool = self
×
129
        parser.add_argument(
×
130
            "conntype",
131
            metavar="<connection type>",
132
            help="connection type: %(choices)s",
133
            choices=["ble", "usb", "ssh"],
134
        )
135
        parser.add_argument(
×
136
            "file",
137
            metavar="<file>",
138
            help="path to a MicroPython script or `-` for stdin",
139
            type=argparse.FileType(),
140
        )
141
        parser.add_argument(
×
142
            "-n",
143
            "--name",
144
            metavar="<name>",
145
            required=False,
146
            help="hostname or IP address for SSH connection; "
147
            "Bluetooth device name or Bluetooth address for BLE connection; "
148
            "serial port name for USB connection",
149
        )
150

151
        if hasattr(argparse, "BooleanOptionalAction"):
×
152
            # BooleanOptionalAction requires Python 3.9
153
            parser.add_argument(
×
154
                "--wait",
155
                help="wait for the program to complete before disconnecting",
156
                action=argparse.BooleanOptionalAction,
157
                default=True,
158
            )
159
        else:
160
            parser.add_argument(
×
161
                "--wait",
162
                help="wait for the program to complete before disconnecting (default)",
163
                action="store_true",
164
                default=True,
165
            )
166
            parser.add_argument(
×
167
                "--no-wait",
168
                help="disconnect as soon as program is done downloading",
169
                action="store_false",
170
                dest="wait",
171
            )
172

173
    async def run(self, args: argparse.Namespace):
×
174

175
        # Pick the right connection
176
        if args.conntype == "ssh":
×
177
            from pybricksdev.connections.ev3dev import EV3Connection
×
178

179
            # So it's an ev3dev
180
            if args.name is None:
×
181
                print("--name is required for SSH connections", file=sys.stderr)
×
182
                exit(1)
×
183

184
            device_or_address = socket.gethostbyname(args.name)
×
185
            hub = EV3Connection(device_or_address)
×
186
        elif args.conntype == "ble":
×
187
            from pybricksdev.ble import find_device as find_ble
×
188
            from pybricksdev.connections.pybricks import PybricksHubBLE
×
189

190
            # It is a Pybricks Hub with BLE. Device name or address is given.
191
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
×
192
            device_or_address = await find_ble(args.name)
×
193
            hub = PybricksHubBLE(device_or_address)
×
194
        elif args.conntype == "usb":
×
195
            from usb.core import find as find_usb
×
196

197
            from pybricksdev.connections.pybricks import PybricksHubUSB
×
198
            from pybricksdev.usb import (
×
199
                LEGO_USB_VID,
200
                MINDSTORMS_INVENTOR_USB_PID,
201
                SPIKE_ESSENTIAL_USB_PID,
202
                SPIKE_PRIME_USB_PID,
203
            )
204

205
            def is_pybricks_usb(dev):
×
206
                return (
×
207
                    (dev.idVendor == LEGO_USB_VID)
208
                    and (
209
                        dev.idProduct
210
                        in [
211
                            SPIKE_PRIME_USB_PID,
212
                            SPIKE_ESSENTIAL_USB_PID,
213
                            MINDSTORMS_INVENTOR_USB_PID,
214
                        ]
215
                    )
216
                    and dev.product.endswith("Pybricks")
217
                )
218

219
            device_or_address = find_usb(custom_match=is_pybricks_usb)
×
220

221
            if device_or_address is not None:
×
222
                hub = PybricksHubUSB(device_or_address)
×
223
            else:
224
                from pybricksdev.connections.lego import REPLHub
×
225

226
                hub = REPLHub()
×
227
        else:
228
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
229

230
        # Connect to the address and run the script
231
        await hub.connect()
×
232
        try:
×
233
            with _get_script_path(args.file) as script_path:
×
234
                await hub.run(script_path, args.wait)
×
235
        finally:
236
            await hub.disconnect()
×
237

238

239
class Flash(Tool):
×
240
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
241
        parser = subparsers.add_parser(
×
242
            "flash", help="flash firmware on a LEGO Powered Up device"
243
        )
244
        parser.tool = self
×
245

246
        parser.add_argument(
×
247
            "firmware",
248
            metavar="<firmware-file>",
249
            type=argparse.FileType(mode="rb"),
250
            help="the firmware .zip file",
251
        ).completer = FilesCompleter(allowednames=(".zip",))
252

253
        parser.add_argument(
×
254
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
255
        )
256

257
    def run(self, args: argparse.Namespace):
×
258
        from pybricksdev.cli.flash import flash_firmware
×
259

260
        return flash_firmware(args.firmware, args.name)
×
261

262

263
class DFUBackup(Tool):
×
264
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
265
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
266
        parser.tool = self
×
267
        parser.add_argument(
×
268
            "firmware",
269
            metavar="<firmware-file>",
270
            type=argparse.FileType(mode="wb"),
271
            help="the firmware .bin file",
272
        ).completer = FilesCompleter(allowednames=(".bin",))
273

274
    async def run(self, args: argparse.Namespace):
×
275
        from pybricksdev.dfu import backup_dfu
×
276

277
        backup_dfu(args.firmware)
×
278

279

280
class DFURestore(Tool):
×
281
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
282
        parser = subparsers.add_parser(
×
283
            "restore",
284
            help="restore firmware using DFU",
285
        )
286
        parser.tool = self
×
287
        parser.add_argument(
×
288
            "firmware",
289
            metavar="<firmware-file>",
290
            type=argparse.FileType(mode="rb"),
291
            help="the firmware .bin file",
292
        ).completer = FilesCompleter(allowednames=(".bin",))
293

294
    async def run(self, args: argparse.Namespace):
×
295
        from pybricksdev.dfu import restore_dfu
×
296

297
        restore_dfu(args.firmware)
×
298

299

300
class DFU(Tool):
×
301
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
302
        self.parser = subparsers.add_parser(
×
303
            "dfu",
304
            help="use DFU to backup or restore firmware",
305
        )
306
        self.parser.tool = self
×
307
        self.subparsers = self.parser.add_subparsers(
×
308
            metavar="<action>", dest="action", help="the action to perform"
309
        )
310

311
        for tool in DFUBackup(), DFURestore():
×
312
            tool.add_parser(self.subparsers)
×
313

314
    def run(self, args: argparse.Namespace):
×
315
        if args.action not in self.subparsers.choices:
×
316
            self.parser.error(
×
317
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
318
            )
319

320
        return self.subparsers.choices[args.action].tool.run(args)
×
321

322

323
class OADFlash(Tool):
×
324
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
325
        parser = subparsers.add_parser(
×
326
            "flash",
327
            help="update firmware on a LEGO Powered Up device using TI OAD",
328
        )
329
        parser.tool = self
×
330
        parser.add_argument(
×
331
            "firmware",
332
            metavar="<firmware-file>",
333
            type=argparse.FileType(mode="rb"),
334
            help="the firmware .oda file",
335
        ).completer = FilesCompleter(allowednames=(".oda",))
336

337
    async def run(self, args: argparse.Namespace):
×
338
        from pybricksdev.cli.oad import flash_oad_image
×
339

340
        await flash_oad_image(args.firmware)
×
341

342

343
class OADInfo(Tool):
×
344
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
345
        parser = subparsers.add_parser(
×
346
            "info",
347
            help="get information about firmware on a LEGO Powered Up device using TI OAD",
348
        )
349
        parser.tool = self
×
350

351
    async def run(self, args: argparse.Namespace):
×
352
        from pybricksdev.cli.oad import dump_oad_info
×
353

354
        await dump_oad_info()
×
355

356

357
class OAD(Tool):
×
358
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
359
        self.parser = subparsers.add_parser(
×
360
            "oad",
361
            help="update firmware on a LEGO Powered Up device using TI OAD",
362
        )
363
        self.parser.tool = self
×
364
        self.subparsers = self.parser.add_subparsers(
×
365
            metavar="<action>", dest="action", help="the action to perform"
366
        )
367

368
        for tool in OADFlash(), OADInfo():
×
369
            tool.add_parser(self.subparsers)
×
370

371
    def run(self, args: argparse.Namespace):
×
372
        if args.action not in self.subparsers.choices:
×
373
            self.parser.error(
×
374
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
375
            )
376

377
        return self.subparsers.choices[args.action].tool.run(args)
×
378

379

380
class LWP3Repl(Tool):
×
381
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
382
        parser = subparsers.add_parser(
×
383
            "repl",
384
            help="interactive REPL for sending and receiving LWP3 messages",
385
        )
386
        parser.tool = self
×
387

388
    def run(self, args: argparse.Namespace):
×
389
        from pybricksdev.cli.lwp3.repl import repl, setup_repl_logging
×
390

391
        setup_repl_logging()
×
392
        return repl()
×
393

394

395
class LWP3(Tool):
×
396
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
397
        self.parser = subparsers.add_parser(
×
398
            "lwp3", help="interact with devices using LWP3"
399
        )
400
        self.parser.tool = self
×
401
        self.subparsers = self.parser.add_subparsers(
×
402
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
403
        )
404

405
        for tool in (LWP3Repl(),):
×
406
            tool.add_parser(self.subparsers)
×
407

408
    def run(self, args: argparse.Namespace):
×
409
        if args.lwp3_tool not in self.subparsers.choices:
×
410
            self.parser.error(
×
411
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
412
            )
413

414
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
415

416

417
class Udev(Tool):
×
418
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
419
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
×
420
        parser.tool = self
×
421

422
    async def run(self, args: argparse.Namespace):
×
423
        from importlib.resources import read_text
×
424

425
        from pybricksdev import resources
×
426

427
        print(read_text(resources, resources.UDEV_RULES))
×
428

429

430
def main():
×
431
    """Runs ``pybricksdev`` command line interface."""
432

433
    if sys.platform == "win32":
×
434
        # Hack around bad side-effects of pythoncom on Windows
435
        try:
×
436
            from bleak_winrt._winrt import MTA, init_apartment
×
437
        except ImportError:
×
438
            from winrt._winrt import MTA, init_apartment
×
439

440
        init_apartment(MTA)
×
441

442
    # Provide main description and help.
443
    parser = argparse.ArgumentParser(
×
444
        prog=PROG_NAME,
445
        description="Utilities for Pybricks developers.",
446
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
447
    )
448

449
    parser.add_argument(
×
450
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
451
    )
452
    parser.add_argument(
×
453
        "-d", "--debug", action="store_true", help="enable debug logging"
454
    )
455

456
    subparsers = parser.add_subparsers(
×
457
        metavar="<tool>",
458
        dest="tool",
459
        help="the tool to use",
460
    )
461

462
    for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
×
463
        tool.add_parser(subparsers)
×
464

465
    argcomplete.autocomplete(parser)
×
466
    args = parser.parse_args()
×
467

468
    logging.basicConfig(
×
469
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
470
        level=logging.DEBUG if args.debug else logging.WARNING,
471
    )
472

473
    if not args.tool:
×
474
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
475

476
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc