• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pybricks / pybricksdev / 18762979388

23 Oct 2025 09:50PM UTC coverage: 53.33% (-0.02%) from 53.352%
18762979388

push

github

dlech
pybricksdev: replace Union with | for type hints

Replace typing.Union with the | operator for type hints. This is
possible now that we require Python 3.10 or later.

65 of 253 branches covered (25.69%)

Branch coverage included in aggregate %.

1985 of 3591 relevant lines covered (55.28%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
pybricksdev/cli/oad.py
1
# SPDX-License-Identifier: MIT
2
# Copyright (c) 2024 The Pybricks Authors
3

4
import asyncio
×
5
from typing import BinaryIO
×
6

7
from bleak import BleakClient, BleakScanner
×
8
from bleak.backends.device import BLEDevice
×
9
from bleak.backends.scanner import AdvertisementData
×
10
from tqdm.auto import tqdm
×
11
from tqdm.contrib.logging import logging_redirect_tqdm
×
12

13
from pybricksdev.ble.lwp3 import LEGO_CID, LWP3_HUB_SERVICE_UUID, HubKind
×
14
from pybricksdev.ble.oad import (
×
15
    OADControlPoint,
16
    OADImageBlock,
17
    OADImageIdentify,
18
    OADReturn,
19
)
20
from pybricksdev.ble.oad.control_point import (
×
21
    OAD_LEGO_MARIO_DEVICE_TYPE,
22
    OAD_LEGO_TECHNIC_MOVE_DEVICE_TYPE,
23
)
24
from pybricksdev.ble.oad.firmware import parse_oad_header
×
25

26
__all__ = ["dump_oad_info", "flash_oad_image"]
×
27

28
# hubs known to use TI OAD
29
_OAD_HUBS = [HubKind.MARIO, HubKind.LUIGI, HubKind.PEACH, HubKind.TECHNIC_MOVE]
×
30

31
_KNOWN_DEVICE_TYPES = {
×
32
    OAD_LEGO_MARIO_DEVICE_TYPE: "LEGO Mario",
33
    OAD_LEGO_TECHNIC_MOVE_DEVICE_TYPE: "LEGO Technic Move Hub",
34
}
35

36

37
def _match_oad_hubs(dev: BLEDevice, adv: AdvertisementData):
×
38
    """
39
    Matches BLE advertisement data that has LEGO manufacturer data and
40
    is a known OAD hub.
41
    """
42
    if LEGO_CID not in adv.manufacturer_data:
×
43
        return False
×
44

45
    # maybe not necessary but helps ensure that mfg data is the expected layout
46
    if LWP3_HUB_SERVICE_UUID not in adv.service_uuids:
×
47
        return False
×
48

49
    kind = HubKind(adv.manufacturer_data[LEGO_CID][1])
×
50

51
    return kind in _OAD_HUBS
×
52

53

54
async def flash_oad_image(firmware: BinaryIO) -> None:
×
55
    """
56
    Connects to an OAD hub and flashes a firmware image to it.
57
    """
58

59
    firmware_bytes = firmware.read()
×
60

61
    header = parse_oad_header(firmware_bytes)
×
62

63
    print("Scanning for hubs...")
×
64
    device = await BleakScanner.find_device_by_filter(_match_oad_hubs)
×
65

66
    if device is None:
×
67
        print("No OAD device found")
×
68
        return
×
69

70
    disconnect_event = asyncio.Event()
×
71

72
    def on_disconnect(_):
×
73
        disconnect_event.set()
×
74

75
    # long timeout in case pairing is needed
76
    async with (
×
77
        asyncio.timeout(60),
78
        BleakClient(device, on_disconnect) as client,
79
        OADImageIdentify(client) as image_identify,
80
        OADControlPoint(client) as control_point,
81
    ):
82
        image_block = OADImageBlock(client)
×
83

84
        print(f"Connected to {device.name}")
×
85

86
        dev_type = await control_point.get_device_type()
×
87

88
        # TODO: match this based on firmware image target
89
        if dev_type not in _KNOWN_DEVICE_TYPES:
×
90
            print(f"Unsupported device type: {dev_type:08X}")
×
91
            return
×
92

93
        block_size = await control_point.get_oad_block_size()
×
94

95
        status = await image_identify.validate(
×
96
            header.image_id,
97
            header.bmi_version,
98
            header.header_version,
99
            header.image_info,
100
            header.image_length,
101
            header.software_version,
102
        )
103
        if status != OADReturn.SUCCESS:
×
104
            print(f"Failed to validate image: {status.name}")
×
105
            return
×
106

107
        sent_blocks = set()
×
108

109
        print("Flashing...")
×
110

111
        with (
×
112
            logging_redirect_tqdm(),
113
            tqdm(total=header.image_length, unit="B", unit_scale=True) as pbar,
114
        ):
115
            async with asyncio.TaskGroup() as group:
×
116
                try:
×
117
                    async for (
×
118
                        status,
119
                        block_num,
120
                    ) in control_point.start_oad_process():
121
                        if status == OADReturn.SUCCESS:
×
122
                            data = firmware_bytes[
×
123
                                block_num
124
                                * (block_size - 4) : (block_num + 1)
125
                                * (block_size - 4)
126
                            ]
127

128
                            task = group.create_task(image_block.write(block_num, data))
×
129

130
                            if block_num not in sent_blocks:
×
131
                                task.add_done_callback(lambda _: pbar.update(len(data)))
×
132
                                sent_blocks.add(block_num)
×
133

134
                        elif status == OADReturn.DL_COMPLETE:
×
135
                            break
×
136
                        elif status == OADReturn.CRC_ERR:
×
137
                            raise RuntimeError("Failed CRC check")
×
138
                        else:
139
                            raise RuntimeError(
×
140
                                f"Block {block_num} with unhandled status: {status.name}"
141
                            )
142
                except BaseException:
×
143
                    await control_point.cancel_oad()
×
144
                    raise
×
145

146
        # This causes hub to reset and disconnect
147
        await control_point.enable_oad_image()
×
148
        print("Done.")
×
149

150
        # avoid race condition of requesting disconnect while hub is initiating
151
        # disconnect itself - this can leave BlueZ in a a bad state
152
        await disconnect_event.wait()
×
153

154

155
async def dump_oad_info():
×
156
    """
157
    Connects to an OAD hub and prints some information about it.
158
    """
159
    device = await BleakScanner.find_device_by_filter(_match_oad_hubs)
×
160

161
    if device is None:
×
162
        print("No OAD device found")
×
163
        return
×
164

165
    # long timeout in case pairing is needed
166
    async with (
×
167
        asyncio.timeout(30),
168
        BleakClient(device) as client,
169
        OADControlPoint(client) as control_point,
170
    ):
171
        sw_ver = await control_point.get_software_version()
×
172
        print(
×
173
            f"Software version: app={sw_ver.app.major}.{sw_ver.app.minor}, stack={sw_ver.stack.major}.{sw_ver.stack.minor}"
174
        )
175

176
        profile_ver = await control_point.get_profile_version()
×
177
        print(f"Profile version: {profile_ver}")
×
178

179
        dev_type = await control_point.get_device_type()
×
180
        print(
×
181
            f"Device type: {dev_type:08X} ({_KNOWN_DEVICE_TYPES.get(dev_type, 'Unknown')})"
182
        )
183

184
        block_size = await control_point.get_oad_block_size()
×
185
        print(f"Block size: {block_size}")
×
186

187
        image_status = await control_point.get_oad_image_status()
×
188
        print(f"Image status: {image_status.name}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc