• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6739420176449536

06 May 2025 11:07AM UTC coverage: 60.181% (+0.4%) from 59.748%
6739420176449536

Pull #9791

CirrusCI

f321x
plugins: structure plugin storage in wallet

store all plugin data by plugin name in a root dictionary `plugin_data`
inside the wallet db so that plugin data can get deleted again.
Prunes the data of plugins from the wallet db on wallet stop if the
plugin is not installed anymore.
Pull Request #9791: plugins: structure plugin storage in wallet db and prune uninstalled plugins data

12 of 20 new or added lines in 5 files covered. (60.0%)

1255 existing lines in 12 files now uncovered.

21626 of 35935 relevant lines covered (60.18%)

2.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.44
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
4✔
27
import os
4✔
28
import pkgutil
4✔
29
import importlib.util
4✔
30
import time
4✔
31
import threading
4✔
32
import sys
4✔
33
import aiohttp
4✔
34
import zipfile as zipfile_lib
4✔
35
from urllib.parse import urlparse
4✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
4✔
40
import zipimport
4✔
41
from functools import wraps, partial
4✔
42
from itertools import chain
4✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
4✔
45

46
from ._vendor.distutils.version import StrictVersion
4✔
47
from .version import ELECTRUM_VERSION
4✔
48
from .i18n import _
4✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
50
from . import bip32
4✔
51
from . import plugins
4✔
52
from .simple_config import SimpleConfig
4✔
53
from .logging import get_logger, Logger
4✔
54
from .crypto import sha256
4✔
55

56
if TYPE_CHECKING:
4✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
4✔
63
plugin_loaders = {}
4✔
64
hook_names = set()
4✔
65
hooks = {}
4✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
4✔
67

68
PLUGIN_PASSWORD_VERSION = 1
4✔
69

70

71
class Plugins(DaemonThread):
4✔
72

73
    LOGGING_SHORTCUT = 'p'
4✔
74
    pkgpath = os.path.dirname(plugins.__file__)
4✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
4✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
4✔
77

78
    @profiler
4✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
4✔
80
        self.config = config
4✔
81
        self.cmd_only = cmd_only  # type: bool
4✔
82
        self.internal_plugin_metadata = {}
4✔
83
        self.external_plugin_metadata = {}
4✔
84
        if cmd_only:
4✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
4✔
91
        self.device_manager = DeviceMgr(config)
4✔
92
        self.name = 'Plugins'  # set name of thread
4✔
93
        self.hw_wallets = {}
4✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
95
        self.gui_name = gui_name
4✔
96
        self.find_plugins()
4✔
97
        self.load_plugins()
4✔
98
        self.add_jobs(self.device_manager.thread_jobs())
4✔
99
        self.start()
4✔
100

101
    @property
4✔
102
    def descriptions(self):
4✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
4✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
4✔
108
        for loader, name, ispkg in iter_modules:
4✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
4✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
4✔
116
                continue
×
117
            try:
4✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
4✔
119
                    d = json.load(f)
4✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
4✔
124
                continue
×
125
            d['path'] = module_path
4✔
126
            if not self.cmd_only:
4✔
127
                gui_good = self.gui_name in d.get('available_for', [])
4✔
128
                if not gui_good:
4✔
129
                    continue
4✔
130
                details = d.get('registers_wallet_type')
4✔
131
                if details:
4✔
132
                    self.register_wallet_type(name, gui_good, details)
4✔
133
                details = d.get('registers_keystore')
4✔
134
                if details:
4✔
135
                    self.register_keystore(name, gui_good, details)
4✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
4✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
4✔
141
                self.internal_plugin_metadata[name] = d
4✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
4✔
146
    def exec_module_from_spec(spec, path: str):
4✔
147
        if prev_fail := _exec_module_failure.get(path):
4✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
4✔
150
            module = importlib.util.module_from_spec(spec)
4✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
4✔
154
            spec.loader.exec_module(module)
4✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
4✔
165

166
    def find_plugins(self):
4✔
167
        internal_plugins_path = (self.pkgpath, False)
4✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
4✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
4✔
170
            if pkg_path and os.path.exists(pkg_path):
4✔
171
                if not external:
4✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
4✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
4✔
175

176
    def load_plugins(self):
4✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
4✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
4✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
4✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
4✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
4✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
4✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
4✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
4✔
245
        if not os.path.exists(pkg_path):
4✔
246
            os.mkdir(pkg_path)
4✔
247
        return pkg_path
4✔
248

249
    async def download_external_plugin(self, url: str) -> str:
4✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
253
        if os.path.exists(path):
×
254
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
255
        async with aiohttp.ClientSession() as session:
×
256
            async with session.get(url) as resp:
×
257
                if resp.status == 200:
×
258
                    with open(path, 'wb') as fd:
×
259
                        async for chunk in resp.content.iter_chunked(10):
×
260
                            fd.write(chunk)
×
261
        return path
×
262

263
    def read_manifest(self, path) -> dict:
4✔
264
        """ return json dict """
265
        with zipfile_lib.ZipFile(path) as file:
×
266
            for filename in file.namelist():
×
267
                if filename.endswith('manifest.json'):
×
268
                    break
×
269
            else:
270
                raise Exception('could not find manifest.json in zip archive')
×
271
            with file.open(filename, 'r') as f:
×
272
                manifest = json.load(f)
×
273
                manifest['path'] = path  # external, path of the zipfile
×
274
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
275
                manifest['is_zip'] = True
×
276
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
277
                return manifest
×
278

279
    def zip_plugin_path(self, name) -> str:
4✔
280
        path = self.get_metadata(name)['path']
×
281
        filename = os.path.basename(path)
×
282
        if name in self.internal_plugin_metadata:
×
283
            pkg_path = self.pkgpath
×
284
        else:
285
            pkg_path = self.get_external_plugin_dir()
×
286
        return os.path.join(pkg_path, filename)
×
287

288
    def find_zip_plugins(self, pkg_path: str, external: bool):
4✔
289
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
290
        if pkg_path is None:
4✔
291
            return
×
292
        for filename in os.listdir(pkg_path):
4✔
293
            path = os.path.join(pkg_path, filename)
×
294
            if not filename.endswith('.zip'):
×
295
                continue
×
296
            try:
×
297
                d = self.read_manifest(path)
×
298
                name = d['name']
×
299
            except Exception:
×
300
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
301
                continue
×
302
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
303
                self.logger.info(f"duplicate plugins for {name=}")
×
304
                continue
×
305
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
306
                continue
×
307
            min_version = d.get('min_electrum_version')
×
308
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
309
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
310
                continue
×
311
            max_version = d.get('max_electrum_version')
×
312
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
313
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
314
                continue
×
315

316
            if not self.cmd_only:
×
317
                gui_good = self.gui_name in d.get('available_for', [])
×
318
                if not gui_good:
×
319
                    continue
×
320
                if 'fullname' not in d:
×
321
                    continue
×
322
                details = d.get('registers_keystore')
×
323
                if details:
×
324
                    self.register_keystore(name, gui_good, details)
×
325
            if external:
×
326
                self.external_plugin_metadata[name] = d
×
327
            else:
328
                self.internal_plugin_metadata[name] = d
×
329

330
    def get(self, name):
4✔
331
        return self.plugins.get(name)
×
332

333
    def count(self):
4✔
334
        return len(self.plugins)
×
335

336
    def load_plugin(self, name) -> 'BasePlugin':
4✔
337
        """Imports the code of the given plugin.
338
        note: can be called from any thread.
339
        """
340
        if self.get_metadata(name):
4✔
341
            return self.load_plugin_by_name(name)
4✔
342
        else:
343
            raise Exception(f"could not find plugin {name!r}")
×
344

345
    def maybe_load_plugin_init_method(self, name: str) -> None:
4✔
346
        """Loads the __init__.py module of the plugin if it is not already loaded."""
347
        is_external = name in self.external_plugin_metadata
4✔
348
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
4✔
349
        if base_name not in sys.modules:
4✔
350
            metadata = self.get_metadata(name)
4✔
351
            is_zip = metadata.get('is_zip', False)
4✔
352
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
353
            if not is_zip:
4✔
354
                if is_external:
4✔
355
                    # this branch is deprecated: external plugins are always zip files
356
                    path = os.path.join(metadata['path'], '__init__.py')
×
357
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
358
                else:
359
                    init_spec = importlib.util.find_spec(base_name)
4✔
360
            else:
361
                zipfile = zipimport.zipimporter(metadata['path'])
×
362
                dirname = metadata['dirname']
×
363
                init_spec = zipfile.find_spec(dirname)
×
364

365
            self.exec_module_from_spec(init_spec, base_name)
4✔
366

367
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
4✔
368
        if name in self.plugins:
4✔
369
            return self.plugins[name]
4✔
370
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
371
        self.maybe_load_plugin_init_method(name)
4✔
372
        is_external = name in self.external_plugin_metadata
4✔
373
        if is_external and not self.is_authorized(name):
4✔
374
            self.logger.info(f'plugin not authorized {name}')
×
375
            return
×
376
        if not is_external:
4✔
377
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
378
        else:
379
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
380

381
        spec = importlib.util.find_spec(full_name)
4✔
382
        if spec is None:
4✔
383
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
384
        try:
4✔
385
            module = self.exec_module_from_spec(spec, full_name)
4✔
386
            plugin = module.Plugin(self, self.config, name)
4✔
387
        except Exception as e:
×
388
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
389
        self.add_jobs(plugin.thread_jobs())
4✔
390
        self.plugins[name] = plugin
4✔
391
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
392
        return plugin
4✔
393

394
    def close_plugin(self, plugin):
4✔
395
        self.remove_jobs(plugin.thread_jobs())
×
396

397
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
4✔
398
        from hashlib import pbkdf2_hmac
×
399
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
400
        return ECPrivkey(secret)
×
401

402
    def install_internal_plugin(self, name):
4✔
403
        self.config.set_key(f'plugins.{name}.enabled', [])
×
404

405
    def install_external_plugin(self, name, path, privkey, manifest):
4✔
406
        # uninstall old version first to get rid of old zip files when updating plugin
407
        self.uninstall(name)
×
408
        self.external_plugin_metadata[name] = manifest
×
409
        self.authorize_plugin(name, path, privkey)
×
410

411
    def uninstall(self, name: str):
4✔
412
        self.config.set_key(f'plugins.{name}', None)
×
413
        if name in self.external_plugin_metadata:
×
414
            zipfile = self.zip_plugin_path(name)
×
415
            os.unlink(zipfile)
×
416
            self.external_plugin_metadata.pop(name)
×
417

418
    def is_internal(self, name) -> bool:
4✔
419
        return name in self.internal_plugin_metadata
×
420

421
    def is_auto_loaded(self, name):
4✔
422
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
423
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
424

425
    def is_installed(self, name) -> bool:
4✔
426
        """an external plugin may be installed but not authorized """
427
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
428
            or name in self.external_plugin_metadata
429

430
    def is_authorized(self, name) -> bool:
4✔
431
        if name in self.internal_plugin_metadata:
×
432
            return True
×
433
        if name not in self.external_plugin_metadata:
×
434
            return False
×
435
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
436
        if not pubkey_bytes:
×
437
            return False
×
438
        if not self.is_plugin_zip(name):
×
439
            return False
×
440
        filename = self.zip_plugin_path(name)
×
441
        plugin_hash = get_file_hash256(filename)
×
442
        sig = self.config.get(f'plugins.{name}.authorized')
×
443
        if not sig:
×
444
            return False
×
445
        pubkey = ECPubkey(pubkey_bytes)
×
446
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
447

448
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
4✔
449
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
450
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
451
        plugin_hash = get_file_hash256(filename)
×
452
        sig = privkey.ecdsa_sign(plugin_hash)
×
453
        value = sig.hex()
×
454
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
455

456
    def enable(self, name: str) -> 'BasePlugin':
4✔
457
        self.config.enable_plugin(name)
×
458
        p = self.get(name)
×
459
        if p:
×
460
            return p
×
461
        return self.load_plugin(name)
×
462

463
    def disable(self, name: str) -> None:
4✔
464
        self.config.disable_plugin(name)
×
465
        p = self.get(name)
×
466
        if not p:
×
467
            return
×
468
        self.plugins.pop(name)
×
469
        p.close()
×
470
        self.logger.info(f"closed {name}")
×
471

472
    @classmethod
4✔
473
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
474
        return key.startswith('plugins.')
×
475

476
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
477
        d = self.descriptions.get(name)
×
478
        if not d:
×
479
            return False
×
480
        deps = d.get('requires', [])
×
481
        for dep, s in deps:
×
482
            try:
×
483
                __import__(dep)
×
484
            except ImportError as e:
×
485
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
486
                return False
×
487
        requires = d.get('requires_wallet_type', [])
×
488
        return not requires or wallet.wallet_type in requires
×
489

490
    def get_hardware_support(self):
4✔
491
        out = []
×
492
        for name, (gui_good, details) in self.hw_wallets.items():
×
493
            if gui_good:
×
494
                try:
×
495
                    p = self.get_plugin(name)
×
496
                    if p.is_available():
×
497
                        out.append(HardwarePluginToScan(name=name,
×
498
                                                        description=details[2],
499
                                                        plugin=p,
500
                                                        exception=None))
501
                except Exception as e:
×
502
                    self.logger.exception(f"cannot load plugin for: {name}")
×
503
                    out.append(HardwarePluginToScan(name=name,
×
504
                                                    description=details[2],
505
                                                    plugin=None,
506
                                                    exception=e))
507
        return out
×
508

509
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
510
        from .wallet import register_wallet_type, register_constructor
4✔
511
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
512

513
        def loader():
4✔
514
            plugin = self.get_plugin(name)
4✔
515
            register_constructor(wallet_type, plugin.wallet_class)
4✔
516
        register_wallet_type(wallet_type)
4✔
517
        plugin_loaders[wallet_type] = loader
4✔
518

519
    def register_keystore(self, name, gui_good, details):
4✔
520
        from .keystore import register_keystore
4✔
521

522
        def dynamic_constructor(d):
4✔
523
            return self.get_plugin(name).keystore_class(d)
4✔
524
        if details[0] == 'hardware':
4✔
525
            self.hw_wallets[name] = (gui_good, details)
4✔
526
            self.logger.info(f"registering hardware {name}: {details}")
4✔
527
            register_keystore(details[1], dynamic_constructor)
4✔
528

529
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
530
        if name not in self.plugins:
4✔
531
            self.load_plugin(name)
4✔
532
        return self.plugins[name]
4✔
533

534
    def is_plugin_zip(self, name: str) -> bool:
4✔
535
        """Returns True if the plugin is a zip file"""
536
        if (metadata := self.get_metadata(name)) is None:
×
537
            return False
×
538
        return metadata.get('is_zip', False)
×
539

540
    def get_metadata(self, name: str) -> Optional[dict]:
4✔
541
        """Returns the metadata of the plugin"""
542
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
4✔
543
        if not metadata:
4✔
544
            return None
×
545
        return metadata
4✔
546

547
    def run(self):
4✔
548
        while self.is_running():
4✔
549
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
550
            self.run_jobs()
4✔
551
        self.on_stop()
4✔
552

553
    def read_file(self, name: str, filename: str) -> bytes:
4✔
554
        if self.is_plugin_zip(name):
×
555
            plugin_filename = self.zip_plugin_path(name)
×
556
            metadata = self.external_plugin_metadata[name]
×
557
            dirname = metadata['dirname']
×
558
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
559
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
560
                    return myfile.read()
×
561
        else:
562
            assert name in self.internal_plugin_metadata
×
563
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
564
            with open(path, 'rb') as myfile:
×
565
                return myfile.read()
×
566

567
def get_file_hash256(path: str) -> bytes:
4✔
568
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
569
    with open(path, 'rb') as f:
×
570
        return sha256(f.read())
×
571

572

573
def hook(func):
4✔
574
    hook_names.add(func.__name__)
4✔
575
    return func
4✔
576

577

578
def run_hook(name, *args):
4✔
579
    results = []
4✔
580
    f_list = hooks.get(name, [])
4✔
581
    for p, f in f_list:
4✔
582
        if p.is_enabled():
4✔
583
            try:
4✔
584
                r = f(*args)
4✔
585
            except Exception:
×
586
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
587
                r = False
×
588
            if r:
4✔
589
                results.append(r)
×
590

591
    if results:
4✔
592
        assert len(results) == 1, results
×
593
        return results[0]
×
594

595

596
class BasePlugin(Logger):
4✔
597

598
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
599
        self.parent = parent  # type: Plugins  # The plugins object
4✔
600
        self.name = name
4✔
601
        self.config = config
4✔
602
        Logger.__init__(self)
4✔
603
        # add self to hooks
604
        for k in dir(self):
4✔
605
            if k in hook_names:
4✔
606
                l = hooks.get(k, [])
4✔
607
                l.append((self, getattr(self, k)))
4✔
608
                hooks[k] = l
4✔
609

610
    def __str__(self):
4✔
611
        return self.name
×
612

613
    def close(self):
4✔
614
        # remove self from hooks
615
        for attr_name in dir(self):
×
616
            if attr_name in hook_names:
×
617
                # found attribute in self that is also the name of a hook
618
                l = hooks.get(attr_name, [])
×
619
                try:
×
620
                    l.remove((self, getattr(self, attr_name)))
×
621
                except ValueError:
×
622
                    # maybe attr name just collided with hook name and was not hook
623
                    continue
×
624
                hooks[attr_name] = l
×
625
        self.parent.close_plugin(self)
×
626
        self.on_close()
×
627

628
    def on_close(self):
4✔
629
        pass
×
630

631
    def requires_settings(self) -> bool:
4✔
632
        return False
×
633

634
    def thread_jobs(self):
4✔
635
        return []
4✔
636

637
    def is_enabled(self):
4✔
638
        if not self.is_available():
×
639
            return False
×
640
        return self.config.is_plugin_enabled(self.name)
×
641

642
    def is_available(self):
4✔
643
        return True
×
644

645
    def can_user_disable(self):
4✔
646
        return True
×
647

648
    def settings_widget(self, window):
4✔
649
        raise NotImplementedError()
×
650

651
    def settings_dialog(self, window):
4✔
652
        raise NotImplementedError()
×
653

654
    def read_file(self, filename: str) -> bytes:
4✔
655
        return self.parent.read_file(self.name, filename)
×
656

657
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
4✔
658
        """Returns a dict which is persisted in the per-wallet database."""
NEW
659
        plugin_storage = wallet.db.get_plugin_storage()
×
NEW
660
        return plugin_storage.setdefault(self.name, {})
×
661

662
class DeviceUnpairableError(UserFacingException): pass
4✔
663
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
664
class CannotAutoSelectDevice(Exception): pass
4✔
665

666

667
class Device(NamedTuple):
4✔
668
    path: Union[str, bytes]
4✔
669
    interface_number: int
4✔
670
    id_: str
4✔
671
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
672
    usage_page: int
4✔
673
    transport_ui_string: str
4✔
674

675

676
class DeviceInfo(NamedTuple):
4✔
677
    device: Device
4✔
678
    label: Optional[str] = None
4✔
679
    initialized: Optional[bool] = None
4✔
680
    exception: Optional[Exception] = None
4✔
681
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
682
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
683
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
684

685

686
class HardwarePluginToScan(NamedTuple):
4✔
687
    name: str
4✔
688
    description: str
4✔
689
    plugin: Optional['HW_PluginBase']
4✔
690
    exception: Optional[Exception]
4✔
691

692

693
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
694

695

696
# hidapi is not thread-safe
697
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
698
#     https://github.com/libusb/hidapi/issues/45
699
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
700
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
701
# It is not entirely clear to me, exactly what is safe and what isn't, when
702
# using multiple threads...
703
# Hence, we use a single thread for all device communications, including
704
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
705
# the following thread:
706
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
707
    max_workers=1,
708
    thread_name_prefix='hwd_comms_thread'
709
)
710

711
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
712
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
713
# To keep it simple, let's just import it now, as we are likely in the main thread here.
714
if threading.current_thread() is not threading.main_thread():
4✔
715
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
716
try:
4✔
717
    import hid
4✔
718
except ImportError:
4✔
719
    pass
4✔
720

721

722
T = TypeVar('T')
4✔
723

724

725
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
726
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
727
        return func()
×
728
    else:
729
        fut = _hwd_comms_executor.submit(func)
×
730
        return fut.result()
×
731
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
732

733

734
def runs_in_hwd_thread(func):
4✔
735
    @wraps(func)
4✔
736
    def wrapper(*args, **kwargs):
4✔
737
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
738
    return wrapper
4✔
739

740

741
def assert_runs_in_hwd_thread():
4✔
742
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
743
        raise Exception("must only be called from HWD communication thread")
×
744

745

746
class DeviceMgr(ThreadJob):
4✔
747
    """Manages hardware clients.  A client communicates over a hardware
748
    channel with the device.
749

750
    In addition to tracking device HID IDs, the device manager tracks
751
    hardware wallets and manages wallet pairing.  A HID ID may be
752
    paired with a wallet when it is confirmed that the hardware device
753
    matches the wallet, i.e. they have the same master public key.  A
754
    HID ID can be unpaired if e.g. it is wiped.
755

756
    Because of hotplugging, a wallet must request its client
757
    dynamically each time it is required, rather than caching it
758
    itself.
759

760
    The device manager is shared across plugins, so just one place
761
    does hardware scans when needed.  By tracking HID IDs, if a device
762
    is plugged into a different port the wallet is automatically
763
    re-paired.
764

765
    Wallets are informed on connect / disconnect events.  It must
766
    implement connected(), disconnected() callbacks.  Being connected
767
    implies a pairing.  Callbacks can happen in any thread context,
768
    and we do them without holding the lock.
769

770
    Confusingly, the HID ID (serial number) reported by the HID system
771
    doesn't match the device ID reported by the device itself.  We use
772
    the HID IDs.
773

774
    This plugin is thread-safe.  Currently only devices supported by
775
    hidapi are implemented."""
776

777
    def __init__(self, config: SimpleConfig):
4✔
778
        ThreadJob.__init__(self)
4✔
779
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
780
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
781
        # A client->id_ map. Needs self.lock.
782
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
783
        # What we recognise.  (vendor_id, product_id) -> Plugin
784
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
785
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
786
        # Custom enumerate functions for devices we don't know about.
787
        self._enumerate_func = set()  # Needs self.lock.
4✔
788

789
        self.lock = threading.RLock()
4✔
790

791
        self.config = config
4✔
792

793
    def thread_jobs(self):
4✔
794
        # Thread job to handle device timeouts
795
        return [self]
4✔
796

797
    def run(self):
4✔
798
        '''Handle device timeouts.  Runs in the context of the Plugins
799
        thread.'''
800
        with self.lock:
4✔
801
            clients = list(self.clients.keys())
4✔
802
        cutoff = time.time() - self.config.get_session_timeout()
4✔
803
        for client in clients:
4✔
804
            client.timeout(cutoff)
×
805

806
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
807
        for pair in device_pairs:
×
808
            self._recognised_hardware[pair] = plugin
×
809

810
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
811
        for vendor_id in vendor_ids:
×
812
            self._recognised_vendor[vendor_id] = plugin
×
813

814
    def register_enumerate_func(self, func):
4✔
815
        with self.lock:
×
816
            self._enumerate_func.add(func)
×
817

818
    @runs_in_hwd_thread
4✔
819
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
820
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
821
        # Get from cache first
822
        client = self._client_by_id(device.id_)
×
823
        if client:
×
824
            return client
×
825
        client = plugin.create_client(device, handler)
×
826
        if client:
×
827
            self.logger.info(f"Registering {client}")
×
828
            with self.lock:
×
829
                self.clients[client] = device.id_
×
830
        return client
×
831

832
    def id_by_pairing_code(self, pairing_code):
4✔
833
        with self.lock:
×
834
            return self.pairing_code_to_id.get(pairing_code)
×
835

836
    def pairing_code_by_id(self, id_):
4✔
837
        with self.lock:
×
838
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
839
                if id2 == id_:
×
840
                    return pairing_code
×
841
            return None
×
842

843
    def unpair_pairing_code(self, pairing_code):
4✔
844
        with self.lock:
×
845
            if pairing_code not in self.pairing_code_to_id:
×
846
                return
×
847
            _id = self.pairing_code_to_id.pop(pairing_code)
×
848
        self._close_client(_id)
×
849

850
    def unpair_id(self, id_):
4✔
851
        pairing_code = self.pairing_code_by_id(id_)
×
852
        if pairing_code:
×
853
            self.unpair_pairing_code(pairing_code)
×
854
        else:
855
            self._close_client(id_)
×
856

857
    def _close_client(self, id_):
4✔
858
        with self.lock:
×
859
            client = self._client_by_id(id_)
×
860
            self.clients.pop(client, None)
×
861
        if client:
×
862
            client.close()
×
863

864
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
865
        with self.lock:
×
866
            for client, client_id in self.clients.items():
×
867
                if client_id == id_:
×
868
                    return client
×
869
        return None
×
870

871
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
872
        '''Returns a client for the device ID if one is registered.  If
873
        a device is wiped or in bootloader mode pairing is impossible;
874
        in such cases we communicate by device ID and not wallet.'''
875
        if scan_now:
×
876
            self.scan_devices()
×
877
        return self._client_by_id(id_)
×
878

879
    @runs_in_hwd_thread
4✔
880
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
881
                            keystore: 'Hardware_KeyStore',
882
                            force_pair: bool, *,
883
                            devices: Sequence['Device'] = None,
884
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
885
        self.logger.info("getting client for keystore")
×
886
        if handler is None:
×
887
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
888
        handler.update_status(False)
×
889
        pcode = keystore.pairing_code()
×
890
        client = None
×
891
        # search existing clients first (fast-path)
892
        if not devices:
×
893
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
894
        # search clients again, now allowing a (slow) scan
895
        if client is None:
×
896
            if devices is None:
×
897
                devices = self.scan_devices()
×
898
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
899
        if client is None and force_pair:
×
900
            try:
×
901
                info = self.select_device(plugin, handler, keystore, devices,
×
902
                                          allow_user_interaction=allow_user_interaction)
903
            except CannotAutoSelectDevice:
×
904
                pass
×
905
            else:
906
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
907
        if client:
×
908
            handler.update_status(True)
×
909
            # note: if select_device was called, we might also update label etc here:
910
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
911
        self.logger.info("end client for keystore")
×
912
        return client
×
913

914
    def client_by_pairing_code(
4✔
915
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
916
        devices: Sequence['Device'],
917
    ) -> Optional['HardwareClientBase']:
918
        _id = self.id_by_pairing_code(pairing_code)
×
919
        client = self._client_by_id(_id)
×
920
        if client:
×
921
            if type(client.plugin) != type(plugin):
×
922
                return
×
923
            # An unpaired client might have another wallet's handler
924
            # from a prior scan.  Replace to fix dialog parenting.
925
            client.handler = handler
×
926
            return client
×
927

928
        for device in devices:
×
929
            if device.id_ == _id:
×
930
                return self.create_client(device, handler, plugin)
×
931

932
    def force_pair_keystore(
4✔
933
        self,
934
        *,
935
        plugin: 'HW_PluginBase',
936
        handler: 'HardwareHandlerBase',
937
        info: 'DeviceInfo',
938
        keystore: 'Hardware_KeyStore',
939
    ) -> 'HardwareClientBase':
940
        xpub = keystore.xpub
×
941
        derivation = keystore.get_derivation_prefix()
×
942
        assert derivation is not None
×
943
        xtype = bip32.xpub_type(xpub)
×
944
        client = self._client_by_id(info.device.id_)
×
945
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
946
            # See comment above for same code
947
            client.handler = handler
×
948
            # This will trigger a PIN/passphrase entry request
949
            try:
×
950
                client_xpub = client.get_xpub(derivation, xtype)
×
951
            except (UserCancelled, RuntimeError):
×
952
                # Bad / cancelled PIN / passphrase
953
                client_xpub = None
×
954
            if client_xpub == xpub:
×
955
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
956
                with self.lock:
×
957
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
958
                return client
×
959

960
        # The user input has wrong PIN or passphrase, or cancelled input,
961
        # or it is not pairable
962
        raise DeviceUnpairableError(
×
963
            _('Electrum cannot pair with your {}.\n\n'
964
              'Before you request bitcoins to be sent to addresses in this '
965
              'wallet, ensure you can pair with your device, or that you have '
966
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
967
              'receive will be unspendable.').format(plugin.device))
968

969
    def list_pairable_device_infos(
4✔
970
        self,
971
        *,
972
        handler: Optional['HardwareHandlerBase'],
973
        plugin: 'HW_PluginBase',
974
        devices: Sequence['Device'] = None,
975
        include_failing_clients: bool = False,
976
    ) -> List['DeviceInfo']:
977
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
978
        Already paired devices are also included, as it is okay to reuse them.
979
        """
980
        if not plugin.libraries_available:
×
981
            message = plugin.get_library_not_available_message()
×
982
            raise HardwarePluginLibraryUnavailable(message)
×
983
        if devices is None:
×
984
            devices = self.scan_devices()
×
985
        infos = []
×
986
        for device in devices:
×
987
            if not plugin.can_recognize_device(device):
×
988
                continue
×
989
            try:
×
990
                client = self.create_client(device, handler, plugin)
×
991
                if not client:
×
992
                    continue
×
993
                label = client.label()
×
994
                is_initialized = client.is_initialized()
×
995
                soft_device_id = client.get_soft_device_id()
×
996
                model_name = client.device_model_name()
×
997
            except Exception as e:
×
998
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
999
                if include_failing_clients:
×
1000
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1001
                continue
×
1002
            infos.append(DeviceInfo(device=device,
×
1003
                                    label=label,
1004
                                    initialized=is_initialized,
1005
                                    plugin_name=plugin.name,
1006
                                    soft_device_id=soft_device_id,
1007
                                    model_name=model_name))
1008

1009
        return infos
×
1010

1011
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
1012
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1013
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1014
        """Select the device to use for keystore."""
1015
        # ideally this should not be called from the GUI thread...
1016
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1017
        while True:
×
1018
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1019
            if infos:
×
1020
                break
×
1021
            if not allow_user_interaction:
×
1022
                raise CannotAutoSelectDevice()
×
1023
            msg = _('Please insert your {}').format(plugin.device)
×
1024
            msg += " ("
×
1025
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1026
                msg += f"label: {keystore.label}, "
×
1027
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1028
            msg += ').\n\n{}\n\n{}'.format(
×
1029
                _('Verify the cable is connected and that '
1030
                  'no other application is using it.'),
1031
                _('Try to connect again?')
1032
            )
1033
            if not handler.yes_no_question(msg):
×
1034
                raise UserCancelled()
×
1035
            devices = None
×
1036

1037
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1038
        # method 1: select device by id
1039
        if keystore.soft_device_id:
×
1040
            for info in infos:
×
1041
                if info.soft_device_id == keystore.soft_device_id:
×
1042
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1043
                    return info
×
1044
        # method 2: select device by label
1045
        #           but only if not a placeholder label and only if there is no collision
1046
        device_labels = [info.label for info in infos]
×
1047
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1048
                and device_labels.count(keystore.label) == 1):
1049
            for info in infos:
×
1050
                if info.label == keystore.label:
×
1051
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1052
                    return info
×
1053
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1054
        #           saved for keystore anyway, select it
1055
        if (len(infos) == 1
×
1056
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1057
                and keystore.soft_device_id is None):
1058
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1059
            return infos[0]
×
1060

1061
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1062
        if not allow_user_interaction:
×
1063
            raise CannotAutoSelectDevice()
×
1064
        # ask user to select device manually
1065
        msg = (
×
1066
                _("Could not automatically pair with device for given keystore.") + "\n"
1067
                + f"(keystore label: {keystore.label!r}, "
1068
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1069
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1070
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1071
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1072
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1073
                                init=(_("initialized") if info.initialized else _("wiped")),
1074
                                transport=info.device.transport_ui_string,
1075
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1076
                        for info in infos]
1077
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1078
                          f"num options: {len(infos)}. options: {infos}")
1079
        c = handler.query_choice(msg, descriptions)
×
1080
        if c is None:
×
1081
            raise UserCancelled()
×
1082
        info = infos[c]
×
1083
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1084
        # note: updated label/soft_device_id will be saved after pairing succeeds
1085
        return info
×
1086

1087
    @runs_in_hwd_thread
4✔
1088
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
1089
        try:
×
1090
            import hid  # noqa: F811
×
1091
        except ImportError:
×
1092
            return []
×
1093

1094
        devices = []
×
1095
        for d in hid.enumerate(0, 0):
×
1096
            vendor_id = d['vendor_id']
×
1097
            product_key = (vendor_id, d['product_id'])
×
1098
            plugin = None
×
1099
            if product_key in self._recognised_hardware:
×
1100
                plugin = self._recognised_hardware[product_key]
×
1101
            elif vendor_id in self._recognised_vendor:
×
1102
                plugin = self._recognised_vendor[vendor_id]
×
1103
            if plugin:
×
1104
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1105
                if device:
×
1106
                    devices.append(device)
×
1107
        return devices
×
1108

1109
    @runs_in_hwd_thread
4✔
1110
    @profiler
4✔
1111
    def scan_devices(self) -> Sequence['Device']:
4✔
1112
        self.logger.info("scanning devices...")
×
1113

1114
        # First see what's connected that we know about
1115
        devices = self._scan_devices_with_hid()
×
1116

1117
        # Let plugin handlers enumerate devices we don't know about
1118
        with self.lock:
×
1119
            enumerate_funcs = list(self._enumerate_func)
×
1120
        for f in enumerate_funcs:
×
1121
            try:
×
1122
                new_devices = f()
×
1123
            except BaseException as e:
×
1124
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1125
            else:
1126
                devices.extend(new_devices)
×
1127

1128
        # find out what was disconnected
1129
        client_ids = [dev.id_ for dev in devices]
×
1130
        disconnected_clients = []
×
1131
        with self.lock:
×
1132
            connected = {}
×
1133
            for client, id_ in self.clients.items():
×
1134
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1135
                    connected[client] = id_
×
1136
                else:
1137
                    disconnected_clients.append((client, id_))
×
1138
            self.clients = connected
×
1139

1140
        # Unpair disconnected devices
1141
        for client, id_ in disconnected_clients:
×
1142
            self.unpair_id(id_)
×
1143
            if client.handler:
×
1144
                client.handler.update_status(False)
×
1145

1146
        return devices
×
1147

1148
    @classmethod
4✔
1149
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
1150
        ret = {}
×
1151
        # add libusb
1152
        try:
×
1153
            import usb1
×
1154
        except Exception as e:
×
1155
            ret["libusb.version"] = None
×
1156
        else:
1157
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1158
            try:
×
1159
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1160
            except AttributeError:
×
1161
                ret["libusb.path"] = None
×
1162
        # add hidapi
1163
        try:
×
1164
            import hid  # noqa: F811
×
1165
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1166
        except Exception as e:
×
1167
            from importlib.metadata import version
×
1168
            try:
×
1169
                ret["hidapi.version"] = version("hidapi")
×
1170
            except ImportError:
×
1171
                ret["hidapi.version"] = None
×
1172
        return ret
×
1173

1174
    def trigger_pairings(
4✔
1175
            self,
1176
            keystores: Sequence['KeyStore'],
1177
            *,
1178
            allow_user_interaction: bool = True,
1179
            devices: Sequence['Device'] = None,
1180
    ) -> None:
1181
        """Given a list of keystores, try to pair each with a connected hardware device.
1182

1183
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1184
        try to pair each keystore individually. Consider the following scenario:
1185
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1186
        - assume none of the devices are paired yet
1187
        1. if we tried to individually pair keystores, we might try with ks1 first
1188
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1189
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1190
             which is confusing and error-prone. It's especially problematic if the hw device does
1191
             not support labels (such as Ledger), as then the user cannot easily distinguish
1192
             same-type devices. (see #4199)
1193
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1194
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1195
        """
1196
        from .keystore import Hardware_KeyStore
×
1197
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1198
        if not keystores:
×
1199
            return
×
1200
        if devices is None:
×
1201
            devices = self.scan_devices()
×
1202
        # first pair with all devices that can be auto-selected
1203
        for ks in keystores:
×
1204
            try:
×
1205
                ks.get_client(
×
1206
                    force_pair=True,
1207
                    allow_user_interaction=False,
1208
                    devices=devices,
1209
                )
1210
            except UserCancelled:
×
1211
                pass
×
1212
        if allow_user_interaction:
×
1213
            # now do manual selections
1214
            for ks in keystores:
×
1215
                try:
×
1216
                    ks.get_client(
×
1217
                        force_pair=True,
1218
                        allow_user_interaction=True,
1219
                        devices=devices,
1220
                    )
1221
                except UserCancelled:
×
1222
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc