• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5097000842035200

13 Apr 2025 03:08PM UTC coverage: 60.269%. First build
5097000842035200

push

CirrusCI

ecdsa
do not show auto-loaded plugins

4 of 9 new or added lines in 2 files covered. (44.44%)

21590 of 35823 relevant lines covered (60.27%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.05
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
5✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
5✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
245
        if not os.path.exists(pkg_path):
5✔
246
            os.mkdir(pkg_path)
5✔
247
        return pkg_path
5✔
248

249
    async def download_external_plugin(self, url):
5✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
253
        async with aiohttp.ClientSession() as session:
×
254
            async with session.get(url) as resp:
×
255
                if resp.status == 200:
×
256
                    with open(path, 'wb') as fd:
×
257
                        async for chunk in resp.content.iter_chunked(10):
×
258
                            fd.write(chunk)
×
259
        return path
×
260

261
    def read_manifest(self, path) -> dict:
5✔
262
        """ return json dict """
263
        with zipfile_lib.ZipFile(path) as file:
×
264
            for filename in file.namelist():
×
265
                if filename.endswith('manifest.json'):
×
266
                    break
×
267
            else:
268
                raise Exception('could not find manifest.json in zip archive')
×
269
            with file.open(filename, 'r') as f:
×
270
                manifest = json.load(f)
×
271
                manifest['path'] = path  # external, path of the zipfile
×
272
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
273
                manifest['is_zip'] = True
×
274
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
275
                return manifest
×
276

277
    def zip_plugin_path(self, name) -> str:
5✔
278
        path = self.get_metadata(name)['path']
×
279
        filename = os.path.basename(path)
×
280
        if name in self.internal_plugin_metadata:
×
281
            pkg_path = self.pkgpath
×
282
        else:
283
            pkg_path = self.get_external_plugin_dir()
×
284
        return os.path.join(pkg_path, filename)
×
285

286
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
287
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
288
        if pkg_path is None:
5✔
289
            return
×
290
        for filename in os.listdir(pkg_path):
5✔
291
            path = os.path.join(pkg_path, filename)
×
292
            if not filename.endswith('.zip'):
×
293
                continue
×
294
            try:
×
295
                d = self.read_manifest(path)
×
296
                name = d['name']
×
297
            except Exception:
×
298
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
299
                continue
×
300
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
301
                self.logger.info(f"duplicate plugins for {name=}")
×
302
                continue
×
303
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
304
                continue
×
305
            min_version = d.get('min_electrum_version')
×
306
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
307
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
308
                continue
×
309
            max_version = d.get('max_electrum_version')
×
310
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
311
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
312
                continue
×
313

314
            if not self.cmd_only:
×
315
                gui_good = self.gui_name in d.get('available_for', [])
×
316
                if not gui_good:
×
317
                    continue
×
318
                if 'fullname' not in d:
×
319
                    continue
×
320
                details = d.get('registers_keystore')
×
321
                if details:
×
322
                    self.register_keystore(name, gui_good, details)
×
323
            if external:
×
324
                self.external_plugin_metadata[name] = d
×
325
            else:
326
                self.internal_plugin_metadata[name] = d
×
327

328
    def get(self, name):
5✔
329
        return self.plugins.get(name)
×
330

331
    def count(self):
5✔
332
        return len(self.plugins)
×
333

334
    def load_plugin(self, name) -> 'BasePlugin':
5✔
335
        """Imports the code of the given plugin.
336
        note: can be called from any thread.
337
        """
338
        if self.get_metadata(name):
5✔
339
            return self.load_plugin_by_name(name)
5✔
340
        else:
341
            raise Exception(f"could not find plugin {name!r}")
×
342

343
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
344
        """Loads the __init__.py module of the plugin if it is not already loaded."""
345
        is_external = name in self.external_plugin_metadata
5✔
346
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
347
        if base_name not in sys.modules:
5✔
348
            metadata = self.get_metadata(name)
5✔
349
            is_zip = metadata.get('is_zip', False)
5✔
350
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
351
            if not is_zip:
5✔
352
                if is_external:
5✔
353
                    # this branch is deprecated: external plugins are always zip files
354
                    path = os.path.join(metadata['path'], '__init__.py')
×
355
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
356
                else:
357
                    init_spec = importlib.util.find_spec(base_name)
5✔
358
            else:
359
                zipfile = zipimport.zipimporter(metadata['path'])
×
360
                dirname = metadata['dirname']
×
361
                init_spec = zipfile.find_spec(dirname)
×
362

363
            self.exec_module_from_spec(init_spec, base_name)
5✔
364

365
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
366
        if name in self.plugins:
5✔
367
            return self.plugins[name]
5✔
368
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
369
        self.maybe_load_plugin_init_method(name)
5✔
370
        is_external = name in self.external_plugin_metadata
5✔
371
        if is_external and not self.is_authorized(name):
5✔
372
            self.logger.info(f'plugin not authorized {name}')
×
373
            return
×
374
        if not is_external:
5✔
375
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
376
        else:
377
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
378

379
        spec = importlib.util.find_spec(full_name)
5✔
380
        if spec is None:
5✔
381
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
382
        try:
5✔
383
            module = self.exec_module_from_spec(spec, full_name)
5✔
384
            plugin = module.Plugin(self, self.config, name)
5✔
385
        except Exception as e:
×
386
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
387
        self.add_jobs(plugin.thread_jobs())
5✔
388
        self.plugins[name] = plugin
5✔
389
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
390
        return plugin
5✔
391

392
    def close_plugin(self, plugin):
5✔
393
        self.remove_jobs(plugin.thread_jobs())
×
394

395
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
396
        from hashlib import pbkdf2_hmac
×
397
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
398
        return ECPrivkey(secret)
×
399

400
    def install_internal_plugin(self, name):
5✔
401
        self.config.set_key(f'plugins.{name}.enabled', [])
×
402

403
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
404
        self.external_plugin_metadata[name] = manifest
×
405
        self.authorize_plugin(name, path, privkey)
×
406

407
    def uninstall(self, name: str):
5✔
408
        self.config.set_key(f'plugins.{name}', None)
×
409
        if name in self.external_plugin_metadata:
×
410
            zipfile = self.zip_plugin_path(name)
×
411
            os.unlink(zipfile)
×
412
            self.external_plugin_metadata.pop(name)
×
413

414
    def is_internal(self, name) -> bool:
5✔
NEW
415
        return name in self.internal_plugin_metadata
×
416

417
    def is_auto_loaded(self, name):
5✔
NEW
418
        metadata = self.internal_plugin_metadata.get(name)
×
NEW
419
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
420

421
    def is_installed(self, name) -> bool:
5✔
422
        """an external plugin may be installed but not authorized """
423
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
424
            or name in self.external_plugin_metadata
425

426
    def is_authorized(self, name) -> bool:
5✔
427
        if name in self.internal_plugin_metadata:
×
428
            return True
×
429
        if name not in self.external_plugin_metadata:
×
430
            return False
×
431
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
432
        if not pubkey_bytes:
×
433
            return False
×
434
        if not self.is_plugin_zip(name):
×
435
            return False
×
436
        filename = self.zip_plugin_path(name)
×
437
        plugin_hash = get_file_hash256(filename)
×
438
        sig = self.config.get(f'plugins.{name}.authorized')
×
439
        if not sig:
×
440
            return False
×
441
        pubkey = ECPubkey(pubkey_bytes)
×
442
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
443

444
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
445
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
446
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
447
        plugin_hash = get_file_hash256(filename)
×
448
        sig = privkey.ecdsa_sign(plugin_hash)
×
449
        value = sig.hex()
×
450
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
451

452
    def _enabled_set(self, name):
5✔
453
        return self.config.get(f'plugins.{name}.enabled') or []
×
454

455
    def enable(self, name: str, wallet: 'Abstract_Wallet') -> 'BasePlugin':
5✔
456
        f = wallet.get_fingerprint()
×
457
        # fixme: obfuscate fingerprint
458
        _set = self._enabled_set(name)
×
459
        if f not in _set:
×
460
            _set.append(f)
×
461
        self.config.set_key(f'plugins.{name}.enabled', _set, save=True)
×
462
        p = self.get(name)
×
463
        if p:
×
464
            return p
×
465
        return self.load_plugin(name)
×
466

467
    def disable(self, name: str, wallet: 'Abstract_Wallet') -> None:
5✔
468
        f = wallet.get_fingerprint()
×
469
        _set = self._enabled_set(name)
×
470
        if f in _set:
×
471
            _set.remove(f)
×
472
        self.config.set_key(f'plugins.{name}.enabled', _set, save=True)
×
473
        p = self.get(name)
×
474
        if not p:
×
475
            return
×
476
        self.plugins.pop(name)
×
477
        p.close()
×
478
        self.logger.info(f"closed {name}")
×
479

480
    @classmethod
5✔
481
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
482
        return key.startswith('plugins.')
×
483

484
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
485
        d = self.descriptions.get(name)
×
486
        if not d:
×
487
            return False
×
488
        deps = d.get('requires', [])
×
489
        for dep, s in deps:
×
490
            try:
×
491
                __import__(dep)
×
492
            except ImportError as e:
×
493
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
494
                return False
×
495
        requires = d.get('requires_wallet_type', [])
×
496
        return not requires or wallet.wallet_type in requires
×
497

498
    def get_hardware_support(self):
5✔
499
        out = []
×
500
        for name, (gui_good, details) in self.hw_wallets.items():
×
501
            if gui_good:
×
502
                try:
×
503
                    p = self.get_plugin(name)
×
504
                    if p.is_available():
×
505
                        out.append(HardwarePluginToScan(name=name,
×
506
                                                        description=details[2],
507
                                                        plugin=p,
508
                                                        exception=None))
509
                except Exception as e:
×
510
                    self.logger.exception(f"cannot load plugin for: {name}")
×
511
                    out.append(HardwarePluginToScan(name=name,
×
512
                                                    description=details[2],
513
                                                    plugin=None,
514
                                                    exception=e))
515
        return out
×
516

517
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
518
        from .wallet import register_wallet_type, register_constructor
5✔
519
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
520

521
        def loader():
5✔
522
            plugin = self.get_plugin(name)
5✔
523
            register_constructor(wallet_type, plugin.wallet_class)
5✔
524
        register_wallet_type(wallet_type)
5✔
525
        plugin_loaders[wallet_type] = loader
5✔
526

527
    def register_keystore(self, name, gui_good, details):
5✔
528
        from .keystore import register_keystore
5✔
529

530
        def dynamic_constructor(d):
5✔
531
            return self.get_plugin(name).keystore_class(d)
5✔
532
        if details[0] == 'hardware':
5✔
533
            self.hw_wallets[name] = (gui_good, details)
5✔
534
            self.logger.info(f"registering hardware {name}: {details}")
5✔
535
            register_keystore(details[1], dynamic_constructor)
5✔
536

537
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
538
        if name not in self.plugins:
5✔
539
            self.load_plugin(name)
5✔
540
        return self.plugins[name]
5✔
541

542
    def is_plugin_zip(self, name: str) -> bool:
5✔
543
        """Returns True if the plugin is a zip file"""
544
        if (metadata := self.get_metadata(name)) is None:
×
545
            return False
×
546
        return metadata.get('is_zip', False)
×
547

548
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
549
        """Returns the metadata of the plugin"""
550
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
551
        if not metadata:
5✔
552
            return None
×
553
        return metadata
5✔
554

555
    def run(self):
5✔
556
        while self.is_running():
5✔
557
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
558
            self.run_jobs()
5✔
559
        self.on_stop()
5✔
560

561

562
def get_file_hash256(path: str) -> bytes:
5✔
563
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
564
    with open(path, 'rb') as f:
×
565
        return sha256(f.read())
×
566

567

568
def hook(func):
5✔
569
    hook_names.add(func.__name__)
5✔
570
    return func
5✔
571

572

573
def run_hook(name, wallet: Optional['Abstract_Wallet'], *args):
5✔
574
    """ if wallet is None, plugin is considered enabled """
575
    results = []
5✔
576
    f_list = hooks.get(name, [])
5✔
577
    for p, f in f_list:
5✔
578
        if wallet is None or p.is_enabled(wallet):
5✔
579
            try:
5✔
580
                r = f(wallet, *args)
5✔
581
            except Exception:
×
582
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
583
                r = False
×
584
            if r:
5✔
585
                results.append(r)
×
586

587
    if results:
5✔
588
        assert len(results) == 1, results
×
589
        return results[0]
×
590

591

592
class BasePlugin(Logger):
5✔
593

594
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
595
        self.parent = parent  # type: Plugins  # The plugins object
5✔
596
        self.name = name
5✔
597
        self.config = config
5✔
598
        #self.wallet = None  # fixme: this field should not exist
599
        Logger.__init__(self)
5✔
600
        # add self to hooks
601
        for k in dir(self):
5✔
602
            if k in hook_names:
5✔
603
                l = hooks.get(k, [])
5✔
604
                l.append((self, getattr(self, k)))
5✔
605
                hooks[k] = l
5✔
606

607
    def __str__(self):
5✔
608
        return self.name
×
609

610
    def close(self):
5✔
611
        # remove self from hooks
612
        for attr_name in dir(self):
×
613
            if attr_name in hook_names:
×
614
                # found attribute in self that is also the name of a hook
615
                l = hooks.get(attr_name, [])
×
616
                try:
×
617
                    l.remove((self, getattr(self, attr_name)))
×
618
                except ValueError:
×
619
                    # maybe attr name just collided with hook name and was not hook
620
                    continue
×
621
                hooks[attr_name] = l
×
622
        self.parent.close_plugin(self)
×
623
        self.on_close()
×
624

625
    def on_close(self):
5✔
626
        pass
×
627

628
    def requires_settings(self) -> bool:
5✔
629
        return False
×
630

631
    def thread_jobs(self):
5✔
632
        return []
5✔
633

634
    def is_enabled(self, wallet: 'Abstract_Wallet'):
5✔
635
        if wallet is None:
×
636
            return False
×
637
        f = wallet.get_fingerprint()
×
638
        _set = self.config.get(f'plugins.{self.name}.enabled')
×
639
        if type(_set) != list:
×
640
            _set = []
×
641
        return self.is_available() and f in _set
×
642

643
    def is_available(self):
5✔
644
        return True
×
645

646
    def can_user_disable(self):
5✔
647
        return True
×
648

649
    def settings_widget(self, window):
5✔
650
        raise NotImplementedError()
×
651

652
    def settings_dialog(self, window):
5✔
653
        raise NotImplementedError()
×
654

655
    def read_file(self, filename: str) -> bytes:
5✔
656
        if self.parent.is_plugin_zip(self.name):
×
657
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
658
            metadata = self.parent.external_plugin_metadata[self.name]
×
659
            dirname = metadata['dirname']
×
660
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
661
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
662
                    return myfile.read()
×
663
        else:
NEW
664
            assert self.name in self.parent.internal_plugin_metadata
×
NEW
665
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
666
            #else:
667
            #    path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
668
            with open(path, 'rb') as myfile:
×
669
                return myfile.read()
×
670

671

672
class DeviceUnpairableError(UserFacingException): pass
5✔
673
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
674
class CannotAutoSelectDevice(Exception): pass
5✔
675

676

677
class Device(NamedTuple):
5✔
678
    path: Union[str, bytes]
5✔
679
    interface_number: int
5✔
680
    id_: str
5✔
681
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
682
    usage_page: int
5✔
683
    transport_ui_string: str
5✔
684

685

686
class DeviceInfo(NamedTuple):
5✔
687
    device: Device
5✔
688
    label: Optional[str] = None
5✔
689
    initialized: Optional[bool] = None
5✔
690
    exception: Optional[Exception] = None
5✔
691
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
692
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
693
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
694

695

696
class HardwarePluginToScan(NamedTuple):
5✔
697
    name: str
5✔
698
    description: str
5✔
699
    plugin: Optional['HW_PluginBase']
5✔
700
    exception: Optional[Exception]
5✔
701

702

703
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
704

705

706
# hidapi is not thread-safe
707
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
708
#     https://github.com/libusb/hidapi/issues/45
709
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
710
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
711
# It is not entirely clear to me, exactly what is safe and what isn't, when
712
# using multiple threads...
713
# Hence, we use a single thread for all device communications, including
714
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
715
# the following thread:
716
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
717
    max_workers=1,
718
    thread_name_prefix='hwd_comms_thread'
719
)
720

721
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
722
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
723
# To keep it simple, let's just import it now, as we are likely in the main thread here.
724
if threading.current_thread() is not threading.main_thread():
5✔
725
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
726
try:
5✔
727
    import hid
5✔
728
except ImportError:
5✔
729
    pass
5✔
730

731

732
T = TypeVar('T')
5✔
733

734

735
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
736
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
737
        return func()
×
738
    else:
739
        fut = _hwd_comms_executor.submit(func)
×
740
        return fut.result()
×
741
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
742

743

744
def runs_in_hwd_thread(func):
5✔
745
    @wraps(func)
5✔
746
    def wrapper(*args, **kwargs):
5✔
747
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
748
    return wrapper
5✔
749

750

751
def assert_runs_in_hwd_thread():
5✔
752
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
753
        raise Exception("must only be called from HWD communication thread")
×
754

755

756
class DeviceMgr(ThreadJob):
5✔
757
    """Manages hardware clients.  A client communicates over a hardware
758
    channel with the device.
759

760
    In addition to tracking device HID IDs, the device manager tracks
761
    hardware wallets and manages wallet pairing.  A HID ID may be
762
    paired with a wallet when it is confirmed that the hardware device
763
    matches the wallet, i.e. they have the same master public key.  A
764
    HID ID can be unpaired if e.g. it is wiped.
765

766
    Because of hotplugging, a wallet must request its client
767
    dynamically each time it is required, rather than caching it
768
    itself.
769

770
    The device manager is shared across plugins, so just one place
771
    does hardware scans when needed.  By tracking HID IDs, if a device
772
    is plugged into a different port the wallet is automatically
773
    re-paired.
774

775
    Wallets are informed on connect / disconnect events.  It must
776
    implement connected(), disconnected() callbacks.  Being connected
777
    implies a pairing.  Callbacks can happen in any thread context,
778
    and we do them without holding the lock.
779

780
    Confusingly, the HID ID (serial number) reported by the HID system
781
    doesn't match the device ID reported by the device itself.  We use
782
    the HID IDs.
783

784
    This plugin is thread-safe.  Currently only devices supported by
785
    hidapi are implemented."""
786

787
    def __init__(self, config: SimpleConfig):
5✔
788
        ThreadJob.__init__(self)
5✔
789
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
790
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
791
        # A client->id_ map. Needs self.lock.
792
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
793
        # What we recognise.  (vendor_id, product_id) -> Plugin
794
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
795
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
796
        # Custom enumerate functions for devices we don't know about.
797
        self._enumerate_func = set()  # Needs self.lock.
5✔
798

799
        self.lock = threading.RLock()
5✔
800

801
        self.config = config
5✔
802

803
    def thread_jobs(self):
5✔
804
        # Thread job to handle device timeouts
805
        return [self]
5✔
806

807
    def run(self):
5✔
808
        '''Handle device timeouts.  Runs in the context of the Plugins
809
        thread.'''
810
        with self.lock:
5✔
811
            clients = list(self.clients.keys())
5✔
812
        cutoff = time.time() - self.config.get_session_timeout()
5✔
813
        for client in clients:
5✔
814
            client.timeout(cutoff)
×
815

816
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
817
        for pair in device_pairs:
×
818
            self._recognised_hardware[pair] = plugin
×
819

820
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
821
        for vendor_id in vendor_ids:
×
822
            self._recognised_vendor[vendor_id] = plugin
×
823

824
    def register_enumerate_func(self, func):
5✔
825
        with self.lock:
×
826
            self._enumerate_func.add(func)
×
827

828
    @runs_in_hwd_thread
5✔
829
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
830
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
831
        # Get from cache first
832
        client = self._client_by_id(device.id_)
×
833
        if client:
×
834
            return client
×
835
        client = plugin.create_client(device, handler)
×
836
        if client:
×
837
            self.logger.info(f"Registering {client}")
×
838
            with self.lock:
×
839
                self.clients[client] = device.id_
×
840
        return client
×
841

842
    def id_by_pairing_code(self, pairing_code):
5✔
843
        with self.lock:
×
844
            return self.pairing_code_to_id.get(pairing_code)
×
845

846
    def pairing_code_by_id(self, id_):
5✔
847
        with self.lock:
×
848
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
849
                if id2 == id_:
×
850
                    return pairing_code
×
851
            return None
×
852

853
    def unpair_pairing_code(self, pairing_code):
5✔
854
        with self.lock:
×
855
            if pairing_code not in self.pairing_code_to_id:
×
856
                return
×
857
            _id = self.pairing_code_to_id.pop(pairing_code)
×
858
        self._close_client(_id)
×
859

860
    def unpair_id(self, id_):
5✔
861
        pairing_code = self.pairing_code_by_id(id_)
×
862
        if pairing_code:
×
863
            self.unpair_pairing_code(pairing_code)
×
864
        else:
865
            self._close_client(id_)
×
866

867
    def _close_client(self, id_):
5✔
868
        with self.lock:
×
869
            client = self._client_by_id(id_)
×
870
            self.clients.pop(client, None)
×
871
        if client:
×
872
            client.close()
×
873

874
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
875
        with self.lock:
×
876
            for client, client_id in self.clients.items():
×
877
                if client_id == id_:
×
878
                    return client
×
879
        return None
×
880

881
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
882
        '''Returns a client for the device ID if one is registered.  If
883
        a device is wiped or in bootloader mode pairing is impossible;
884
        in such cases we communicate by device ID and not wallet.'''
885
        if scan_now:
×
886
            self.scan_devices()
×
887
        return self._client_by_id(id_)
×
888

889
    @runs_in_hwd_thread
5✔
890
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
891
                            keystore: 'Hardware_KeyStore',
892
                            force_pair: bool, *,
893
                            devices: Sequence['Device'] = None,
894
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
895
        self.logger.info("getting client for keystore")
×
896
        if handler is None:
×
897
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
898
        handler.update_status(False)
×
899
        pcode = keystore.pairing_code()
×
900
        client = None
×
901
        # search existing clients first (fast-path)
902
        if not devices:
×
903
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
904
        # search clients again, now allowing a (slow) scan
905
        if client is None:
×
906
            if devices is None:
×
907
                devices = self.scan_devices()
×
908
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
909
        if client is None and force_pair:
×
910
            try:
×
911
                info = self.select_device(plugin, handler, keystore, devices,
×
912
                                          allow_user_interaction=allow_user_interaction)
913
            except CannotAutoSelectDevice:
×
914
                pass
×
915
            else:
916
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
917
        if client:
×
918
            handler.update_status(True)
×
919
            # note: if select_device was called, we might also update label etc here:
920
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
921
        self.logger.info("end client for keystore")
×
922
        return client
×
923

924
    def client_by_pairing_code(
5✔
925
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
926
        devices: Sequence['Device'],
927
    ) -> Optional['HardwareClientBase']:
928
        _id = self.id_by_pairing_code(pairing_code)
×
929
        client = self._client_by_id(_id)
×
930
        if client:
×
931
            if type(client.plugin) != type(plugin):
×
932
                return
×
933
            # An unpaired client might have another wallet's handler
934
            # from a prior scan.  Replace to fix dialog parenting.
935
            client.handler = handler
×
936
            return client
×
937

938
        for device in devices:
×
939
            if device.id_ == _id:
×
940
                return self.create_client(device, handler, plugin)
×
941

942
    def force_pair_keystore(
5✔
943
        self,
944
        *,
945
        plugin: 'HW_PluginBase',
946
        handler: 'HardwareHandlerBase',
947
        info: 'DeviceInfo',
948
        keystore: 'Hardware_KeyStore',
949
    ) -> 'HardwareClientBase':
950
        xpub = keystore.xpub
×
951
        derivation = keystore.get_derivation_prefix()
×
952
        assert derivation is not None
×
953
        xtype = bip32.xpub_type(xpub)
×
954
        client = self._client_by_id(info.device.id_)
×
955
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
956
            # See comment above for same code
957
            client.handler = handler
×
958
            # This will trigger a PIN/passphrase entry request
959
            try:
×
960
                client_xpub = client.get_xpub(derivation, xtype)
×
961
            except (UserCancelled, RuntimeError):
×
962
                # Bad / cancelled PIN / passphrase
963
                client_xpub = None
×
964
            if client_xpub == xpub:
×
965
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
966
                with self.lock:
×
967
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
968
                return client
×
969

970
        # The user input has wrong PIN or passphrase, or cancelled input,
971
        # or it is not pairable
972
        raise DeviceUnpairableError(
×
973
            _('Electrum cannot pair with your {}.\n\n'
974
              'Before you request bitcoins to be sent to addresses in this '
975
              'wallet, ensure you can pair with your device, or that you have '
976
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
977
              'receive will be unspendable.').format(plugin.device))
978

979
    def list_pairable_device_infos(
5✔
980
        self,
981
        *,
982
        handler: Optional['HardwareHandlerBase'],
983
        plugin: 'HW_PluginBase',
984
        devices: Sequence['Device'] = None,
985
        include_failing_clients: bool = False,
986
    ) -> List['DeviceInfo']:
987
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
988
        Already paired devices are also included, as it is okay to reuse them.
989
        """
990
        if not plugin.libraries_available:
×
991
            message = plugin.get_library_not_available_message()
×
992
            raise HardwarePluginLibraryUnavailable(message)
×
993
        if devices is None:
×
994
            devices = self.scan_devices()
×
995
        infos = []
×
996
        for device in devices:
×
997
            if not plugin.can_recognize_device(device):
×
998
                continue
×
999
            try:
×
1000
                client = self.create_client(device, handler, plugin)
×
1001
                if not client:
×
1002
                    continue
×
1003
                label = client.label()
×
1004
                is_initialized = client.is_initialized()
×
1005
                soft_device_id = client.get_soft_device_id()
×
1006
                model_name = client.device_model_name()
×
1007
            except Exception as e:
×
1008
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1009
                if include_failing_clients:
×
1010
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1011
                continue
×
1012
            infos.append(DeviceInfo(device=device,
×
1013
                                    label=label,
1014
                                    initialized=is_initialized,
1015
                                    plugin_name=plugin.name,
1016
                                    soft_device_id=soft_device_id,
1017
                                    model_name=model_name))
1018

1019
        return infos
×
1020

1021
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1022
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1023
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1024
        """Select the device to use for keystore."""
1025
        # ideally this should not be called from the GUI thread...
1026
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1027
        while True:
×
1028
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1029
            if infos:
×
1030
                break
×
1031
            if not allow_user_interaction:
×
1032
                raise CannotAutoSelectDevice()
×
1033
            msg = _('Please insert your {}').format(plugin.device)
×
1034
            msg += " ("
×
1035
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1036
                msg += f"label: {keystore.label}, "
×
1037
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1038
            msg += ').\n\n{}\n\n{}'.format(
×
1039
                _('Verify the cable is connected and that '
1040
                  'no other application is using it.'),
1041
                _('Try to connect again?')
1042
            )
1043
            if not handler.yes_no_question(msg):
×
1044
                raise UserCancelled()
×
1045
            devices = None
×
1046

1047
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1048
        # method 1: select device by id
1049
        if keystore.soft_device_id:
×
1050
            for info in infos:
×
1051
                if info.soft_device_id == keystore.soft_device_id:
×
1052
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1053
                    return info
×
1054
        # method 2: select device by label
1055
        #           but only if not a placeholder label and only if there is no collision
1056
        device_labels = [info.label for info in infos]
×
1057
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1058
                and device_labels.count(keystore.label) == 1):
1059
            for info in infos:
×
1060
                if info.label == keystore.label:
×
1061
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1062
                    return info
×
1063
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1064
        #           saved for keystore anyway, select it
1065
        if (len(infos) == 1
×
1066
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1067
                and keystore.soft_device_id is None):
1068
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1069
            return infos[0]
×
1070

1071
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1072
        if not allow_user_interaction:
×
1073
            raise CannotAutoSelectDevice()
×
1074
        # ask user to select device manually
1075
        msg = (
×
1076
                _("Could not automatically pair with device for given keystore.") + "\n"
1077
                + f"(keystore label: {keystore.label!r}, "
1078
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1079
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1080
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1081
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1082
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1083
                                init=(_("initialized") if info.initialized else _("wiped")),
1084
                                transport=info.device.transport_ui_string,
1085
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1086
                        for info in infos]
1087
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1088
                          f"num options: {len(infos)}. options: {infos}")
1089
        c = handler.query_choice(msg, descriptions)
×
1090
        if c is None:
×
1091
            raise UserCancelled()
×
1092
        info = infos[c]
×
1093
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1094
        # note: updated label/soft_device_id will be saved after pairing succeeds
1095
        return info
×
1096

1097
    @runs_in_hwd_thread
5✔
1098
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1099
        try:
×
1100
            import hid  # noqa: F811
×
1101
        except ImportError:
×
1102
            return []
×
1103

1104
        devices = []
×
1105
        for d in hid.enumerate(0, 0):
×
1106
            vendor_id = d['vendor_id']
×
1107
            product_key = (vendor_id, d['product_id'])
×
1108
            plugin = None
×
1109
            if product_key in self._recognised_hardware:
×
1110
                plugin = self._recognised_hardware[product_key]
×
1111
            elif vendor_id in self._recognised_vendor:
×
1112
                plugin = self._recognised_vendor[vendor_id]
×
1113
            if plugin:
×
1114
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1115
                if device:
×
1116
                    devices.append(device)
×
1117
        return devices
×
1118

1119
    @runs_in_hwd_thread
5✔
1120
    @profiler
5✔
1121
    def scan_devices(self) -> Sequence['Device']:
5✔
1122
        self.logger.info("scanning devices...")
×
1123

1124
        # First see what's connected that we know about
1125
        devices = self._scan_devices_with_hid()
×
1126

1127
        # Let plugin handlers enumerate devices we don't know about
1128
        with self.lock:
×
1129
            enumerate_funcs = list(self._enumerate_func)
×
1130
        for f in enumerate_funcs:
×
1131
            try:
×
1132
                new_devices = f()
×
1133
            except BaseException as e:
×
1134
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1135
            else:
1136
                devices.extend(new_devices)
×
1137

1138
        # find out what was disconnected
1139
        client_ids = [dev.id_ for dev in devices]
×
1140
        disconnected_clients = []
×
1141
        with self.lock:
×
1142
            connected = {}
×
1143
            for client, id_ in self.clients.items():
×
1144
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1145
                    connected[client] = id_
×
1146
                else:
1147
                    disconnected_clients.append((client, id_))
×
1148
            self.clients = connected
×
1149

1150
        # Unpair disconnected devices
1151
        for client, id_ in disconnected_clients:
×
1152
            self.unpair_id(id_)
×
1153
            if client.handler:
×
1154
                client.handler.update_status(False)
×
1155

1156
        return devices
×
1157

1158
    @classmethod
5✔
1159
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1160
        ret = {}
×
1161
        # add libusb
1162
        try:
×
1163
            import usb1
×
1164
        except Exception as e:
×
1165
            ret["libusb.version"] = None
×
1166
        else:
1167
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1168
            try:
×
1169
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1170
            except AttributeError:
×
1171
                ret["libusb.path"] = None
×
1172
        # add hidapi
1173
        try:
×
1174
            import hid  # noqa: F811
×
1175
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1176
        except Exception as e:
×
1177
            from importlib.metadata import version
×
1178
            try:
×
1179
                ret["hidapi.version"] = version("hidapi")
×
1180
            except ImportError:
×
1181
                ret["hidapi.version"] = None
×
1182
        return ret
×
1183

1184
    def trigger_pairings(
5✔
1185
            self,
1186
            keystores: Sequence['KeyStore'],
1187
            *,
1188
            allow_user_interaction: bool = True,
1189
            devices: Sequence['Device'] = None,
1190
    ) -> None:
1191
        """Given a list of keystores, try to pair each with a connected hardware device.
1192

1193
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1194
        try to pair each keystore individually. Consider the following scenario:
1195
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1196
        - assume none of the devices are paired yet
1197
        1. if we tried to individually pair keystores, we might try with ks1 first
1198
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1199
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1200
             which is confusing and error-prone. It's especially problematic if the hw device does
1201
             not support labels (such as Ledger), as then the user cannot easily distinguish
1202
             same-type devices. (see #4199)
1203
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1204
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1205
        """
1206
        from .keystore import Hardware_KeyStore
×
1207
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1208
        if not keystores:
×
1209
            return
×
1210
        if devices is None:
×
1211
            devices = self.scan_devices()
×
1212
        # first pair with all devices that can be auto-selected
1213
        for ks in keystores:
×
1214
            try:
×
1215
                ks.get_client(
×
1216
                    force_pair=True,
1217
                    allow_user_interaction=False,
1218
                    devices=devices,
1219
                )
1220
            except UserCancelled:
×
1221
                pass
×
1222
        if allow_user_interaction:
×
1223
            # now do manual selections
1224
            for ks in keystores:
×
1225
                try:
×
1226
                    ks.get_client(
×
1227
                        force_pair=True,
1228
                        allow_user_interaction=True,
1229
                        devices=devices,
1230
                    )
1231
                except UserCancelled:
×
1232
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc