• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5987722331947008

30 Apr 2025 11:52AM UTC coverage: 60.212% (-0.03%) from 60.243%
5987722331947008

push

CirrusCI

web-flow
Merge pull request #9769 from f321x/fix_plugins_dialog_bad_state

fix: PluginsDialog bugs causing exceptions

1 of 3 new or added lines in 1 file covered. (33.33%)

28 existing lines in 7 files now uncovered.

21615 of 35898 relevant lines covered (60.21%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.5
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
5✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
5✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
245
        if not os.path.exists(pkg_path):
5✔
246
            os.mkdir(pkg_path)
5✔
247
        return pkg_path
5✔
248

249
    async def download_external_plugin(self, url: str) -> str:
5✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
NEW
253
        if os.path.exists(path):
×
NEW
254
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
255
        async with aiohttp.ClientSession() as session:
×
256
            async with session.get(url) as resp:
×
257
                if resp.status == 200:
×
258
                    with open(path, 'wb') as fd:
×
259
                        async for chunk in resp.content.iter_chunked(10):
×
260
                            fd.write(chunk)
×
261
        return path
×
262

263
    def read_manifest(self, path) -> dict:
5✔
264
        """ return json dict """
265
        with zipfile_lib.ZipFile(path) as file:
×
266
            for filename in file.namelist():
×
267
                if filename.endswith('manifest.json'):
×
268
                    break
×
269
            else:
270
                raise Exception('could not find manifest.json in zip archive')
×
271
            with file.open(filename, 'r') as f:
×
272
                manifest = json.load(f)
×
273
                manifest['path'] = path  # external, path of the zipfile
×
274
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
275
                manifest['is_zip'] = True
×
276
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
277
                return manifest
×
278

279
    def zip_plugin_path(self, name) -> str:
5✔
280
        path = self.get_metadata(name)['path']
×
281
        filename = os.path.basename(path)
×
282
        if name in self.internal_plugin_metadata:
×
283
            pkg_path = self.pkgpath
×
284
        else:
285
            pkg_path = self.get_external_plugin_dir()
×
286
        return os.path.join(pkg_path, filename)
×
287

288
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
289
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
290
        if pkg_path is None:
5✔
291
            return
×
292
        for filename in os.listdir(pkg_path):
5✔
293
            path = os.path.join(pkg_path, filename)
×
294
            if not filename.endswith('.zip'):
×
295
                continue
×
296
            try:
×
297
                d = self.read_manifest(path)
×
298
                name = d['name']
×
299
            except Exception:
×
300
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
301
                continue
×
302
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
303
                self.logger.info(f"duplicate plugins for {name=}")
×
304
                continue
×
305
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
306
                continue
×
307
            min_version = d.get('min_electrum_version')
×
308
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
309
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
310
                continue
×
311
            max_version = d.get('max_electrum_version')
×
312
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
313
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
314
                continue
×
315

316
            if not self.cmd_only:
×
317
                gui_good = self.gui_name in d.get('available_for', [])
×
318
                if not gui_good:
×
319
                    continue
×
320
                if 'fullname' not in d:
×
321
                    continue
×
322
                details = d.get('registers_keystore')
×
323
                if details:
×
324
                    self.register_keystore(name, gui_good, details)
×
325
            if external:
×
326
                self.external_plugin_metadata[name] = d
×
327
            else:
328
                self.internal_plugin_metadata[name] = d
×
329

330
    def get(self, name):
5✔
331
        return self.plugins.get(name)
×
332

333
    def count(self):
5✔
334
        return len(self.plugins)
×
335

336
    def load_plugin(self, name) -> 'BasePlugin':
5✔
337
        """Imports the code of the given plugin.
338
        note: can be called from any thread.
339
        """
340
        if self.get_metadata(name):
5✔
341
            return self.load_plugin_by_name(name)
5✔
342
        else:
343
            raise Exception(f"could not find plugin {name!r}")
×
344

345
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
346
        """Loads the __init__.py module of the plugin if it is not already loaded."""
347
        is_external = name in self.external_plugin_metadata
5✔
348
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
349
        if base_name not in sys.modules:
5✔
350
            metadata = self.get_metadata(name)
5✔
351
            is_zip = metadata.get('is_zip', False)
5✔
352
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
353
            if not is_zip:
5✔
354
                if is_external:
5✔
355
                    # this branch is deprecated: external plugins are always zip files
356
                    path = os.path.join(metadata['path'], '__init__.py')
×
357
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
358
                else:
359
                    init_spec = importlib.util.find_spec(base_name)
5✔
360
            else:
361
                zipfile = zipimport.zipimporter(metadata['path'])
×
362
                dirname = metadata['dirname']
×
363
                init_spec = zipfile.find_spec(dirname)
×
364

365
            self.exec_module_from_spec(init_spec, base_name)
5✔
366

367
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
368
        if name in self.plugins:
5✔
369
            return self.plugins[name]
5✔
370
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
371
        self.maybe_load_plugin_init_method(name)
5✔
372
        is_external = name in self.external_plugin_metadata
5✔
373
        if is_external and not self.is_authorized(name):
5✔
374
            self.logger.info(f'plugin not authorized {name}')
×
375
            return
×
376
        if not is_external:
5✔
377
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
378
        else:
379
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
380

381
        spec = importlib.util.find_spec(full_name)
5✔
382
        if spec is None:
5✔
383
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
384
        try:
5✔
385
            module = self.exec_module_from_spec(spec, full_name)
5✔
386
            plugin = module.Plugin(self, self.config, name)
5✔
387
        except Exception as e:
×
388
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
389
        self.add_jobs(plugin.thread_jobs())
5✔
390
        self.plugins[name] = plugin
5✔
391
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
392
        return plugin
5✔
393

394
    def close_plugin(self, plugin):
5✔
395
        self.remove_jobs(plugin.thread_jobs())
×
396

397
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
398
        from hashlib import pbkdf2_hmac
×
399
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
400
        return ECPrivkey(secret)
×
401

402
    def install_internal_plugin(self, name):
5✔
403
        self.config.set_key(f'plugins.{name}.enabled', [])
×
404

405
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
406
        self.external_plugin_metadata[name] = manifest
×
407
        self.authorize_plugin(name, path, privkey)
×
408

409
    def uninstall(self, name: str):
5✔
410
        self.config.set_key(f'plugins.{name}', None)
×
411
        if name in self.external_plugin_metadata:
×
412
            zipfile = self.zip_plugin_path(name)
×
413
            os.unlink(zipfile)
×
414
            self.external_plugin_metadata.pop(name)
×
415

416
    def is_internal(self, name) -> bool:
5✔
417
        return name in self.internal_plugin_metadata
×
418

419
    def is_auto_loaded(self, name):
5✔
420
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
421
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
422

423
    def is_installed(self, name) -> bool:
5✔
424
        """an external plugin may be installed but not authorized """
425
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
426
            or name in self.external_plugin_metadata
427

428
    def is_authorized(self, name) -> bool:
5✔
429
        if name in self.internal_plugin_metadata:
×
430
            return True
×
431
        if name not in self.external_plugin_metadata:
×
432
            return False
×
433
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
434
        if not pubkey_bytes:
×
435
            return False
×
436
        if not self.is_plugin_zip(name):
×
437
            return False
×
438
        filename = self.zip_plugin_path(name)
×
439
        plugin_hash = get_file_hash256(filename)
×
440
        sig = self.config.get(f'plugins.{name}.authorized')
×
441
        if not sig:
×
442
            return False
×
443
        pubkey = ECPubkey(pubkey_bytes)
×
444
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
445

446
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
447
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
448
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
449
        plugin_hash = get_file_hash256(filename)
×
450
        sig = privkey.ecdsa_sign(plugin_hash)
×
451
        value = sig.hex()
×
452
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
453

454
    def enable(self, name: str) -> 'BasePlugin':
5✔
455
        self.config.enable_plugin(name)
×
456
        p = self.get(name)
×
457
        if p:
×
458
            return p
×
459
        return self.load_plugin(name)
×
460

461
    def disable(self, name: str) -> None:
5✔
462
        self.config.disable_plugin(name)
×
463
        p = self.get(name)
×
464
        if not p:
×
465
            return
×
466
        self.plugins.pop(name)
×
467
        p.close()
×
468
        self.logger.info(f"closed {name}")
×
469

470
    @classmethod
5✔
471
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
472
        return key.startswith('plugins.')
×
473

474
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
475
        d = self.descriptions.get(name)
×
476
        if not d:
×
477
            return False
×
478
        deps = d.get('requires', [])
×
479
        for dep, s in deps:
×
480
            try:
×
481
                __import__(dep)
×
482
            except ImportError as e:
×
483
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
484
                return False
×
485
        requires = d.get('requires_wallet_type', [])
×
486
        return not requires or wallet.wallet_type in requires
×
487

488
    def get_hardware_support(self):
5✔
489
        out = []
×
490
        for name, (gui_good, details) in self.hw_wallets.items():
×
491
            if gui_good:
×
492
                try:
×
493
                    p = self.get_plugin(name)
×
494
                    if p.is_available():
×
495
                        out.append(HardwarePluginToScan(name=name,
×
496
                                                        description=details[2],
497
                                                        plugin=p,
498
                                                        exception=None))
499
                except Exception as e:
×
500
                    self.logger.exception(f"cannot load plugin for: {name}")
×
501
                    out.append(HardwarePluginToScan(name=name,
×
502
                                                    description=details[2],
503
                                                    plugin=None,
504
                                                    exception=e))
505
        return out
×
506

507
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
508
        from .wallet import register_wallet_type, register_constructor
5✔
509
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
510

511
        def loader():
5✔
512
            plugin = self.get_plugin(name)
5✔
513
            register_constructor(wallet_type, plugin.wallet_class)
5✔
514
        register_wallet_type(wallet_type)
5✔
515
        plugin_loaders[wallet_type] = loader
5✔
516

517
    def register_keystore(self, name, gui_good, details):
5✔
518
        from .keystore import register_keystore
5✔
519

520
        def dynamic_constructor(d):
5✔
521
            return self.get_plugin(name).keystore_class(d)
5✔
522
        if details[0] == 'hardware':
5✔
523
            self.hw_wallets[name] = (gui_good, details)
5✔
524
            self.logger.info(f"registering hardware {name}: {details}")
5✔
525
            register_keystore(details[1], dynamic_constructor)
5✔
526

527
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
528
        if name not in self.plugins:
5✔
529
            self.load_plugin(name)
5✔
530
        return self.plugins[name]
5✔
531

532
    def is_plugin_zip(self, name: str) -> bool:
5✔
533
        """Returns True if the plugin is a zip file"""
534
        if (metadata := self.get_metadata(name)) is None:
×
535
            return False
×
536
        return metadata.get('is_zip', False)
×
537

538
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
539
        """Returns the metadata of the plugin"""
540
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
541
        if not metadata:
5✔
542
            return None
×
543
        return metadata
5✔
544

545
    def run(self):
5✔
546
        while self.is_running():
5✔
547
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
548
            self.run_jobs()
5✔
549
        self.on_stop()
5✔
550

551
    def read_file(self, name: str, filename: str) -> bytes:
5✔
552
        if self.is_plugin_zip(name):
×
553
            plugin_filename = self.zip_plugin_path(name)
×
554
            metadata = self.external_plugin_metadata[name]
×
555
            dirname = metadata['dirname']
×
556
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
557
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
558
                    return myfile.read()
×
559
        else:
560
            assert name in self.internal_plugin_metadata
×
561
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
562
            with open(path, 'rb') as myfile:
×
563
                return myfile.read()
×
564

565
def get_file_hash256(path: str) -> bytes:
5✔
566
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
567
    with open(path, 'rb') as f:
×
568
        return sha256(f.read())
×
569

570

571
def hook(func):
5✔
572
    hook_names.add(func.__name__)
5✔
573
    return func
5✔
574

575

576
def run_hook(name, *args):
5✔
577
    results = []
5✔
578
    f_list = hooks.get(name, [])
5✔
579
    for p, f in f_list:
5✔
580
        if p.is_enabled():
5✔
581
            try:
5✔
582
                r = f(*args)
5✔
583
            except Exception:
×
584
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
585
                r = False
×
586
            if r:
5✔
587
                results.append(r)
×
588

589
    if results:
5✔
590
        assert len(results) == 1, results
×
591
        return results[0]
×
592

593

594
class BasePlugin(Logger):
5✔
595

596
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
597
        self.parent = parent  # type: Plugins  # The plugins object
5✔
598
        self.name = name
5✔
599
        self.config = config
5✔
600
        Logger.__init__(self)
5✔
601
        # add self to hooks
602
        for k in dir(self):
5✔
603
            if k in hook_names:
5✔
604
                l = hooks.get(k, [])
5✔
605
                l.append((self, getattr(self, k)))
5✔
606
                hooks[k] = l
5✔
607

608
    def __str__(self):
5✔
609
        return self.name
×
610

611
    def close(self):
5✔
612
        # remove self from hooks
613
        for attr_name in dir(self):
×
614
            if attr_name in hook_names:
×
615
                # found attribute in self that is also the name of a hook
616
                l = hooks.get(attr_name, [])
×
617
                try:
×
618
                    l.remove((self, getattr(self, attr_name)))
×
619
                except ValueError:
×
620
                    # maybe attr name just collided with hook name and was not hook
621
                    continue
×
622
                hooks[attr_name] = l
×
623
        self.parent.close_plugin(self)
×
624
        self.on_close()
×
625

626
    def on_close(self):
5✔
627
        pass
×
628

629
    def requires_settings(self) -> bool:
5✔
630
        return False
×
631

632
    def thread_jobs(self):
5✔
633
        return []
5✔
634

635
    def is_enabled(self):
5✔
636
        if not self.is_available():
×
637
            return False
×
638
        return self.config.is_plugin_enabled(self.name)
×
639

640
    def is_available(self):
5✔
641
        return True
×
642

643
    def can_user_disable(self):
5✔
644
        return True
×
645

646
    def settings_widget(self, window):
5✔
647
        raise NotImplementedError()
×
648

649
    def settings_dialog(self, window):
5✔
650
        raise NotImplementedError()
×
651

652
    def read_file(self, filename: str) -> bytes:
5✔
653
        return self.parent.read_file(self.name, filename)
×
654

655

656
class DeviceUnpairableError(UserFacingException): pass
5✔
657
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
658
class CannotAutoSelectDevice(Exception): pass
5✔
659

660

661
class Device(NamedTuple):
5✔
662
    path: Union[str, bytes]
5✔
663
    interface_number: int
5✔
664
    id_: str
5✔
665
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
666
    usage_page: int
5✔
667
    transport_ui_string: str
5✔
668

669

670
class DeviceInfo(NamedTuple):
5✔
671
    device: Device
5✔
672
    label: Optional[str] = None
5✔
673
    initialized: Optional[bool] = None
5✔
674
    exception: Optional[Exception] = None
5✔
675
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
676
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
677
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
678

679

680
class HardwarePluginToScan(NamedTuple):
5✔
681
    name: str
5✔
682
    description: str
5✔
683
    plugin: Optional['HW_PluginBase']
5✔
684
    exception: Optional[Exception]
5✔
685

686

687
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
688

689

690
# hidapi is not thread-safe
691
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
692
#     https://github.com/libusb/hidapi/issues/45
693
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
694
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
695
# It is not entirely clear to me, exactly what is safe and what isn't, when
696
# using multiple threads...
697
# Hence, we use a single thread for all device communications, including
698
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
699
# the following thread:
700
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
701
    max_workers=1,
702
    thread_name_prefix='hwd_comms_thread'
703
)
704

705
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
706
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
707
# To keep it simple, let's just import it now, as we are likely in the main thread here.
708
if threading.current_thread() is not threading.main_thread():
5✔
709
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
710
try:
5✔
711
    import hid
5✔
712
except ImportError:
5✔
713
    pass
5✔
714

715

716
T = TypeVar('T')
5✔
717

718

719
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
720
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
721
        return func()
×
722
    else:
723
        fut = _hwd_comms_executor.submit(func)
×
724
        return fut.result()
×
725
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
726

727

728
def runs_in_hwd_thread(func):
5✔
729
    @wraps(func)
5✔
730
    def wrapper(*args, **kwargs):
5✔
731
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
732
    return wrapper
5✔
733

734

735
def assert_runs_in_hwd_thread():
5✔
736
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
737
        raise Exception("must only be called from HWD communication thread")
×
738

739

740
class DeviceMgr(ThreadJob):
5✔
741
    """Manages hardware clients.  A client communicates over a hardware
742
    channel with the device.
743

744
    In addition to tracking device HID IDs, the device manager tracks
745
    hardware wallets and manages wallet pairing.  A HID ID may be
746
    paired with a wallet when it is confirmed that the hardware device
747
    matches the wallet, i.e. they have the same master public key.  A
748
    HID ID can be unpaired if e.g. it is wiped.
749

750
    Because of hotplugging, a wallet must request its client
751
    dynamically each time it is required, rather than caching it
752
    itself.
753

754
    The device manager is shared across plugins, so just one place
755
    does hardware scans when needed.  By tracking HID IDs, if a device
756
    is plugged into a different port the wallet is automatically
757
    re-paired.
758

759
    Wallets are informed on connect / disconnect events.  It must
760
    implement connected(), disconnected() callbacks.  Being connected
761
    implies a pairing.  Callbacks can happen in any thread context,
762
    and we do them without holding the lock.
763

764
    Confusingly, the HID ID (serial number) reported by the HID system
765
    doesn't match the device ID reported by the device itself.  We use
766
    the HID IDs.
767

768
    This plugin is thread-safe.  Currently only devices supported by
769
    hidapi are implemented."""
770

771
    def __init__(self, config: SimpleConfig):
5✔
772
        ThreadJob.__init__(self)
5✔
773
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
774
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
775
        # A client->id_ map. Needs self.lock.
776
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
777
        # What we recognise.  (vendor_id, product_id) -> Plugin
778
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
779
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
780
        # Custom enumerate functions for devices we don't know about.
781
        self._enumerate_func = set()  # Needs self.lock.
5✔
782

783
        self.lock = threading.RLock()
5✔
784

785
        self.config = config
5✔
786

787
    def thread_jobs(self):
5✔
788
        # Thread job to handle device timeouts
789
        return [self]
5✔
790

791
    def run(self):
5✔
792
        '''Handle device timeouts.  Runs in the context of the Plugins
793
        thread.'''
794
        with self.lock:
5✔
795
            clients = list(self.clients.keys())
5✔
796
        cutoff = time.time() - self.config.get_session_timeout()
5✔
797
        for client in clients:
5✔
798
            client.timeout(cutoff)
×
799

800
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
801
        for pair in device_pairs:
×
802
            self._recognised_hardware[pair] = plugin
×
803

804
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
805
        for vendor_id in vendor_ids:
×
806
            self._recognised_vendor[vendor_id] = plugin
×
807

808
    def register_enumerate_func(self, func):
5✔
809
        with self.lock:
×
810
            self._enumerate_func.add(func)
×
811

812
    @runs_in_hwd_thread
5✔
813
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
814
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
815
        # Get from cache first
816
        client = self._client_by_id(device.id_)
×
817
        if client:
×
818
            return client
×
819
        client = plugin.create_client(device, handler)
×
820
        if client:
×
821
            self.logger.info(f"Registering {client}")
×
822
            with self.lock:
×
823
                self.clients[client] = device.id_
×
824
        return client
×
825

826
    def id_by_pairing_code(self, pairing_code):
5✔
827
        with self.lock:
×
828
            return self.pairing_code_to_id.get(pairing_code)
×
829

830
    def pairing_code_by_id(self, id_):
5✔
831
        with self.lock:
×
832
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
833
                if id2 == id_:
×
834
                    return pairing_code
×
835
            return None
×
836

837
    def unpair_pairing_code(self, pairing_code):
5✔
838
        with self.lock:
×
839
            if pairing_code not in self.pairing_code_to_id:
×
840
                return
×
841
            _id = self.pairing_code_to_id.pop(pairing_code)
×
842
        self._close_client(_id)
×
843

844
    def unpair_id(self, id_):
5✔
845
        pairing_code = self.pairing_code_by_id(id_)
×
846
        if pairing_code:
×
847
            self.unpair_pairing_code(pairing_code)
×
848
        else:
849
            self._close_client(id_)
×
850

851
    def _close_client(self, id_):
5✔
852
        with self.lock:
×
853
            client = self._client_by_id(id_)
×
854
            self.clients.pop(client, None)
×
855
        if client:
×
856
            client.close()
×
857

858
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
859
        with self.lock:
×
860
            for client, client_id in self.clients.items():
×
861
                if client_id == id_:
×
862
                    return client
×
863
        return None
×
864

865
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
866
        '''Returns a client for the device ID if one is registered.  If
867
        a device is wiped or in bootloader mode pairing is impossible;
868
        in such cases we communicate by device ID and not wallet.'''
869
        if scan_now:
×
870
            self.scan_devices()
×
871
        return self._client_by_id(id_)
×
872

873
    @runs_in_hwd_thread
5✔
874
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
875
                            keystore: 'Hardware_KeyStore',
876
                            force_pair: bool, *,
877
                            devices: Sequence['Device'] = None,
878
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
879
        self.logger.info("getting client for keystore")
×
880
        if handler is None:
×
881
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
882
        handler.update_status(False)
×
883
        pcode = keystore.pairing_code()
×
884
        client = None
×
885
        # search existing clients first (fast-path)
886
        if not devices:
×
887
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
888
        # search clients again, now allowing a (slow) scan
889
        if client is None:
×
890
            if devices is None:
×
891
                devices = self.scan_devices()
×
892
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
893
        if client is None and force_pair:
×
894
            try:
×
895
                info = self.select_device(plugin, handler, keystore, devices,
×
896
                                          allow_user_interaction=allow_user_interaction)
897
            except CannotAutoSelectDevice:
×
898
                pass
×
899
            else:
900
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
901
        if client:
×
902
            handler.update_status(True)
×
903
            # note: if select_device was called, we might also update label etc here:
904
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
905
        self.logger.info("end client for keystore")
×
906
        return client
×
907

908
    def client_by_pairing_code(
5✔
909
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
910
        devices: Sequence['Device'],
911
    ) -> Optional['HardwareClientBase']:
912
        _id = self.id_by_pairing_code(pairing_code)
×
913
        client = self._client_by_id(_id)
×
914
        if client:
×
915
            if type(client.plugin) != type(plugin):
×
916
                return
×
917
            # An unpaired client might have another wallet's handler
918
            # from a prior scan.  Replace to fix dialog parenting.
919
            client.handler = handler
×
920
            return client
×
921

922
        for device in devices:
×
923
            if device.id_ == _id:
×
924
                return self.create_client(device, handler, plugin)
×
925

926
    def force_pair_keystore(
5✔
927
        self,
928
        *,
929
        plugin: 'HW_PluginBase',
930
        handler: 'HardwareHandlerBase',
931
        info: 'DeviceInfo',
932
        keystore: 'Hardware_KeyStore',
933
    ) -> 'HardwareClientBase':
934
        xpub = keystore.xpub
×
935
        derivation = keystore.get_derivation_prefix()
×
936
        assert derivation is not None
×
937
        xtype = bip32.xpub_type(xpub)
×
938
        client = self._client_by_id(info.device.id_)
×
939
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
940
            # See comment above for same code
941
            client.handler = handler
×
942
            # This will trigger a PIN/passphrase entry request
943
            try:
×
944
                client_xpub = client.get_xpub(derivation, xtype)
×
945
            except (UserCancelled, RuntimeError):
×
946
                # Bad / cancelled PIN / passphrase
947
                client_xpub = None
×
948
            if client_xpub == xpub:
×
949
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
950
                with self.lock:
×
951
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
952
                return client
×
953

954
        # The user input has wrong PIN or passphrase, or cancelled input,
955
        # or it is not pairable
956
        raise DeviceUnpairableError(
×
957
            _('Electrum cannot pair with your {}.\n\n'
958
              'Before you request bitcoins to be sent to addresses in this '
959
              'wallet, ensure you can pair with your device, or that you have '
960
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
961
              'receive will be unspendable.').format(plugin.device))
962

963
    def list_pairable_device_infos(
5✔
964
        self,
965
        *,
966
        handler: Optional['HardwareHandlerBase'],
967
        plugin: 'HW_PluginBase',
968
        devices: Sequence['Device'] = None,
969
        include_failing_clients: bool = False,
970
    ) -> List['DeviceInfo']:
971
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
972
        Already paired devices are also included, as it is okay to reuse them.
973
        """
974
        if not plugin.libraries_available:
×
975
            message = plugin.get_library_not_available_message()
×
976
            raise HardwarePluginLibraryUnavailable(message)
×
977
        if devices is None:
×
978
            devices = self.scan_devices()
×
979
        infos = []
×
980
        for device in devices:
×
981
            if not plugin.can_recognize_device(device):
×
982
                continue
×
983
            try:
×
984
                client = self.create_client(device, handler, plugin)
×
985
                if not client:
×
986
                    continue
×
987
                label = client.label()
×
988
                is_initialized = client.is_initialized()
×
989
                soft_device_id = client.get_soft_device_id()
×
990
                model_name = client.device_model_name()
×
991
            except Exception as e:
×
992
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
993
                if include_failing_clients:
×
994
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
995
                continue
×
996
            infos.append(DeviceInfo(device=device,
×
997
                                    label=label,
998
                                    initialized=is_initialized,
999
                                    plugin_name=plugin.name,
1000
                                    soft_device_id=soft_device_id,
1001
                                    model_name=model_name))
1002

1003
        return infos
×
1004

1005
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1006
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1007
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1008
        """Select the device to use for keystore."""
1009
        # ideally this should not be called from the GUI thread...
1010
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1011
        while True:
×
1012
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1013
            if infos:
×
1014
                break
×
1015
            if not allow_user_interaction:
×
1016
                raise CannotAutoSelectDevice()
×
1017
            msg = _('Please insert your {}').format(plugin.device)
×
1018
            msg += " ("
×
1019
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1020
                msg += f"label: {keystore.label}, "
×
1021
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1022
            msg += ').\n\n{}\n\n{}'.format(
×
1023
                _('Verify the cable is connected and that '
1024
                  'no other application is using it.'),
1025
                _('Try to connect again?')
1026
            )
1027
            if not handler.yes_no_question(msg):
×
1028
                raise UserCancelled()
×
1029
            devices = None
×
1030

1031
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1032
        # method 1: select device by id
1033
        if keystore.soft_device_id:
×
1034
            for info in infos:
×
1035
                if info.soft_device_id == keystore.soft_device_id:
×
1036
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1037
                    return info
×
1038
        # method 2: select device by label
1039
        #           but only if not a placeholder label and only if there is no collision
1040
        device_labels = [info.label for info in infos]
×
1041
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1042
                and device_labels.count(keystore.label) == 1):
1043
            for info in infos:
×
1044
                if info.label == keystore.label:
×
1045
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1046
                    return info
×
1047
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1048
        #           saved for keystore anyway, select it
1049
        if (len(infos) == 1
×
1050
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1051
                and keystore.soft_device_id is None):
1052
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1053
            return infos[0]
×
1054

1055
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1056
        if not allow_user_interaction:
×
1057
            raise CannotAutoSelectDevice()
×
1058
        # ask user to select device manually
1059
        msg = (
×
1060
                _("Could not automatically pair with device for given keystore.") + "\n"
1061
                + f"(keystore label: {keystore.label!r}, "
1062
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1063
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1064
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1065
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1066
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1067
                                init=(_("initialized") if info.initialized else _("wiped")),
1068
                                transport=info.device.transport_ui_string,
1069
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1070
                        for info in infos]
1071
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1072
                          f"num options: {len(infos)}. options: {infos}")
1073
        c = handler.query_choice(msg, descriptions)
×
1074
        if c is None:
×
1075
            raise UserCancelled()
×
1076
        info = infos[c]
×
1077
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1078
        # note: updated label/soft_device_id will be saved after pairing succeeds
1079
        return info
×
1080

1081
    @runs_in_hwd_thread
5✔
1082
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1083
        try:
×
1084
            import hid  # noqa: F811
×
1085
        except ImportError:
×
1086
            return []
×
1087

1088
        devices = []
×
1089
        for d in hid.enumerate(0, 0):
×
1090
            vendor_id = d['vendor_id']
×
1091
            product_key = (vendor_id, d['product_id'])
×
1092
            plugin = None
×
1093
            if product_key in self._recognised_hardware:
×
1094
                plugin = self._recognised_hardware[product_key]
×
1095
            elif vendor_id in self._recognised_vendor:
×
1096
                plugin = self._recognised_vendor[vendor_id]
×
1097
            if plugin:
×
1098
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1099
                if device:
×
1100
                    devices.append(device)
×
1101
        return devices
×
1102

1103
    @runs_in_hwd_thread
5✔
1104
    @profiler
5✔
1105
    def scan_devices(self) -> Sequence['Device']:
5✔
1106
        self.logger.info("scanning devices...")
×
1107

1108
        # First see what's connected that we know about
1109
        devices = self._scan_devices_with_hid()
×
1110

1111
        # Let plugin handlers enumerate devices we don't know about
1112
        with self.lock:
×
1113
            enumerate_funcs = list(self._enumerate_func)
×
1114
        for f in enumerate_funcs:
×
1115
            try:
×
1116
                new_devices = f()
×
1117
            except BaseException as e:
×
1118
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1119
            else:
1120
                devices.extend(new_devices)
×
1121

1122
        # find out what was disconnected
1123
        client_ids = [dev.id_ for dev in devices]
×
1124
        disconnected_clients = []
×
1125
        with self.lock:
×
1126
            connected = {}
×
1127
            for client, id_ in self.clients.items():
×
1128
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1129
                    connected[client] = id_
×
1130
                else:
1131
                    disconnected_clients.append((client, id_))
×
1132
            self.clients = connected
×
1133

1134
        # Unpair disconnected devices
1135
        for client, id_ in disconnected_clients:
×
1136
            self.unpair_id(id_)
×
1137
            if client.handler:
×
1138
                client.handler.update_status(False)
×
1139

1140
        return devices
×
1141

1142
    @classmethod
5✔
1143
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1144
        ret = {}
×
1145
        # add libusb
1146
        try:
×
1147
            import usb1
×
1148
        except Exception as e:
×
1149
            ret["libusb.version"] = None
×
1150
        else:
1151
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1152
            try:
×
1153
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1154
            except AttributeError:
×
1155
                ret["libusb.path"] = None
×
1156
        # add hidapi
1157
        try:
×
1158
            import hid  # noqa: F811
×
1159
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1160
        except Exception as e:
×
1161
            from importlib.metadata import version
×
1162
            try:
×
1163
                ret["hidapi.version"] = version("hidapi")
×
1164
            except ImportError:
×
1165
                ret["hidapi.version"] = None
×
1166
        return ret
×
1167

1168
    def trigger_pairings(
5✔
1169
            self,
1170
            keystores: Sequence['KeyStore'],
1171
            *,
1172
            allow_user_interaction: bool = True,
1173
            devices: Sequence['Device'] = None,
1174
    ) -> None:
1175
        """Given a list of keystores, try to pair each with a connected hardware device.
1176

1177
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1178
        try to pair each keystore individually. Consider the following scenario:
1179
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1180
        - assume none of the devices are paired yet
1181
        1. if we tried to individually pair keystores, we might try with ks1 first
1182
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1183
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1184
             which is confusing and error-prone. It's especially problematic if the hw device does
1185
             not support labels (such as Ledger), as then the user cannot easily distinguish
1186
             same-type devices. (see #4199)
1187
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1188
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1189
        """
1190
        from .keystore import Hardware_KeyStore
×
1191
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1192
        if not keystores:
×
1193
            return
×
1194
        if devices is None:
×
1195
            devices = self.scan_devices()
×
1196
        # first pair with all devices that can be auto-selected
1197
        for ks in keystores:
×
1198
            try:
×
1199
                ks.get_client(
×
1200
                    force_pair=True,
1201
                    allow_user_interaction=False,
1202
                    devices=devices,
1203
                )
1204
            except UserCancelled:
×
1205
                pass
×
1206
        if allow_user_interaction:
×
1207
            # now do manual selections
1208
            for ks in keystores:
×
1209
                try:
×
1210
                    ks.get_client(
×
1211
                        force_pair=True,
1212
                        allow_user_interaction=True,
1213
                        devices=devices,
1214
                    )
1215
                except UserCancelled:
×
1216
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc