• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4938565303402496

30 Apr 2025 02:34PM UTC coverage: 60.216% (-0.02%) from 60.236%
4938565303402496

Pull #9775

CirrusCI

f321x
fix: blocked Exception_Window by setting modality

If some dialogs are open while an exception was raised the input of
Exception_Window is blocked and the user cannot click the button to
submit the issue report. Only if the other dialog is closed
Exception_Window starts responding. By setting the modality of
`Exception_Windows` to `ApplicationModal` the `Exception_Window` will
get the 'highest priority' and accepts input even if other dialogs are
open.
Pull Request #9775: fix: blocked Exception_Window by setting modality

21615 of 35896 relevant lines covered (60.22%)

2.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.59
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
4✔
27
import os
4✔
28
import pkgutil
4✔
29
import importlib.util
4✔
30
import time
4✔
31
import threading
4✔
32
import sys
4✔
33
import aiohttp
4✔
34
import zipfile as zipfile_lib
4✔
35
from urllib.parse import urlparse
4✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
4✔
40
import zipimport
4✔
41
from functools import wraps, partial
4✔
42
from itertools import chain
4✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
4✔
45

46
from ._vendor.distutils.version import StrictVersion
4✔
47
from .version import ELECTRUM_VERSION
4✔
48
from .i18n import _
4✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
50
from . import bip32
4✔
51
from . import plugins
4✔
52
from .simple_config import SimpleConfig
4✔
53
from .logging import get_logger, Logger
4✔
54
from .crypto import sha256
4✔
55

56
if TYPE_CHECKING:
4✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
4✔
63
plugin_loaders = {}
4✔
64
hook_names = set()
4✔
65
hooks = {}
4✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
4✔
67

68
PLUGIN_PASSWORD_VERSION = 1
4✔
69

70

71
class Plugins(DaemonThread):
4✔
72

73
    LOGGING_SHORTCUT = 'p'
4✔
74
    pkgpath = os.path.dirname(plugins.__file__)
4✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
4✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
4✔
77

78
    @profiler
4✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
4✔
80
        self.config = config
4✔
81
        self.cmd_only = cmd_only  # type: bool
4✔
82
        self.internal_plugin_metadata = {}
4✔
83
        self.external_plugin_metadata = {}
4✔
84
        if cmd_only:
4✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
4✔
91
        self.device_manager = DeviceMgr(config)
4✔
92
        self.name = 'Plugins'  # set name of thread
4✔
93
        self.hw_wallets = {}
4✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
95
        self.gui_name = gui_name
4✔
96
        self.find_plugins()
4✔
97
        self.load_plugins()
4✔
98
        self.add_jobs(self.device_manager.thread_jobs())
4✔
99
        self.start()
4✔
100

101
    @property
4✔
102
    def descriptions(self):
4✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
4✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
4✔
108
        for loader, name, ispkg in iter_modules:
4✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
4✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
4✔
116
                continue
×
117
            try:
4✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
4✔
119
                    d = json.load(f)
4✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
4✔
124
                continue
×
125
            d['path'] = module_path
4✔
126
            if not self.cmd_only:
4✔
127
                gui_good = self.gui_name in d.get('available_for', [])
4✔
128
                if not gui_good:
4✔
129
                    continue
4✔
130
                details = d.get('registers_wallet_type')
4✔
131
                if details:
4✔
132
                    self.register_wallet_type(name, gui_good, details)
4✔
133
                details = d.get('registers_keystore')
4✔
134
                if details:
4✔
135
                    self.register_keystore(name, gui_good, details)
4✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
4✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
4✔
141
                self.internal_plugin_metadata[name] = d
4✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
4✔
146
    def exec_module_from_spec(spec, path: str):
4✔
147
        if prev_fail := _exec_module_failure.get(path):
4✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
4✔
150
            module = importlib.util.module_from_spec(spec)
4✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
4✔
154
            spec.loader.exec_module(module)
4✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
4✔
165

166
    def find_plugins(self):
4✔
167
        internal_plugins_path = (self.pkgpath, False)
4✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
4✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
4✔
170
            if pkg_path and os.path.exists(pkg_path):
4✔
171
                if not external:
4✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
4✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
4✔
175

176
    def load_plugins(self):
4✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
4✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
4✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
4✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
4✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
4✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
4✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
4✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
4✔
245
        if not os.path.exists(pkg_path):
4✔
246
            os.mkdir(pkg_path)
4✔
247
        return pkg_path
4✔
248

249
    async def download_external_plugin(self, url):
4✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
253
        async with aiohttp.ClientSession() as session:
×
254
            async with session.get(url) as resp:
×
255
                if resp.status == 200:
×
256
                    with open(path, 'wb') as fd:
×
257
                        async for chunk in resp.content.iter_chunked(10):
×
258
                            fd.write(chunk)
×
259
        return path
×
260

261
    def read_manifest(self, path) -> dict:
4✔
262
        """ return json dict """
263
        with zipfile_lib.ZipFile(path) as file:
×
264
            for filename in file.namelist():
×
265
                if filename.endswith('manifest.json'):
×
266
                    break
×
267
            else:
268
                raise Exception('could not find manifest.json in zip archive')
×
269
            with file.open(filename, 'r') as f:
×
270
                manifest = json.load(f)
×
271
                manifest['path'] = path  # external, path of the zipfile
×
272
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
273
                manifest['is_zip'] = True
×
274
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
275
                return manifest
×
276

277
    def zip_plugin_path(self, name) -> str:
4✔
278
        path = self.get_metadata(name)['path']
×
279
        filename = os.path.basename(path)
×
280
        if name in self.internal_plugin_metadata:
×
281
            pkg_path = self.pkgpath
×
282
        else:
283
            pkg_path = self.get_external_plugin_dir()
×
284
        return os.path.join(pkg_path, filename)
×
285

286
    def find_zip_plugins(self, pkg_path: str, external: bool):
4✔
287
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
288
        if pkg_path is None:
4✔
289
            return
×
290
        for filename in os.listdir(pkg_path):
4✔
291
            path = os.path.join(pkg_path, filename)
×
292
            if not filename.endswith('.zip'):
×
293
                continue
×
294
            try:
×
295
                d = self.read_manifest(path)
×
296
                name = d['name']
×
297
            except Exception:
×
298
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
299
                continue
×
300
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
301
                self.logger.info(f"duplicate plugins for {name=}")
×
302
                continue
×
303
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
304
                continue
×
305
            min_version = d.get('min_electrum_version')
×
306
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
307
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
308
                continue
×
309
            max_version = d.get('max_electrum_version')
×
310
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
311
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
312
                continue
×
313

314
            if not self.cmd_only:
×
315
                gui_good = self.gui_name in d.get('available_for', [])
×
316
                if not gui_good:
×
317
                    continue
×
318
                if 'fullname' not in d:
×
319
                    continue
×
320
                details = d.get('registers_keystore')
×
321
                if details:
×
322
                    self.register_keystore(name, gui_good, details)
×
323
            if external:
×
324
                self.external_plugin_metadata[name] = d
×
325
            else:
326
                self.internal_plugin_metadata[name] = d
×
327

328
    def get(self, name):
4✔
329
        return self.plugins.get(name)
×
330

331
    def count(self):
4✔
332
        return len(self.plugins)
×
333

334
    def load_plugin(self, name) -> 'BasePlugin':
4✔
335
        """Imports the code of the given plugin.
336
        note: can be called from any thread.
337
        """
338
        if self.get_metadata(name):
4✔
339
            return self.load_plugin_by_name(name)
4✔
340
        else:
341
            raise Exception(f"could not find plugin {name!r}")
×
342

343
    def maybe_load_plugin_init_method(self, name: str) -> None:
4✔
344
        """Loads the __init__.py module of the plugin if it is not already loaded."""
345
        is_external = name in self.external_plugin_metadata
4✔
346
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
4✔
347
        if base_name not in sys.modules:
4✔
348
            metadata = self.get_metadata(name)
4✔
349
            is_zip = metadata.get('is_zip', False)
4✔
350
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
351
            if not is_zip:
4✔
352
                if is_external:
4✔
353
                    # this branch is deprecated: external plugins are always zip files
354
                    path = os.path.join(metadata['path'], '__init__.py')
×
355
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
356
                else:
357
                    init_spec = importlib.util.find_spec(base_name)
4✔
358
            else:
359
                zipfile = zipimport.zipimporter(metadata['path'])
×
360
                dirname = metadata['dirname']
×
361
                init_spec = zipfile.find_spec(dirname)
×
362

363
            self.exec_module_from_spec(init_spec, base_name)
4✔
364

365
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
4✔
366
        if name in self.plugins:
4✔
367
            return self.plugins[name]
4✔
368
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
369
        self.maybe_load_plugin_init_method(name)
4✔
370
        is_external = name in self.external_plugin_metadata
4✔
371
        if is_external and not self.is_authorized(name):
4✔
372
            self.logger.info(f'plugin not authorized {name}')
×
373
            return
×
374
        if not is_external:
4✔
375
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
376
        else:
377
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
378

379
        spec = importlib.util.find_spec(full_name)
4✔
380
        if spec is None:
4✔
381
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
382
        try:
4✔
383
            module = self.exec_module_from_spec(spec, full_name)
4✔
384
            plugin = module.Plugin(self, self.config, name)
4✔
385
        except Exception as e:
×
386
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
387
        self.add_jobs(plugin.thread_jobs())
4✔
388
        self.plugins[name] = plugin
4✔
389
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
390
        return plugin
4✔
391

392
    def close_plugin(self, plugin):
4✔
393
        self.remove_jobs(plugin.thread_jobs())
×
394

395
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
4✔
396
        from hashlib import pbkdf2_hmac
×
397
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
398
        return ECPrivkey(secret)
×
399

400
    def install_internal_plugin(self, name):
4✔
401
        self.config.set_key(f'plugins.{name}.enabled', [])
×
402

403
    def install_external_plugin(self, name, path, privkey, manifest):
4✔
404
        self.external_plugin_metadata[name] = manifest
×
405
        self.authorize_plugin(name, path, privkey)
×
406

407
    def uninstall(self, name: str):
4✔
408
        self.config.set_key(f'plugins.{name}', None)
×
409
        if name in self.external_plugin_metadata:
×
410
            zipfile = self.zip_plugin_path(name)
×
411
            os.unlink(zipfile)
×
412
            self.external_plugin_metadata.pop(name)
×
413

414
    def is_internal(self, name) -> bool:
4✔
415
        return name in self.internal_plugin_metadata
×
416

417
    def is_auto_loaded(self, name):
4✔
418
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
419
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
420

421
    def is_installed(self, name) -> bool:
4✔
422
        """an external plugin may be installed but not authorized """
423
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
424
            or name in self.external_plugin_metadata
425

426
    def is_authorized(self, name) -> bool:
4✔
427
        if name in self.internal_plugin_metadata:
×
428
            return True
×
429
        if name not in self.external_plugin_metadata:
×
430
            return False
×
431
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
432
        if not pubkey_bytes:
×
433
            return False
×
434
        if not self.is_plugin_zip(name):
×
435
            return False
×
436
        filename = self.zip_plugin_path(name)
×
437
        plugin_hash = get_file_hash256(filename)
×
438
        sig = self.config.get(f'plugins.{name}.authorized')
×
439
        if not sig:
×
440
            return False
×
441
        pubkey = ECPubkey(pubkey_bytes)
×
442
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
443

444
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
4✔
445
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
446
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
447
        plugin_hash = get_file_hash256(filename)
×
448
        sig = privkey.ecdsa_sign(plugin_hash)
×
449
        value = sig.hex()
×
450
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
451

452
    def enable(self, name: str) -> 'BasePlugin':
4✔
453
        self.config.enable_plugin(name)
×
454
        p = self.get(name)
×
455
        if p:
×
456
            return p
×
457
        return self.load_plugin(name)
×
458

459
    def disable(self, name: str) -> None:
4✔
460
        self.config.disable_plugin(name)
×
461
        p = self.get(name)
×
462
        if not p:
×
463
            return
×
464
        self.plugins.pop(name)
×
465
        p.close()
×
466
        self.logger.info(f"closed {name}")
×
467

468
    @classmethod
4✔
469
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
470
        return key.startswith('plugins.')
×
471

472
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
473
        d = self.descriptions.get(name)
×
474
        if not d:
×
475
            return False
×
476
        deps = d.get('requires', [])
×
477
        for dep, s in deps:
×
478
            try:
×
479
                __import__(dep)
×
480
            except ImportError as e:
×
481
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
482
                return False
×
483
        requires = d.get('requires_wallet_type', [])
×
484
        return not requires or wallet.wallet_type in requires
×
485

486
    def get_hardware_support(self):
4✔
487
        out = []
×
488
        for name, (gui_good, details) in self.hw_wallets.items():
×
489
            if gui_good:
×
490
                try:
×
491
                    p = self.get_plugin(name)
×
492
                    if p.is_available():
×
493
                        out.append(HardwarePluginToScan(name=name,
×
494
                                                        description=details[2],
495
                                                        plugin=p,
496
                                                        exception=None))
497
                except Exception as e:
×
498
                    self.logger.exception(f"cannot load plugin for: {name}")
×
499
                    out.append(HardwarePluginToScan(name=name,
×
500
                                                    description=details[2],
501
                                                    plugin=None,
502
                                                    exception=e))
503
        return out
×
504

505
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
506
        from .wallet import register_wallet_type, register_constructor
4✔
507
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
508

509
        def loader():
4✔
510
            plugin = self.get_plugin(name)
4✔
511
            register_constructor(wallet_type, plugin.wallet_class)
4✔
512
        register_wallet_type(wallet_type)
4✔
513
        plugin_loaders[wallet_type] = loader
4✔
514

515
    def register_keystore(self, name, gui_good, details):
4✔
516
        from .keystore import register_keystore
4✔
517

518
        def dynamic_constructor(d):
4✔
519
            return self.get_plugin(name).keystore_class(d)
4✔
520
        if details[0] == 'hardware':
4✔
521
            self.hw_wallets[name] = (gui_good, details)
4✔
522
            self.logger.info(f"registering hardware {name}: {details}")
4✔
523
            register_keystore(details[1], dynamic_constructor)
4✔
524

525
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
526
        if name not in self.plugins:
4✔
527
            self.load_plugin(name)
4✔
528
        return self.plugins[name]
4✔
529

530
    def is_plugin_zip(self, name: str) -> bool:
4✔
531
        """Returns True if the plugin is a zip file"""
532
        if (metadata := self.get_metadata(name)) is None:
×
533
            return False
×
534
        return metadata.get('is_zip', False)
×
535

536
    def get_metadata(self, name: str) -> Optional[dict]:
4✔
537
        """Returns the metadata of the plugin"""
538
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
4✔
539
        if not metadata:
4✔
540
            return None
×
541
        return metadata
4✔
542

543
    def run(self):
4✔
544
        while self.is_running():
4✔
545
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
546
            self.run_jobs()
4✔
547
        self.on_stop()
4✔
548

549
    def read_file(self, name: str, filename: str) -> bytes:
4✔
550
        if self.is_plugin_zip(name):
×
551
            plugin_filename = self.zip_plugin_path(name)
×
552
            metadata = self.external_plugin_metadata[name]
×
553
            dirname = metadata['dirname']
×
554
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
555
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
556
                    return myfile.read()
×
557
        else:
558
            assert name in self.internal_plugin_metadata
×
559
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
560
            with open(path, 'rb') as myfile:
×
561
                return myfile.read()
×
562

563
def get_file_hash256(path: str) -> bytes:
4✔
564
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
565
    with open(path, 'rb') as f:
×
566
        return sha256(f.read())
×
567

568

569
def hook(func):
4✔
570
    hook_names.add(func.__name__)
4✔
571
    return func
4✔
572

573

574
def run_hook(name, *args):
4✔
575
    results = []
4✔
576
    f_list = hooks.get(name, [])
4✔
577
    for p, f in f_list:
4✔
578
        if p.is_enabled():
4✔
579
            try:
4✔
580
                r = f(*args)
4✔
581
            except Exception:
×
582
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
583
                r = False
×
584
            if r:
4✔
585
                results.append(r)
×
586

587
    if results:
4✔
588
        assert len(results) == 1, results
×
589
        return results[0]
×
590

591

592
class BasePlugin(Logger):
4✔
593

594
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
595
        self.parent = parent  # type: Plugins  # The plugins object
4✔
596
        self.name = name
4✔
597
        self.config = config
4✔
598
        Logger.__init__(self)
4✔
599
        # add self to hooks
600
        for k in dir(self):
4✔
601
            if k in hook_names:
4✔
602
                l = hooks.get(k, [])
4✔
603
                l.append((self, getattr(self, k)))
4✔
604
                hooks[k] = l
4✔
605

606
    def __str__(self):
4✔
607
        return self.name
×
608

609
    def close(self):
4✔
610
        # remove self from hooks
611
        for attr_name in dir(self):
×
612
            if attr_name in hook_names:
×
613
                # found attribute in self that is also the name of a hook
614
                l = hooks.get(attr_name, [])
×
615
                try:
×
616
                    l.remove((self, getattr(self, attr_name)))
×
617
                except ValueError:
×
618
                    # maybe attr name just collided with hook name and was not hook
619
                    continue
×
620
                hooks[attr_name] = l
×
621
        self.parent.close_plugin(self)
×
622
        self.on_close()
×
623

624
    def on_close(self):
4✔
625
        pass
×
626

627
    def requires_settings(self) -> bool:
4✔
628
        return False
×
629

630
    def thread_jobs(self):
4✔
631
        return []
4✔
632

633
    def is_enabled(self):
4✔
634
        if not self.is_available():
×
635
            return False
×
636
        return self.config.is_plugin_enabled(self.name)
×
637

638
    def is_available(self):
4✔
639
        return True
×
640

641
    def can_user_disable(self):
4✔
642
        return True
×
643

644
    def settings_widget(self, window):
4✔
645
        raise NotImplementedError()
×
646

647
    def settings_dialog(self, window):
4✔
648
        raise NotImplementedError()
×
649

650
    def read_file(self, filename: str) -> bytes:
4✔
651
        return self.parent.read_file(self.name, filename)
×
652

653

654
class DeviceUnpairableError(UserFacingException): pass
4✔
655
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
656
class CannotAutoSelectDevice(Exception): pass
4✔
657

658

659
class Device(NamedTuple):
4✔
660
    path: Union[str, bytes]
4✔
661
    interface_number: int
4✔
662
    id_: str
4✔
663
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
664
    usage_page: int
4✔
665
    transport_ui_string: str
4✔
666

667

668
class DeviceInfo(NamedTuple):
4✔
669
    device: Device
4✔
670
    label: Optional[str] = None
4✔
671
    initialized: Optional[bool] = None
4✔
672
    exception: Optional[Exception] = None
4✔
673
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
674
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
675
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
676

677

678
class HardwarePluginToScan(NamedTuple):
4✔
679
    name: str
4✔
680
    description: str
4✔
681
    plugin: Optional['HW_PluginBase']
4✔
682
    exception: Optional[Exception]
4✔
683

684

685
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
686

687

688
# hidapi is not thread-safe
689
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
690
#     https://github.com/libusb/hidapi/issues/45
691
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
692
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
693
# It is not entirely clear to me, exactly what is safe and what isn't, when
694
# using multiple threads...
695
# Hence, we use a single thread for all device communications, including
696
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
697
# the following thread:
698
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
699
    max_workers=1,
700
    thread_name_prefix='hwd_comms_thread'
701
)
702

703
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
704
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
705
# To keep it simple, let's just import it now, as we are likely in the main thread here.
706
if threading.current_thread() is not threading.main_thread():
4✔
707
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
708
try:
4✔
709
    import hid
4✔
710
except ImportError:
4✔
711
    pass
4✔
712

713

714
T = TypeVar('T')
4✔
715

716

717
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
718
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
719
        return func()
×
720
    else:
721
        fut = _hwd_comms_executor.submit(func)
×
722
        return fut.result()
×
723
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
724

725

726
def runs_in_hwd_thread(func):
4✔
727
    @wraps(func)
4✔
728
    def wrapper(*args, **kwargs):
4✔
729
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
730
    return wrapper
4✔
731

732

733
def assert_runs_in_hwd_thread():
4✔
734
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
735
        raise Exception("must only be called from HWD communication thread")
×
736

737

738
class DeviceMgr(ThreadJob):
4✔
739
    """Manages hardware clients.  A client communicates over a hardware
740
    channel with the device.
741

742
    In addition to tracking device HID IDs, the device manager tracks
743
    hardware wallets and manages wallet pairing.  A HID ID may be
744
    paired with a wallet when it is confirmed that the hardware device
745
    matches the wallet, i.e. they have the same master public key.  A
746
    HID ID can be unpaired if e.g. it is wiped.
747

748
    Because of hotplugging, a wallet must request its client
749
    dynamically each time it is required, rather than caching it
750
    itself.
751

752
    The device manager is shared across plugins, so just one place
753
    does hardware scans when needed.  By tracking HID IDs, if a device
754
    is plugged into a different port the wallet is automatically
755
    re-paired.
756

757
    Wallets are informed on connect / disconnect events.  It must
758
    implement connected(), disconnected() callbacks.  Being connected
759
    implies a pairing.  Callbacks can happen in any thread context,
760
    and we do them without holding the lock.
761

762
    Confusingly, the HID ID (serial number) reported by the HID system
763
    doesn't match the device ID reported by the device itself.  We use
764
    the HID IDs.
765

766
    This plugin is thread-safe.  Currently only devices supported by
767
    hidapi are implemented."""
768

769
    def __init__(self, config: SimpleConfig):
4✔
770
        ThreadJob.__init__(self)
4✔
771
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
772
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
773
        # A client->id_ map. Needs self.lock.
774
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
775
        # What we recognise.  (vendor_id, product_id) -> Plugin
776
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
777
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
778
        # Custom enumerate functions for devices we don't know about.
779
        self._enumerate_func = set()  # Needs self.lock.
4✔
780

781
        self.lock = threading.RLock()
4✔
782

783
        self.config = config
4✔
784

785
    def thread_jobs(self):
4✔
786
        # Thread job to handle device timeouts
787
        return [self]
4✔
788

789
    def run(self):
4✔
790
        '''Handle device timeouts.  Runs in the context of the Plugins
791
        thread.'''
792
        with self.lock:
4✔
793
            clients = list(self.clients.keys())
4✔
794
        cutoff = time.time() - self.config.get_session_timeout()
4✔
795
        for client in clients:
4✔
796
            client.timeout(cutoff)
×
797

798
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
799
        for pair in device_pairs:
×
800
            self._recognised_hardware[pair] = plugin
×
801

802
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
803
        for vendor_id in vendor_ids:
×
804
            self._recognised_vendor[vendor_id] = plugin
×
805

806
    def register_enumerate_func(self, func):
4✔
807
        with self.lock:
×
808
            self._enumerate_func.add(func)
×
809

810
    @runs_in_hwd_thread
4✔
811
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
812
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
813
        # Get from cache first
814
        client = self._client_by_id(device.id_)
×
815
        if client:
×
816
            return client
×
817
        client = plugin.create_client(device, handler)
×
818
        if client:
×
819
            self.logger.info(f"Registering {client}")
×
820
            with self.lock:
×
821
                self.clients[client] = device.id_
×
822
        return client
×
823

824
    def id_by_pairing_code(self, pairing_code):
4✔
825
        with self.lock:
×
826
            return self.pairing_code_to_id.get(pairing_code)
×
827

828
    def pairing_code_by_id(self, id_):
4✔
829
        with self.lock:
×
830
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
831
                if id2 == id_:
×
832
                    return pairing_code
×
833
            return None
×
834

835
    def unpair_pairing_code(self, pairing_code):
4✔
836
        with self.lock:
×
837
            if pairing_code not in self.pairing_code_to_id:
×
838
                return
×
839
            _id = self.pairing_code_to_id.pop(pairing_code)
×
840
        self._close_client(_id)
×
841

842
    def unpair_id(self, id_):
4✔
843
        pairing_code = self.pairing_code_by_id(id_)
×
844
        if pairing_code:
×
845
            self.unpair_pairing_code(pairing_code)
×
846
        else:
847
            self._close_client(id_)
×
848

849
    def _close_client(self, id_):
4✔
850
        with self.lock:
×
851
            client = self._client_by_id(id_)
×
852
            self.clients.pop(client, None)
×
853
        if client:
×
854
            client.close()
×
855

856
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
857
        with self.lock:
×
858
            for client, client_id in self.clients.items():
×
859
                if client_id == id_:
×
860
                    return client
×
861
        return None
×
862

863
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
864
        '''Returns a client for the device ID if one is registered.  If
865
        a device is wiped or in bootloader mode pairing is impossible;
866
        in such cases we communicate by device ID and not wallet.'''
867
        if scan_now:
×
868
            self.scan_devices()
×
869
        return self._client_by_id(id_)
×
870

871
    @runs_in_hwd_thread
4✔
872
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
873
                            keystore: 'Hardware_KeyStore',
874
                            force_pair: bool, *,
875
                            devices: Sequence['Device'] = None,
876
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
877
        self.logger.info("getting client for keystore")
×
878
        if handler is None:
×
879
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
880
        handler.update_status(False)
×
881
        pcode = keystore.pairing_code()
×
882
        client = None
×
883
        # search existing clients first (fast-path)
884
        if not devices:
×
885
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
886
        # search clients again, now allowing a (slow) scan
887
        if client is None:
×
888
            if devices is None:
×
889
                devices = self.scan_devices()
×
890
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
891
        if client is None and force_pair:
×
892
            try:
×
893
                info = self.select_device(plugin, handler, keystore, devices,
×
894
                                          allow_user_interaction=allow_user_interaction)
895
            except CannotAutoSelectDevice:
×
896
                pass
×
897
            else:
898
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
899
        if client:
×
900
            handler.update_status(True)
×
901
            # note: if select_device was called, we might also update label etc here:
902
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
903
        self.logger.info("end client for keystore")
×
904
        return client
×
905

906
    def client_by_pairing_code(
4✔
907
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
908
        devices: Sequence['Device'],
909
    ) -> Optional['HardwareClientBase']:
910
        _id = self.id_by_pairing_code(pairing_code)
×
911
        client = self._client_by_id(_id)
×
912
        if client:
×
913
            if type(client.plugin) != type(plugin):
×
914
                return
×
915
            # An unpaired client might have another wallet's handler
916
            # from a prior scan.  Replace to fix dialog parenting.
917
            client.handler = handler
×
918
            return client
×
919

920
        for device in devices:
×
921
            if device.id_ == _id:
×
922
                return self.create_client(device, handler, plugin)
×
923

924
    def force_pair_keystore(
4✔
925
        self,
926
        *,
927
        plugin: 'HW_PluginBase',
928
        handler: 'HardwareHandlerBase',
929
        info: 'DeviceInfo',
930
        keystore: 'Hardware_KeyStore',
931
    ) -> 'HardwareClientBase':
932
        xpub = keystore.xpub
×
933
        derivation = keystore.get_derivation_prefix()
×
934
        assert derivation is not None
×
935
        xtype = bip32.xpub_type(xpub)
×
936
        client = self._client_by_id(info.device.id_)
×
937
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
938
            # See comment above for same code
939
            client.handler = handler
×
940
            # This will trigger a PIN/passphrase entry request
941
            try:
×
942
                client_xpub = client.get_xpub(derivation, xtype)
×
943
            except (UserCancelled, RuntimeError):
×
944
                # Bad / cancelled PIN / passphrase
945
                client_xpub = None
×
946
            if client_xpub == xpub:
×
947
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
948
                with self.lock:
×
949
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
950
                return client
×
951

952
        # The user input has wrong PIN or passphrase, or cancelled input,
953
        # or it is not pairable
954
        raise DeviceUnpairableError(
×
955
            _('Electrum cannot pair with your {}.\n\n'
956
              'Before you request bitcoins to be sent to addresses in this '
957
              'wallet, ensure you can pair with your device, or that you have '
958
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
959
              'receive will be unspendable.').format(plugin.device))
960

961
    def list_pairable_device_infos(
4✔
962
        self,
963
        *,
964
        handler: Optional['HardwareHandlerBase'],
965
        plugin: 'HW_PluginBase',
966
        devices: Sequence['Device'] = None,
967
        include_failing_clients: bool = False,
968
    ) -> List['DeviceInfo']:
969
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
970
        Already paired devices are also included, as it is okay to reuse them.
971
        """
972
        if not plugin.libraries_available:
×
973
            message = plugin.get_library_not_available_message()
×
974
            raise HardwarePluginLibraryUnavailable(message)
×
975
        if devices is None:
×
976
            devices = self.scan_devices()
×
977
        infos = []
×
978
        for device in devices:
×
979
            if not plugin.can_recognize_device(device):
×
980
                continue
×
981
            try:
×
982
                client = self.create_client(device, handler, plugin)
×
983
                if not client:
×
984
                    continue
×
985
                label = client.label()
×
986
                is_initialized = client.is_initialized()
×
987
                soft_device_id = client.get_soft_device_id()
×
988
                model_name = client.device_model_name()
×
989
            except Exception as e:
×
990
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
991
                if include_failing_clients:
×
992
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
993
                continue
×
994
            infos.append(DeviceInfo(device=device,
×
995
                                    label=label,
996
                                    initialized=is_initialized,
997
                                    plugin_name=plugin.name,
998
                                    soft_device_id=soft_device_id,
999
                                    model_name=model_name))
1000

1001
        return infos
×
1002

1003
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
1004
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1005
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1006
        """Select the device to use for keystore."""
1007
        # ideally this should not be called from the GUI thread...
1008
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1009
        while True:
×
1010
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1011
            if infos:
×
1012
                break
×
1013
            if not allow_user_interaction:
×
1014
                raise CannotAutoSelectDevice()
×
1015
            msg = _('Please insert your {}').format(plugin.device)
×
1016
            msg += " ("
×
1017
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1018
                msg += f"label: {keystore.label}, "
×
1019
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1020
            msg += ').\n\n{}\n\n{}'.format(
×
1021
                _('Verify the cable is connected and that '
1022
                  'no other application is using it.'),
1023
                _('Try to connect again?')
1024
            )
1025
            if not handler.yes_no_question(msg):
×
1026
                raise UserCancelled()
×
1027
            devices = None
×
1028

1029
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1030
        # method 1: select device by id
1031
        if keystore.soft_device_id:
×
1032
            for info in infos:
×
1033
                if info.soft_device_id == keystore.soft_device_id:
×
1034
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1035
                    return info
×
1036
        # method 2: select device by label
1037
        #           but only if not a placeholder label and only if there is no collision
1038
        device_labels = [info.label for info in infos]
×
1039
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1040
                and device_labels.count(keystore.label) == 1):
1041
            for info in infos:
×
1042
                if info.label == keystore.label:
×
1043
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1044
                    return info
×
1045
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1046
        #           saved for keystore anyway, select it
1047
        if (len(infos) == 1
×
1048
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1049
                and keystore.soft_device_id is None):
1050
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1051
            return infos[0]
×
1052

1053
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1054
        if not allow_user_interaction:
×
1055
            raise CannotAutoSelectDevice()
×
1056
        # ask user to select device manually
1057
        msg = (
×
1058
                _("Could not automatically pair with device for given keystore.") + "\n"
1059
                + f"(keystore label: {keystore.label!r}, "
1060
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1061
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1062
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1063
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1064
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1065
                                init=(_("initialized") if info.initialized else _("wiped")),
1066
                                transport=info.device.transport_ui_string,
1067
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1068
                        for info in infos]
1069
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1070
                          f"num options: {len(infos)}. options: {infos}")
1071
        c = handler.query_choice(msg, descriptions)
×
1072
        if c is None:
×
1073
            raise UserCancelled()
×
1074
        info = infos[c]
×
1075
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1076
        # note: updated label/soft_device_id will be saved after pairing succeeds
1077
        return info
×
1078

1079
    @runs_in_hwd_thread
4✔
1080
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
1081
        try:
×
1082
            import hid  # noqa: F811
×
1083
        except ImportError:
×
1084
            return []
×
1085

1086
        devices = []
×
1087
        for d in hid.enumerate(0, 0):
×
1088
            vendor_id = d['vendor_id']
×
1089
            product_key = (vendor_id, d['product_id'])
×
1090
            plugin = None
×
1091
            if product_key in self._recognised_hardware:
×
1092
                plugin = self._recognised_hardware[product_key]
×
1093
            elif vendor_id in self._recognised_vendor:
×
1094
                plugin = self._recognised_vendor[vendor_id]
×
1095
            if plugin:
×
1096
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1097
                if device:
×
1098
                    devices.append(device)
×
1099
        return devices
×
1100

1101
    @runs_in_hwd_thread
4✔
1102
    @profiler
4✔
1103
    def scan_devices(self) -> Sequence['Device']:
4✔
1104
        self.logger.info("scanning devices...")
×
1105

1106
        # First see what's connected that we know about
1107
        devices = self._scan_devices_with_hid()
×
1108

1109
        # Let plugin handlers enumerate devices we don't know about
1110
        with self.lock:
×
1111
            enumerate_funcs = list(self._enumerate_func)
×
1112
        for f in enumerate_funcs:
×
1113
            try:
×
1114
                new_devices = f()
×
1115
            except BaseException as e:
×
1116
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1117
            else:
1118
                devices.extend(new_devices)
×
1119

1120
        # find out what was disconnected
1121
        client_ids = [dev.id_ for dev in devices]
×
1122
        disconnected_clients = []
×
1123
        with self.lock:
×
1124
            connected = {}
×
1125
            for client, id_ in self.clients.items():
×
1126
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1127
                    connected[client] = id_
×
1128
                else:
1129
                    disconnected_clients.append((client, id_))
×
1130
            self.clients = connected
×
1131

1132
        # Unpair disconnected devices
1133
        for client, id_ in disconnected_clients:
×
1134
            self.unpair_id(id_)
×
1135
            if client.handler:
×
1136
                client.handler.update_status(False)
×
1137

1138
        return devices
×
1139

1140
    @classmethod
4✔
1141
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
1142
        ret = {}
×
1143
        # add libusb
1144
        try:
×
1145
            import usb1
×
1146
        except Exception as e:
×
1147
            ret["libusb.version"] = None
×
1148
        else:
1149
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1150
            try:
×
1151
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1152
            except AttributeError:
×
1153
                ret["libusb.path"] = None
×
1154
        # add hidapi
1155
        try:
×
1156
            import hid  # noqa: F811
×
1157
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1158
        except Exception as e:
×
1159
            from importlib.metadata import version
×
1160
            try:
×
1161
                ret["hidapi.version"] = version("hidapi")
×
1162
            except ImportError:
×
1163
                ret["hidapi.version"] = None
×
1164
        return ret
×
1165

1166
    def trigger_pairings(
4✔
1167
            self,
1168
            keystores: Sequence['KeyStore'],
1169
            *,
1170
            allow_user_interaction: bool = True,
1171
            devices: Sequence['Device'] = None,
1172
    ) -> None:
1173
        """Given a list of keystores, try to pair each with a connected hardware device.
1174

1175
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1176
        try to pair each keystore individually. Consider the following scenario:
1177
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1178
        - assume none of the devices are paired yet
1179
        1. if we tried to individually pair keystores, we might try with ks1 first
1180
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1181
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1182
             which is confusing and error-prone. It's especially problematic if the hw device does
1183
             not support labels (such as Ledger), as then the user cannot easily distinguish
1184
             same-type devices. (see #4199)
1185
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1186
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1187
        """
1188
        from .keystore import Hardware_KeyStore
×
1189
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1190
        if not keystores:
×
1191
            return
×
1192
        if devices is None:
×
1193
            devices = self.scan_devices()
×
1194
        # first pair with all devices that can be auto-selected
1195
        for ks in keystores:
×
1196
            try:
×
1197
                ks.get_client(
×
1198
                    force_pair=True,
1199
                    allow_user_interaction=False,
1200
                    devices=devices,
1201
                )
1202
            except UserCancelled:
×
1203
                pass
×
1204
        if allow_user_interaction:
×
1205
            # now do manual selections
1206
            for ks in keystores:
×
1207
                try:
×
1208
                    ks.get_client(
×
1209
                        force_pair=True,
1210
                        allow_user_interaction=True,
1211
                        devices=devices,
1212
                    )
1213
                except UserCancelled:
×
1214
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc