• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4976067045228544

19 Mar 2025 10:35AM UTC coverage: 61.184% (+0.05%) from 61.133%
4976067045228544

Pull #9654

CirrusCI

ecdsa
Commands: add option documentation in docstring.

This allows plugins to document the commands they add.

The docstring is parsed as a regular expression:

    arg:<type>:<name>:<description>\n

Types are defined in commands.arg_types.

Note that this commit removes support for single letter
shortcuts in command options.

If a command is not properly documented, a warning is issued
with print(), because no logger is available at this point.
Pull Request #9654: Improve cmdline help

16 of 64 new or added lines in 1 file covered. (25.0%)

264 existing lines in 5 files now uncovered.

21379 of 34942 relevant lines covered (61.18%)

3.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.98
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41
from itertools import chain
5✔
42

43
from .i18n import _
5✔
44
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
45
from . import bip32
5✔
46
from . import plugins
5✔
47
from .simple_config import SimpleConfig
5✔
48
from .logging import get_logger, Logger
5✔
49
from .crypto import sha256
5✔
50

51
if TYPE_CHECKING:
5✔
UNCOV
52
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
UNCOV
53
    from .keystore import Hardware_KeyStore, KeyStore
×
54
    from .wallet import Abstract_Wallet
×
55

56

57
_logger = get_logger(__name__)
5✔
58
plugin_loaders = {}
5✔
59
hook_names = set()
5✔
60
hooks = {}
5✔
61

62

63
class Plugins(DaemonThread):
5✔
64

65
    LOGGING_SHORTCUT = 'p'
5✔
66
    pkgpath = os.path.dirname(plugins.__file__)
5✔
67

68
    @profiler
5✔
69
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
70
        self.config = config
5✔
71
        self.cmd_only = cmd_only  # type: bool
5✔
72
        self.internal_plugin_metadata = {}
5✔
73
        self.external_plugin_metadata = {}
5✔
74
        self.loaded_command_modules = set()  # type: set[str]
5✔
75
        if cmd_only:
5✔
76
            # only import the command modules of plugins
UNCOV
77
            Logger.__init__(self)
×
UNCOV
78
            self.find_plugins()
×
79
            return
×
80
        DaemonThread.__init__(self)
5✔
81
        self.device_manager = DeviceMgr(config)
5✔
82
        self.name = 'Plugins'  # set name of thread
5✔
83
        self.hw_wallets = {}
5✔
84
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
85
        self.gui_name = gui_name
5✔
86
        self.find_plugins()
5✔
87
        self.load_plugins()
5✔
88
        self.add_jobs(self.device_manager.thread_jobs())
5✔
89
        self.start()
5✔
90

91
    @property
5✔
92
    def descriptions(self):
5✔
UNCOV
93
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
94

95
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
96
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
97
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
98
        for loader, name, ispkg in iter_modules:
5✔
99
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
100
            #       once as data and once as code. To honor the "no duplicates" rule below,
101
            #       we exclude the ones packaged as *code*, here:
102
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
UNCOV
103
                continue
×
104
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
5✔
UNCOV
105
                continue
×
106
            base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
5✔
107
            full_name = f'{base_name}.{name}'
5✔
108
            if external:
5✔
109
                module_path = os.path.join(pkg_path, name)
×
110
                if not self._has_recursive_root_permissions(module_path):
×
UNCOV
111
                    self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
112
                    continue
×
UNCOV
113
                module_path = os.path.join(module_path, '__init__.py')
×
UNCOV
114
                if not os.path.exists(module_path):
×
UNCOV
115
                    continue
×
UNCOV
116
                spec = importlib.util.spec_from_file_location(full_name, module_path)
×
117
            else:
118
                spec = importlib.util.find_spec(full_name)
5✔
119
            if spec is None:
5✔
120
                if self.cmd_only:
×
UNCOV
121
                    continue # no commands module in this plugin
×
UNCOV
122
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
123
            module = self.exec_module_from_spec(spec, full_name)
5✔
124
            if self.cmd_only:
5✔
UNCOV
125
                assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
UNCOV
126
                self.loaded_command_modules.add(name)
×
UNCOV
127
                continue
×
128
            d = module.__dict__
5✔
129
            if 'fullname' not in d:
5✔
130
                continue
5✔
131
            d['display_name'] = d['fullname']
5✔
132
            gui_good = self.gui_name in d.get('available_for', [])
5✔
133
            if not gui_good:
5✔
134
                continue
5✔
135
            details = d.get('registers_wallet_type')
5✔
136
            if details:
5✔
137
                self.register_wallet_type(name, gui_good, details)
5✔
138
            details = d.get('registers_keystore')
5✔
139
            if details:
5✔
140
                self.register_keystore(name, gui_good, details)
5✔
141
            if d.get('requires_wallet_type'):
5✔
142
                # trustedcoin will not be added to list
143
                continue
5✔
144
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
UNCOV
145
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
UNCOV
146
                raise Exception(f"duplicate plugins? for {name=}")
×
147
            if not external:
5✔
148
                self.internal_plugin_metadata[name] = d
5✔
149
            else:
150
                self.external_plugin_metadata[name] = d
×
151

152
    @staticmethod
5✔
153
    def exec_module_from_spec(spec, path):
5✔
154
        try:
5✔
155
            module = importlib.util.module_from_spec(spec)
5✔
156
            # sys.modules needs to be modified for relative imports to work
157
            # see https://stackoverflow.com/a/50395128
158
            sys.modules[path] = module
5✔
159
            spec.loader.exec_module(module)
5✔
UNCOV
160
        except Exception as e:
×
UNCOV
161
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
162
        return module
5✔
163

164
    def find_plugins(self):
5✔
165
        internal_plugins_path = (self.pkgpath, False)
5✔
166
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
167
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
168
            # external plugins enforce root permissions on the directory
169
            if pkg_path and os.path.exists(pkg_path):
5✔
170
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
171
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
UNCOV
176
                try:
×
UNCOV
177
                    self.load_plugin_by_name(name)
×
UNCOV
178
                except BaseException as e:
×
UNCOV
179
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
180

181
    def _has_root_permissions(self, path):
5✔
182
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
183

184
    @profiler(min_threshold=0.5)
5✔
185
    def _has_recursive_root_permissions(self, path):
5✔
186
        """Check if a directory and all its subdirectories have root permissions"""
187
        for root, dirs, files in os.walk(path):
×
188
            if not self._has_root_permissions(root):
×
189
                return False
×
190
            for f in files:
×
UNCOV
191
                if not self._has_root_permissions(os.path.join(root, f)):
×
UNCOV
192
                    return False
×
UNCOV
193
        return True
×
194

195
    def get_external_plugin_dir(self):
5✔
196
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
UNCOV
197
            return
×
198
        pkg_path = '/opt/electrum_plugins'
5✔
199
        if not os.path.exists(pkg_path):
5✔
200
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
201
            return
5✔
202
        if not self._has_root_permissions(pkg_path):
×
UNCOV
203
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
UNCOV
204
            return
×
205
        return pkg_path
×
206

207
    def zip_plugin_path(self, name):
5✔
UNCOV
208
        filename = self.get_metadata(name)['filename']
×
209
        if name in self.internal_plugin_metadata:
×
210
            pkg_path = self.pkgpath
×
211
        else:
UNCOV
212
            pkg_path = self.get_external_plugin_dir()
×
UNCOV
213
        return os.path.join(pkg_path, filename)
×
214

215
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
216
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
217
        if pkg_path is None:
5✔
UNCOV
218
            return
×
219
        for filename in os.listdir(pkg_path):
5✔
220
            path = os.path.join(pkg_path, filename)
5✔
221
            if not filename.endswith('.zip'):
5✔
222
                continue
5✔
223
            if external and not self._has_root_permissions(path):
×
224
                self.logger.info(f'not loading {path}: file has user write permissions')
×
225
                continue
×
226
            try:
×
227
                zipfile = zipimport.zipimporter(path)
×
228
            except zipimport.ZipImportError:
×
229
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
230
                continue
×
231
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
232
                if b is False:
×
233
                    continue
×
234
                if name in self.internal_plugin_metadata:
×
235
                    raise Exception(f"duplicate plugins for name={name}")
×
236
                if name in self.external_plugin_metadata:
×
237
                    raise Exception(f"duplicate plugins for name={name}")
×
238
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
239
                    continue
×
240
                module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
×
241
                spec = zipfile.find_spec(name)
×
242
                module = self.exec_module_from_spec(spec, module_path)
×
243
                if self.cmd_only:
×
244
                    assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
245
                    self.loaded_command_modules.add(name)
×
246
                    continue
×
247
                d = module.__dict__
×
248
                gui_good = self.gui_name in d.get('available_for', [])
×
249
                if not gui_good:
×
250
                    continue
×
251
                d['filename'] = filename
×
252
                if 'fullname' not in d:
×
253
                    continue
×
254
                d['display_name'] = d['fullname']
×
255
                d['zip_hash_sha256'] = get_file_hash256(path)
×
256
                d['is_zip'] = True
×
257
                if external:
×
UNCOV
258
                    self.external_plugin_metadata[name] = d
×
259
                else:
UNCOV
260
                    self.internal_plugin_metadata[name] = d
×
261

262
    def get(self, name):
5✔
UNCOV
263
        return self.plugins.get(name)
×
264

265
    def count(self):
5✔
UNCOV
266
        return len(self.plugins)
×
267

268
    def load_plugin(self, name) -> 'BasePlugin':
5✔
269
        """Imports the code of the given plugin.
270
        note: can be called from any thread.
271
        """
272
        if self.get_metadata(name):
5✔
273
            return self.load_plugin_by_name(name)
5✔
274
        else:
UNCOV
275
            raise Exception(f"could not find plugin {name!r}")
×
276

277
    def load_plugin_by_name(self, name) -> 'BasePlugin':
5✔
278
        if name in self.plugins:
5✔
UNCOV
279
            return self.plugins[name]
×
280

281
        is_zip = self.is_plugin_zip(name)
5✔
282
        is_external = name in self.external_plugin_metadata
5✔
283
        if not is_external:
5✔
284
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
285
        else:
286
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
287

288
        spec = importlib.util.find_spec(full_name)
5✔
289
        if spec is None:
5✔
UNCOV
290
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
291
        try:
5✔
292
            if is_zip:
5✔
UNCOV
293
                module = self.exec_module_from_spec(spec, full_name)
×
294
            else:
295
                module = importlib.util.module_from_spec(spec)
5✔
296
                spec.loader.exec_module(module)
5✔
297
            plugin = module.Plugin(self, self.config, name)
5✔
UNCOV
298
        except Exception as e:
×
UNCOV
299
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
300
        self.add_jobs(plugin.thread_jobs())
5✔
301
        self.plugins[name] = plugin
5✔
302
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
303
        return plugin
5✔
304

305
    def close_plugin(self, plugin):
5✔
UNCOV
306
        self.remove_jobs(plugin.thread_jobs())
×
307

308
    def enable(self, name: str) -> 'BasePlugin':
5✔
309
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
UNCOV
310
        p = self.get(name)
×
UNCOV
311
        if p:
×
UNCOV
312
            return p
×
313
        return self.load_plugin(name)
×
314

315
    def disable(self, name: str) -> None:
5✔
UNCOV
316
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
317
        p = self.get(name)
×
318
        if not p:
×
UNCOV
319
            return
×
UNCOV
320
        self.plugins.pop(name)
×
UNCOV
321
        p.close()
×
UNCOV
322
        self.logger.info(f"closed {name}")
×
323

324
    @classmethod
5✔
325
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
UNCOV
326
        return key.startswith('enable_plugin_')
×
327

328
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
329
        p = self.get(name)
×
330
        return self.disable(name) if p else self.enable(name)
×
331

332
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
UNCOV
333
        d = self.descriptions.get(name)
×
UNCOV
334
        if not d:
×
335
            return False
×
336
        deps = d.get('requires', [])
×
337
        for dep, s in deps:
×
338
            try:
×
339
                __import__(dep)
×
340
            except ImportError as e:
×
341
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
UNCOV
342
                return False
×
UNCOV
343
        requires = d.get('requires_wallet_type', [])
×
UNCOV
344
        return not requires or wallet.wallet_type in requires
×
345

346
    def get_hardware_support(self):
5✔
UNCOV
347
        out = []
×
348
        for name, (gui_good, details) in self.hw_wallets.items():
×
349
            if gui_good:
×
UNCOV
350
                try:
×
UNCOV
351
                    p = self.get_plugin(name)
×
352
                    if p.is_available():
×
353
                        out.append(HardwarePluginToScan(name=name,
×
354
                                                        description=details[2],
355
                                                        plugin=p,
356
                                                        exception=None))
357
                except Exception as e:
×
358
                    self.logger.exception(f"cannot load plugin for: {name}")
×
359
                    out.append(HardwarePluginToScan(name=name,
×
360
                                                    description=details[2],
361
                                                    plugin=None,
362
                                                    exception=e))
363
        return out
×
364

365
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
366
        from .wallet import register_wallet_type, register_constructor
5✔
367
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
368

369
        def loader():
5✔
370
            plugin = self.get_plugin(name)
5✔
371
            register_constructor(wallet_type, plugin.wallet_class)
5✔
372
        register_wallet_type(wallet_type)
5✔
373
        plugin_loaders[wallet_type] = loader
5✔
374

375
    def register_keystore(self, name, gui_good, details):
5✔
376
        from .keystore import register_keystore
5✔
377

378
        def dynamic_constructor(d):
5✔
379
            return self.get_plugin(name).keystore_class(d)
5✔
380
        if details[0] == 'hardware':
5✔
381
            self.hw_wallets[name] = (gui_good, details)
5✔
382
            self.logger.info(f"registering hardware {name}: {details}")
5✔
383
            register_keystore(details[1], dynamic_constructor)
5✔
384

385
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
386
        if name not in self.plugins:
5✔
387
            self.load_plugin(name)
5✔
388
        return self.plugins[name]
5✔
389

390
    def is_plugin_zip(self, name: str) -> bool:
5✔
391
        """Returns True if the plugin is a zip file"""
392
        if (metadata := self.get_metadata(name)) is None:
5✔
393
            return False
5✔
394
        return metadata.get('is_zip', False)
5✔
395

396
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
397
        """Returns the metadata of the plugin"""
398
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
399
        if not metadata:
5✔
400
            return None
5✔
401
        return metadata
5✔
402

403
    def run(self):
5✔
404
        while self.is_running():
5✔
405
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
406
            self.run_jobs()
5✔
407
        self.on_stop()
5✔
408

409

410
def get_file_hash256(path: str) -> str:
5✔
411
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
412
    with open(path, 'rb') as f:
×
413
        return sha256(f.read()).hex()
×
414

415
def hook(func):
5✔
416
    hook_names.add(func.__name__)
5✔
417
    return func
5✔
418

419

420
def run_hook(name, *args):
5✔
421
    results = []
5✔
422
    f_list = hooks.get(name, [])
5✔
423
    for p, f in f_list:
5✔
424
        if p.is_enabled():
5✔
425
            try:
5✔
426
                r = f(*args)
5✔
UNCOV
427
            except Exception:
×
UNCOV
428
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
429
                r = False
×
430
            if r:
5✔
431
                results.append(r)
×
432

433
    if results:
5✔
UNCOV
434
        assert len(results) == 1, results
×
UNCOV
435
        return results[0]
×
436

437

438
class BasePlugin(Logger):
5✔
439

440
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
441
        self.parent = parent  # type: Plugins  # The plugins object
5✔
442
        self.name = name
5✔
443
        self.config = config
5✔
444
        self.wallet = None  # fixme: this field should not exist
5✔
445
        Logger.__init__(self)
5✔
446
        # add self to hooks
447
        for k in dir(self):
5✔
448
            if k in hook_names:
5✔
449
                l = hooks.get(k, [])
5✔
450
                l.append((self, getattr(self, k)))
5✔
451
                hooks[k] = l
5✔
452

453
    def __str__(self):
5✔
454
        return self.name
×
455

456
    def close(self):
5✔
457
        # remove self from hooks
UNCOV
458
        for attr_name in dir(self):
×
UNCOV
459
            if attr_name in hook_names:
×
460
                # found attribute in self that is also the name of a hook
UNCOV
461
                l = hooks.get(attr_name, [])
×
UNCOV
462
                try:
×
UNCOV
463
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
464
                except ValueError:
×
465
                    # maybe attr name just collided with hook name and was not hook
UNCOV
466
                    continue
×
UNCOV
467
                hooks[attr_name] = l
×
UNCOV
468
        self.parent.close_plugin(self)
×
UNCOV
469
        self.on_close()
×
470

471
    def on_close(self):
5✔
UNCOV
472
        pass
×
473

474
    def requires_settings(self) -> bool:
5✔
UNCOV
475
        return False
×
476

477
    def thread_jobs(self):
5✔
478
        return []
5✔
479

480
    def is_enabled(self):
5✔
481
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
482

483
    def is_available(self):
5✔
UNCOV
484
        return True
×
485

486
    def can_user_disable(self):
5✔
487
        return True
×
488

489
    def settings_widget(self, window):
5✔
UNCOV
490
        raise NotImplementedError()
×
491

492
    def settings_dialog(self, window):
5✔
UNCOV
493
        raise NotImplementedError()
×
494

495
    def read_file(self, filename: str) -> bytes:
5✔
UNCOV
496
        import zipfile
×
UNCOV
497
        if self.parent.is_plugin_zip(self.name):
×
UNCOV
498
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
UNCOV
499
            with zipfile.ZipFile(plugin_filename) as myzip:
×
500
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
UNCOV
501
                    return myfile.read()
×
502
        else:
503
            if self.name in self.parent.internal_plugin_metadata:
×
UNCOV
504
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
505
            else:
506
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
UNCOV
507
            with open(path, 'rb') as myfile:
×
UNCOV
508
                return myfile.read()
×
509

510

511
class DeviceUnpairableError(UserFacingException): pass
5✔
512
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
513
class CannotAutoSelectDevice(Exception): pass
5✔
514

515

516
class Device(NamedTuple):
5✔
517
    path: Union[str, bytes]
5✔
518
    interface_number: int
5✔
519
    id_: str
5✔
520
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
521
    usage_page: int
5✔
522
    transport_ui_string: str
5✔
523

524

525
class DeviceInfo(NamedTuple):
5✔
526
    device: Device
5✔
527
    label: Optional[str] = None
5✔
528
    initialized: Optional[bool] = None
5✔
529
    exception: Optional[Exception] = None
5✔
530
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
531
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
532
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
533

534

535
class HardwarePluginToScan(NamedTuple):
5✔
536
    name: str
5✔
537
    description: str
5✔
538
    plugin: Optional['HW_PluginBase']
5✔
539
    exception: Optional[Exception]
5✔
540

541

542
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
543

544

545
# hidapi is not thread-safe
546
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
547
#     https://github.com/libusb/hidapi/issues/45
548
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
549
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
550
# It is not entirely clear to me, exactly what is safe and what isn't, when
551
# using multiple threads...
552
# Hence, we use a single thread for all device communications, including
553
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
554
# the following thread:
555
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
556
    max_workers=1,
557
    thread_name_prefix='hwd_comms_thread'
558
)
559

560
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
561
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
562
# To keep it simple, let's just import it now, as we are likely in the main thread here.
563
if threading.current_thread() is not threading.main_thread():
5✔
UNCOV
564
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
565
try:
5✔
566
    import hid
5✔
567
except ImportError:
5✔
568
    pass
5✔
569

570

571
T = TypeVar('T')
5✔
572

573

574
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
UNCOV
575
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
576
        return func()
×
577
    else:
UNCOV
578
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
579
        return fut.result()
×
580
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
581

582

583
def runs_in_hwd_thread(func):
5✔
584
    @wraps(func)
5✔
585
    def wrapper(*args, **kwargs):
5✔
UNCOV
586
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
587
    return wrapper
5✔
588

589

590
def assert_runs_in_hwd_thread():
5✔
UNCOV
591
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
592
        raise Exception("must only be called from HWD communication thread")
×
593

594

595
class DeviceMgr(ThreadJob):
5✔
596
    """Manages hardware clients.  A client communicates over a hardware
597
    channel with the device.
598

599
    In addition to tracking device HID IDs, the device manager tracks
600
    hardware wallets and manages wallet pairing.  A HID ID may be
601
    paired with a wallet when it is confirmed that the hardware device
602
    matches the wallet, i.e. they have the same master public key.  A
603
    HID ID can be unpaired if e.g. it is wiped.
604

605
    Because of hotplugging, a wallet must request its client
606
    dynamically each time it is required, rather than caching it
607
    itself.
608

609
    The device manager is shared across plugins, so just one place
610
    does hardware scans when needed.  By tracking HID IDs, if a device
611
    is plugged into a different port the wallet is automatically
612
    re-paired.
613

614
    Wallets are informed on connect / disconnect events.  It must
615
    implement connected(), disconnected() callbacks.  Being connected
616
    implies a pairing.  Callbacks can happen in any thread context,
617
    and we do them without holding the lock.
618

619
    Confusingly, the HID ID (serial number) reported by the HID system
620
    doesn't match the device ID reported by the device itself.  We use
621
    the HID IDs.
622

623
    This plugin is thread-safe.  Currently only devices supported by
624
    hidapi are implemented."""
625

626
    def __init__(self, config: SimpleConfig):
5✔
627
        ThreadJob.__init__(self)
5✔
628
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
629
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
630
        # A client->id_ map. Needs self.lock.
631
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
632
        # What we recognise.  (vendor_id, product_id) -> Plugin
633
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
634
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
635
        # Custom enumerate functions for devices we don't know about.
636
        self._enumerate_func = set()  # Needs self.lock.
5✔
637

638
        self.lock = threading.RLock()
5✔
639

640
        self.config = config
5✔
641

642
    def thread_jobs(self):
5✔
643
        # Thread job to handle device timeouts
644
        return [self]
5✔
645

646
    def run(self):
5✔
647
        '''Handle device timeouts.  Runs in the context of the Plugins
648
        thread.'''
649
        with self.lock:
5✔
650
            clients = list(self.clients.keys())
5✔
651
        cutoff = time.time() - self.config.get_session_timeout()
5✔
652
        for client in clients:
5✔
UNCOV
653
            client.timeout(cutoff)
×
654

655
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
656
        for pair in device_pairs:
×
UNCOV
657
            self._recognised_hardware[pair] = plugin
×
658

659
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
UNCOV
660
        for vendor_id in vendor_ids:
×
UNCOV
661
            self._recognised_vendor[vendor_id] = plugin
×
662

663
    def register_enumerate_func(self, func):
5✔
UNCOV
664
        with self.lock:
×
UNCOV
665
            self._enumerate_func.add(func)
×
666

667
    @runs_in_hwd_thread
5✔
668
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
669
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
670
        # Get from cache first
671
        client = self._client_by_id(device.id_)
×
UNCOV
672
        if client:
×
UNCOV
673
            return client
×
674
        client = plugin.create_client(device, handler)
×
675
        if client:
×
UNCOV
676
            self.logger.info(f"Registering {client}")
×
UNCOV
677
            with self.lock:
×
678
                self.clients[client] = device.id_
×
679
        return client
×
680

681
    def id_by_pairing_code(self, pairing_code):
5✔
682
        with self.lock:
×
683
            return self.pairing_code_to_id.get(pairing_code)
×
684

685
    def pairing_code_by_id(self, id_):
5✔
UNCOV
686
        with self.lock:
×
UNCOV
687
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
688
                if id2 == id_:
×
689
                    return pairing_code
×
690
            return None
×
691

692
    def unpair_pairing_code(self, pairing_code):
5✔
693
        with self.lock:
×
694
            if pairing_code not in self.pairing_code_to_id:
×
695
                return
×
696
            _id = self.pairing_code_to_id.pop(pairing_code)
×
697
        self._close_client(_id)
×
698

699
    def unpair_id(self, id_):
5✔
700
        pairing_code = self.pairing_code_by_id(id_)
×
701
        if pairing_code:
×
UNCOV
702
            self.unpair_pairing_code(pairing_code)
×
703
        else:
704
            self._close_client(id_)
×
705

706
    def _close_client(self, id_):
5✔
707
        with self.lock:
×
708
            client = self._client_by_id(id_)
×
UNCOV
709
            self.clients.pop(client, None)
×
UNCOV
710
        if client:
×
711
            client.close()
×
712

713
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
714
        with self.lock:
×
715
            for client, client_id in self.clients.items():
×
UNCOV
716
                if client_id == id_:
×
UNCOV
717
                    return client
×
718
        return None
×
719

720
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
721
        '''Returns a client for the device ID if one is registered.  If
722
        a device is wiped or in bootloader mode pairing is impossible;
723
        in such cases we communicate by device ID and not wallet.'''
UNCOV
724
        if scan_now:
×
725
            self.scan_devices()
×
726
        return self._client_by_id(id_)
×
727

728
    @runs_in_hwd_thread
5✔
729
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
730
                            keystore: 'Hardware_KeyStore',
731
                            force_pair: bool, *,
732
                            devices: Sequence['Device'] = None,
733
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
734
        self.logger.info("getting client for keystore")
×
735
        if handler is None:
×
736
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
737
        handler.update_status(False)
×
UNCOV
738
        pcode = keystore.pairing_code()
×
UNCOV
739
        client = None
×
740
        # search existing clients first (fast-path)
UNCOV
741
        if not devices:
×
742
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
743
        # search clients again, now allowing a (slow) scan
744
        if client is None:
×
UNCOV
745
            if devices is None:
×
UNCOV
746
                devices = self.scan_devices()
×
UNCOV
747
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
UNCOV
748
        if client is None and force_pair:
×
UNCOV
749
            try:
×
UNCOV
750
                info = self.select_device(plugin, handler, keystore, devices,
×
751
                                          allow_user_interaction=allow_user_interaction)
752
            except CannotAutoSelectDevice:
×
753
                pass
×
754
            else:
755
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
756
        if client:
×
757
            handler.update_status(True)
×
758
            # note: if select_device was called, we might also update label etc here:
759
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
760
        self.logger.info("end client for keystore")
×
UNCOV
761
        return client
×
762

763
    def client_by_pairing_code(
5✔
764
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
765
        devices: Sequence['Device'],
766
    ) -> Optional['HardwareClientBase']:
767
        _id = self.id_by_pairing_code(pairing_code)
×
768
        client = self._client_by_id(_id)
×
UNCOV
769
        if client:
×
770
            if type(client.plugin) != type(plugin):
×
771
                return
×
772
            # An unpaired client might have another wallet's handler
773
            # from a prior scan.  Replace to fix dialog parenting.
774
            client.handler = handler
×
775
            return client
×
776

777
        for device in devices:
×
778
            if device.id_ == _id:
×
779
                return self.create_client(device, handler, plugin)
×
780

781
    def force_pair_keystore(
5✔
782
        self,
783
        *,
784
        plugin: 'HW_PluginBase',
785
        handler: 'HardwareHandlerBase',
786
        info: 'DeviceInfo',
787
        keystore: 'Hardware_KeyStore',
788
    ) -> 'HardwareClientBase':
789
        xpub = keystore.xpub
×
UNCOV
790
        derivation = keystore.get_derivation_prefix()
×
UNCOV
791
        assert derivation is not None
×
792
        xtype = bip32.xpub_type(xpub)
×
793
        client = self._client_by_id(info.device.id_)
×
UNCOV
794
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
795
            # See comment above for same code
796
            client.handler = handler
×
797
            # This will trigger a PIN/passphrase entry request
UNCOV
798
            try:
×
UNCOV
799
                client_xpub = client.get_xpub(derivation, xtype)
×
UNCOV
800
            except (UserCancelled, RuntimeError):
×
801
                # Bad / cancelled PIN / passphrase
UNCOV
802
                client_xpub = None
×
UNCOV
803
            if client_xpub == xpub:
×
UNCOV
804
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
805
                with self.lock:
×
UNCOV
806
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
807
                return client
×
808

809
        # The user input has wrong PIN or passphrase, or cancelled input,
810
        # or it is not pairable
811
        raise DeviceUnpairableError(
×
812
            _('Electrum cannot pair with your {}.\n\n'
813
              'Before you request bitcoins to be sent to addresses in this '
814
              'wallet, ensure you can pair with your device, or that you have '
815
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
816
              'receive will be unspendable.').format(plugin.device))
817

818
    def list_pairable_device_infos(
5✔
819
        self,
820
        *,
821
        handler: Optional['HardwareHandlerBase'],
822
        plugin: 'HW_PluginBase',
823
        devices: Sequence['Device'] = None,
824
        include_failing_clients: bool = False,
825
    ) -> List['DeviceInfo']:
826
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
827
        Already paired devices are also included, as it is okay to reuse them.
828
        """
829
        if not plugin.libraries_available:
×
UNCOV
830
            message = plugin.get_library_not_available_message()
×
UNCOV
831
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
832
        if devices is None:
×
UNCOV
833
            devices = self.scan_devices()
×
UNCOV
834
        infos = []
×
UNCOV
835
        for device in devices:
×
UNCOV
836
            if not plugin.can_recognize_device(device):
×
UNCOV
837
                continue
×
UNCOV
838
            try:
×
UNCOV
839
                client = self.create_client(device, handler, plugin)
×
UNCOV
840
                if not client:
×
UNCOV
841
                    continue
×
UNCOV
842
                label = client.label()
×
UNCOV
843
                is_initialized = client.is_initialized()
×
UNCOV
844
                soft_device_id = client.get_soft_device_id()
×
UNCOV
845
                model_name = client.device_model_name()
×
UNCOV
846
            except Exception as e:
×
847
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
848
                if include_failing_clients:
×
849
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
850
                continue
×
851
            infos.append(DeviceInfo(device=device,
×
852
                                    label=label,
853
                                    initialized=is_initialized,
854
                                    plugin_name=plugin.name,
855
                                    soft_device_id=soft_device_id,
856
                                    model_name=model_name))
857

858
        return infos
×
859

860
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
861
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
862
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
863
        """Select the device to use for keystore."""
864
        # ideally this should not be called from the GUI thread...
865
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
866
        while True:
×
867
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
868
            if infos:
×
869
                break
×
UNCOV
870
            if not allow_user_interaction:
×
UNCOV
871
                raise CannotAutoSelectDevice()
×
UNCOV
872
            msg = _('Please insert your {}').format(plugin.device)
×
UNCOV
873
            msg += " ("
×
UNCOV
874
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
UNCOV
875
                msg += f"label: {keystore.label}, "
×
876
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
UNCOV
877
            msg += ').\n\n{}\n\n{}'.format(
×
878
                _('Verify the cable is connected and that '
879
                  'no other application is using it.'),
880
                _('Try to connect again?')
881
            )
UNCOV
882
            if not handler.yes_no_question(msg):
×
UNCOV
883
                raise UserCancelled()
×
884
            devices = None
×
885

886
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
887
        # method 1: select device by id
888
        if keystore.soft_device_id:
×
889
            for info in infos:
×
890
                if info.soft_device_id == keystore.soft_device_id:
×
891
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
892
                    return info
×
893
        # method 2: select device by label
894
        #           but only if not a placeholder label and only if there is no collision
895
        device_labels = [info.label for info in infos]
×
UNCOV
896
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
897
                and device_labels.count(keystore.label) == 1):
UNCOV
898
            for info in infos:
×
UNCOV
899
                if info.label == keystore.label:
×
900
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
901
                    return info
×
902
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
903
        #           saved for keystore anyway, select it
UNCOV
904
        if (len(infos) == 1
×
905
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
906
                and keystore.soft_device_id is None):
907
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
908
            return infos[0]
×
909

910
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
UNCOV
911
        if not allow_user_interaction:
×
UNCOV
912
            raise CannotAutoSelectDevice()
×
913
        # ask user to select device manually
914
        msg = (
×
915
                _("Could not automatically pair with device for given keystore.") + "\n"
916
                + f"(keystore label: {keystore.label!r}, "
917
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
918
        msg += _("Please select which {} device to use:").format(plugin.device)
×
919
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
920
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
921
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
922
                                init=(_("initialized") if info.initialized else _("wiped")),
923
                                transport=info.device.transport_ui_string,
924
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
925
                        for info in infos]
926
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
927
                          f"num options: {len(infos)}. options: {infos}")
928
        c = handler.query_choice(msg, descriptions)
×
929
        if c is None:
×
930
            raise UserCancelled()
×
UNCOV
931
        info = infos[c]
×
932
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
933
        # note: updated label/soft_device_id will be saved after pairing succeeds
UNCOV
934
        return info
×
935

936
    @runs_in_hwd_thread
5✔
937
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
938
        try:
×
UNCOV
939
            import hid
×
UNCOV
940
        except ImportError:
×
UNCOV
941
            return []
×
942

UNCOV
943
        devices = []
×
944
        for d in hid.enumerate(0, 0):
×
UNCOV
945
            vendor_id = d['vendor_id']
×
946
            product_key = (vendor_id, d['product_id'])
×
947
            plugin = None
×
948
            if product_key in self._recognised_hardware:
×
949
                plugin = self._recognised_hardware[product_key]
×
950
            elif vendor_id in self._recognised_vendor:
×
UNCOV
951
                plugin = self._recognised_vendor[vendor_id]
×
952
            if plugin:
×
UNCOV
953
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
UNCOV
954
                if device:
×
UNCOV
955
                    devices.append(device)
×
956
        return devices
×
957

958
    @runs_in_hwd_thread
5✔
959
    @profiler
5✔
960
    def scan_devices(self) -> Sequence['Device']:
5✔
961
        self.logger.info("scanning devices...")
×
962

963
        # First see what's connected that we know about
964
        devices = self._scan_devices_with_hid()
×
965

966
        # Let plugin handlers enumerate devices we don't know about
967
        with self.lock:
×
968
            enumerate_funcs = list(self._enumerate_func)
×
969
        for f in enumerate_funcs:
×
970
            try:
×
971
                new_devices = f()
×
972
            except BaseException as e:
×
973
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
974
            else:
UNCOV
975
                devices.extend(new_devices)
×
976

977
        # find out what was disconnected
UNCOV
978
        client_ids = [dev.id_ for dev in devices]
×
979
        disconnected_clients = []
×
UNCOV
980
        with self.lock:
×
UNCOV
981
            connected = {}
×
982
            for client, id_ in self.clients.items():
×
UNCOV
983
                if id_ in client_ids and client.has_usable_connection_with_device():
×
UNCOV
984
                    connected[client] = id_
×
985
                else:
986
                    disconnected_clients.append((client, id_))
×
987
            self.clients = connected
×
988

989
        # Unpair disconnected devices
990
        for client, id_ in disconnected_clients:
×
991
            self.unpair_id(id_)
×
UNCOV
992
            if client.handler:
×
993
                client.handler.update_status(False)
×
994

UNCOV
995
        return devices
×
996

997
    @classmethod
5✔
998
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
999
        ret = {}
×
1000
        # add libusb
1001
        try:
×
1002
            import usb1
×
UNCOV
1003
        except Exception as e:
×
1004
            ret["libusb.version"] = None
×
1005
        else:
UNCOV
1006
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
UNCOV
1007
            try:
×
1008
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1009
            except AttributeError:
×
1010
                ret["libusb.path"] = None
×
1011
        # add hidapi
UNCOV
1012
        try:
×
1013
            import hid
×
UNCOV
1014
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
1015
        except Exception as e:
×
UNCOV
1016
            from importlib.metadata import version
×
1017
            try:
×
UNCOV
1018
                ret["hidapi.version"] = version("hidapi")
×
1019
            except ImportError:
×
1020
                ret["hidapi.version"] = None
×
1021
        return ret
×
1022

1023
    def trigger_pairings(
5✔
1024
            self,
1025
            keystores: Sequence['KeyStore'],
1026
            *,
1027
            allow_user_interaction: bool = True,
1028
            devices: Sequence['Device'] = None,
1029
    ) -> None:
1030
        """Given a list of keystores, try to pair each with a connected hardware device.
1031

1032
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1033
        try to pair each keystore individually. Consider the following scenario:
1034
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1035
        - assume none of the devices are paired yet
1036
        1. if we tried to individually pair keystores, we might try with ks1 first
1037
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1038
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1039
             which is confusing and error-prone. It's especially problematic if the hw device does
1040
             not support labels (such as Ledger), as then the user cannot easily distinguish
1041
             same-type devices. (see #4199)
1042
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1043
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1044
        """
UNCOV
1045
        from .keystore import Hardware_KeyStore
×
UNCOV
1046
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1047
        if not keystores:
×
UNCOV
1048
            return
×
UNCOV
1049
        if devices is None:
×
UNCOV
1050
            devices = self.scan_devices()
×
1051
        # first pair with all devices that can be auto-selected
UNCOV
1052
        for ks in keystores:
×
UNCOV
1053
            try:
×
UNCOV
1054
                ks.get_client(
×
1055
                    force_pair=True,
1056
                    allow_user_interaction=False,
1057
                    devices=devices,
1058
                )
UNCOV
1059
            except UserCancelled:
×
UNCOV
1060
                pass
×
UNCOV
1061
        if allow_user_interaction:
×
1062
            # now do manual selections
1063
            for ks in keystores:
×
1064
                try:
×
1065
                    ks.get_client(
×
1066
                        force_pair=True,
1067
                        allow_user_interaction=True,
1068
                        devices=devices,
1069
                    )
1070
                except UserCancelled:
×
1071
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc