• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 16793037302

07 Aug 2025 01:47AM UTC coverage: 53.848% (+0.2%) from 53.606%
16793037302

Pull #118

github

web-flow
Merge d331dc9dd into 9b84ee4f1
Pull Request #118: cli.flash: finish removal of REPLHub

63 of 241 branches covered (26.14%)

Branch coverage included in aggregate %.

1959 of 3514 relevant lines covered (55.75%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/flash.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2019-2023 The Pybricks Authors
3

4
import asyncio
×
5
import hashlib
×
6
import json
×
7
import logging
×
8
import os
×
9
import struct
×
10
import sys
×
11
import zipfile
×
12
import zlib
×
13
from tempfile import NamedTemporaryFile
×
14
from typing import BinaryIO, Dict, Optional
×
15

16
from bleak import BleakClient, BleakScanner
×
17
from bleak.backends.device import BLEDevice
×
18
from bleak.backends.scanner import AdvertisementData
×
19
from packaging.version import Version
×
20
from tqdm.auto import tqdm
×
21
from tqdm.contrib.logging import logging_redirect_tqdm
×
22

23
from pybricksdev.ble.lwp3 import (
×
24
    LEGO_CID,
25
    LWP3_BOOTLOADER_SERVICE_UUID,
26
    LWP3_HUB_CHARACTERISTIC_UUID,
27
    LWP3_HUB_SERVICE_UUID,
28
)
29
from pybricksdev.ble.lwp3 import AdvertisementData as HubAdvertisementData
×
30
from pybricksdev.ble.lwp3.bootloader import BootloaderAdvertisementData
×
31
from pybricksdev.ble.lwp3.bytecodes import HubKind, HubProperty
×
32
from pybricksdev.ble.lwp3.messages import (
×
33
    FirmwareUpdateMessage,
34
    HubPropertyRequestUpdate,
35
    HubPropertyUpdate,
36
    parse_message,
37
)
38
from pybricksdev.ble.nus import NUS_RX_UUID, NUS_TX_UUID
×
39
from pybricksdev.ble.pybricks import (
×
40
    FW_REV_UUID,
41
    PNP_ID_UUID,
42
    PYBRICKS_COMMAND_EVENT_UUID,
43
    PYBRICKS_SERVICE_UUID,
44
    SW_REV_UUID,
45
    Command,
46
    unpack_pnp_id,
47
)
48
from pybricksdev.compile import compile_file
×
49
from pybricksdev.dfu import flash_dfu
×
50
from pybricksdev.firmware import create_firmware_blob
×
51
from pybricksdev.flash import BootloaderConnection
×
52
from pybricksdev.tools import chunk
×
53
from pybricksdev.tools.checksum import xor_bytes
×
54

55
logger = logging.getLogger(__name__)
×
56

57

58
REBOOT_SCRIPT = """
×
59
from pybricks.hubs import ThisHub
60
from pybricks.tools import wait
61

62
hub = ThisHub()
63

64
# without delay, hub will reboot before we receive last checksum
65
wait(500)
66
hub.system.reset(2)
67
"""
68

69

70
def match_hub(hub_kind: HubKind, adv: AdvertisementData) -> bool:
×
71
    """
72
    Advertisement data matching function for filtering supported hubs.
73

74
    Args:
75
        hub_kind: The hub type ID to match.
76
        adv: The advertisement data to check.
77

78
    Returns:
79
        ``True`` if *adv* matches the criteria, otherwise ``False``.
80
    """
81
    # LEGO firmware uses manufacturer-specific data
82

83
    lego_data = adv.manufacturer_data.get(LEGO_CID)
×
84

85
    if lego_data:
×
86
        if LWP3_BOOTLOADER_SERVICE_UUID in adv.service_uuids:
×
87
            bl_data = BootloaderAdvertisementData(lego_data)
×
88
            return bl_data.hub_kind == hub_kind
×
89

90
        if LWP3_HUB_SERVICE_UUID in adv.service_uuids:
×
91
            hub_data = HubAdvertisementData(lego_data)
×
92
            return hub_data.hub_kind == hub_kind
×
93

94
    # Pybricks firmware uses Device Information service data
95

96
    pnp_id_data = adv.service_data.get(PNP_ID_UUID)
×
97

98
    if pnp_id_data and PYBRICKS_SERVICE_UUID in adv.service_uuids:
×
99
        _, _, pid, _ = unpack_pnp_id(pnp_id_data)
×
100
        return pid == hub_kind
×
101

102
    return False
×
103

104

105
async def download_and_run(client: BleakClient, script: str, abi: int) -> None:
×
106
    """
107
    Downloads and runs a script on a hub running Pybricks firmware.
108

109
    Args:
110
        client: The Bluetooth connection to the hub.
111
        script: The script to be compiled and run.
112
        abi: The MPY ABI version.
113
    """
114
    with NamedTemporaryFile("w", suffix=".py") as temp:
×
115
        temp.write(script)
×
116

117
        # file has to be closed so mpy-cross can open it
118
        temp.file.close()
×
119

120
        mpy = await compile_file(
×
121
            os.path.dirname(temp.name), os.path.basename(temp.name), abi
122
        )
123

124
    recv_queue = asyncio.Queue()
×
125

126
    def on_notify(_h, data: bytes):
×
127
        recv_queue.put_nowait(data)
×
128

129
    # BOOST Move hub has hardware limit of MTU == 23 so it has to have data
130
    # split into smaller chunks
131
    write_size = 20 if client.mtu_size < 100 else 100
×
132

133
    async def write_chunk(data: bytes):
×
134
        """
135
        Writes a chunk of data and waits for a checksum reply.
136

137
        Args:
138
            data: The data.
139

140
        Raises:
141
            RuntimeError: If the returned checksum did not match.
142
            asyncio.TimeoutError: If no reply was received.
143
        """
144
        checksum = xor_bytes(data, 0)
×
145

146
        for c in chunk(data, write_size):
×
147
            await client.write_gatt_char(NUS_RX_UUID, c)
×
148

149
        reply: bytes = await asyncio.wait_for(recv_queue.get(), 1)
×
150

151
        if reply[0] != checksum:
×
152
            raise RuntimeError(
×
153
                f"bad checksum, expecting {checksum:02X} but received {reply[0]:02X}"
154
            )
155

156
    await client.start_notify(NUS_TX_UUID, on_notify)
×
157

158
    # communication protocol is write file size, then send file in 100 byte chunks
159
    try:
×
160
        await write_chunk(len(mpy).to_bytes(4, "little"))
×
161

162
        for c in chunk(mpy, 100):
×
163
            await write_chunk(c)
×
164

165
    finally:
166
        await client.stop_notify(NUS_TX_UUID)
×
167

168

169
async def reboot_official_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
×
170
    """
171
    Connects to a hub running official LEGO firmware and sends a message to
172
    reboot in firmware update mode.
173
    """
174
    async with BleakClient(device) as client:
×
175
        # give bluetooth stack time to settle
176
        await asyncio.sleep(1)
×
177

178
        fw_ver_future = asyncio.get_running_loop().create_future()
×
179

180
        def on_notify(_h, data: bytes):
×
181
            msg = parse_message(data)
×
182

183
            logger.debug("%s", str(msg))
×
184

185
            if (
×
186
                isinstance(msg, HubPropertyUpdate)
187
                and msg.prop == HubProperty.FW_VERSION
188
            ):
189
                fw_ver_future.set_result(msg.value)
×
190

191
        await client.start_notify(LWP3_HUB_CHARACTERISTIC_UUID, on_notify)
×
192
        await client.write_gatt_char(
×
193
            LWP3_HUB_CHARACTERISTIC_UUID,
194
            HubPropertyRequestUpdate(HubProperty.FW_VERSION),
195
            # work around city hub bluetooth bug on linux
196
            response=hub_kind == HubKind.CITY,
197
        )
198

199
        fw_ver = await asyncio.wait_for(fw_ver_future, 5)
×
200
        print(f"Hub is running firmware v{fw_ver}.")
×
201

202
        print("Rebooting in update mode...")
×
203

204
        await client.write_gatt_char(
×
205
            LWP3_HUB_CHARACTERISTIC_UUID, FirmwareUpdateMessage()
206
        )
207

208

209
async def reboot_pybricks_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
×
210
    """
211
    Connects to a hub running Pybricks firmware and sends a message to
212
    reboot in firmware update mode.
213
    """
214
    async with BleakClient(device) as client:
×
215
        # Work around BlueZ limitation.
216
        if client.__class__.__name__ == "BleakClientBlueZDBus":
×
217
            client._mtu_size = 23 if hub_kind == HubKind.BOOST else 158
×
218

219
        # give bluetooth stack time to settle
220
        await asyncio.sleep(1)
×
221

222
        fw_ver = await client.read_gatt_char(FW_REV_UUID)
×
223
        fw_ver = fw_ver.decode()
×
224
        print(f"Hub is running firmware v{fw_ver}.")
×
225

226
        print("Rebooting in update mode...")
×
227

228
        profile_ver = await client.read_gatt_char(SW_REV_UUID)
×
229

230
        if Version(profile_ver.decode()) >= Version("1.2.0"):
×
231
            try:
×
232
                await client.write_gatt_char(
×
233
                    PYBRICKS_COMMAND_EVENT_UUID,
234
                    struct.pack(
235
                        "<B", Command.PBIO_PYBRICKS_COMMAND_REBOOT_TO_UPDATE_MODE
236
                    ),
237
                    response=True,
238
                )
239
                # This causes the hub to become disconnected before completing
240
                # the write request, so we expect an exception here.
241
            except Exception:
×
242
                # REVISIT: Should probably check for more specific exception.
243
                # However, OK for now since code will just timeout later while
244
                # scanning for bootloader.
245
                pass
×
246
            else:
247
                raise RuntimeError("hub did not reset")
×
248

249
        else:
250
            # older protocol doesn't support this command, so we have to
251
            # download and run a program
252

253
            # HACK: there isn't a proper way to get the MPY ABI version from hub
254
            # so we use heuristics on the firmware version
255
            abi = 6 if Version(fw_ver) >= Version("3.2.0b2") else 5
×
256

257
            await download_and_run(client, REBOOT_SCRIPT, abi)
×
258

259

260
async def flash_ble(hub_kind: HubKind, firmware: bytes, metadata: dict):
×
261
    """
262
    Flashes firmware to the hub using Bluetooth Low Energy.
263

264
    The hub has to be advertising and can be running official LEGO firmware,
265
    Pybricks firmware or be in bootloader mode.
266

267
    Args:
268
        hub_kind: The hub type ID. Only hubs matching this ID will be discovered.
269
        firmware: The raw firmware binary blob.
270
        metadata: The firmware metadata from the firmware.zip file.
271
    """
272

273
    print(f"Searching for {hub_kind.name} hub...")
×
274

275
    # TODO: add upstream feature to Bleak to allow getting device, adv tuple
276
    # as return value from find_device_by_filter()
277
    # https://github.com/hbldh/bleak/issues/1277
278

279
    device_adv_map: Dict[str, AdvertisementData] = {}
×
280

281
    def map_and_match(device: BLEDevice, adv: AdvertisementData):
×
282
        # capture the adv data for later use
283
        device_adv_map[device.address] = adv
×
284
        return match_hub(hub_kind, adv)
×
285

286
    # scan for hubs in bootloader mode, running official LEGO firmware or
287
    # running Pybricks firmware
288

289
    device = await BleakScanner.find_device_by_filter(
×
290
        map_and_match,
291
        service_uuids=[
292
            LWP3_BOOTLOADER_SERVICE_UUID,
293
            LWP3_HUB_SERVICE_UUID,
294
            PYBRICKS_SERVICE_UUID,
295
        ],
296
    )
297

298
    if device is None:
×
299
        print("timed out", file=sys.stderr)
×
300
        return
×
301

302
    adv_data = device_adv_map[device.address]
×
303

304
    # if not already in bootlaoder mode, we need to reboot into bootloader mode
305
    if LWP3_HUB_SERVICE_UUID in adv_data.service_uuids:
×
306
        print("Found hub running official LEGO firmware.")
×
307
        await reboot_official_to_bootloader(hub_kind, device)
×
308
    elif PYBRICKS_SERVICE_UUID in adv_data.service_uuids:
×
309
        print("Found hub running Pybricks firmware.")
×
310
        await reboot_pybricks_to_bootloader(hub_kind, device)
×
311

312
    # if not previously in bootlaoder mode, scan again, this time only for bootloader
313
    if LWP3_BOOTLOADER_SERVICE_UUID not in adv_data.service_uuids:
×
314
        device = await BleakScanner.find_device_by_filter(
×
315
            lambda _d, a: match_hub(hub_kind, a),
316
            service_uuids=[
317
                LWP3_BOOTLOADER_SERVICE_UUID,
318
            ],
319
        )
320

321
        if device is None:
×
322
            print("timed out", file=sys.stderr)
×
323
            return
×
324

325
    print("Found:", device)
×
326
    updater = BootloaderConnection()
×
327
    await updater.connect(device)
×
328
    print("Erasing flash and starting update")
×
329
    await updater.flash(firmware, metadata)
×
330

331

332
async def flash_nxt(firmware: bytes) -> None:
×
333
    """
334
    Flashes firmware to NXT using the Samba bootloader.
335

336
    Args:
337
        firmware:
338
            A firmware blob with the NxOS header appended to the end.
339
    """
340
    from pybricksdev._vendored.pynxt.firmware import Firmware
×
341
    from pybricksdev._vendored.pynxt.flash import FlashController
×
342
    from pybricksdev._vendored.pynxt.samba import SambaBrick, SambaOpenError
×
343

344
    # parse the header
345
    info = Firmware(firmware)
×
346

347
    if info.samba:
×
348
        raise ValueError("Firmware is not suitable for flashing.")
×
349

350
    s = SambaBrick()
×
351

352
    try:
×
353
        print("Looking for the NXT in SAM-BA mode...")
×
354
        s.open(timeout=5)
×
355
        print("Brick found!")
×
356
    except SambaOpenError as e:
×
357
        print(e)
×
358
        sys.exit(1)
×
359

360
    print("Flashing firmware...")
×
361
    f = FlashController(s)
×
362
    f.flash(firmware)
×
363

364
    print("Flashing complete, jumping to 0x100000...")
×
365
    f._wait_for_flash()
×
366
    s.jump(0x100000)
×
367

368
    print("Firmware started.")
×
369
    s.close()
×
370

371

372
async def flash_ev3(firmware: bytes) -> None:
×
373
    """
374
    Flashes firmware to EV3.
375

376
    Args:
377
        firmware:
378
            A firmware blob.
379
    """
380
    from pybricksdev.connections.ev3 import EV3Bootloader
×
381

382
    # TODO: nice error message and exit(1) if EV3 is not found
383
    with EV3Bootloader() as bootloader:
×
384
        fw, hw = await bootloader.get_version()
×
385
        print(f"hwid: {hw}")
×
386

387
        # Erasing doesn't have any feedback so we just use time for the progress
388
        # bar. The operation runs on the EV3, so the time is the same for everyone.
389
        async def tick(callback):
×
390
            CHUNK = 8000
×
391
            SPEED = 256000
×
392
            for _ in range(len(firmware) // CHUNK):
×
393
                await asyncio.sleep(CHUNK / SPEED)
×
394
                callback(CHUNK)
×
395

396
        print("Erasing memory and preparing firmware download...")
×
397
        with logging_redirect_tqdm(), tqdm(
×
398
            total=len(firmware), unit="B", unit_scale=True
399
        ) as pbar:
400
            await asyncio.gather(
×
401
                bootloader.erase_and_begin_download(len(firmware)), tick(pbar.update)
402
            )
403

404
        print("Downloading firmware...")
×
405
        with logging_redirect_tqdm(), tqdm(
×
406
            total=len(firmware), unit="B", unit_scale=True
407
        ) as pbar:
408
            await bootloader.download(firmware, pbar.update)
×
409

410
        print("Verifying...", end="", flush=True)
×
411
        checksum = await bootloader.get_checksum(0, len(firmware))
×
412
        expected_checksum = zlib.crc32(firmware)
×
413

414
        if checksum != expected_checksum:
×
415
            print("Bad checksum!")
×
416
            exit(1)
×
417

418
        print("OK.")
×
419

420
        print("Restarting EV3...", end="", flush=True)
×
421
        await bootloader.start_app()
×
422
        print("Done.")
×
423

424

425
async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> None:
×
426
    """
427
    Command line tool for flashing firmware.
428

429
    Args:
430
        firmware_zip: The path to the ``firmware.zip`` file.
431
        new_name: Optional custom hub name to be applied to the firmware image.
432
    """
433

434
    print("Creating firmware...")
×
435

436
    # REVISIT: require accepting license agreement either interactively or by command line option
437
    firmware, metadata, license = await create_firmware_blob(firmware_zip, new_name)
×
438
    hub_kind = HubKind(metadata["device-id"])
×
439

440
    if hub_kind in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
×
441
        flash_dfu(firmware, metadata)
×
442
    elif hub_kind in [HubKind.BOOST, HubKind.CITY, HubKind.TECHNIC]:
×
443
        await flash_ble(hub_kind, firmware, metadata)
×
444
    elif hub_kind == HubKind.NXT:
×
445
        await flash_nxt(firmware)
×
446
    elif hub_kind == HubKind.EV3:
×
447
        await flash_ev3(firmware)
×
448
    else:
449
        raise ValueError(f"unsupported hub kind: {hub_kind}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc