• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 7346541855

28 Dec 2023 10:04AM UTC coverage: 48.959% (-1.6%) from 50.515%
7346541855

Pull #69

github

web-flow
Merge 3456029ad into 11667cb05
Pull Request #69: Add USB support

75 of 303 branches covered (0.0%)

Branch coverage included in aggregate %.

1641 of 3202 relevant lines covered (51.25%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2022 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
×
5

6
import argparse
×
7
import asyncio
×
8
import contextlib
×
9
import logging
×
10
import os
×
11
import socket
×
12
import sys
×
13
from abc import ABC, abstractmethod
×
14
from os import PathLike, path
×
15
from tempfile import NamedTemporaryFile
×
16
from typing import ContextManager, TextIO
×
17

18
import argcomplete
×
19
from argcomplete.completers import FilesCompleter
×
20

21
from .. import __name__ as MODULE_NAME
×
22
from .. import __version__ as MODULE_VERSION
×
23

24
PROG_NAME = (
×
25
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
26
    if sys.argv[0].endswith("__main__.py")
27
    else path.basename(sys.argv[0])
28
)
29

30

31
class Tool(ABC):
×
32
    """Common base class for tool implementations."""
33

34
    @abstractmethod
×
35
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
36
        """
37
        Overriding methods must at least do the following::
38

39
            parser = subparsers.add_parser('tool', ...)
40
            parser.tool = self
41

42
        Then additional arguments can be added using the ``parser`` object.
43
        """
44
        pass
×
45

46
    @abstractmethod
×
47
    async def run(self, args: argparse.Namespace):
×
48
        """
49
        Overriding methods should provide an implementation to run the tool.
50
        """
51
        pass
×
52

53

54
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
×
55
    """
56
    Gets the path to a script on the file system.
57

58
    If the file is ``sys.stdin``, the contents are copied to a temporary file
59
    and the path to the temporary file is returned. Otherwise, the file is closed
60
    and the path is returned.
61

62
    The context manager will delete the temporary file, if applicable.
63
    """
64
    if file is sys.stdin:
×
65
        # Have to close the temp file so that mpy-cross can read it, so we
66
        # create our own context manager to delete the file when we are done
67
        # using it.
68

69
        @contextlib.contextmanager
×
70
        def temp_context():
×
71
            try:
×
72
                with NamedTemporaryFile(suffix=".py", delete=False) as temp:
×
73
                    temp.write(file.buffer.read())
×
74

75
                yield temp.name
×
76
            finally:
77
                try:
×
78
                    os.remove(temp.name)
×
79
                except NameError:
×
80
                    # if NamedTemporaryFile() throws, temp is not defined
81
                    pass
×
82
                except OSError:
×
83
                    # file was already deleted or other strangeness
84
                    pass
×
85

86
        return temp_context()
×
87

88
    file.close()
×
89
    return contextlib.nullcontext(file.name)
×
90

91

92
class Compile(Tool):
×
93
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
94
        parser = subparsers.add_parser(
×
95
            "compile",
96
            help="compile a Pybricks program without running it",
97
        )
98
        parser.add_argument(
×
99
            "file",
100
            metavar="<file>",
101
            help="path to a MicroPython script or `-` for stdin",
102
            type=argparse.FileType(),
103
        )
104
        parser.add_argument(
×
105
            "--abi",
106
            metavar="<n>",
107
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
108
            default=6,
109
            choices=[5, 6],
110
            type=int,
111
        )
112
        parser.tool = self
×
113

114
    async def run(self, args: argparse.Namespace):
×
115
        from ..compile import compile_file, print_mpy
×
116

117
        with _get_script_path(args.file) as script_path:
×
118
            mpy = await compile_file(script_path, args.abi)
×
119
        print_mpy(mpy)
×
120

121

122
class Run(Tool):
×
123
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
124
        parser = subparsers.add_parser(
×
125
            "run",
126
            help="run a Pybricks program",
127
        )
128
        parser.tool = self
×
129
        parser.add_argument(
×
130
            "conntype",
131
            metavar="<connection type>",
132
            help="connection type: %(choices)s",
133
            choices=["ble", "usb", "ssh"],
134
        )
135
        parser.add_argument(
×
136
            "file",
137
            metavar="<file>",
138
            help="path to a MicroPython script or `-` for stdin",
139
            type=argparse.FileType(),
140
        )
141
        parser.add_argument(
×
142
            "-n",
143
            "--name",
144
            metavar="<name>",
145
            required=False,
146
            help="hostname or IP address for SSH connection; "
147
            "Bluetooth device name or Bluetooth address for BLE connection; "
148
            "serial port name for USB connection",
149
        )
150

151
        if hasattr(argparse, "BooleanOptionalAction"):
×
152
            # BooleanOptionalAction requires Python 3.9
153
            parser.add_argument(
×
154
                "--wait",
155
                help="wait for the program to complete before disconnecting",
156
                action=argparse.BooleanOptionalAction,
157
                default=True,
158
            )
159
        else:
160
            parser.add_argument(
×
161
                "--wait",
162
                help="wait for the program to complete before disconnecting (default)",
163
                action="store_true",
164
                default=True,
165
            )
166
            parser.add_argument(
×
167
                "--no-wait",
168
                help="disconnect as soon as program is done downloading",
169
                action="store_false",
170
                dest="wait",
171
            )
172

173
    async def run(self, args: argparse.Namespace):
×
174
        from usb.core import find as find_usb
×
175

176
        from ..ble import find_device as find_ble
×
177
        from ..connections.ev3dev import EV3Connection
×
178
        from ..connections.lego import REPLHub
×
179
        from ..connections.pybricks import PybricksHubBLE, PybricksHubUSB
×
180

181
        # Pick the right connection
182
        if args.conntype == "ssh":
×
183
            # So it's an ev3dev
184
            if args.name is None:
×
185
                print("--name is required for SSH connections", file=sys.stderr)
×
186
                exit(1)
×
187

188
            device_or_address = socket.gethostbyname(args.name)
×
189
            hub = EV3Connection(device_or_address)
×
190

191
        elif args.conntype == "ble":
×
192
            # It is a Pybricks Hub with BLE. Device name or address is given.
193
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
×
194
            device_or_address = await find_ble(args.name)
×
195
            hub = PybricksHubBLE(device_or_address)
×
196

197
        elif args.conntype == "usb":
×
198

199
            def is_pybricks_usb(dev):
×
200
                return (
×
201
                    (dev.idVendor == 0x0694)
202
                    and ((dev.idProduct == 0x0009) or (dev.idProduct == 0x0011))
203
                    and dev.product.endswith("Pybricks")
204
                )
205

206
            device_or_address = find_usb(custom_match=is_pybricks_usb)
×
207

208
            if device_or_address is not None:
×
209
                hub = PybricksHubUSB(device_or_address)
×
210
            else:
211
                hub = REPLHub()
×
212
        else:
213
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
214

215
        # Connect to the address and run the script
216
        await hub.connect()
×
217
        try:
×
218
            with _get_script_path(args.file) as script_path:
×
219
                await hub.run(script_path, args.wait)
×
220
        finally:
221
            await hub.disconnect()
×
222

223

224
class Flash(Tool):
×
225
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
226
        parser = subparsers.add_parser(
×
227
            "flash", help="flash firmware on a LEGO Powered Up device"
228
        )
229
        parser.tool = self
×
230

231
        parser.add_argument(
×
232
            "firmware",
233
            metavar="<firmware-file>",
234
            type=argparse.FileType(mode="rb"),
235
            help="the firmware .zip file",
236
        ).completer = FilesCompleter(allowednames=(".zip",))
237

238
        parser.add_argument(
×
239
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
240
        )
241

242
    def run(self, args: argparse.Namespace):
×
243
        from .flash import flash_firmware
×
244

245
        return flash_firmware(args.firmware, args.name)
×
246

247

248
class DFUBackup(Tool):
×
249
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
250
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
251
        parser.tool = self
×
252
        parser.add_argument(
×
253
            "firmware",
254
            metavar="<firmware-file>",
255
            type=argparse.FileType(mode="wb"),
256
            help="the firmware .bin file",
257
        ).completer = FilesCompleter(allowednames=(".bin",))
258

259
    async def run(self, args: argparse.Namespace):
×
260
        from ..dfu import backup_dfu
×
261

262
        backup_dfu(args.firmware)
×
263

264

265
class DFURestore(Tool):
×
266
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
267
        parser = subparsers.add_parser(
×
268
            "restore",
269
            help="restore firmware using DFU",
270
        )
271
        parser.tool = self
×
272
        parser.add_argument(
×
273
            "firmware",
274
            metavar="<firmware-file>",
275
            type=argparse.FileType(mode="rb"),
276
            help="the firmware .bin file",
277
        ).completer = FilesCompleter(allowednames=(".bin",))
278

279
    async def run(self, args: argparse.Namespace):
×
280
        from ..dfu import restore_dfu
×
281

282
        restore_dfu(args.firmware)
×
283

284

285
class DFU(Tool):
×
286
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
287
        self.parser = subparsers.add_parser(
×
288
            "dfu",
289
            help="use DFU to backup or restore firmware",
290
        )
291
        self.parser.tool = self
×
292
        self.subparsers = self.parser.add_subparsers(
×
293
            metavar="<action>", dest="action", help="the action to perform"
294
        )
295

296
        for tool in DFUBackup(), DFURestore():
×
297
            tool.add_parser(self.subparsers)
×
298

299
    def run(self, args: argparse.Namespace):
×
300
        if args.action not in self.subparsers.choices:
×
301
            self.parser.error(
×
302
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
303
            )
304

305
        return self.subparsers.choices[args.action].tool.run(args)
×
306

307

308
class LWP3Repl(Tool):
×
309
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
310
        parser = subparsers.add_parser(
×
311
            "repl",
312
            help="interactive REPL for sending and receiving LWP3 messages",
313
        )
314
        parser.tool = self
×
315

316
    def run(self, args: argparse.Namespace):
×
317
        from .lwp3.repl import repl, setup_repl_logging
×
318

319
        setup_repl_logging()
×
320
        return repl()
×
321

322

323
class LWP3(Tool):
×
324
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
325
        self.parser = subparsers.add_parser(
×
326
            "lwp3", help="interact with devices using LWP3"
327
        )
328
        self.parser.tool = self
×
329
        self.subparsers = self.parser.add_subparsers(
×
330
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
331
        )
332

333
        for tool in (LWP3Repl(),):
×
334
            tool.add_parser(self.subparsers)
×
335

336
    def run(self, args: argparse.Namespace):
×
337
        if args.lwp3_tool not in self.subparsers.choices:
×
338
            self.parser.error(
×
339
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
340
            )
341

342
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
343

344

345
class Udev(Tool):
×
346
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
347
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
×
348
        parser.tool = self
×
349

350
    async def run(self, args: argparse.Namespace):
×
351
        from importlib.resources import read_text
×
352

353
        from .. import resources
×
354

355
        print(read_text(resources, resources.UDEV_RULES))
×
356

357

358
def main():
×
359
    """Runs ``pybricksdev`` command line interface."""
360

361
    # Provide main description and help.
362
    parser = argparse.ArgumentParser(
×
363
        prog=PROG_NAME,
364
        description="Utilities for Pybricks developers.",
365
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
366
    )
367

368
    parser.add_argument(
×
369
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
370
    )
371
    parser.add_argument(
×
372
        "-d", "--debug", action="store_true", help="enable debug logging"
373
    )
374

375
    subparsers = parser.add_subparsers(
×
376
        metavar="<tool>",
377
        dest="tool",
378
        help="the tool to use",
379
    )
380

381
    for tool in Compile(), Run(), Flash(), DFU(), LWP3(), Udev():
×
382
        tool.add_parser(subparsers)
×
383

384
    argcomplete.autocomplete(parser)
×
385
    args = parser.parse_args()
×
386

387
    logging.basicConfig(
×
388
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
389
        level=logging.DEBUG if args.debug else logging.WARNING,
390
    )
391

392
    if not args.tool:
×
393
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
394

395
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc