• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4917020841476096

19 Mar 2025 04:27PM UTC coverage: 61.11% (-0.005%) from 61.115%
4917020841476096

Pull #9655

CirrusCI

SomberNight
plugins: better handle exceptions in __init__

When importing a plugin, if it raised an exception in its `__init__` file, we
ignored it, and still loaded the plugin, in a potentially half-broken state.
This is because maybe_load_plugin_init_method only calls exec_module_from_spec
if the plugin is not already in sys.modules, but exec_module_from_spec
will put the plugin into sys.modules even if it errors.

Consider this patch to test with, enable the "labels" plugin:
```patch
diff --git a/electrum/plugins/labels/__init__.py b/electrum/plugins/labels/__init__.py
index b68127df8e..0d6d95abce 100644
--- a/electrum/plugins/labels/__init__.py
+++ b/electrum/plugins/labels/__init__.py
@@ -21,3 +21,5 @@ async def pull(self: 'Commands', plugin: 'LabelsPlugin' = None, wallet=None, for
     arg:bool:force:pull all labels
     """
     return await plugin.pull_thread(wallet, force=force)
+
+raise Exception("heyheyhey")

```

I would expect we don't load the labels plugin due to the error, but we do:
```
>>> plugins.get_plugin("labels")
<electrum.plugins.labels.qt.Plugin object at 0x7801df30fb50>
```

Log:
```
$ ./run_electrum -v --testnet -o
  0.75 | I | simple_config.SimpleConfig | electrum directory /home/user/.electrum/testnet
  0.75 | E | p/plugin.Plugins | cannot initialize plugin labels: Error pre-loading electrum.plugins.labels: Exception('heyheyhey')
Traceback (most recent call last):
  File "/home/user/wspace/electrum/electrum/plugin.py", line 148, in exec_module_from_spec
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/home/user/wspace/electrum/electrum/plugins/labels/__init__.py", line 25, in <module>
    raise Exception("heyheyhey")
Exception: heyheyhey

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/wspace/electrum/electrum/plu... (continued)
Pull Request #9655: plugins: better handle exceptions in `__init__`

3 of 7 new or added lines in 1 file covered. (42.86%)

2 existing lines in 2 files now uncovered.

21366 of 34963 relevant lines covered (61.11%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.09
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import traceback
5✔
33
import sys
5✔
34
import aiohttp
5✔
35
import zipfile as zipfile_lib
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from concurrent import futures
5✔
42
from functools import wraps, partial
5✔
43
from itertools import chain
5✔
44

45
from .i18n import _
5✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
47
from . import bip32
5✔
48
from . import plugins
5✔
49
from .simple_config import ConfigVar, SimpleConfig
5✔
50
from .logging import get_logger, Logger
5✔
51
from .crypto import sha256
5✔
52

53
if TYPE_CHECKING:
5✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
5✔
60
plugin_loaders = {}
5✔
61
hook_names = set()
5✔
62
hooks = {}
5✔
63
_root_permission_cache = {}
5✔
64
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
65

66

67
class Plugins(DaemonThread):
5✔
68

69
    LOGGING_SHORTCUT = 'p'
5✔
70
    pkgpath = os.path.dirname(plugins.__file__)
5✔
71

72
    @profiler
5✔
73
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
74
        self.config = config
5✔
75
        self.cmd_only = cmd_only  # type: bool
5✔
76
        self.internal_plugin_metadata = {}
5✔
77
        self.external_plugin_metadata = {}
5✔
78
        if cmd_only:
5✔
79
            # only import the command modules of plugins
80
            Logger.__init__(self)
×
81
            self.find_plugins()
×
82
            self.load_plugins()
×
83
            return
×
84
        DaemonThread.__init__(self)
5✔
85
        self.device_manager = DeviceMgr(config)
5✔
86
        self.name = 'Plugins'  # set name of thread
5✔
87
        self.hw_wallets = {}
5✔
88
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
89
        self.gui_name = gui_name
5✔
90
        self.find_plugins()
5✔
91
        self.load_plugins()
5✔
92
        self.add_jobs(self.device_manager.thread_jobs())
5✔
93
        self.start()
5✔
94

95
    @property
5✔
96
    def descriptions(self):
5✔
97
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
98

99
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
100
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
101
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
102
        for loader, name, ispkg in iter_modules:
5✔
103
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
104
            #       once as data and once as code. To honor the "no duplicates" rule below,
105
            #       we exclude the ones packaged as *code*, here:
106
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
107
                continue
×
108
            module_path = os.path.join(pkg_path, name)
5✔
109
            if external and not self._has_recursive_root_permissions(module_path):
5✔
110
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
111
                continue
×
112
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
113
                continue
×
114
            try:
5✔
115
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
116
                    d = json.load(f)
5✔
117
            except FileNotFoundError:
5✔
118
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
119
                continue
5✔
120
            if 'fullname' not in d:
5✔
121
                continue
×
122
            d['display_name'] = d['fullname']
5✔
123
            d['path'] = module_path
5✔
124
            if not self.cmd_only:
5✔
125
                gui_good = self.gui_name in d.get('available_for', [])
5✔
126
                if not gui_good:
5✔
127
                    continue
5✔
128
                details = d.get('registers_wallet_type')
5✔
129
                if details:
5✔
130
                    self.register_wallet_type(name, gui_good, details)
5✔
131
                details = d.get('registers_keystore')
5✔
132
                if details:
5✔
133
                    self.register_keystore(name, gui_good, details)
5✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
5✔
138
                self.internal_plugin_metadata[name] = d
5✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
5✔
143
    def exec_module_from_spec(spec, path: str):
5✔
144
        if prev_fail := _exec_module_failure.get(path):
5✔
NEW
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
5✔
147
            module = importlib.util.module_from_spec(spec)
5✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
5✔
151
            spec.loader.exec_module(module)
5✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
NEW
157
            _exec_module_failure[path] = e
×
NEW
158
            if path in sys.modules:
×
NEW
159
                sys.modules.pop(path, None)
×
UNCOV
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
5✔
162

163
    def find_plugins(self):
5✔
164
        internal_plugins_path = (self.pkgpath, False)
5✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
167
            # external plugins enforce root permissions on the directory
168
            if pkg_path and os.path.exists(pkg_path):
5✔
169
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
170
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
171

172
    def load_plugins(self):
5✔
173
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
174
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
175
                try:
×
176
                    if self.cmd_only:  # only load init method to register commands
×
177
                        self.maybe_load_plugin_init_method(name)
×
178
                    else:
179
                        self.load_plugin_by_name(name)
×
180
                except BaseException as e:
×
181
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
182

183
    def _has_root_permissions(self, path):
5✔
184
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
185

186
    @profiler(min_threshold=0.5)
5✔
187
    def _has_recursive_root_permissions(self, path):
5✔
188
        """Check if a directory and all its subdirectories have root permissions"""
189
        global _root_permission_cache
190
        if _root_permission_cache.get(path) is not None:
×
191
            return _root_permission_cache[path]
×
192
        _root_permission_cache[path] = False
×
193
        for root, dirs, files in os.walk(path):
×
194
            if not self._has_root_permissions(root):
×
195
                return False
×
196
            for f in files:
×
197
                if not self._has_root_permissions(os.path.join(root, f)):
×
198
                    return False
×
199
        _root_permission_cache[path] = True
×
200
        return True
×
201

202
    def get_external_plugin_dir(self):
5✔
203
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
204
            return
×
205
        pkg_path = '/opt/electrum_plugins'
5✔
206
        if not os.path.exists(pkg_path):
5✔
207
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
208
            return
5✔
209
        if not self._has_root_permissions(pkg_path):
×
210
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
211
            return
×
212
        return pkg_path
×
213

214
    def zip_plugin_path(self, name):
5✔
215
        filename = self.get_metadata(name)['filename']
×
216
        if name in self.internal_plugin_metadata:
×
217
            pkg_path = self.pkgpath
×
218
        else:
219
            pkg_path = self.get_external_plugin_dir()
×
220
        return os.path.join(pkg_path, filename)
×
221

222
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
223
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
224
        if pkg_path is None:
5✔
225
            return
×
226
        for filename in os.listdir(pkg_path):
5✔
227
            path = os.path.join(pkg_path, filename)
5✔
228
            if not filename.endswith('.zip'):
5✔
229
                continue
5✔
230
            if external and not self._has_root_permissions(path):
×
231
                self.logger.info(f'not loading {path}: file has user write permissions')
×
232
                continue
×
233
            try:
×
234
                zipfile = zipimport.zipimporter(path)
×
235
            except zipimport.ZipImportError:
×
236
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
237
                continue
×
238
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
239
                if b is False:
×
240
                    continue
×
241
                if name in self.internal_plugin_metadata:
×
242
                    raise Exception(f"duplicate plugins for name={name}")
×
243
                if name in self.external_plugin_metadata:
×
244
                    raise Exception(f"duplicate plugins for name={name}")
×
245
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
246
                    continue
×
247
                try:
×
248
                    with zipfile_lib.ZipFile(path) as file:
×
249
                        manifest_path = os.path.join(name, 'manifest.json')
×
250
                        with file.open(manifest_path, 'r') as f:
×
251
                            d = json.load(f)
×
252
                except Exception:
×
253
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
254
                    continue
×
255
                d['filename'] = filename
×
256
                d['is_zip'] = True
×
257
                d['path'] = path
×
258
                if not self.cmd_only:
×
259
                    gui_good = self.gui_name in d.get('available_for', [])
×
260
                    if not gui_good:
×
261
                        continue
×
262
                    if 'fullname' not in d:
×
263
                        continue
×
264
                    d['display_name'] = d['fullname']
×
265
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
266
                if external:
×
267
                    self.external_plugin_metadata[name] = d
×
268
                else:
269
                    self.internal_plugin_metadata[name] = d
×
270

271
    def get(self, name):
5✔
272
        return self.plugins.get(name)
×
273

274
    def count(self):
5✔
275
        return len(self.plugins)
×
276

277
    def load_plugin(self, name) -> 'BasePlugin':
5✔
278
        """Imports the code of the given plugin.
279
        note: can be called from any thread.
280
        """
281
        if self.get_metadata(name):
5✔
282
            return self.load_plugin_by_name(name)
5✔
283
        else:
284
            raise Exception(f"could not find plugin {name!r}")
×
285

286
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
287
        """Loads the __init__.py module of the plugin if it is not already loaded."""
288
        is_external = name in self.external_plugin_metadata
5✔
289
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
290
        if base_name not in sys.modules:
5✔
291
            metadata = self.get_metadata(name)
5✔
292
            is_zip = metadata.get('is_zip', False)
5✔
293
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
294
            if not is_zip:
5✔
295
                if is_external:
5✔
296
                    path = os.path.join(metadata['path'], '__init__.py')
×
297
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
298
                else:
299
                    init_spec = importlib.util.find_spec(base_name)
5✔
300
            else:
301
                zipfile = zipimport.zipimporter(metadata['path'])
×
302
                init_spec = zipfile.find_spec(name)
×
303
            module = self.exec_module_from_spec(init_spec, base_name)
5✔
304
            # import config vars
305
            if hasattr(module, 'config_vars'):
5✔
306
                for cv in module.config_vars:
×
307
                    setattr(SimpleConfig, cv.key().upper(), cv)
×
308
            if name == "trustedcoin":
5✔
309
                # removes trustedcoin after loading to not show it in the list of plugins
310
                del self.internal_plugin_metadata[name]
×
311

312
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
313
        if name in self.plugins:
5✔
314
            return self.plugins[name]
×
315

316
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
317
        self.maybe_load_plugin_init_method(name)
5✔
318

319
        is_external = name in self.external_plugin_metadata
5✔
320
        if not is_external:
5✔
321
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
322
        else:
323
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
324

325
        spec = importlib.util.find_spec(full_name)
5✔
326
        if spec is None:
5✔
327
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
328
        try:
5✔
329
            module = self.exec_module_from_spec(spec, full_name)
5✔
330
            plugin = module.Plugin(self, self.config, name)
5✔
331
        except Exception as e:
×
332
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
333
        self.add_jobs(plugin.thread_jobs())
5✔
334
        self.plugins[name] = plugin
5✔
335
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
336
        return plugin
5✔
337

338
    def close_plugin(self, plugin):
5✔
339
        self.remove_jobs(plugin.thread_jobs())
×
340

341
    def enable(self, name: str) -> 'BasePlugin':
5✔
342
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
343
        p = self.get(name)
×
344
        if p:
×
345
            return p
×
346
        return self.load_plugin(name)
×
347

348
    def disable(self, name: str) -> None:
5✔
349
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
350
        p = self.get(name)
×
351
        if not p:
×
352
            return
×
353
        self.plugins.pop(name)
×
354
        p.close()
×
355
        self.logger.info(f"closed {name}")
×
356

357
    @classmethod
5✔
358
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
359
        return key.startswith('enable_plugin_')
×
360

361
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
362
        p = self.get(name)
×
363
        return self.disable(name) if p else self.enable(name)
×
364

365
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
366
        d = self.descriptions.get(name)
×
367
        if not d:
×
368
            return False
×
369
        deps = d.get('requires', [])
×
370
        for dep, s in deps:
×
371
            try:
×
372
                __import__(dep)
×
373
            except ImportError as e:
×
374
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
375
                return False
×
376
        requires = d.get('requires_wallet_type', [])
×
377
        return not requires or wallet.wallet_type in requires
×
378

379
    def get_hardware_support(self):
5✔
380
        out = []
×
381
        for name, (gui_good, details) in self.hw_wallets.items():
×
382
            if gui_good:
×
383
                try:
×
384
                    p = self.get_plugin(name)
×
385
                    if p.is_available():
×
386
                        out.append(HardwarePluginToScan(name=name,
×
387
                                                        description=details[2],
388
                                                        plugin=p,
389
                                                        exception=None))
390
                except Exception as e:
×
391
                    self.logger.exception(f"cannot load plugin for: {name}")
×
392
                    out.append(HardwarePluginToScan(name=name,
×
393
                                                    description=details[2],
394
                                                    plugin=None,
395
                                                    exception=e))
396
        return out
×
397

398
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
399
        from .wallet import register_wallet_type, register_constructor
5✔
400
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
401

402
        def loader():
5✔
403
            plugin = self.get_plugin(name)
5✔
404
            register_constructor(wallet_type, plugin.wallet_class)
5✔
405
        register_wallet_type(wallet_type)
5✔
406
        plugin_loaders[wallet_type] = loader
5✔
407

408
    def register_keystore(self, name, gui_good, details):
5✔
409
        from .keystore import register_keystore
5✔
410

411
        def dynamic_constructor(d):
5✔
412
            return self.get_plugin(name).keystore_class(d)
5✔
413
        if details[0] == 'hardware':
5✔
414
            self.hw_wallets[name] = (gui_good, details)
5✔
415
            self.logger.info(f"registering hardware {name}: {details}")
5✔
416
            register_keystore(details[1], dynamic_constructor)
5✔
417

418
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
419
        if name not in self.plugins:
5✔
420
            self.load_plugin(name)
5✔
421
        return self.plugins[name]
5✔
422

423
    def is_plugin_zip(self, name: str) -> bool:
5✔
424
        """Returns True if the plugin is a zip file"""
425
        if (metadata := self.get_metadata(name)) is None:
×
426
            return False
×
427
        return metadata.get('is_zip', False)
×
428

429
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
430
        """Returns the metadata of the plugin"""
431
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
432
        if not metadata:
5✔
433
            return None
×
434
        return metadata
5✔
435

436
    def run(self):
5✔
437
        while self.is_running():
5✔
438
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
439
            self.run_jobs()
5✔
440
        self.on_stop()
5✔
441

442

443
def get_file_hash256(path: str) -> str:
5✔
444
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
445
    with open(path, 'rb') as f:
×
446
        return sha256(f.read()).hex()
×
447

448
def hook(func):
5✔
449
    hook_names.add(func.__name__)
5✔
450
    return func
5✔
451

452

453
def run_hook(name, *args):
5✔
454
    results = []
5✔
455
    f_list = hooks.get(name, [])
5✔
456
    for p, f in f_list:
5✔
457
        if p.is_enabled():
5✔
458
            try:
5✔
459
                r = f(*args)
5✔
460
            except Exception:
×
461
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
462
                r = False
×
463
            if r:
5✔
464
                results.append(r)
×
465

466
    if results:
5✔
467
        assert len(results) == 1, results
×
468
        return results[0]
×
469

470

471
class BasePlugin(Logger):
5✔
472

473
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
474
        self.parent = parent  # type: Plugins  # The plugins object
5✔
475
        self.name = name
5✔
476
        self.config = config
5✔
477
        self.wallet = None  # fixme: this field should not exist
5✔
478
        Logger.__init__(self)
5✔
479
        # add self to hooks
480
        for k in dir(self):
5✔
481
            if k in hook_names:
5✔
482
                l = hooks.get(k, [])
5✔
483
                l.append((self, getattr(self, k)))
5✔
484
                hooks[k] = l
5✔
485

486
    def __str__(self):
5✔
487
        return self.name
×
488

489
    def close(self):
5✔
490
        # remove self from hooks
491
        for attr_name in dir(self):
×
492
            if attr_name in hook_names:
×
493
                # found attribute in self that is also the name of a hook
494
                l = hooks.get(attr_name, [])
×
495
                try:
×
496
                    l.remove((self, getattr(self, attr_name)))
×
497
                except ValueError:
×
498
                    # maybe attr name just collided with hook name and was not hook
499
                    continue
×
500
                hooks[attr_name] = l
×
501
        self.parent.close_plugin(self)
×
502
        self.on_close()
×
503

504
    def on_close(self):
5✔
505
        pass
×
506

507
    def requires_settings(self) -> bool:
5✔
508
        return False
×
509

510
    def thread_jobs(self):
5✔
511
        return []
5✔
512

513
    def is_enabled(self):
5✔
514
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
515

516
    def is_available(self):
5✔
517
        return True
×
518

519
    def can_user_disable(self):
5✔
520
        return True
×
521

522
    def settings_widget(self, window):
5✔
523
        raise NotImplementedError()
×
524

525
    def settings_dialog(self, window):
5✔
526
        raise NotImplementedError()
×
527

528
    def read_file(self, filename: str) -> bytes:
5✔
529
        if self.parent.is_plugin_zip(self.name):
×
530
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
531
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
532
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
533
                    return myfile.read()
×
534
        else:
535
            if self.name in self.parent.internal_plugin_metadata:
×
536
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
537
            else:
538
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
539
            with open(path, 'rb') as myfile:
×
540
                return myfile.read()
×
541

542

543
class DeviceUnpairableError(UserFacingException): pass
5✔
544
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
545
class CannotAutoSelectDevice(Exception): pass
5✔
546

547

548
class Device(NamedTuple):
5✔
549
    path: Union[str, bytes]
5✔
550
    interface_number: int
5✔
551
    id_: str
5✔
552
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
553
    usage_page: int
5✔
554
    transport_ui_string: str
5✔
555

556

557
class DeviceInfo(NamedTuple):
5✔
558
    device: Device
5✔
559
    label: Optional[str] = None
5✔
560
    initialized: Optional[bool] = None
5✔
561
    exception: Optional[Exception] = None
5✔
562
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
563
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
564
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
565

566

567
class HardwarePluginToScan(NamedTuple):
5✔
568
    name: str
5✔
569
    description: str
5✔
570
    plugin: Optional['HW_PluginBase']
5✔
571
    exception: Optional[Exception]
5✔
572

573

574
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
575

576

577
# hidapi is not thread-safe
578
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
579
#     https://github.com/libusb/hidapi/issues/45
580
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
581
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
582
# It is not entirely clear to me, exactly what is safe and what isn't, when
583
# using multiple threads...
584
# Hence, we use a single thread for all device communications, including
585
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
586
# the following thread:
587
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
588
    max_workers=1,
589
    thread_name_prefix='hwd_comms_thread'
590
)
591

592
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
593
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
594
# To keep it simple, let's just import it now, as we are likely in the main thread here.
595
if threading.current_thread() is not threading.main_thread():
5✔
596
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
597
try:
5✔
598
    import hid
5✔
599
except ImportError:
5✔
600
    pass
5✔
601

602

603
T = TypeVar('T')
5✔
604

605

606
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
607
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
608
        return func()
×
609
    else:
610
        fut = _hwd_comms_executor.submit(func)
×
611
        return fut.result()
×
612
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
613

614

615
def runs_in_hwd_thread(func):
5✔
616
    @wraps(func)
5✔
617
    def wrapper(*args, **kwargs):
5✔
618
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
619
    return wrapper
5✔
620

621

622
def assert_runs_in_hwd_thread():
5✔
623
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
624
        raise Exception("must only be called from HWD communication thread")
×
625

626

627
class DeviceMgr(ThreadJob):
5✔
628
    """Manages hardware clients.  A client communicates over a hardware
629
    channel with the device.
630

631
    In addition to tracking device HID IDs, the device manager tracks
632
    hardware wallets and manages wallet pairing.  A HID ID may be
633
    paired with a wallet when it is confirmed that the hardware device
634
    matches the wallet, i.e. they have the same master public key.  A
635
    HID ID can be unpaired if e.g. it is wiped.
636

637
    Because of hotplugging, a wallet must request its client
638
    dynamically each time it is required, rather than caching it
639
    itself.
640

641
    The device manager is shared across plugins, so just one place
642
    does hardware scans when needed.  By tracking HID IDs, if a device
643
    is plugged into a different port the wallet is automatically
644
    re-paired.
645

646
    Wallets are informed on connect / disconnect events.  It must
647
    implement connected(), disconnected() callbacks.  Being connected
648
    implies a pairing.  Callbacks can happen in any thread context,
649
    and we do them without holding the lock.
650

651
    Confusingly, the HID ID (serial number) reported by the HID system
652
    doesn't match the device ID reported by the device itself.  We use
653
    the HID IDs.
654

655
    This plugin is thread-safe.  Currently only devices supported by
656
    hidapi are implemented."""
657

658
    def __init__(self, config: SimpleConfig):
5✔
659
        ThreadJob.__init__(self)
5✔
660
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
661
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
662
        # A client->id_ map. Needs self.lock.
663
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
664
        # What we recognise.  (vendor_id, product_id) -> Plugin
665
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
666
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
667
        # Custom enumerate functions for devices we don't know about.
668
        self._enumerate_func = set()  # Needs self.lock.
5✔
669

670
        self.lock = threading.RLock()
5✔
671

672
        self.config = config
5✔
673

674
    def thread_jobs(self):
5✔
675
        # Thread job to handle device timeouts
676
        return [self]
5✔
677

678
    def run(self):
5✔
679
        '''Handle device timeouts.  Runs in the context of the Plugins
680
        thread.'''
681
        with self.lock:
5✔
682
            clients = list(self.clients.keys())
5✔
683
        cutoff = time.time() - self.config.get_session_timeout()
5✔
684
        for client in clients:
5✔
685
            client.timeout(cutoff)
×
686

687
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
688
        for pair in device_pairs:
×
689
            self._recognised_hardware[pair] = plugin
×
690

691
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
692
        for vendor_id in vendor_ids:
×
693
            self._recognised_vendor[vendor_id] = plugin
×
694

695
    def register_enumerate_func(self, func):
5✔
696
        with self.lock:
×
697
            self._enumerate_func.add(func)
×
698

699
    @runs_in_hwd_thread
5✔
700
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
701
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
702
        # Get from cache first
703
        client = self._client_by_id(device.id_)
×
704
        if client:
×
705
            return client
×
706
        client = plugin.create_client(device, handler)
×
707
        if client:
×
708
            self.logger.info(f"Registering {client}")
×
709
            with self.lock:
×
710
                self.clients[client] = device.id_
×
711
        return client
×
712

713
    def id_by_pairing_code(self, pairing_code):
5✔
714
        with self.lock:
×
715
            return self.pairing_code_to_id.get(pairing_code)
×
716

717
    def pairing_code_by_id(self, id_):
5✔
718
        with self.lock:
×
719
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
720
                if id2 == id_:
×
721
                    return pairing_code
×
722
            return None
×
723

724
    def unpair_pairing_code(self, pairing_code):
5✔
725
        with self.lock:
×
726
            if pairing_code not in self.pairing_code_to_id:
×
727
                return
×
728
            _id = self.pairing_code_to_id.pop(pairing_code)
×
729
        self._close_client(_id)
×
730

731
    def unpair_id(self, id_):
5✔
732
        pairing_code = self.pairing_code_by_id(id_)
×
733
        if pairing_code:
×
734
            self.unpair_pairing_code(pairing_code)
×
735
        else:
736
            self._close_client(id_)
×
737

738
    def _close_client(self, id_):
5✔
739
        with self.lock:
×
740
            client = self._client_by_id(id_)
×
741
            self.clients.pop(client, None)
×
742
        if client:
×
743
            client.close()
×
744

745
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
746
        with self.lock:
×
747
            for client, client_id in self.clients.items():
×
748
                if client_id == id_:
×
749
                    return client
×
750
        return None
×
751

752
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
753
        '''Returns a client for the device ID if one is registered.  If
754
        a device is wiped or in bootloader mode pairing is impossible;
755
        in such cases we communicate by device ID and not wallet.'''
756
        if scan_now:
×
757
            self.scan_devices()
×
758
        return self._client_by_id(id_)
×
759

760
    @runs_in_hwd_thread
5✔
761
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
762
                            keystore: 'Hardware_KeyStore',
763
                            force_pair: bool, *,
764
                            devices: Sequence['Device'] = None,
765
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
766
        self.logger.info("getting client for keystore")
×
767
        if handler is None:
×
768
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
769
        handler.update_status(False)
×
770
        pcode = keystore.pairing_code()
×
771
        client = None
×
772
        # search existing clients first (fast-path)
773
        if not devices:
×
774
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
775
        # search clients again, now allowing a (slow) scan
776
        if client is None:
×
777
            if devices is None:
×
778
                devices = self.scan_devices()
×
779
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
780
        if client is None and force_pair:
×
781
            try:
×
782
                info = self.select_device(plugin, handler, keystore, devices,
×
783
                                          allow_user_interaction=allow_user_interaction)
784
            except CannotAutoSelectDevice:
×
785
                pass
×
786
            else:
787
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
788
        if client:
×
789
            handler.update_status(True)
×
790
            # note: if select_device was called, we might also update label etc here:
791
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
792
        self.logger.info("end client for keystore")
×
793
        return client
×
794

795
    def client_by_pairing_code(
5✔
796
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
797
        devices: Sequence['Device'],
798
    ) -> Optional['HardwareClientBase']:
799
        _id = self.id_by_pairing_code(pairing_code)
×
800
        client = self._client_by_id(_id)
×
801
        if client:
×
802
            if type(client.plugin) != type(plugin):
×
803
                return
×
804
            # An unpaired client might have another wallet's handler
805
            # from a prior scan.  Replace to fix dialog parenting.
806
            client.handler = handler
×
807
            return client
×
808

809
        for device in devices:
×
810
            if device.id_ == _id:
×
811
                return self.create_client(device, handler, plugin)
×
812

813
    def force_pair_keystore(
5✔
814
        self,
815
        *,
816
        plugin: 'HW_PluginBase',
817
        handler: 'HardwareHandlerBase',
818
        info: 'DeviceInfo',
819
        keystore: 'Hardware_KeyStore',
820
    ) -> 'HardwareClientBase':
821
        xpub = keystore.xpub
×
822
        derivation = keystore.get_derivation_prefix()
×
823
        assert derivation is not None
×
824
        xtype = bip32.xpub_type(xpub)
×
825
        client = self._client_by_id(info.device.id_)
×
826
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
827
            # See comment above for same code
828
            client.handler = handler
×
829
            # This will trigger a PIN/passphrase entry request
830
            try:
×
831
                client_xpub = client.get_xpub(derivation, xtype)
×
832
            except (UserCancelled, RuntimeError):
×
833
                # Bad / cancelled PIN / passphrase
834
                client_xpub = None
×
835
            if client_xpub == xpub:
×
836
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
837
                with self.lock:
×
838
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
839
                return client
×
840

841
        # The user input has wrong PIN or passphrase, or cancelled input,
842
        # or it is not pairable
843
        raise DeviceUnpairableError(
×
844
            _('Electrum cannot pair with your {}.\n\n'
845
              'Before you request bitcoins to be sent to addresses in this '
846
              'wallet, ensure you can pair with your device, or that you have '
847
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
848
              'receive will be unspendable.').format(plugin.device))
849

850
    def list_pairable_device_infos(
5✔
851
        self,
852
        *,
853
        handler: Optional['HardwareHandlerBase'],
854
        plugin: 'HW_PluginBase',
855
        devices: Sequence['Device'] = None,
856
        include_failing_clients: bool = False,
857
    ) -> List['DeviceInfo']:
858
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
859
        Already paired devices are also included, as it is okay to reuse them.
860
        """
861
        if not plugin.libraries_available:
×
862
            message = plugin.get_library_not_available_message()
×
863
            raise HardwarePluginLibraryUnavailable(message)
×
864
        if devices is None:
×
865
            devices = self.scan_devices()
×
866
        infos = []
×
867
        for device in devices:
×
868
            if not plugin.can_recognize_device(device):
×
869
                continue
×
870
            try:
×
871
                client = self.create_client(device, handler, plugin)
×
872
                if not client:
×
873
                    continue
×
874
                label = client.label()
×
875
                is_initialized = client.is_initialized()
×
876
                soft_device_id = client.get_soft_device_id()
×
877
                model_name = client.device_model_name()
×
878
            except Exception as e:
×
879
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
880
                if include_failing_clients:
×
881
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
882
                continue
×
883
            infos.append(DeviceInfo(device=device,
×
884
                                    label=label,
885
                                    initialized=is_initialized,
886
                                    plugin_name=plugin.name,
887
                                    soft_device_id=soft_device_id,
888
                                    model_name=model_name))
889

890
        return infos
×
891

892
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
893
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
894
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
895
        """Select the device to use for keystore."""
896
        # ideally this should not be called from the GUI thread...
897
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
898
        while True:
×
899
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
900
            if infos:
×
901
                break
×
902
            if not allow_user_interaction:
×
903
                raise CannotAutoSelectDevice()
×
904
            msg = _('Please insert your {}').format(plugin.device)
×
905
            msg += " ("
×
906
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
907
                msg += f"label: {keystore.label}, "
×
908
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
909
            msg += ').\n\n{}\n\n{}'.format(
×
910
                _('Verify the cable is connected and that '
911
                  'no other application is using it.'),
912
                _('Try to connect again?')
913
            )
914
            if not handler.yes_no_question(msg):
×
915
                raise UserCancelled()
×
916
            devices = None
×
917

918
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
919
        # method 1: select device by id
920
        if keystore.soft_device_id:
×
921
            for info in infos:
×
922
                if info.soft_device_id == keystore.soft_device_id:
×
923
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
924
                    return info
×
925
        # method 2: select device by label
926
        #           but only if not a placeholder label and only if there is no collision
927
        device_labels = [info.label for info in infos]
×
928
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
929
                and device_labels.count(keystore.label) == 1):
930
            for info in infos:
×
931
                if info.label == keystore.label:
×
932
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
933
                    return info
×
934
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
935
        #           saved for keystore anyway, select it
936
        if (len(infos) == 1
×
937
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
938
                and keystore.soft_device_id is None):
939
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
940
            return infos[0]
×
941

942
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
943
        if not allow_user_interaction:
×
944
            raise CannotAutoSelectDevice()
×
945
        # ask user to select device manually
946
        msg = (
×
947
                _("Could not automatically pair with device for given keystore.") + "\n"
948
                + f"(keystore label: {keystore.label!r}, "
949
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
950
        msg += _("Please select which {} device to use:").format(plugin.device)
×
951
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
952
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
953
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
954
                                init=(_("initialized") if info.initialized else _("wiped")),
955
                                transport=info.device.transport_ui_string,
956
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
957
                        for info in infos]
958
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
959
                          f"num options: {len(infos)}. options: {infos}")
960
        c = handler.query_choice(msg, descriptions)
×
961
        if c is None:
×
962
            raise UserCancelled()
×
963
        info = infos[c]
×
964
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
965
        # note: updated label/soft_device_id will be saved after pairing succeeds
966
        return info
×
967

968
    @runs_in_hwd_thread
5✔
969
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
970
        try:
×
971
            import hid
×
972
        except ImportError:
×
973
            return []
×
974

975
        devices = []
×
976
        for d in hid.enumerate(0, 0):
×
977
            vendor_id = d['vendor_id']
×
978
            product_key = (vendor_id, d['product_id'])
×
979
            plugin = None
×
980
            if product_key in self._recognised_hardware:
×
981
                plugin = self._recognised_hardware[product_key]
×
982
            elif vendor_id in self._recognised_vendor:
×
983
                plugin = self._recognised_vendor[vendor_id]
×
984
            if plugin:
×
985
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
986
                if device:
×
987
                    devices.append(device)
×
988
        return devices
×
989

990
    @runs_in_hwd_thread
5✔
991
    @profiler
5✔
992
    def scan_devices(self) -> Sequence['Device']:
5✔
993
        self.logger.info("scanning devices...")
×
994

995
        # First see what's connected that we know about
996
        devices = self._scan_devices_with_hid()
×
997

998
        # Let plugin handlers enumerate devices we don't know about
999
        with self.lock:
×
1000
            enumerate_funcs = list(self._enumerate_func)
×
1001
        for f in enumerate_funcs:
×
1002
            try:
×
1003
                new_devices = f()
×
1004
            except BaseException as e:
×
1005
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1006
            else:
1007
                devices.extend(new_devices)
×
1008

1009
        # find out what was disconnected
1010
        client_ids = [dev.id_ for dev in devices]
×
1011
        disconnected_clients = []
×
1012
        with self.lock:
×
1013
            connected = {}
×
1014
            for client, id_ in self.clients.items():
×
1015
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1016
                    connected[client] = id_
×
1017
                else:
1018
                    disconnected_clients.append((client, id_))
×
1019
            self.clients = connected
×
1020

1021
        # Unpair disconnected devices
1022
        for client, id_ in disconnected_clients:
×
1023
            self.unpair_id(id_)
×
1024
            if client.handler:
×
1025
                client.handler.update_status(False)
×
1026

1027
        return devices
×
1028

1029
    @classmethod
5✔
1030
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1031
        ret = {}
×
1032
        # add libusb
1033
        try:
×
1034
            import usb1
×
1035
        except Exception as e:
×
1036
            ret["libusb.version"] = None
×
1037
        else:
1038
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1039
            try:
×
1040
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1041
            except AttributeError:
×
1042
                ret["libusb.path"] = None
×
1043
        # add hidapi
1044
        try:
×
1045
            import hid
×
1046
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1047
        except Exception as e:
×
1048
            from importlib.metadata import version
×
1049
            try:
×
1050
                ret["hidapi.version"] = version("hidapi")
×
1051
            except ImportError:
×
1052
                ret["hidapi.version"] = None
×
1053
        return ret
×
1054

1055
    def trigger_pairings(
5✔
1056
            self,
1057
            keystores: Sequence['KeyStore'],
1058
            *,
1059
            allow_user_interaction: bool = True,
1060
            devices: Sequence['Device'] = None,
1061
    ) -> None:
1062
        """Given a list of keystores, try to pair each with a connected hardware device.
1063

1064
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1065
        try to pair each keystore individually. Consider the following scenario:
1066
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1067
        - assume none of the devices are paired yet
1068
        1. if we tried to individually pair keystores, we might try with ks1 first
1069
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1070
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1071
             which is confusing and error-prone. It's especially problematic if the hw device does
1072
             not support labels (such as Ledger), as then the user cannot easily distinguish
1073
             same-type devices. (see #4199)
1074
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1075
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1076
        """
1077
        from .keystore import Hardware_KeyStore
×
1078
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1079
        if not keystores:
×
1080
            return
×
1081
        if devices is None:
×
1082
            devices = self.scan_devices()
×
1083
        # first pair with all devices that can be auto-selected
1084
        for ks in keystores:
×
1085
            try:
×
1086
                ks.get_client(
×
1087
                    force_pair=True,
1088
                    allow_user_interaction=False,
1089
                    devices=devices,
1090
                )
1091
            except UserCancelled:
×
1092
                pass
×
1093
        if allow_user_interaction:
×
1094
            # now do manual selections
1095
            for ks in keystores:
×
1096
                try:
×
1097
                    ks.get_client(
×
1098
                        force_pair=True,
1099
                        allow_user_interaction=True,
1100
                        devices=devices,
1101
                    )
1102
                except UserCancelled:
×
1103
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc