• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5928096156614656

06 May 2025 02:21PM UTC coverage: 59.729% (-0.02%) from 59.746%
5928096156614656

Pull #9745

CirrusCI

ecdsa
hardware wallets: show address on device also from tx dialog
Pull Request #9745: hardware wallets: show address on device also from tx dialog

0 of 11 new or added lines in 1 file covered. (0.0%)

599 existing lines in 6 files now uncovered.

21496 of 35989 relevant lines covered (59.73%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.45
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
5✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
5✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
245
        if not os.path.exists(pkg_path):
5✔
246
            os.mkdir(pkg_path)
5✔
247
        return pkg_path
5✔
248

249
    async def download_external_plugin(self, url: str) -> str:
5✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
253
        if os.path.exists(path):
×
254
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
255
        async with aiohttp.ClientSession() as session:
×
256
            async with session.get(url) as resp:
×
257
                if resp.status == 200:
×
258
                    with open(path, 'wb') as fd:
×
259
                        async for chunk in resp.content.iter_chunked(10):
×
260
                            fd.write(chunk)
×
261
        return path
×
262

263
    def read_manifest(self, path) -> dict:
5✔
264
        """ return json dict """
265
        with zipfile_lib.ZipFile(path) as file:
×
266
            for filename in file.namelist():
×
267
                if filename.endswith('manifest.json'):
×
268
                    break
×
269
            else:
270
                raise Exception('could not find manifest.json in zip archive')
×
271
            with file.open(filename, 'r') as f:
×
272
                manifest = json.load(f)
×
273
                manifest['path'] = path  # external, path of the zipfile
×
274
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
275
                manifest['is_zip'] = True
×
276
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
277
                return manifest
×
278

279
    def zip_plugin_path(self, name) -> str:
5✔
280
        path = self.get_metadata(name)['path']
×
281
        filename = os.path.basename(path)
×
282
        if name in self.internal_plugin_metadata:
×
283
            pkg_path = self.pkgpath
×
284
        else:
285
            pkg_path = self.get_external_plugin_dir()
×
286
        return os.path.join(pkg_path, filename)
×
287

288
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
289
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
290
        if pkg_path is None:
5✔
291
            return
×
292
        for filename in os.listdir(pkg_path):
5✔
293
            path = os.path.join(pkg_path, filename)
×
294
            if not filename.endswith('.zip'):
×
295
                continue
×
296
            try:
×
297
                d = self.read_manifest(path)
×
298
                name = d['name']
×
299
            except Exception:
×
300
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
301
                continue
×
302
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
303
                self.logger.info(f"duplicate plugins for {name=}")
×
304
                continue
×
305
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
306
                continue
×
307
            min_version = d.get('min_electrum_version')
×
308
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
309
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
310
                continue
×
311
            max_version = d.get('max_electrum_version')
×
312
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
313
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
314
                continue
×
315

316
            if not self.cmd_only:
×
317
                gui_good = self.gui_name in d.get('available_for', [])
×
318
                if not gui_good:
×
319
                    continue
×
320
                if 'fullname' not in d:
×
321
                    continue
×
322
                details = d.get('registers_keystore')
×
323
                if details:
×
324
                    self.register_keystore(name, gui_good, details)
×
325
            if external:
×
326
                self.external_plugin_metadata[name] = d
×
327
            else:
328
                self.internal_plugin_metadata[name] = d
×
329

330
    def get(self, name):
5✔
331
        return self.plugins.get(name)
×
332

333
    def count(self):
5✔
334
        return len(self.plugins)
×
335

336
    def load_plugin(self, name) -> 'BasePlugin':
5✔
337
        """Imports the code of the given plugin.
338
        note: can be called from any thread.
339
        """
340
        if self.get_metadata(name):
5✔
341
            return self.load_plugin_by_name(name)
5✔
342
        else:
343
            raise Exception(f"could not find plugin {name!r}")
×
344

345
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
346
        """Loads the __init__.py module of the plugin if it is not already loaded."""
347
        is_external = name in self.external_plugin_metadata
5✔
348
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
349
        if base_name not in sys.modules:
5✔
350
            metadata = self.get_metadata(name)
5✔
351
            is_zip = metadata.get('is_zip', False)
5✔
352
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
353
            if not is_zip:
5✔
354
                if is_external:
5✔
355
                    # this branch is deprecated: external plugins are always zip files
356
                    path = os.path.join(metadata['path'], '__init__.py')
×
357
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
358
                else:
359
                    init_spec = importlib.util.find_spec(base_name)
5✔
360
            else:
361
                zipfile = zipimport.zipimporter(metadata['path'])
×
362
                dirname = metadata['dirname']
×
363
                init_spec = zipfile.find_spec(dirname)
×
364

365
            self.exec_module_from_spec(init_spec, base_name)
5✔
366

367
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
368
        if name in self.plugins:
5✔
369
            return self.plugins[name]
5✔
370
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
371
        self.maybe_load_plugin_init_method(name)
5✔
372
        is_external = name in self.external_plugin_metadata
5✔
373
        if is_external and not self.is_authorized(name):
5✔
374
            self.logger.info(f'plugin not authorized {name}')
×
375
            return
×
376
        if not is_external:
5✔
377
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
378
        else:
379
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
380

381
        spec = importlib.util.find_spec(full_name)
5✔
382
        if spec is None:
5✔
383
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
384
        try:
5✔
385
            module = self.exec_module_from_spec(spec, full_name)
5✔
386
            plugin = module.Plugin(self, self.config, name)
5✔
387
        except Exception as e:
×
388
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
389
        self.add_jobs(plugin.thread_jobs())
5✔
390
        self.plugins[name] = plugin
5✔
391
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
392
        return plugin
5✔
393

394
    def close_plugin(self, plugin):
5✔
395
        self.remove_jobs(plugin.thread_jobs())
×
396

397
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
398
        from hashlib import pbkdf2_hmac
×
399
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
400
        return ECPrivkey(secret)
×
401

402
    def install_internal_plugin(self, name):
5✔
403
        self.config.set_key(f'plugins.{name}.enabled', [])
×
404

405
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
406
        # uninstall old version first to get rid of old zip files when updating plugin
407
        self.uninstall(name)
×
408
        self.external_plugin_metadata[name] = manifest
×
409
        self.authorize_plugin(name, path, privkey)
×
410

411
    def uninstall(self, name: str):
5✔
412
        self.config.set_key(f'plugins.{name}', None)
×
413
        if name in self.external_plugin_metadata:
×
414
            zipfile = self.zip_plugin_path(name)
×
415
            os.unlink(zipfile)
×
416
            self.external_plugin_metadata.pop(name)
×
417

418
    def is_internal(self, name) -> bool:
5✔
419
        return name in self.internal_plugin_metadata
×
420

421
    def is_auto_loaded(self, name):
5✔
422
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
423
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
424

425
    def is_installed(self, name) -> bool:
5✔
426
        """an external plugin may be installed but not authorized """
427
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
428
            or name in self.external_plugin_metadata
429

430
    def is_authorized(self, name) -> bool:
5✔
431
        if name in self.internal_plugin_metadata:
×
432
            return True
×
433
        if name not in self.external_plugin_metadata:
×
434
            return False
×
435
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
436
        if not pubkey_bytes:
×
437
            return False
×
438
        if not self.is_plugin_zip(name):
×
439
            return False
×
440
        filename = self.zip_plugin_path(name)
×
441
        plugin_hash = get_file_hash256(filename)
×
442
        sig = self.config.get(f'plugins.{name}.authorized')
×
443
        if not sig:
×
444
            return False
×
445
        pubkey = ECPubkey(pubkey_bytes)
×
446
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
447

448
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
449
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
450
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
451
        plugin_hash = get_file_hash256(filename)
×
452
        sig = privkey.ecdsa_sign(plugin_hash)
×
453
        value = sig.hex()
×
454
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
455

456
    def enable(self, name: str) -> 'BasePlugin':
5✔
457
        self.config.enable_plugin(name)
×
458
        p = self.get(name)
×
459
        if p:
×
460
            return p
×
461
        return self.load_plugin(name)
×
462

463
    def disable(self, name: str) -> None:
5✔
464
        self.config.disable_plugin(name)
×
465
        p = self.get(name)
×
466
        if not p:
×
467
            return
×
468
        self.plugins.pop(name)
×
469
        p.close()
×
470
        self.logger.info(f"closed {name}")
×
471

472
    @classmethod
5✔
473
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
474
        return key.startswith('plugins.')
×
475

476
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
477
        d = self.descriptions.get(name)
×
478
        if not d:
×
479
            return False
×
480
        deps = d.get('requires', [])
×
481
        for dep, s in deps:
×
482
            try:
×
483
                __import__(dep)
×
484
            except ImportError as e:
×
485
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
486
                return False
×
487
        requires = d.get('requires_wallet_type', [])
×
488
        return not requires or wallet.wallet_type in requires
×
489

490
    def get_hardware_support(self):
5✔
491
        out = []
×
492
        for name, (gui_good, details) in self.hw_wallets.items():
×
493
            if gui_good:
×
494
                try:
×
495
                    p = self.get_plugin(name)
×
496
                    if p.is_available():
×
497
                        out.append(HardwarePluginToScan(name=name,
×
498
                                                        description=details[2],
499
                                                        plugin=p,
500
                                                        exception=None))
501
                except Exception as e:
×
502
                    self.logger.exception(f"cannot load plugin for: {name}")
×
503
                    out.append(HardwarePluginToScan(name=name,
×
504
                                                    description=details[2],
505
                                                    plugin=None,
506
                                                    exception=e))
507
        return out
×
508

509
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
510
        from .wallet import register_wallet_type, register_constructor
5✔
511
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
512

513
        def loader():
5✔
514
            plugin = self.get_plugin(name)
5✔
515
            register_constructor(wallet_type, plugin.wallet_class)
5✔
516
        register_wallet_type(wallet_type)
5✔
517
        plugin_loaders[wallet_type] = loader
5✔
518

519
    def register_keystore(self, name, gui_good, details):
5✔
520
        from .keystore import register_keystore
5✔
521

522
        def dynamic_constructor(d):
5✔
523
            return self.get_plugin(name).keystore_class(d)
5✔
524
        if details[0] == 'hardware':
5✔
525
            self.hw_wallets[name] = (gui_good, details)
5✔
526
            self.logger.info(f"registering hardware {name}: {details}")
5✔
527
            register_keystore(details[1], dynamic_constructor)
5✔
528

529
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
530
        if name not in self.plugins:
5✔
531
            self.load_plugin(name)
5✔
532
        return self.plugins[name]
5✔
533

534
    def is_plugin_zip(self, name: str) -> bool:
5✔
535
        """Returns True if the plugin is a zip file"""
536
        if (metadata := self.get_metadata(name)) is None:
×
537
            return False
×
538
        return metadata.get('is_zip', False)
×
539

540
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
541
        """Returns the metadata of the plugin"""
542
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
543
        if not metadata:
5✔
544
            return None
×
545
        return metadata
5✔
546

547
    def run(self):
5✔
548
        while self.is_running():
5✔
549
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
550
            self.run_jobs()
5✔
551
        self.on_stop()
5✔
552

553
    def read_file(self, name: str, filename: str) -> bytes:
5✔
554
        if self.is_plugin_zip(name):
×
555
            plugin_filename = self.zip_plugin_path(name)
×
556
            metadata = self.external_plugin_metadata[name]
×
557
            dirname = metadata['dirname']
×
558
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
559
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
560
                    return myfile.read()
×
561
        else:
562
            assert name in self.internal_plugin_metadata
×
563
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
564
            with open(path, 'rb') as myfile:
×
565
                return myfile.read()
×
566

567
def get_file_hash256(path: str) -> bytes:
5✔
568
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
569
    with open(path, 'rb') as f:
×
570
        return sha256(f.read())
×
571

572

573
def hook(func):
5✔
574
    hook_names.add(func.__name__)
5✔
575
    return func
5✔
576

577

578
def run_hook(name, *args):
5✔
579
    results = []
5✔
580
    f_list = hooks.get(name, [])
5✔
581
    for p, f in f_list:
5✔
582
        if p.is_enabled():
5✔
583
            try:
5✔
584
                r = f(*args)
5✔
585
            except Exception:
×
586
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
587
                r = False
×
588
            if r:
5✔
589
                results.append(r)
×
590

591
    if results:
5✔
592
        assert len(results) == 1, results
×
593
        return results[0]
×
594

595

596
class BasePlugin(Logger):
5✔
597

598
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
599
        self.parent = parent  # type: Plugins  # The plugins object
5✔
600
        self.name = name
5✔
601
        self.config = config
5✔
602
        Logger.__init__(self)
5✔
603
        # add self to hooks
604
        for k in dir(self):
5✔
605
            if k in hook_names:
5✔
606
                l = hooks.get(k, [])
5✔
607
                l.append((self, getattr(self, k)))
5✔
608
                hooks[k] = l
5✔
609

610
    def __str__(self):
5✔
611
        return self.name
×
612

613
    def close(self):
5✔
614
        # remove self from hooks
615
        for attr_name in dir(self):
×
616
            if attr_name in hook_names:
×
617
                # found attribute in self that is also the name of a hook
618
                l = hooks.get(attr_name, [])
×
619
                try:
×
620
                    l.remove((self, getattr(self, attr_name)))
×
621
                except ValueError:
×
622
                    # maybe attr name just collided with hook name and was not hook
623
                    continue
×
624
                hooks[attr_name] = l
×
625
        self.parent.close_plugin(self)
×
626
        self.on_close()
×
627

628
    def on_close(self):
5✔
629
        pass
×
630

631
    def requires_settings(self) -> bool:
5✔
632
        return False
×
633

634
    def thread_jobs(self):
5✔
635
        return []
5✔
636

637
    def is_enabled(self):
5✔
638
        if not self.is_available():
×
639
            return False
×
640
        return self.config.is_plugin_enabled(self.name)
×
641

642
    def is_available(self):
5✔
643
        return True
×
644

645
    def can_user_disable(self):
5✔
646
        return True
×
647

648
    def settings_widget(self, window):
5✔
649
        raise NotImplementedError()
×
650

651
    def settings_dialog(self, window):
5✔
652
        raise NotImplementedError()
×
653

654
    def read_file(self, filename: str) -> bytes:
5✔
655
        return self.parent.read_file(self.name, filename)
×
656

657

658
class DeviceUnpairableError(UserFacingException): pass
5✔
659
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
660
class CannotAutoSelectDevice(Exception): pass
5✔
661

662

663
class Device(NamedTuple):
5✔
664
    path: Union[str, bytes]
5✔
665
    interface_number: int
5✔
666
    id_: str
5✔
667
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
668
    usage_page: int
5✔
669
    transport_ui_string: str
5✔
670

671

672
class DeviceInfo(NamedTuple):
5✔
673
    device: Device
5✔
674
    label: Optional[str] = None
5✔
675
    initialized: Optional[bool] = None
5✔
676
    exception: Optional[Exception] = None
5✔
677
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
678
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
679
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
680

681

682
class HardwarePluginToScan(NamedTuple):
5✔
683
    name: str
5✔
684
    description: str
5✔
685
    plugin: Optional['HW_PluginBase']
5✔
686
    exception: Optional[Exception]
5✔
687

688

689
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
690

691

692
# hidapi is not thread-safe
693
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
694
#     https://github.com/libusb/hidapi/issues/45
695
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
696
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
697
# It is not entirely clear to me, exactly what is safe and what isn't, when
698
# using multiple threads...
699
# Hence, we use a single thread for all device communications, including
700
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
701
# the following thread:
702
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
703
    max_workers=1,
704
    thread_name_prefix='hwd_comms_thread'
705
)
706

707
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
708
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
709
# To keep it simple, let's just import it now, as we are likely in the main thread here.
710
if threading.current_thread() is not threading.main_thread():
5✔
UNCOV
711
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
712
try:
5✔
713
    import hid
5✔
714
except ImportError:
5✔
715
    pass
5✔
716

717

718
T = TypeVar('T')
5✔
719

720

721
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
UNCOV
722
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
723
        return func()
×
724
    else:
UNCOV
725
        fut = _hwd_comms_executor.submit(func)
×
726
        return fut.result()
×
727
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
728

729

730
def runs_in_hwd_thread(func):
5✔
731
    @wraps(func)
5✔
732
    def wrapper(*args, **kwargs):
5✔
UNCOV
733
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
734
    return wrapper
5✔
735

736

737
def assert_runs_in_hwd_thread():
5✔
UNCOV
738
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
739
        raise Exception("must only be called from HWD communication thread")
×
740

741

742
class DeviceMgr(ThreadJob):
5✔
743
    """Manages hardware clients.  A client communicates over a hardware
744
    channel with the device.
745

746
    In addition to tracking device HID IDs, the device manager tracks
747
    hardware wallets and manages wallet pairing.  A HID ID may be
748
    paired with a wallet when it is confirmed that the hardware device
749
    matches the wallet, i.e. they have the same master public key.  A
750
    HID ID can be unpaired if e.g. it is wiped.
751

752
    Because of hotplugging, a wallet must request its client
753
    dynamically each time it is required, rather than caching it
754
    itself.
755

756
    The device manager is shared across plugins, so just one place
757
    does hardware scans when needed.  By tracking HID IDs, if a device
758
    is plugged into a different port the wallet is automatically
759
    re-paired.
760

761
    Wallets are informed on connect / disconnect events.  It must
762
    implement connected(), disconnected() callbacks.  Being connected
763
    implies a pairing.  Callbacks can happen in any thread context,
764
    and we do them without holding the lock.
765

766
    Confusingly, the HID ID (serial number) reported by the HID system
767
    doesn't match the device ID reported by the device itself.  We use
768
    the HID IDs.
769

770
    This plugin is thread-safe.  Currently only devices supported by
771
    hidapi are implemented."""
772

773
    def __init__(self, config: SimpleConfig):
5✔
774
        ThreadJob.__init__(self)
5✔
775
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
776
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
777
        # A client->id_ map. Needs self.lock.
778
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
779
        # What we recognise.  (vendor_id, product_id) -> Plugin
780
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
781
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
782
        # Custom enumerate functions for devices we don't know about.
783
        self._enumerate_func = set()  # Needs self.lock.
5✔
784

785
        self.lock = threading.RLock()
5✔
786

787
        self.config = config
5✔
788

789
    def thread_jobs(self):
5✔
790
        # Thread job to handle device timeouts
791
        return [self]
5✔
792

793
    def run(self):
5✔
794
        '''Handle device timeouts.  Runs in the context of the Plugins
795
        thread.'''
796
        with self.lock:
5✔
797
            clients = list(self.clients.keys())
5✔
798
        cutoff = time.time() - self.config.get_session_timeout()
5✔
799
        for client in clients:
5✔
UNCOV
800
            client.timeout(cutoff)
×
801

802
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
803
        for pair in device_pairs:
×
804
            self._recognised_hardware[pair] = plugin
×
805

806
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
807
        for vendor_id in vendor_ids:
×
808
            self._recognised_vendor[vendor_id] = plugin
×
809

810
    def register_enumerate_func(self, func):
5✔
811
        with self.lock:
×
812
            self._enumerate_func.add(func)
×
813

814
    @runs_in_hwd_thread
5✔
815
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
816
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
817
        # Get from cache first
UNCOV
818
        client = self._client_by_id(device.id_)
×
UNCOV
819
        if client:
×
UNCOV
820
            return client
×
UNCOV
821
        client = plugin.create_client(device, handler)
×
822
        if client:
×
823
            self.logger.info(f"Registering {client}")
×
824
            with self.lock:
×
825
                self.clients[client] = device.id_
×
826
        return client
×
827

828
    def id_by_pairing_code(self, pairing_code):
5✔
829
        with self.lock:
×
830
            return self.pairing_code_to_id.get(pairing_code)
×
831

832
    def pairing_code_by_id(self, id_):
5✔
833
        with self.lock:
×
834
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
835
                if id2 == id_:
×
UNCOV
836
                    return pairing_code
×
837
            return None
×
838

839
    def unpair_pairing_code(self, pairing_code):
5✔
840
        with self.lock:
×
841
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
842
                return
×
UNCOV
843
            _id = self.pairing_code_to_id.pop(pairing_code)
×
844
        self._close_client(_id)
×
845

846
    def unpair_id(self, id_):
5✔
847
        pairing_code = self.pairing_code_by_id(id_)
×
848
        if pairing_code:
×
UNCOV
849
            self.unpair_pairing_code(pairing_code)
×
850
        else:
851
            self._close_client(id_)
×
852

853
    def _close_client(self, id_):
5✔
UNCOV
854
        with self.lock:
×
855
            client = self._client_by_id(id_)
×
UNCOV
856
            self.clients.pop(client, None)
×
UNCOV
857
        if client:
×
858
            client.close()
×
859

860
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
861
        with self.lock:
×
862
            for client, client_id in self.clients.items():
×
UNCOV
863
                if client_id == id_:
×
UNCOV
864
                    return client
×
865
        return None
×
866

867
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
868
        '''Returns a client for the device ID if one is registered.  If
869
        a device is wiped or in bootloader mode pairing is impossible;
870
        in such cases we communicate by device ID and not wallet.'''
UNCOV
871
        if scan_now:
×
UNCOV
872
            self.scan_devices()
×
UNCOV
873
        return self._client_by_id(id_)
×
874

875
    @runs_in_hwd_thread
5✔
876
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
877
                            keystore: 'Hardware_KeyStore',
878
                            force_pair: bool, *,
879
                            devices: Sequence['Device'] = None,
880
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
881
        self.logger.info("getting client for keystore")
×
UNCOV
882
        if handler is None:
×
UNCOV
883
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
884
        handler.update_status(False)
×
885
        pcode = keystore.pairing_code()
×
886
        client = None
×
887
        # search existing clients first (fast-path)
888
        if not devices:
×
889
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
890
        # search clients again, now allowing a (slow) scan
UNCOV
891
        if client is None:
×
892
            if devices is None:
×
893
                devices = self.scan_devices()
×
UNCOV
894
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
895
        if client is None and force_pair:
×
896
            try:
×
897
                info = self.select_device(plugin, handler, keystore, devices,
×
898
                                          allow_user_interaction=allow_user_interaction)
899
            except CannotAutoSelectDevice:
×
900
                pass
×
901
            else:
UNCOV
902
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
903
        if client:
×
904
            handler.update_status(True)
×
905
            # note: if select_device was called, we might also update label etc here:
906
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
907
        self.logger.info("end client for keystore")
×
908
        return client
×
909

910
    def client_by_pairing_code(
5✔
911
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
912
        devices: Sequence['Device'],
913
    ) -> Optional['HardwareClientBase']:
UNCOV
914
        _id = self.id_by_pairing_code(pairing_code)
×
UNCOV
915
        client = self._client_by_id(_id)
×
UNCOV
916
        if client:
×
UNCOV
917
            if type(client.plugin) != type(plugin):
×
918
                return
×
919
            # An unpaired client might have another wallet's handler
920
            # from a prior scan.  Replace to fix dialog parenting.
921
            client.handler = handler
×
922
            return client
×
923

UNCOV
924
        for device in devices:
×
925
            if device.id_ == _id:
×
926
                return self.create_client(device, handler, plugin)
×
927

928
    def force_pair_keystore(
5✔
929
        self,
930
        *,
931
        plugin: 'HW_PluginBase',
932
        handler: 'HardwareHandlerBase',
933
        info: 'DeviceInfo',
934
        keystore: 'Hardware_KeyStore',
935
    ) -> 'HardwareClientBase':
UNCOV
936
        xpub = keystore.xpub
×
UNCOV
937
        derivation = keystore.get_derivation_prefix()
×
UNCOV
938
        assert derivation is not None
×
UNCOV
939
        xtype = bip32.xpub_type(xpub)
×
940
        client = self._client_by_id(info.device.id_)
×
941
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
942
            # See comment above for same code
943
            client.handler = handler
×
944
            # This will trigger a PIN/passphrase entry request
945
            try:
×
UNCOV
946
                client_xpub = client.get_xpub(derivation, xtype)
×
947
            except (UserCancelled, RuntimeError):
×
948
                # Bad / cancelled PIN / passphrase
949
                client_xpub = None
×
950
            if client_xpub == xpub:
×
951
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
952
                with self.lock:
×
953
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
954
                return client
×
955

956
        # The user input has wrong PIN or passphrase, or cancelled input,
957
        # or it is not pairable
958
        raise DeviceUnpairableError(
×
959
            _('Electrum cannot pair with your {}.\n\n'
960
              'Before you request bitcoins to be sent to addresses in this '
961
              'wallet, ensure you can pair with your device, or that you have '
962
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
963
              'receive will be unspendable.').format(plugin.device))
964

965
    def list_pairable_device_infos(
5✔
966
        self,
967
        *,
968
        handler: Optional['HardwareHandlerBase'],
969
        plugin: 'HW_PluginBase',
970
        devices: Sequence['Device'] = None,
971
        include_failing_clients: bool = False,
972
    ) -> List['DeviceInfo']:
973
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
974
        Already paired devices are also included, as it is okay to reuse them.
975
        """
UNCOV
976
        if not plugin.libraries_available:
×
UNCOV
977
            message = plugin.get_library_not_available_message()
×
UNCOV
978
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
979
        if devices is None:
×
980
            devices = self.scan_devices()
×
981
        infos = []
×
982
        for device in devices:
×
983
            if not plugin.can_recognize_device(device):
×
984
                continue
×
985
            try:
×
986
                client = self.create_client(device, handler, plugin)
×
987
                if not client:
×
988
                    continue
×
989
                label = client.label()
×
990
                is_initialized = client.is_initialized()
×
991
                soft_device_id = client.get_soft_device_id()
×
992
                model_name = client.device_model_name()
×
993
            except Exception as e:
×
994
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
995
                if include_failing_clients:
×
996
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
997
                continue
×
998
            infos.append(DeviceInfo(device=device,
×
999
                                    label=label,
1000
                                    initialized=is_initialized,
1001
                                    plugin_name=plugin.name,
1002
                                    soft_device_id=soft_device_id,
1003
                                    model_name=model_name))
1004

UNCOV
1005
        return infos
×
1006

1007
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1008
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1009
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1010
        """Select the device to use for keystore."""
1011
        # ideally this should not be called from the GUI thread...
1012
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
1013
        while True:
×
UNCOV
1014
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
UNCOV
1015
            if infos:
×
UNCOV
1016
                break
×
1017
            if not allow_user_interaction:
×
1018
                raise CannotAutoSelectDevice()
×
1019
            msg = _('Please insert your {}').format(plugin.device)
×
1020
            msg += " ("
×
1021
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1022
                msg += f"label: {keystore.label}, "
×
1023
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1024
            msg += ').\n\n{}\n\n{}'.format(
×
1025
                _('Verify the cable is connected and that '
1026
                  'no other application is using it.'),
1027
                _('Try to connect again?')
1028
            )
UNCOV
1029
            if not handler.yes_no_question(msg):
×
UNCOV
1030
                raise UserCancelled()
×
UNCOV
1031
            devices = None
×
1032

1033
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1034
        # method 1: select device by id
1035
        if keystore.soft_device_id:
×
UNCOV
1036
            for info in infos:
×
UNCOV
1037
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
1038
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1039
                    return info
×
1040
        # method 2: select device by label
1041
        #           but only if not a placeholder label and only if there is no collision
1042
        device_labels = [info.label for info in infos]
×
1043
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1044
                and device_labels.count(keystore.label) == 1):
UNCOV
1045
            for info in infos:
×
1046
                if info.label == keystore.label:
×
1047
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
1048
                    return info
×
1049
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1050
        #           saved for keystore anyway, select it
1051
        if (len(infos) == 1
×
1052
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1053
                and keystore.soft_device_id is None):
UNCOV
1054
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1055
            return infos[0]
×
1056

UNCOV
1057
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1058
        if not allow_user_interaction:
×
1059
            raise CannotAutoSelectDevice()
×
1060
        # ask user to select device manually
1061
        msg = (
×
1062
                _("Could not automatically pair with device for given keystore.") + "\n"
1063
                + f"(keystore label: {keystore.label!r}, "
1064
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1065
        msg += _("Please select which {} device to use:").format(plugin.device)
×
UNCOV
1066
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
1067
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1068
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1069
                                init=(_("initialized") if info.initialized else _("wiped")),
1070
                                transport=info.device.transport_ui_string,
1071
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1072
                        for info in infos]
UNCOV
1073
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1074
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
1075
        c = handler.query_choice(msg, descriptions)
×
UNCOV
1076
        if c is None:
×
1077
            raise UserCancelled()
×
UNCOV
1078
        info = infos[c]
×
1079
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1080
        # note: updated label/soft_device_id will be saved after pairing succeeds
1081
        return info
×
1082

1083
    @runs_in_hwd_thread
5✔
1084
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1085
        try:
×
UNCOV
1086
            import hid  # noqa: F811
×
UNCOV
1087
        except ImportError:
×
UNCOV
1088
            return []
×
1089

1090
        devices = []
×
1091
        for d in hid.enumerate(0, 0):
×
1092
            vendor_id = d['vendor_id']
×
UNCOV
1093
            product_key = (vendor_id, d['product_id'])
×
1094
            plugin = None
×
1095
            if product_key in self._recognised_hardware:
×
1096
                plugin = self._recognised_hardware[product_key]
×
1097
            elif vendor_id in self._recognised_vendor:
×
1098
                plugin = self._recognised_vendor[vendor_id]
×
1099
            if plugin:
×
1100
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1101
                if device:
×
1102
                    devices.append(device)
×
1103
        return devices
×
1104

1105
    @runs_in_hwd_thread
5✔
1106
    @profiler
5✔
1107
    def scan_devices(self) -> Sequence['Device']:
5✔
UNCOV
1108
        self.logger.info("scanning devices...")
×
1109

1110
        # First see what's connected that we know about
UNCOV
1111
        devices = self._scan_devices_with_hid()
×
1112

1113
        # Let plugin handlers enumerate devices we don't know about
UNCOV
1114
        with self.lock:
×
1115
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
1116
        for f in enumerate_funcs:
×
UNCOV
1117
            try:
×
1118
                new_devices = f()
×
1119
            except BaseException as e:
×
1120
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1121
            else:
1122
                devices.extend(new_devices)
×
1123

1124
        # find out what was disconnected
UNCOV
1125
        client_ids = [dev.id_ for dev in devices]
×
1126
        disconnected_clients = []
×
UNCOV
1127
        with self.lock:
×
UNCOV
1128
            connected = {}
×
1129
            for client, id_ in self.clients.items():
×
1130
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1131
                    connected[client] = id_
×
1132
                else:
1133
                    disconnected_clients.append((client, id_))
×
1134
            self.clients = connected
×
1135

1136
        # Unpair disconnected devices
1137
        for client, id_ in disconnected_clients:
×
1138
            self.unpair_id(id_)
×
UNCOV
1139
            if client.handler:
×
UNCOV
1140
                client.handler.update_status(False)
×
1141

1142
        return devices
×
1143

1144
    @classmethod
5✔
1145
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1146
        ret = {}
×
1147
        # add libusb
UNCOV
1148
        try:
×
UNCOV
1149
            import usb1
×
1150
        except Exception as e:
×
UNCOV
1151
            ret["libusb.version"] = None
×
1152
        else:
1153
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1154
            try:
×
1155
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
1156
            except AttributeError:
×
1157
                ret["libusb.path"] = None
×
1158
        # add hidapi
1159
        try:
×
1160
            import hid  # noqa: F811
×
1161
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
1162
        except Exception as e:
×
1163
            from importlib.metadata import version
×
1164
            try:
×
1165
                ret["hidapi.version"] = version("hidapi")
×
1166
            except ImportError:
×
1167
                ret["hidapi.version"] = None
×
1168
        return ret
×
1169

1170
    def trigger_pairings(
5✔
1171
            self,
1172
            keystores: Sequence['KeyStore'],
1173
            *,
1174
            allow_user_interaction: bool = True,
1175
            devices: Sequence['Device'] = None,
1176
    ) -> None:
1177
        """Given a list of keystores, try to pair each with a connected hardware device.
1178

1179
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1180
        try to pair each keystore individually. Consider the following scenario:
1181
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1182
        - assume none of the devices are paired yet
1183
        1. if we tried to individually pair keystores, we might try with ks1 first
1184
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1185
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1186
             which is confusing and error-prone. It's especially problematic if the hw device does
1187
             not support labels (such as Ledger), as then the user cannot easily distinguish
1188
             same-type devices. (see #4199)
1189
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1190
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1191
        """
UNCOV
1192
        from .keystore import Hardware_KeyStore
×
UNCOV
1193
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1194
        if not keystores:
×
UNCOV
1195
            return
×
1196
        if devices is None:
×
1197
            devices = self.scan_devices()
×
1198
        # first pair with all devices that can be auto-selected
1199
        for ks in keystores:
×
1200
            try:
×
1201
                ks.get_client(
×
1202
                    force_pair=True,
1203
                    allow_user_interaction=False,
1204
                    devices=devices,
1205
                )
UNCOV
1206
            except UserCancelled:
×
UNCOV
1207
                pass
×
UNCOV
1208
        if allow_user_interaction:
×
1209
            # now do manual selections
1210
            for ks in keystores:
×
1211
                try:
×
1212
                    ks.get_client(
×
1213
                        force_pair=True,
1214
                        allow_user_interaction=True,
1215
                        devices=devices,
1216
                    )
UNCOV
1217
                except UserCancelled:
×
UNCOV
1218
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc