• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ARMmbed / mbed-os-tools / #457

24 Aug 2024 09:15PM CUT coverage: 0.0% (-59.9%) from 59.947%
#457

push

coveralls-python

web-flow
Merge 7c6dbce13 into c467d6f14

0 of 4902 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mbed_os_tools/detect/windows.py
1
# Copyright (c) 2018, Arm Limited and affiliates.
2
# SPDX-License-Identifier: Apache-2.0
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import re
×
17
import sys
×
18
from collections import defaultdict
×
19
from copy import copy
×
20

21
from .lstools_base import MbedLsToolsBase
×
22

23
import logging
×
24

25
logger = logging.getLogger("mbedls.lstools_win7")
×
26
logger.addHandler(logging.NullHandler())
×
27
DEBUG = logging.DEBUG
×
28
del logging
×
29

30
if sys.version_info[0] < 3:
×
31
    import _winreg as winreg
×
32
else:
33
    import winreg
×
34

35

36
MAX_COMPOSITE_DEVICE_SUBDEVICES = 8
×
37
MBED_STORAGE_DEVICE_VENDOR_STRINGS = [
×
38
    "ven_mbed",
39
    "ven_segger",
40
    "ven_arm_v2m",
41
    "ven_nxp",
42
    "ven_atmel",
43
]
44

45

46
def _get_values_with_numeric_keys(reg_key):
×
47
    result = []
×
48
    try:
×
49
        for v in _iter_vals(reg_key):
×
50
            try:
×
51
                # The only values we care about are ones that have an integer key.
52
                # The other values are metadata for the registry
53
                int(v[0])
×
54
                result.append(v[1])
×
55
            except ValueError:
×
56
                continue
×
57
    except OSError:
×
58
        logger.debug("Failed to iterate over all keys")
×
59

60
    return result
×
61

62

63
def _is_mbed_volume(volume_string):
×
64
    for vendor_string in MBED_STORAGE_DEVICE_VENDOR_STRINGS:
×
65
        if vendor_string.lower() in volume_string.lower():
×
66
            return True
×
67

68
    return False
×
69

70

71
def _get_cached_mounted_points():
×
72
    """! Get the volumes present on the system
73
    @return List of mount points and their associated volume string
74
      Ex. [{ 'mount_point': 'D:', 'volume_string': 'xxxx'}, ...]
75
    """
76
    result = []
×
77
    try:
×
78
        # Open the registry key for mounted devices
79
        mounted_devices_key = winreg.OpenKey(
×
80
            winreg.HKEY_LOCAL_MACHINE, "SYSTEM\\MountedDevices"
81
        )
82
        for v in _iter_vals(mounted_devices_key):
×
83
            # Valid entries have the following format: \\DosDevices\\D:
84
            if "DosDevices" not in v[0]:
×
85
                continue
×
86

87
            volume_string = v[1].decode("utf-16le", "ignore")
×
88
            if not _is_mbed_volume(volume_string):
×
89
                continue
×
90

91
            mount_point_match = re.match(".*\\\\(.:)$", v[0])
×
92

93
            if not mount_point_match:
×
94
                logger.debug("Invalid disk pattern for entry %s, skipping", v[0])
×
95
                continue
×
96

97
            mount_point = mount_point_match.group(1)
×
98
            logger.debug(
×
99
                "Mount point %s found for volume %s", mount_point, volume_string
100
            )
101

102
            result.append({"mount_point": mount_point, "volume_string": volume_string})
×
103
    except OSError:
×
104
        logger.error('Failed to open "MountedDevices" in registry')
×
105

106
    return result
×
107

108

109
def _get_disks():
×
110
    logger.debug("Fetching mounted devices from disk service registry entry")
×
111
    try:
×
112
        disks_key = winreg.OpenKey(
×
113
            winreg.HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services\\Disk\\Enum"
114
        )
115
        disk_strings = _get_values_with_numeric_keys(disks_key)
×
116
        return [v for v in disk_strings if _is_mbed_volume(v)]
×
117
    except OSError:
×
118
        logger.debug("No disk service found, no device can be detected")
×
119
        return []
×
120

121

122
def _get_usb_storage_devices():
×
123
    logger.debug("Fetching usb storage devices from USBSTOR service registry entry")
×
124
    try:
×
125
        usbstor_key = winreg.OpenKey(
×
126
            winreg.HKEY_LOCAL_MACHINE,
127
            "SYSTEM\\CurrentControlSet\\Services\\USBSTOR\\Enum",
128
        )
129
        return _get_values_with_numeric_keys(usbstor_key)
×
130
    except OSError:
×
131
        logger.debug("No USBSTOR service found, no device can be detected")
×
132
        return []
×
133

134

135
def _determine_valid_non_composite_devices(devices, target_id_usb_id_mount_point_map):
×
136
    # Some Mbed devices do not expose a composite USB device. This is typical for
137
    # DAPLink devices in bootloader mode. Since we only have to check one endpoint
138
    # (specifically, the mass storage device), we handle this case separately
139
    candidates = {}
×
140
    for device in devices:
×
141
        device_key_string = "SYSTEM\\CurrentControlSet\\Enum\\" + device["full_path"]
×
142
        try:
×
143
            device_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, device_key_string)
×
144
        except OSError:
×
145
            logger.debug('Key "%s" not found', device_key_string)
×
146
            continue
×
147

148
        try:
×
149
            capability = _determine_subdevice_capability(device_key)
×
150
        except CompatibleIDsNotFoundException:
×
151
            logger.debug(
×
152
                'Expected %s to have subkey "CompatibleIDs". Skipping.',
153
                device_key_string,
154
            )
155
            continue
×
156

157
        if capability != "msd":
×
158
            logger.debug(
×
159
                "Expected msd device but got %s, skipping %s",
160
                capability,
161
                device["full_path"],
162
            )
163
            continue
×
164

165
        target_id_usb_id = device["entry_key_string"]
×
166
        try:
×
167
            candidates[target_id_usb_id] = {
×
168
                "target_id_usb_id": target_id_usb_id,
169
                "mount_point": target_id_usb_id_mount_point_map[target_id_usb_id],
170
            }
171

172
            candidates[target_id_usb_id].update(
×
173
                _vid_pid_path_to_usb_info(device["vid_pid_path"])
174
            )
175
        except KeyError:
×
176
            pass
×
177

178
    return candidates
×
179

180

181
def _determine_subdevice_capability(key):
×
182
    try:
×
183
        vals = winreg.QueryValueEx(key, "CompatibleIDs")
×
184
        compatible_ids = [x.lower() for x in vals[0]]
×
185
    except OSError:
×
186
        raise CompatibleIDsNotFoundException()
×
187

188
    if "usb\\class_00" in compatible_ids or "usb\\devclass_00" in compatible_ids:
×
189
        return "composite"
×
190
    elif "usb\\class_08" in compatible_ids:
×
191
        return "msd"
×
192
    elif "usb\\class_02" in compatible_ids:
×
193
        return "serial"
×
194
    else:
195
        logger.debug("Unknown capabilities from the following ids: %s", compatible_ids)
×
196
        return None
×
197

198

199
def _vid_pid_path_to_usb_info(vid_pid_path):
×
200
    """! Provide the vendor ID and product ID of a device based on its entry in the registry
201
    @return Returns {'vendor_id': '<vendor ID>', 'product': '<product ID>'}
202
    @details If the vendor ID or product ID can't be determined, they will be returned
203
    as None.
204
    """
205
    result = {"vendor_id": None, "product_id": None}
×
206

207
    for component in vid_pid_path.split("&"):
×
208
        component_part = component.lower().split("_")
×
209

210
        if len(component_part) != 2:
×
211
            logger.debug("Unexpected VID/PID string structure %s", component)
×
212
            break
×
213

214
        if component_part[0] == "vid":
×
215
            result["vendor_id"] = component_part[1]
×
216
        elif component_part[0] == "pid":
×
217
            result["product_id"] = component_part[1]
×
218

219
    return result
×
220

221

222
def _iter_keys_as_str(key):
×
223
    """! Iterate over subkeys of a key returning subkey as string
224
    """
225
    for i in range(winreg.QueryInfoKey(key)[0]):
×
226
        yield winreg.EnumKey(key, i)
×
227

228

229
def _iter_keys(key):
×
230
    """! Iterate over subkeys of a key
231
    """
232
    for i in range(winreg.QueryInfoKey(key)[0]):
×
233
        yield winreg.OpenKey(key, winreg.EnumKey(key, i))
×
234

235

236
def _iter_vals(key):
×
237
    """! Iterate over values of a key
238
    """
239
    logger.debug("_iter_vals %r", key)
×
240
    for i in range(winreg.QueryInfoKey(key)[1]):
×
241
        yield winreg.EnumValue(key, i)
×
242

243

244
class CompatibleIDsNotFoundException(Exception):
×
245
    pass
×
246

247

248
class MbedLsToolsWin7(MbedLsToolsBase):
×
249
    """ mbed-enabled platform detection for Windows
250
    """
251

252
    def __init__(self, **kwargs):
×
253
        MbedLsToolsBase.__init__(self, **kwargs)
×
254
        self.os_supported.append("Windows7")
×
255

256
    def find_candidates(self):
×
257
        cached_mount_points = _get_cached_mounted_points()
×
258
        disks = _get_disks()
×
259
        usb_storage_devices = _get_usb_storage_devices()
×
260

261
        target_id_usb_id_mount_point_map = {}
×
262
        for cached_mount_point_info in cached_mount_points:
×
263
            for index, disk in enumerate(copy(disks)):
×
264
                match_string = disk.split("\\")[-1]
×
265
                if match_string in cached_mount_point_info["volume_string"]:
×
266
                    # TargetID is a hex string with 10-48 chars
267
                    target_id_usb_id_match = re.search(
×
268
                        "[&#]([0-9A-Za-z]{10,48})[&#]",
269
                        cached_mount_point_info["volume_string"],
270
                    )
271
                    if not target_id_usb_id_match:
×
272
                        logger.debug(
×
273
                            "Entry %s has invalid target id pattern %s, skipping",
274
                            cached_mount_point_info["mount_point"],
275
                            cached_mount_point_info["volume_string"],
276
                        )
277
                        continue
×
278

279
                    target_id_usb_id_mount_point_map[
×
280
                        target_id_usb_id_match.group(1)
281
                    ] = cached_mount_point_info["mount_point"]
282
                    disks.pop(index)
×
283
                    break
×
284

285
        logger.debug(
×
286
            "target_id_usb_id -> mount_point mapping: %s ",
287
            target_id_usb_id_mount_point_map,
288
        )
289
        non_composite_devices = []
×
290
        composite_devices = []
×
291
        for vid_pid_path in usb_storage_devices:
×
292
            # Split paths like "USB\VID_0483&PID_374B&MI_01\7&25b4dc8e&0&0001" by "\"
293
            vid_pid_path_componets = vid_pid_path.split("\\")
×
294

295
            vid_pid_components = vid_pid_path_componets[1].split("&")
×
296

297
            if len(vid_pid_components) != 2 and len(vid_pid_components) != 3:
×
298
                logger.debug(
×
299
                    "Skipping USBSTOR device with unusual VID/PID string format '%s'",
300
                    vid_pid_path,
301
                )
302
                continue
×
303

304
            device = {
×
305
                "full_path": vid_pid_path,
306
                "vid_pid_path": "&".join(vid_pid_components[:2]),
307
                "entry_key_string": vid_pid_path_componets[2],
308
            }
309

310
            # A composite device's vid/pid path always has a third component
311
            if len(vid_pid_components) == 3:
×
312
                composite_devices.append(device)
×
313
            else:
314
                non_composite_devices.append(device)
×
315

316
        candidates = defaultdict(dict)
×
317
        candidates.update(
×
318
            _determine_valid_non_composite_devices(
319
                non_composite_devices, target_id_usb_id_mount_point_map
320
            )
321
        )
322
        # Now we'll find all valid VID/PID and target ID combinations
323
        target_id_usb_ids = set(target_id_usb_id_mount_point_map.keys()) - set(
×
324
            candidates.keys()
325
        )
326
        vid_pid_entry_key_string_map = defaultdict(set)
×
327

328
        for device in composite_devices:
×
329
            vid_pid_entry_key_string_map[device["vid_pid_path"]].add(
×
330
                device["entry_key_string"]
331
            )
332

333
        vid_pid_target_id_usb_id_map = defaultdict(dict)
×
334
        usb_key_string = "SYSTEM\\CurrentControlSet\\Enum\\USB"
×
335
        for vid_pid_path, entry_key_strings in vid_pid_entry_key_string_map.items():
×
336
            vid_pid_key_string = "%s\\%s" % (usb_key_string, vid_pid_path)
×
337
            try:
×
338
                vid_pid_key = winreg.OpenKey(
×
339
                    winreg.HKEY_LOCAL_MACHINE, vid_pid_key_string
340
                )
341
                target_id_usb_id_sub_keys = set(
×
342
                    [k for k in _iter_keys_as_str(vid_pid_key)]
343
                )
344
            except OSError:
×
345
                logger.debug('VID/PID "%s" not found', vid_pid_key_string)
×
346
                continue
×
347

348
            overlapping_target_id_usb_ids = target_id_usb_id_sub_keys.intersection(
×
349
                set(target_id_usb_ids)
350
            )
351
            for target_id_usb_id in overlapping_target_id_usb_ids:
×
352
                composite_device_key_string = "%s\\%s" % (
×
353
                    vid_pid_key_string,
354
                    target_id_usb_id,
355
                )
356
                composite_device_key = winreg.OpenKey(vid_pid_key, target_id_usb_id)
×
357

358
                entry_key_string = target_id_usb_id
×
359
                is_prefix = False
×
360

361
                try:
×
362
                    new_entry_key_string, _ = winreg.QueryValueEx(
×
363
                        composite_device_key, "ParentIdPrefix"
364
                    )
365

366
                    if any(
×
367
                        e.startswith(new_entry_key_string) for e in entry_key_strings
368
                    ):
369
                        logger.debug(
×
370
                            "Assigning new entry key string of %s to device %s, "
371
                            "as found in ParentIdPrefix",
372
                            new_entry_key_string,
373
                            target_id_usb_id,
374
                        )
375
                        entry_key_string = new_entry_key_string
×
376
                        is_prefix = True
×
377
                except OSError:
×
378
                    logger.debug(
×
379
                        'Device %s did not have a "ParentIdPrefix" key, '
380
                        "sticking with %s as entry key string",
381
                        composite_device_key_string,
382
                        target_id_usb_id,
383
                    )
384

385
                vid_pid_target_id_usb_id_map[vid_pid_path][entry_key_string] = {
×
386
                    "target_id_usb_id": target_id_usb_id,
387
                    "is_prefix": is_prefix,
388
                }
389

390
        for (
×
391
            vid_pid_path,
392
            entry_key_string_target_id_usb_id_map,
393
        ) in vid_pid_target_id_usb_id_map.items():
394
            for composite_device_subdevice_number in range(
×
395
                MAX_COMPOSITE_DEVICE_SUBDEVICES
396
            ):
397
                subdevice_type_key_string = "%s\\%s&MI_0%d" % (
×
398
                    usb_key_string,
399
                    vid_pid_path,
400
                    composite_device_subdevice_number,
401
                )
402
                try:
×
403
                    subdevice_type_key = winreg.OpenKey(
×
404
                        winreg.HKEY_LOCAL_MACHINE, subdevice_type_key_string
405
                    )
406
                except OSError:
×
407
                    logger.debug(
×
408
                        "Composite device subdevice key %s was not found, skipping",
409
                        subdevice_type_key_string,
410
                    )
411
                    continue
×
412

413
                for (
×
414
                    entry_key_string,
415
                    entry_data,
416
                ) in entry_key_string_target_id_usb_id_map.items():
417
                    if entry_data["is_prefix"]:
×
418
                        prepared_entry_key_string = "%s&000%d" % (
×
419
                            entry_key_string,
420
                            composite_device_subdevice_number,
421
                        )
422
                    else:
423
                        prepared_entry_key_string = entry_key_string
×
424
                    subdevice_key_string = "%s\\%s" % (
×
425
                        subdevice_type_key_string,
426
                        prepared_entry_key_string,
427
                    )
428
                    try:
×
429
                        subdevice_key = winreg.OpenKey(
×
430
                            subdevice_type_key, prepared_entry_key_string
431
                        )
432
                    except OSError:
×
433
                        logger.debug(
×
434
                            "Sub-device %s not found, skipping", subdevice_key_string
435
                        )
436
                        continue
×
437

438
                    try:
×
439
                        capability = _determine_subdevice_capability(subdevice_key)
×
440
                    except CompatibleIDsNotFoundException:
×
441
                        logger.debug(
×
442
                            'Expected %s to have subkey "CompatibleIDs". Skipping.',
443
                            subdevice_key_string,
444
                        )
445
                        continue
×
446

447
                    if capability == "msd":
×
448
                        candidates[entry_data["target_id_usb_id"]][
×
449
                            "mount_point"
450
                        ] = target_id_usb_id_mount_point_map[
451
                            entry_data["target_id_usb_id"]
452
                        ]
453
                        candidates[entry_data["target_id_usb_id"]].update(
×
454
                            _vid_pid_path_to_usb_info(vid_pid_path)
455
                        )
456
                    elif capability == "serial":
×
457
                        try:
×
458
                            device_parameters_key = winreg.OpenKey(
×
459
                                subdevice_key, "Device Parameters"
460
                            )
461
                        except OSError:
×
462
                            logger.debug(
×
463
                                'Key "Device Parameters" not under serial device entry'
464
                            )
465
                            continue
×
466

467
                        try:
×
468
                            candidates[entry_data["target_id_usb_id"]][
×
469
                                "serial_port"
470
                            ], _ = winreg.QueryValueEx(
471
                                device_parameters_key, "PortName"
472
                            )
473
                            candidates[entry_data["target_id_usb_id"]].update(
×
474
                                _vid_pid_path_to_usb_info(vid_pid_path)
475
                            )
476
                        except OSError:
×
477
                            logger.debug(
×
478
                                '"PortName" value not found under serial device entry'
479
                            )
480
                            continue
×
481

482
        final_candidates = []
×
483
        for target_id_usb_id, candidate in candidates.items():
×
484
            candidate["target_id_usb_id"] = target_id_usb_id
×
485

486
            if "serial_port" not in candidate:
×
487
                candidate["serial_port"] = None
×
488

489
            if "mount_point" not in candidate:
×
490
                candidate["mount_point"] = None
×
491

492
            final_candidates.append(candidate)
×
493

494
        return final_candidates
×
495

496
    def mount_point_ready(self, path):
×
497
        """! Check if a mount point is ready for file operations
498
        @return Returns True if the given path exists, False otherwise
499
        @details Calling the Windows command `dir` instead of using the python
500
        `os.path.exists`. The latter causes a Python error box to appear claiming
501
        there is "No Disk" for some devices that are in the ejected state. Calling
502
        `dir` prevents this since it uses the Windows API to determine if the
503
        device is ready before accessing the file system.
504
        """
505
        stdout, stderr, retcode = self._run_cli_process("dir %s" % path)
×
506
        result = True if retcode == 0 else False
×
507

508
        if result:
×
509
            logger.debug("Mount point %s is ready", path)
×
510
        else:
511
            logger.debug(
×
512
                "Mount point %s reported not ready with error '%s'",
513
                path,
514
                stderr.strip(),
515
            )
516

517
        return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc