• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 16793078126

07 Aug 2025 01:50AM UTC coverage: 53.891% (+0.3%) from 53.606%
16793078126

push

github

dlech
cli.flash: finish removal of REPLHub

Remove REPLHub from the flash command. REPLHub was removed in commit
8b44f69 ("pybricksdev/connections: Drop REPLHub.") so it was causing
an import error.

Fixes: https://github.com/pybricks/pybricksdev/issues/117

63 of 241 branches covered (26.14%)

Branch coverage included in aggregate %.

1959 of 3511 relevant lines covered (55.8%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/flash.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2023 The Pybricks Authors
3

4
import asyncio
×
5
import logging
×
6
import os
×
7
import struct
×
8
import sys
×
9
import zlib
×
10
from tempfile import NamedTemporaryFile
×
11
from typing import BinaryIO, Dict, Optional
×
12

13
from bleak import BleakClient, BleakScanner
×
14
from bleak.backends.device import BLEDevice
×
15
from bleak.backends.scanner import AdvertisementData
×
16
from packaging.version import Version
×
17
from tqdm.auto import tqdm
×
18
from tqdm.contrib.logging import logging_redirect_tqdm
×
19

20
from pybricksdev.ble.lwp3 import (
×
21
    LEGO_CID,
22
    LWP3_BOOTLOADER_SERVICE_UUID,
23
    LWP3_HUB_CHARACTERISTIC_UUID,
24
    LWP3_HUB_SERVICE_UUID,
25
)
26
from pybricksdev.ble.lwp3 import AdvertisementData as HubAdvertisementData
×
27
from pybricksdev.ble.lwp3.bootloader import BootloaderAdvertisementData
×
28
from pybricksdev.ble.lwp3.bytecodes import HubKind, HubProperty
×
29
from pybricksdev.ble.lwp3.messages import (
×
30
    FirmwareUpdateMessage,
31
    HubPropertyRequestUpdate,
32
    HubPropertyUpdate,
33
    parse_message,
34
)
35
from pybricksdev.ble.nus import NUS_RX_UUID, NUS_TX_UUID
×
36
from pybricksdev.ble.pybricks import (
×
37
    FW_REV_UUID,
38
    PNP_ID_UUID,
39
    PYBRICKS_COMMAND_EVENT_UUID,
40
    PYBRICKS_SERVICE_UUID,
41
    SW_REV_UUID,
42
    Command,
43
    unpack_pnp_id,
44
)
45
from pybricksdev.compile import compile_file
×
46
from pybricksdev.dfu import flash_dfu
×
47
from pybricksdev.firmware import create_firmware_blob
×
48
from pybricksdev.flash import BootloaderConnection
×
49
from pybricksdev.tools import chunk
×
50
from pybricksdev.tools.checksum import xor_bytes
×
51

52
logger = logging.getLogger(__name__)
×
53

54

55
REBOOT_SCRIPT = """
×
56
from pybricks.hubs import ThisHub
57
from pybricks.tools import wait
58

59
hub = ThisHub()
60

61
# without delay, hub will reboot before we receive last checksum
62
wait(500)
63
hub.system.reset(2)
64
"""
65

66

67
def match_hub(hub_kind: HubKind, adv: AdvertisementData) -> bool:
×
68
    """
69
    Advertisement data matching function for filtering supported hubs.
70

71
    Args:
72
        hub_kind: The hub type ID to match.
73
        adv: The advertisement data to check.
74

75
    Returns:
76
        ``True`` if *adv* matches the criteria, otherwise ``False``.
77
    """
78
    # LEGO firmware uses manufacturer-specific data
79

80
    lego_data = adv.manufacturer_data.get(LEGO_CID)
×
81

82
    if lego_data:
×
83
        if LWP3_BOOTLOADER_SERVICE_UUID in adv.service_uuids:
×
84
            bl_data = BootloaderAdvertisementData(lego_data)
×
85
            return bl_data.hub_kind == hub_kind
×
86

87
        if LWP3_HUB_SERVICE_UUID in adv.service_uuids:
×
88
            hub_data = HubAdvertisementData(lego_data)
×
89
            return hub_data.hub_kind == hub_kind
×
90

91
    # Pybricks firmware uses Device Information service data
92

93
    pnp_id_data = adv.service_data.get(PNP_ID_UUID)
×
94

95
    if pnp_id_data and PYBRICKS_SERVICE_UUID in adv.service_uuids:
×
96
        _, _, pid, _ = unpack_pnp_id(pnp_id_data)
×
97
        return pid == hub_kind
×
98

99
    return False
×
100

101

102
async def download_and_run(client: BleakClient, script: str, abi: int) -> None:
×
103
    """
104
    Downloads and runs a script on a hub running Pybricks firmware.
105

106
    Args:
107
        client: The Bluetooth connection to the hub.
108
        script: The script to be compiled and run.
109
        abi: The MPY ABI version.
110
    """
111
    with NamedTemporaryFile("w", suffix=".py") as temp:
×
112
        temp.write(script)
×
113

114
        # file has to be closed so mpy-cross can open it
115
        temp.file.close()
×
116

117
        mpy = await compile_file(
×
118
            os.path.dirname(temp.name), os.path.basename(temp.name), abi
119
        )
120

121
    recv_queue = asyncio.Queue()
×
122

123
    def on_notify(_h, data: bytes):
×
124
        recv_queue.put_nowait(data)
×
125

126
    # BOOST Move hub has hardware limit of MTU == 23 so it has to have data
127
    # split into smaller chunks
128
    write_size = 20 if client.mtu_size < 100 else 100
×
129

130
    async def write_chunk(data: bytes):
×
131
        """
132
        Writes a chunk of data and waits for a checksum reply.
133

134
        Args:
135
            data: The data.
136

137
        Raises:
138
            RuntimeError: If the returned checksum did not match.
139
            asyncio.TimeoutError: If no reply was received.
140
        """
141
        checksum = xor_bytes(data, 0)
×
142

143
        for c in chunk(data, write_size):
×
144
            await client.write_gatt_char(NUS_RX_UUID, c)
×
145

146
        reply: bytes = await asyncio.wait_for(recv_queue.get(), 1)
×
147

148
        if reply[0] != checksum:
×
149
            raise RuntimeError(
×
150
                f"bad checksum, expecting {checksum:02X} but received {reply[0]:02X}"
151
            )
152

153
    await client.start_notify(NUS_TX_UUID, on_notify)
×
154

155
    # communication protocol is write file size, then send file in 100 byte chunks
156
    try:
×
157
        await write_chunk(len(mpy).to_bytes(4, "little"))
×
158

159
        for c in chunk(mpy, 100):
×
160
            await write_chunk(c)
×
161

162
    finally:
163
        await client.stop_notify(NUS_TX_UUID)
×
164

165

166
async def reboot_official_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
×
167
    """
168
    Connects to a hub running official LEGO firmware and sends a message to
169
    reboot in firmware update mode.
170
    """
171
    async with BleakClient(device) as client:
×
172
        # give bluetooth stack time to settle
173
        await asyncio.sleep(1)
×
174

175
        fw_ver_future = asyncio.get_running_loop().create_future()
×
176

177
        def on_notify(_h, data: bytes):
×
178
            msg = parse_message(data)
×
179

180
            logger.debug("%s", str(msg))
×
181

182
            if (
×
183
                isinstance(msg, HubPropertyUpdate)
184
                and msg.prop == HubProperty.FW_VERSION
185
            ):
186
                fw_ver_future.set_result(msg.value)
×
187

188
        await client.start_notify(LWP3_HUB_CHARACTERISTIC_UUID, on_notify)
×
189
        await client.write_gatt_char(
×
190
            LWP3_HUB_CHARACTERISTIC_UUID,
191
            HubPropertyRequestUpdate(HubProperty.FW_VERSION),
192
            # work around city hub bluetooth bug on linux
193
            response=hub_kind == HubKind.CITY,
194
        )
195

196
        fw_ver = await asyncio.wait_for(fw_ver_future, 5)
×
197
        print(f"Hub is running firmware v{fw_ver}.")
×
198

199
        print("Rebooting in update mode...")
×
200

201
        await client.write_gatt_char(
×
202
            LWP3_HUB_CHARACTERISTIC_UUID, FirmwareUpdateMessage()
203
        )
204

205

206
async def reboot_pybricks_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
×
207
    """
208
    Connects to a hub running Pybricks firmware and sends a message to
209
    reboot in firmware update mode.
210
    """
211
    async with BleakClient(device) as client:
×
212
        # Work around BlueZ limitation.
213
        if client.__class__.__name__ == "BleakClientBlueZDBus":
×
214
            client._mtu_size = 23 if hub_kind == HubKind.BOOST else 158
×
215

216
        # give bluetooth stack time to settle
217
        await asyncio.sleep(1)
×
218

219
        fw_ver = await client.read_gatt_char(FW_REV_UUID)
×
220
        fw_ver = fw_ver.decode()
×
221
        print(f"Hub is running firmware v{fw_ver}.")
×
222

223
        print("Rebooting in update mode...")
×
224

225
        profile_ver = await client.read_gatt_char(SW_REV_UUID)
×
226

227
        if Version(profile_ver.decode()) >= Version("1.2.0"):
×
228
            try:
×
229
                await client.write_gatt_char(
×
230
                    PYBRICKS_COMMAND_EVENT_UUID,
231
                    struct.pack(
232
                        "<B", Command.PBIO_PYBRICKS_COMMAND_REBOOT_TO_UPDATE_MODE
233
                    ),
234
                    response=True,
235
                )
236
                # This causes the hub to become disconnected before completing
237
                # the write request, so we expect an exception here.
238
            except Exception:
×
239
                # REVISIT: Should probably check for more specific exception.
240
                # However, OK for now since code will just timeout later while
241
                # scanning for bootloader.
242
                pass
×
243
            else:
244
                raise RuntimeError("hub did not reset")
×
245

246
        else:
247
            # older protocol doesn't support this command, so we have to
248
            # download and run a program
249

250
            # HACK: there isn't a proper way to get the MPY ABI version from hub
251
            # so we use heuristics on the firmware version
252
            abi = 6 if Version(fw_ver) >= Version("3.2.0b2") else 5
×
253

254
            await download_and_run(client, REBOOT_SCRIPT, abi)
×
255

256

257
async def flash_ble(hub_kind: HubKind, firmware: bytes, metadata: dict):
×
258
    """
259
    Flashes firmware to the hub using Bluetooth Low Energy.
260

261
    The hub has to be advertising and can be running official LEGO firmware,
262
    Pybricks firmware or be in bootloader mode.
263

264
    Args:
265
        hub_kind: The hub type ID. Only hubs matching this ID will be discovered.
266
        firmware: The raw firmware binary blob.
267
        metadata: The firmware metadata from the firmware.zip file.
268
    """
269

270
    print(f"Searching for {hub_kind.name} hub...")
×
271

272
    # TODO: add upstream feature to Bleak to allow getting device, adv tuple
273
    # as return value from find_device_by_filter()
274
    # https://github.com/hbldh/bleak/issues/1277
275

276
    device_adv_map: Dict[str, AdvertisementData] = {}
×
277

278
    def map_and_match(device: BLEDevice, adv: AdvertisementData):
×
279
        # capture the adv data for later use
280
        device_adv_map[device.address] = adv
×
281
        return match_hub(hub_kind, adv)
×
282

283
    # scan for hubs in bootloader mode, running official LEGO firmware or
284
    # running Pybricks firmware
285

286
    device = await BleakScanner.find_device_by_filter(
×
287
        map_and_match,
288
        service_uuids=[
289
            LWP3_BOOTLOADER_SERVICE_UUID,
290
            LWP3_HUB_SERVICE_UUID,
291
            PYBRICKS_SERVICE_UUID,
292
        ],
293
    )
294

295
    if device is None:
×
296
        print("timed out", file=sys.stderr)
×
297
        return
×
298

299
    adv_data = device_adv_map[device.address]
×
300

301
    # if not already in bootlaoder mode, we need to reboot into bootloader mode
302
    if LWP3_HUB_SERVICE_UUID in adv_data.service_uuids:
×
303
        print("Found hub running official LEGO firmware.")
×
304
        await reboot_official_to_bootloader(hub_kind, device)
×
305
    elif PYBRICKS_SERVICE_UUID in adv_data.service_uuids:
×
306
        print("Found hub running Pybricks firmware.")
×
307
        await reboot_pybricks_to_bootloader(hub_kind, device)
×
308

309
    # if not previously in bootlaoder mode, scan again, this time only for bootloader
310
    if LWP3_BOOTLOADER_SERVICE_UUID not in adv_data.service_uuids:
×
311
        device = await BleakScanner.find_device_by_filter(
×
312
            lambda _d, a: match_hub(hub_kind, a),
313
            service_uuids=[
314
                LWP3_BOOTLOADER_SERVICE_UUID,
315
            ],
316
        )
317

318
        if device is None:
×
319
            print("timed out", file=sys.stderr)
×
320
            return
×
321

322
    print("Found:", device)
×
323
    updater = BootloaderConnection()
×
324
    await updater.connect(device)
×
325
    print("Erasing flash and starting update")
×
326
    await updater.flash(firmware, metadata)
×
327

328

329
async def flash_nxt(firmware: bytes) -> None:
×
330
    """
331
    Flashes firmware to NXT using the Samba bootloader.
332

333
    Args:
334
        firmware:
335
            A firmware blob with the NxOS header appended to the end.
336
    """
337
    from pybricksdev._vendored.pynxt.firmware import Firmware
×
338
    from pybricksdev._vendored.pynxt.flash import FlashController
×
339
    from pybricksdev._vendored.pynxt.samba import SambaBrick, SambaOpenError
×
340

341
    # parse the header
342
    info = Firmware(firmware)
×
343

344
    if info.samba:
×
345
        raise ValueError("Firmware is not suitable for flashing.")
×
346

347
    s = SambaBrick()
×
348

349
    try:
×
350
        print("Looking for the NXT in SAM-BA mode...")
×
351
        s.open(timeout=5)
×
352
        print("Brick found!")
×
353
    except SambaOpenError as e:
×
354
        print(e)
×
355
        sys.exit(1)
×
356

357
    print("Flashing firmware...")
×
358
    f = FlashController(s)
×
359
    f.flash(firmware)
×
360

361
    print("Flashing complete, jumping to 0x100000...")
×
362
    f._wait_for_flash()
×
363
    s.jump(0x100000)
×
364

365
    print("Firmware started.")
×
366
    s.close()
×
367

368

369
async def flash_ev3(firmware: bytes) -> None:
×
370
    """
371
    Flashes firmware to EV3.
372

373
    Args:
374
        firmware:
375
            A firmware blob.
376
    """
377
    from pybricksdev.connections.ev3 import EV3Bootloader
×
378

379
    # TODO: nice error message and exit(1) if EV3 is not found
380
    with EV3Bootloader() as bootloader:
×
381
        fw, hw = await bootloader.get_version()
×
382
        print(f"hwid: {hw}")
×
383

384
        # Erasing doesn't have any feedback so we just use time for the progress
385
        # bar. The operation runs on the EV3, so the time is the same for everyone.
386
        async def tick(callback):
×
387
            CHUNK = 8000
×
388
            SPEED = 256000
×
389
            for _ in range(len(firmware) // CHUNK):
×
390
                await asyncio.sleep(CHUNK / SPEED)
×
391
                callback(CHUNK)
×
392

393
        print("Erasing memory and preparing firmware download...")
×
394
        with logging_redirect_tqdm(), tqdm(
×
395
            total=len(firmware), unit="B", unit_scale=True
396
        ) as pbar:
397
            await asyncio.gather(
×
398
                bootloader.erase_and_begin_download(len(firmware)), tick(pbar.update)
399
            )
400

401
        print("Downloading firmware...")
×
402
        with logging_redirect_tqdm(), tqdm(
×
403
            total=len(firmware), unit="B", unit_scale=True
404
        ) as pbar:
405
            await bootloader.download(firmware, pbar.update)
×
406

407
        print("Verifying...", end="", flush=True)
×
408
        checksum = await bootloader.get_checksum(0, len(firmware))
×
409
        expected_checksum = zlib.crc32(firmware)
×
410

411
        if checksum != expected_checksum:
×
412
            print("Bad checksum!")
×
413
            exit(1)
×
414

415
        print("OK.")
×
416

417
        print("Restarting EV3...", end="", flush=True)
×
418
        await bootloader.start_app()
×
419
        print("Done.")
×
420

421

422
async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> None:
×
423
    """
424
    Command line tool for flashing firmware.
425

426
    Args:
427
        firmware_zip: The path to the ``firmware.zip`` file.
428
        new_name: Optional custom hub name to be applied to the firmware image.
429
    """
430

431
    print("Creating firmware...")
×
432

433
    # REVISIT: require accepting license agreement either interactively or by command line option
434
    firmware, metadata, license = await create_firmware_blob(firmware_zip, new_name)
×
435
    hub_kind = HubKind(metadata["device-id"])
×
436

437
    if hub_kind in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
×
438
        flash_dfu(firmware, metadata)
×
439
    elif hub_kind in [HubKind.BOOST, HubKind.CITY, HubKind.TECHNIC]:
×
440
        await flash_ble(hub_kind, firmware, metadata)
×
441
    elif hub_kind == HubKind.NXT:
×
442
        await flash_nxt(firmware)
×
443
    elif hub_kind == HubKind.EV3:
×
444
        await flash_ev3(firmware)
×
445
    else:
446
        raise ValueError(f"unsupported hub kind: {hub_kind}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc