• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 7096510692

05 Dec 2023 05:16AM UTC coverage: 48.987% (-1.5%) from 50.515%
7096510692

Pull #69

github

web-flow
Merge 335903eeb into 11667cb05
Pull Request #69: Add USB support

75 of 303 branches covered (0.0%)

Branch coverage included in aggregate %.

1641 of 3200 relevant lines covered (51.28%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/__init__.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2022 The Pybricks Authors
3

4
"""Command line wrapper around pybricksdev library."""
×
5

6
import argparse
×
7
import asyncio
×
8
import contextlib
×
9
import logging
×
10
import os
×
11
import socket
×
12
import sys
×
13
from abc import ABC, abstractmethod
×
14
from os import PathLike, path
×
15
from tempfile import NamedTemporaryFile
×
16
from typing import ContextManager, TextIO
×
17

18
import argcomplete
×
19
from argcomplete.completers import FilesCompleter
×
20

21
from .. import __name__ as MODULE_NAME
×
22
from .. import __version__ as MODULE_VERSION
×
23

24
PROG_NAME = (
×
25
    f"{path.basename(sys.executable)} -m {MODULE_NAME}"
26
    if sys.argv[0].endswith("__main__.py")
27
    else path.basename(sys.argv[0])
28
)
29

30

31
class Tool(ABC):
×
32
    """Common base class for tool implementations."""
33

34
    @abstractmethod
×
35
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
36
        """
37
        Overriding methods must at least do the following::
38

39
            parser = subparsers.add_parser('tool', ...)
40
            parser.tool = self
41

42
        Then additional arguments can be added using the ``parser`` object.
43
        """
44
        pass
×
45

46
    @abstractmethod
×
47
    async def run(self, args: argparse.Namespace):
×
48
        """
49
        Overriding methods should provide an implementation to run the tool.
50
        """
51
        pass
×
52

53

54
def _get_script_path(file: TextIO) -> ContextManager[PathLike]:
×
55
    """
56
    Gets the path to a script on the file system.
57

58
    If the file is ``sys.stdin``, the contents are copied to a temporary file
59
    and the path to the temporary file is returned. Otherwise, the file is closed
60
    and the path is returned.
61

62
    The context manager will delete the temporary file, if applicable.
63
    """
64
    if file is sys.stdin:
×
65
        # Have to close the temp file so that mpy-cross can read it, so we
66
        # create our own context manager to delete the file when we are done
67
        # using it.
68

69
        @contextlib.contextmanager
×
70
        def temp_context():
×
71
            try:
×
72
                with NamedTemporaryFile(suffix=".py", delete=False) as temp:
×
73
                    temp.write(file.buffer.read())
×
74

75
                yield temp.name
×
76
            finally:
77
                try:
×
78
                    os.remove(temp.name)
×
79
                except NameError:
×
80
                    # if NamedTemporaryFile() throws, temp is not defined
81
                    pass
×
82
                except OSError:
×
83
                    # file was already deleted or other strangeness
84
                    pass
×
85

86
        return temp_context()
×
87

88
    file.close()
×
89
    return contextlib.nullcontext(file.name)
×
90

91

92
class Compile(Tool):
×
93
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
94
        parser = subparsers.add_parser(
×
95
            "compile",
96
            help="compile a Pybricks program without running it",
97
        )
98
        parser.add_argument(
×
99
            "file",
100
            metavar="<file>",
101
            help="path to a MicroPython script or `-` for stdin",
102
            type=argparse.FileType(),
103
        )
104
        parser.add_argument(
×
105
            "--abi",
106
            metavar="<n>",
107
            help="the MPY ABI version, one of %(choices)s (default: %(default)s)",
108
            default=6,
109
            choices=[5, 6],
110
            type=int,
111
        )
112
        parser.tool = self
×
113

114
    async def run(self, args: argparse.Namespace):
×
115
        from ..compile import compile_file, print_mpy
×
116

117
        with _get_script_path(args.file) as script_path:
×
118
            mpy = await compile_file(script_path, args.abi)
×
119
        print_mpy(mpy)
×
120

121

122
class Run(Tool):
×
123
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
124
        parser = subparsers.add_parser(
×
125
            "run",
126
            help="run a Pybricks program",
127
        )
128
        parser.tool = self
×
129
        parser.add_argument(
×
130
            "conntype",
131
            metavar="<connection type>",
132
            help="connection type: %(choices)s",
133
            choices=["ble", "usb", "ssh"],
134
        )
135
        parser.add_argument(
×
136
            "file",
137
            metavar="<file>",
138
            help="path to a MicroPython script or `-` for stdin",
139
            type=argparse.FileType(),
140
        )
141
        parser.add_argument(
×
142
            "-n",
143
            "--name",
144
            metavar="<name>",
145
            required=False,
146
            help="hostname or IP address for SSH connection; "
147
            "Bluetooth device name or Bluetooth address for BLE connection; "
148
            "serial port name for USB connection",
149
        )
150

151
        if hasattr(argparse, "BooleanOptionalAction"):
×
152
            # BooleanOptionalAction requires Python 3.9
153
            parser.add_argument(
×
154
                "--wait",
155
                help="wait for the program to complete before disconnecting",
156
                action=argparse.BooleanOptionalAction,
157
                default=True,
158
            )
159
        else:
160
            parser.add_argument(
×
161
                "--wait",
162
                help="wait for the program to complete before disconnecting (default)",
163
                action="store_true",
164
                default=True,
165
            )
166
            parser.add_argument(
×
167
                "--no-wait",
168
                help="disconnect as soon as program is done downloading",
169
                action="store_false",
170
                dest="wait",
171
            )
172

173
    async def run(self, args: argparse.Namespace):
×
174
        from usb.core import find as find_usb
×
175

176
        from ..ble import find_device as find_ble
×
177
        from ..connections.ev3dev import EV3Connection
×
178
        from ..connections.lego import REPLHub
×
179
        from ..connections.pybricks import PybricksHubBLE, PybricksHubUSB
×
180

181
        # Pick the right connection
182
        if args.conntype == "ssh":
×
183
            # So it's an ev3dev
184
            if args.name is None:
×
185
                print("--name is required for SSH connections", file=sys.stderr)
×
186
                exit(1)
×
187

188
            device_or_address = socket.gethostbyname(args.name)
×
189
            hub = EV3Connection(device_or_address)
×
190

191
        elif args.conntype == "ble":
×
192
            # It is a Pybricks Hub with BLE. Device name or address is given.
193
            print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
×
194
            device_or_address = await find_ble(args.name)
×
195
            hub = PybricksHubBLE(device_or_address)
×
196

197
        elif args.conntype == "usb":
×
198
            device_or_address = find_usb(idVendor=0x0483, idProduct=0x5740)
×
199

200
            if (
×
201
                device_or_address is not None
202
                and device_or_address.product == "Pybricks Hub"
203
            ):
204
                hub = PybricksHubUSB(device_or_address)
×
205
            else:
206
                hub = REPLHub()
×
207
        else:
208
            raise ValueError(f"Unknown connection type: {args.conntype}")
×
209

210
        # Connect to the address and run the script
211
        await hub.connect()
×
212
        try:
×
213
            with _get_script_path(args.file) as script_path:
×
214
                await hub.run(script_path, args.wait)
×
215
        finally:
216
            await hub.disconnect()
×
217

218

219
class Flash(Tool):
×
220
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
221
        parser = subparsers.add_parser(
×
222
            "flash", help="flash firmware on a LEGO Powered Up device"
223
        )
224
        parser.tool = self
×
225

226
        parser.add_argument(
×
227
            "firmware",
228
            metavar="<firmware-file>",
229
            type=argparse.FileType(mode="rb"),
230
            help="the firmware .zip file",
231
        ).completer = FilesCompleter(allowednames=(".zip",))
232

233
        parser.add_argument(
×
234
            "-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
235
        )
236

237
    def run(self, args: argparse.Namespace):
×
238
        from .flash import flash_firmware
×
239

240
        return flash_firmware(args.firmware, args.name)
×
241

242

243
class DFUBackup(Tool):
×
244
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
245
        parser = subparsers.add_parser("backup", help="backup firmware using DFU")
×
246
        parser.tool = self
×
247
        parser.add_argument(
×
248
            "firmware",
249
            metavar="<firmware-file>",
250
            type=argparse.FileType(mode="wb"),
251
            help="the firmware .bin file",
252
        ).completer = FilesCompleter(allowednames=(".bin",))
253

254
    async def run(self, args: argparse.Namespace):
×
255
        from ..dfu import backup_dfu
×
256

257
        backup_dfu(args.firmware)
×
258

259

260
class DFURestore(Tool):
×
261
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
262
        parser = subparsers.add_parser(
×
263
            "restore",
264
            help="restore firmware using DFU",
265
        )
266
        parser.tool = self
×
267
        parser.add_argument(
×
268
            "firmware",
269
            metavar="<firmware-file>",
270
            type=argparse.FileType(mode="rb"),
271
            help="the firmware .bin file",
272
        ).completer = FilesCompleter(allowednames=(".bin",))
273

274
    async def run(self, args: argparse.Namespace):
×
275
        from ..dfu import restore_dfu
×
276

277
        restore_dfu(args.firmware)
×
278

279

280
class DFU(Tool):
×
281
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
282
        self.parser = subparsers.add_parser(
×
283
            "dfu",
284
            help="use DFU to backup or restore firmware",
285
        )
286
        self.parser.tool = self
×
287
        self.subparsers = self.parser.add_subparsers(
×
288
            metavar="<action>", dest="action", help="the action to perform"
289
        )
290

291
        for tool in DFUBackup(), DFURestore():
×
292
            tool.add_parser(self.subparsers)
×
293

294
    def run(self, args: argparse.Namespace):
×
295
        if args.action not in self.subparsers.choices:
×
296
            self.parser.error(
×
297
                f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
298
            )
299

300
        return self.subparsers.choices[args.action].tool.run(args)
×
301

302

303
class LWP3Repl(Tool):
×
304
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
305
        parser = subparsers.add_parser(
×
306
            "repl",
307
            help="interactive REPL for sending and receiving LWP3 messages",
308
        )
309
        parser.tool = self
×
310

311
    def run(self, args: argparse.Namespace):
×
312
        from .lwp3.repl import repl, setup_repl_logging
×
313

314
        setup_repl_logging()
×
315
        return repl()
×
316

317

318
class LWP3(Tool):
×
319
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
320
        self.parser = subparsers.add_parser(
×
321
            "lwp3", help="interact with devices using LWP3"
322
        )
323
        self.parser.tool = self
×
324
        self.subparsers = self.parser.add_subparsers(
×
325
            metavar="<lwp3-tool>", dest="lwp3_tool", help="the tool to run"
326
        )
327

328
        for tool in (LWP3Repl(),):
×
329
            tool.add_parser(self.subparsers)
×
330

331
    def run(self, args: argparse.Namespace):
×
332
        if args.lwp3_tool not in self.subparsers.choices:
×
333
            self.parser.error(
×
334
                f'Missing name of tool: {"|".join(self.subparsers.choices.keys())}'
335
            )
336

337
        return self.subparsers.choices[args.lwp3_tool].tool.run(args)
×
338

339

340
class Udev(Tool):
×
341
    def add_parser(self, subparsers: argparse._SubParsersAction):
×
342
        parser = subparsers.add_parser("udev", help="print udev rules to stdout")
×
343
        parser.tool = self
×
344

345
    async def run(self, args: argparse.Namespace):
×
346
        from importlib.resources import read_text
×
347

348
        from .. import resources
×
349

350
        print(read_text(resources, resources.UDEV_RULES))
×
351

352

353
def main():
×
354
    """Runs ``pybricksdev`` command line interface."""
355

356
    # Provide main description and help.
357
    parser = argparse.ArgumentParser(
×
358
        prog=PROG_NAME,
359
        description="Utilities for Pybricks developers.",
360
        epilog="Run `%(prog)s <tool> --help` for tool-specific arguments.",
361
    )
362

363
    parser.add_argument(
×
364
        "-v", "--version", action="version", version=f"{MODULE_NAME} v{MODULE_VERSION}"
365
    )
366
    parser.add_argument(
×
367
        "-d", "--debug", action="store_true", help="enable debug logging"
368
    )
369

370
    subparsers = parser.add_subparsers(
×
371
        metavar="<tool>",
372
        dest="tool",
373
        help="the tool to use",
374
    )
375

376
    for tool in Compile(), Run(), Flash(), DFU(), LWP3(), Udev():
×
377
        tool.add_parser(subparsers)
×
378

379
    argcomplete.autocomplete(parser)
×
380
    args = parser.parse_args()
×
381

382
    logging.basicConfig(
×
383
        format="%(asctime)s: %(levelname)s: %(name)s: %(message)s",
384
        level=logging.DEBUG if args.debug else logging.WARNING,
385
    )
386

387
    if not args.tool:
×
388
        parser.error(f'Missing name of tool: {"|".join(subparsers.choices.keys())}')
×
389

390
    asyncio.run(subparsers.choices[args.tool].tool.run(args))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc