• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6400884898267136

03 Apr 2025 04:39PM UTC coverage: 61.073% (-0.003%) from 61.076%
6400884898267136

Pull #9681

CirrusCI

f321x
make txbatcher wait for network connection
Pull Request #9681: wallet: make txbatcher wait for network connection

6 of 11 new or added lines in 1 file covered. (54.55%)

1633 existing lines in 12 files now uncovered.

21362 of 34978 relevant lines covered (61.07%)

2.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.12
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
4✔
27
import os
4✔
28
import pkgutil
4✔
29
import importlib.util
4✔
30
import time
4✔
31
import threading
4✔
32
import traceback
4✔
33
import sys
4✔
34
import aiohttp
4✔
35
import zipfile as zipfile_lib
4✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
4✔
40
import zipimport
4✔
41
from concurrent import futures
4✔
42
from functools import wraps, partial
4✔
43
from itertools import chain
4✔
44

45
from .i18n import _
4✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
47
from . import bip32
4✔
48
from . import plugins
4✔
49
from .simple_config import ConfigVar, SimpleConfig
4✔
50
from .logging import get_logger, Logger
4✔
51
from .crypto import sha256
4✔
52

53
if TYPE_CHECKING:
4✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
4✔
60
plugin_loaders = {}
4✔
61
hook_names = set()
4✔
62
hooks = {}
4✔
63
_root_permission_cache = {}
4✔
64
_exec_module_failure = {}  # type: Dict[str, Exception]
4✔
65

66

67
class Plugins(DaemonThread):
4✔
68

69
    LOGGING_SHORTCUT = 'p'
4✔
70
    pkgpath = os.path.dirname(plugins.__file__)
4✔
71

72
    @profiler
4✔
73
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
4✔
74
        self.config = config
4✔
75
        self.cmd_only = cmd_only  # type: bool
4✔
76
        self.internal_plugin_metadata = {}
4✔
77
        self.external_plugin_metadata = {}
4✔
78
        if cmd_only:
4✔
79
            # only import the command modules of plugins
80
            Logger.__init__(self)
×
81
            self.find_plugins()
×
82
            self.load_plugins()
×
83
            return
×
84
        DaemonThread.__init__(self)
4✔
85
        self.device_manager = DeviceMgr(config)
4✔
86
        self.name = 'Plugins'  # set name of thread
4✔
87
        self.hw_wallets = {}
4✔
88
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
89
        self.gui_name = gui_name
4✔
90
        self.find_plugins()
4✔
91
        self.load_plugins()
4✔
92
        self.add_jobs(self.device_manager.thread_jobs())
4✔
93
        self.start()
4✔
94

95
    @property
4✔
96
    def descriptions(self):
4✔
97
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
98

99
    def find_directory_plugins(self, pkg_path: str, external: bool):
4✔
100
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
101
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
4✔
102
        for loader, name, ispkg in iter_modules:
4✔
103
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
104
            #       once as data and once as code. To honor the "no duplicates" rule below,
105
            #       we exclude the ones packaged as *code*, here:
106
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
107
                continue
×
108
            module_path = os.path.join(pkg_path, name)
4✔
109
            if external and not self._has_recursive_root_permissions(module_path):
4✔
110
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
111
                continue
×
112
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
4✔
113
                continue
×
114
            try:
4✔
115
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
4✔
116
                    d = json.load(f)
4✔
117
            except FileNotFoundError:
4✔
118
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
4✔
119
                continue
4✔
120
            if 'fullname' not in d:
4✔
121
                continue
×
122
            d['display_name'] = d['fullname']
4✔
123
            d['path'] = module_path
4✔
124
            if not self.cmd_only:
4✔
125
                gui_good = self.gui_name in d.get('available_for', [])
4✔
126
                if not gui_good:
4✔
127
                    continue
4✔
128
                details = d.get('registers_wallet_type')
4✔
129
                if details:
4✔
130
                    self.register_wallet_type(name, gui_good, details)
4✔
131
                details = d.get('registers_keystore')
4✔
132
                if details:
4✔
133
                    self.register_keystore(name, gui_good, details)
4✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
4✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
4✔
138
                self.internal_plugin_metadata[name] = d
4✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
4✔
143
    def exec_module_from_spec(spec, path: str):
4✔
144
        if prev_fail := _exec_module_failure.get(path):
4✔
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
4✔
147
            module = importlib.util.module_from_spec(spec)
4✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
4✔
151
            spec.loader.exec_module(module)
4✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
157
            _exec_module_failure[path] = e
×
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
4✔
162

163
    def find_plugins(self):
4✔
164
        internal_plugins_path = (self.pkgpath, False)
4✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
4✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
4✔
167
            # external plugins enforce root permissions on the directory
168
            if pkg_path and os.path.exists(pkg_path):
4✔
169
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
4✔
170
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
4✔
171

172
    def load_plugins(self):
4✔
173
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
4✔
174
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
4✔
175
                try:
×
176
                    if self.cmd_only:  # only load init method to register commands
×
177
                        self.maybe_load_plugin_init_method(name)
×
178
                    else:
179
                        self.load_plugin_by_name(name)
×
180
                except BaseException as e:
×
181
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
182

183
    def _has_root_permissions(self, path):
4✔
184
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
185

186
    @profiler(min_threshold=0.5)
4✔
187
    def _has_recursive_root_permissions(self, path):
4✔
188
        """Check if a directory and all its subdirectories have root permissions"""
189
        global _root_permission_cache
190
        if _root_permission_cache.get(path) is not None:
×
191
            return _root_permission_cache[path]
×
192
        _root_permission_cache[path] = False
×
193
        for root, dirs, files in os.walk(path):
×
194
            if not self._has_root_permissions(root):
×
195
                return False
×
196
            for f in files:
×
197
                if not self._has_root_permissions(os.path.join(root, f)):
×
198
                    return False
×
199
        _root_permission_cache[path] = True
×
UNCOV
200
        return True
×
201

202
    def get_external_plugin_dir(self):
4✔
203
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
4✔
UNCOV
204
            return
×
205
        pkg_path = '/opt/electrum_plugins'
4✔
206
        if not os.path.exists(pkg_path):
4✔
207
            self.logger.info(f'directory {pkg_path} does not exist')
4✔
208
            return
4✔
209
        if not self._has_root_permissions(pkg_path):
×
210
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
211
            return
×
UNCOV
212
        return pkg_path
×
213

214
    def zip_plugin_path(self, name):
4✔
215
        filename = self.get_metadata(name)['filename']
×
216
        if name in self.internal_plugin_metadata:
×
UNCOV
217
            pkg_path = self.pkgpath
×
218
        else:
219
            pkg_path = self.get_external_plugin_dir()
×
UNCOV
220
        return os.path.join(pkg_path, filename)
×
221

222
    def find_zip_plugins(self, pkg_path: str, external: bool):
4✔
223
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
224
        if pkg_path is None:
4✔
UNCOV
225
            return
×
226
        for filename in os.listdir(pkg_path):
4✔
227
            path = os.path.join(pkg_path, filename)
4✔
228
            if not filename.endswith('.zip'):
4✔
229
                continue
4✔
230
            if external and not self._has_root_permissions(path):
×
231
                self.logger.info(f'not loading {path}: file has user write permissions')
×
232
                continue
×
233
            try:
×
234
                zipfile = zipimport.zipimporter(path)
×
235
            except zipimport.ZipImportError:
×
236
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
237
                continue
×
238
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
239
                if b is False:
×
240
                    continue
×
241
                if name in self.internal_plugin_metadata:
×
242
                    raise Exception(f"duplicate plugins for name={name}")
×
243
                if name in self.external_plugin_metadata:
×
244
                    raise Exception(f"duplicate plugins for name={name}")
×
245
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
246
                    continue
×
247
                try:
×
248
                    with zipfile_lib.ZipFile(path) as file:
×
249
                        manifest_path = os.path.join(name, 'manifest.json')
×
250
                        with file.open(manifest_path, 'r') as f:
×
251
                            d = json.load(f)
×
252
                except Exception:
×
253
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
254
                    continue
×
255
                d['filename'] = filename
×
256
                d['is_zip'] = True
×
257
                d['path'] = path
×
258
                if not self.cmd_only:
×
259
                    gui_good = self.gui_name in d.get('available_for', [])
×
260
                    if not gui_good:
×
261
                        continue
×
262
                    if 'fullname' not in d:
×
263
                        continue
×
264
                    d['display_name'] = d['fullname']
×
265
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
266
                if external:
×
UNCOV
267
                    self.external_plugin_metadata[name] = d
×
268
                else:
UNCOV
269
                    self.internal_plugin_metadata[name] = d
×
270

271
    def get(self, name):
4✔
UNCOV
272
        return self.plugins.get(name)
×
273

274
    def count(self):
4✔
UNCOV
275
        return len(self.plugins)
×
276

277
    def load_plugin(self, name) -> 'BasePlugin':
4✔
278
        """Imports the code of the given plugin.
279
        note: can be called from any thread.
280
        """
281
        if self.get_metadata(name):
4✔
282
            return self.load_plugin_by_name(name)
4✔
283
        else:
UNCOV
284
            raise Exception(f"could not find plugin {name!r}")
×
285

286
    def maybe_load_plugin_init_method(self, name: str) -> None:
4✔
287
        """Loads the __init__.py module of the plugin if it is not already loaded."""
288
        is_external = name in self.external_plugin_metadata
4✔
289
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
4✔
290
        if base_name not in sys.modules:
4✔
291
            metadata = self.get_metadata(name)
4✔
292
            is_zip = metadata.get('is_zip', False)
4✔
293
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
294
            if not is_zip:
4✔
295
                if is_external:
4✔
296
                    path = os.path.join(metadata['path'], '__init__.py')
×
UNCOV
297
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
298
                else:
299
                    init_spec = importlib.util.find_spec(base_name)
4✔
300
            else:
301
                zipfile = zipimport.zipimporter(metadata['path'])
×
UNCOV
302
                init_spec = zipfile.find_spec(name)
×
303
            self.exec_module_from_spec(init_spec, base_name)
4✔
304
            if name == "trustedcoin":
4✔
305
                # removes trustedcoin after loading to not show it in the list of plugins
UNCOV
306
                del self.internal_plugin_metadata[name]
×
307

308
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
4✔
309
        if name in self.plugins:
4✔
UNCOV
310
            return self.plugins[name]
×
311

312
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
313
        self.maybe_load_plugin_init_method(name)
4✔
314

315
        is_external = name in self.external_plugin_metadata
4✔
316
        if not is_external:
4✔
317
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
318
        else:
UNCOV
319
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
320

321
        spec = importlib.util.find_spec(full_name)
4✔
322
        if spec is None:
4✔
UNCOV
323
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
324
        try:
4✔
325
            module = self.exec_module_from_spec(spec, full_name)
4✔
326
            plugin = module.Plugin(self, self.config, name)
4✔
327
        except Exception as e:
×
UNCOV
328
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
329
        self.add_jobs(plugin.thread_jobs())
4✔
330
        self.plugins[name] = plugin
4✔
331
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
332
        return plugin
4✔
333

334
    def close_plugin(self, plugin):
4✔
UNCOV
335
        self.remove_jobs(plugin.thread_jobs())
×
336

337
    def enable(self, name: str) -> 'BasePlugin':
4✔
338
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
339
        p = self.get(name)
×
340
        if p:
×
341
            return p
×
UNCOV
342
        return self.load_plugin(name)
×
343

344
    def disable(self, name: str) -> None:
4✔
345
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
346
        p = self.get(name)
×
347
        if not p:
×
348
            return
×
349
        self.plugins.pop(name)
×
350
        p.close()
×
UNCOV
351
        self.logger.info(f"closed {name}")
×
352

353
    @classmethod
4✔
354
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
UNCOV
355
        return key.startswith('enable_plugin_')
×
356

357
    def toggle(self, name: str) -> Optional['BasePlugin']:
4✔
358
        p = self.get(name)
×
UNCOV
359
        return self.disable(name) if p else self.enable(name)
×
360

361
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
362
        d = self.descriptions.get(name)
×
363
        if not d:
×
364
            return False
×
365
        deps = d.get('requires', [])
×
366
        for dep, s in deps:
×
367
            try:
×
368
                __import__(dep)
×
369
            except ImportError as e:
×
370
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
371
                return False
×
372
        requires = d.get('requires_wallet_type', [])
×
UNCOV
373
        return not requires or wallet.wallet_type in requires
×
374

375
    def get_hardware_support(self):
4✔
376
        out = []
×
377
        for name, (gui_good, details) in self.hw_wallets.items():
×
378
            if gui_good:
×
379
                try:
×
380
                    p = self.get_plugin(name)
×
381
                    if p.is_available():
×
UNCOV
382
                        out.append(HardwarePluginToScan(name=name,
×
383
                                                        description=details[2],
384
                                                        plugin=p,
385
                                                        exception=None))
386
                except Exception as e:
×
387
                    self.logger.exception(f"cannot load plugin for: {name}")
×
UNCOV
388
                    out.append(HardwarePluginToScan(name=name,
×
389
                                                    description=details[2],
390
                                                    plugin=None,
391
                                                    exception=e))
UNCOV
392
        return out
×
393

394
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
395
        from .wallet import register_wallet_type, register_constructor
4✔
396
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
397

398
        def loader():
4✔
399
            plugin = self.get_plugin(name)
4✔
400
            register_constructor(wallet_type, plugin.wallet_class)
4✔
401
        register_wallet_type(wallet_type)
4✔
402
        plugin_loaders[wallet_type] = loader
4✔
403

404
    def register_keystore(self, name, gui_good, details):
4✔
405
        from .keystore import register_keystore
4✔
406

407
        def dynamic_constructor(d):
4✔
408
            return self.get_plugin(name).keystore_class(d)
4✔
409
        if details[0] == 'hardware':
4✔
410
            self.hw_wallets[name] = (gui_good, details)
4✔
411
            self.logger.info(f"registering hardware {name}: {details}")
4✔
412
            register_keystore(details[1], dynamic_constructor)
4✔
413

414
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
415
        if name not in self.plugins:
4✔
416
            self.load_plugin(name)
4✔
417
        return self.plugins[name]
4✔
418

419
    def is_plugin_zip(self, name: str) -> bool:
4✔
420
        """Returns True if the plugin is a zip file"""
421
        if (metadata := self.get_metadata(name)) is None:
×
422
            return False
×
UNCOV
423
        return metadata.get('is_zip', False)
×
424

425
    def get_metadata(self, name: str) -> Optional[dict]:
4✔
426
        """Returns the metadata of the plugin"""
427
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
4✔
428
        if not metadata:
4✔
UNCOV
429
            return None
×
430
        return metadata
4✔
431

432
    def run(self):
4✔
433
        while self.is_running():
4✔
434
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
435
            self.run_jobs()
4✔
436
        self.on_stop()
4✔
437

438

439
def get_file_hash256(path: str) -> str:
4✔
440
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
441
    with open(path, 'rb') as f:
×
UNCOV
442
        return sha256(f.read()).hex()
×
443

444
def hook(func):
4✔
445
    hook_names.add(func.__name__)
4✔
446
    return func
4✔
447

448

449
def run_hook(name, *args):
4✔
450
    results = []
4✔
451
    f_list = hooks.get(name, [])
4✔
452
    for p, f in f_list:
4✔
453
        if p.is_enabled():
4✔
454
            try:
4✔
455
                r = f(*args)
4✔
456
            except Exception:
×
457
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
458
                r = False
×
459
            if r:
4✔
UNCOV
460
                results.append(r)
×
461

462
    if results:
4✔
463
        assert len(results) == 1, results
×
UNCOV
464
        return results[0]
×
465

466

467
class BasePlugin(Logger):
4✔
468

469
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
470
        self.parent = parent  # type: Plugins  # The plugins object
4✔
471
        self.name = name
4✔
472
        self.config = config
4✔
473
        self.wallet = None  # fixme: this field should not exist
4✔
474
        Logger.__init__(self)
4✔
475
        # add self to hooks
476
        for k in dir(self):
4✔
477
            if k in hook_names:
4✔
478
                l = hooks.get(k, [])
4✔
479
                l.append((self, getattr(self, k)))
4✔
480
                hooks[k] = l
4✔
481

482
    def __str__(self):
4✔
UNCOV
483
        return self.name
×
484

485
    def close(self):
4✔
486
        # remove self from hooks
487
        for attr_name in dir(self):
×
UNCOV
488
            if attr_name in hook_names:
×
489
                # found attribute in self that is also the name of a hook
490
                l = hooks.get(attr_name, [])
×
491
                try:
×
492
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
493
                except ValueError:
×
494
                    # maybe attr name just collided with hook name and was not hook
495
                    continue
×
496
                hooks[attr_name] = l
×
497
        self.parent.close_plugin(self)
×
UNCOV
498
        self.on_close()
×
499

500
    def on_close(self):
4✔
UNCOV
501
        pass
×
502

503
    def requires_settings(self) -> bool:
4✔
UNCOV
504
        return False
×
505

506
    def thread_jobs(self):
4✔
507
        return []
4✔
508

509
    def is_enabled(self):
4✔
UNCOV
510
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
511

512
    def is_available(self):
4✔
UNCOV
513
        return True
×
514

515
    def can_user_disable(self):
4✔
UNCOV
516
        return True
×
517

518
    def settings_widget(self, window):
4✔
UNCOV
519
        raise NotImplementedError()
×
520

521
    def settings_dialog(self, window):
4✔
UNCOV
522
        raise NotImplementedError()
×
523

524
    def read_file(self, filename: str) -> bytes:
4✔
525
        if self.parent.is_plugin_zip(self.name):
×
526
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
527
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
528
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
UNCOV
529
                    return myfile.read()
×
530
        else:
531
            if self.name in self.parent.internal_plugin_metadata:
×
UNCOV
532
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
533
            else:
534
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
535
            with open(path, 'rb') as myfile:
×
UNCOV
536
                return myfile.read()
×
537

538

539
class DeviceUnpairableError(UserFacingException): pass
4✔
540
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
541
class CannotAutoSelectDevice(Exception): pass
4✔
542

543

544
class Device(NamedTuple):
4✔
545
    path: Union[str, bytes]
4✔
546
    interface_number: int
4✔
547
    id_: str
4✔
548
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
549
    usage_page: int
4✔
550
    transport_ui_string: str
4✔
551

552

553
class DeviceInfo(NamedTuple):
4✔
554
    device: Device
4✔
555
    label: Optional[str] = None
4✔
556
    initialized: Optional[bool] = None
4✔
557
    exception: Optional[Exception] = None
4✔
558
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
559
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
560
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
561

562

563
class HardwarePluginToScan(NamedTuple):
4✔
564
    name: str
4✔
565
    description: str
4✔
566
    plugin: Optional['HW_PluginBase']
4✔
567
    exception: Optional[Exception]
4✔
568

569

570
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
571

572

573
# hidapi is not thread-safe
574
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
575
#     https://github.com/libusb/hidapi/issues/45
576
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
577
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
578
# It is not entirely clear to me, exactly what is safe and what isn't, when
579
# using multiple threads...
580
# Hence, we use a single thread for all device communications, including
581
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
582
# the following thread:
583
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
584
    max_workers=1,
585
    thread_name_prefix='hwd_comms_thread'
586
)
587

588
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
589
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
590
# To keep it simple, let's just import it now, as we are likely in the main thread here.
591
if threading.current_thread() is not threading.main_thread():
4✔
UNCOV
592
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
593
try:
4✔
594
    import hid
4✔
595
except ImportError:
4✔
596
    pass
4✔
597

598

599
T = TypeVar('T')
4✔
600

601

602
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
603
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
604
        return func()
×
605
    else:
606
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
607
        return fut.result()
×
608
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
609

610

611
def runs_in_hwd_thread(func):
4✔
612
    @wraps(func)
4✔
613
    def wrapper(*args, **kwargs):
4✔
UNCOV
614
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
615
    return wrapper
4✔
616

617

618
def assert_runs_in_hwd_thread():
4✔
619
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
620
        raise Exception("must only be called from HWD communication thread")
×
621

622

623
class DeviceMgr(ThreadJob):
4✔
624
    """Manages hardware clients.  A client communicates over a hardware
625
    channel with the device.
626

627
    In addition to tracking device HID IDs, the device manager tracks
628
    hardware wallets and manages wallet pairing.  A HID ID may be
629
    paired with a wallet when it is confirmed that the hardware device
630
    matches the wallet, i.e. they have the same master public key.  A
631
    HID ID can be unpaired if e.g. it is wiped.
632

633
    Because of hotplugging, a wallet must request its client
634
    dynamically each time it is required, rather than caching it
635
    itself.
636

637
    The device manager is shared across plugins, so just one place
638
    does hardware scans when needed.  By tracking HID IDs, if a device
639
    is plugged into a different port the wallet is automatically
640
    re-paired.
641

642
    Wallets are informed on connect / disconnect events.  It must
643
    implement connected(), disconnected() callbacks.  Being connected
644
    implies a pairing.  Callbacks can happen in any thread context,
645
    and we do them without holding the lock.
646

647
    Confusingly, the HID ID (serial number) reported by the HID system
648
    doesn't match the device ID reported by the device itself.  We use
649
    the HID IDs.
650

651
    This plugin is thread-safe.  Currently only devices supported by
652
    hidapi are implemented."""
653

654
    def __init__(self, config: SimpleConfig):
4✔
655
        ThreadJob.__init__(self)
4✔
656
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
657
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
658
        # A client->id_ map. Needs self.lock.
659
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
660
        # What we recognise.  (vendor_id, product_id) -> Plugin
661
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
662
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
663
        # Custom enumerate functions for devices we don't know about.
664
        self._enumerate_func = set()  # Needs self.lock.
4✔
665

666
        self.lock = threading.RLock()
4✔
667

668
        self.config = config
4✔
669

670
    def thread_jobs(self):
4✔
671
        # Thread job to handle device timeouts
672
        return [self]
4✔
673

674
    def run(self):
4✔
675
        '''Handle device timeouts.  Runs in the context of the Plugins
676
        thread.'''
677
        with self.lock:
4✔
678
            clients = list(self.clients.keys())
4✔
679
        cutoff = time.time() - self.config.get_session_timeout()
4✔
680
        for client in clients:
4✔
UNCOV
681
            client.timeout(cutoff)
×
682

683
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
684
        for pair in device_pairs:
×
UNCOV
685
            self._recognised_hardware[pair] = plugin
×
686

687
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
688
        for vendor_id in vendor_ids:
×
UNCOV
689
            self._recognised_vendor[vendor_id] = plugin
×
690

691
    def register_enumerate_func(self, func):
4✔
692
        with self.lock:
×
UNCOV
693
            self._enumerate_func.add(func)
×
694

695
    @runs_in_hwd_thread
4✔
696
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
697
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
698
        # Get from cache first
699
        client = self._client_by_id(device.id_)
×
700
        if client:
×
701
            return client
×
702
        client = plugin.create_client(device, handler)
×
703
        if client:
×
704
            self.logger.info(f"Registering {client}")
×
705
            with self.lock:
×
706
                self.clients[client] = device.id_
×
UNCOV
707
        return client
×
708

709
    def id_by_pairing_code(self, pairing_code):
4✔
710
        with self.lock:
×
UNCOV
711
            return self.pairing_code_to_id.get(pairing_code)
×
712

713
    def pairing_code_by_id(self, id_):
4✔
714
        with self.lock:
×
715
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
716
                if id2 == id_:
×
717
                    return pairing_code
×
UNCOV
718
            return None
×
719

720
    def unpair_pairing_code(self, pairing_code):
4✔
721
        with self.lock:
×
722
            if pairing_code not in self.pairing_code_to_id:
×
723
                return
×
724
            _id = self.pairing_code_to_id.pop(pairing_code)
×
UNCOV
725
        self._close_client(_id)
×
726

727
    def unpair_id(self, id_):
4✔
728
        pairing_code = self.pairing_code_by_id(id_)
×
729
        if pairing_code:
×
UNCOV
730
            self.unpair_pairing_code(pairing_code)
×
731
        else:
UNCOV
732
            self._close_client(id_)
×
733

734
    def _close_client(self, id_):
4✔
735
        with self.lock:
×
736
            client = self._client_by_id(id_)
×
737
            self.clients.pop(client, None)
×
738
        if client:
×
UNCOV
739
            client.close()
×
740

741
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
742
        with self.lock:
×
743
            for client, client_id in self.clients.items():
×
744
                if client_id == id_:
×
745
                    return client
×
UNCOV
746
        return None
×
747

748
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
749
        '''Returns a client for the device ID if one is registered.  If
750
        a device is wiped or in bootloader mode pairing is impossible;
751
        in such cases we communicate by device ID and not wallet.'''
752
        if scan_now:
×
753
            self.scan_devices()
×
UNCOV
754
        return self._client_by_id(id_)
×
755

756
    @runs_in_hwd_thread
4✔
757
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
758
                            keystore: 'Hardware_KeyStore',
759
                            force_pair: bool, *,
760
                            devices: Sequence['Device'] = None,
761
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
762
        self.logger.info("getting client for keystore")
×
763
        if handler is None:
×
764
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
765
        handler.update_status(False)
×
766
        pcode = keystore.pairing_code()
×
UNCOV
767
        client = None
×
768
        # search existing clients first (fast-path)
769
        if not devices:
×
UNCOV
770
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
771
        # search clients again, now allowing a (slow) scan
772
        if client is None:
×
773
            if devices is None:
×
774
                devices = self.scan_devices()
×
775
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
776
        if client is None and force_pair:
×
777
            try:
×
UNCOV
778
                info = self.select_device(plugin, handler, keystore, devices,
×
779
                                          allow_user_interaction=allow_user_interaction)
780
            except CannotAutoSelectDevice:
×
UNCOV
781
                pass
×
782
            else:
783
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
784
        if client:
×
UNCOV
785
            handler.update_status(True)
×
786
            # note: if select_device was called, we might also update label etc here:
787
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
788
        self.logger.info("end client for keystore")
×
UNCOV
789
        return client
×
790

791
    def client_by_pairing_code(
4✔
792
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
793
        devices: Sequence['Device'],
794
    ) -> Optional['HardwareClientBase']:
795
        _id = self.id_by_pairing_code(pairing_code)
×
796
        client = self._client_by_id(_id)
×
797
        if client:
×
798
            if type(client.plugin) != type(plugin):
×
UNCOV
799
                return
×
800
            # An unpaired client might have another wallet's handler
801
            # from a prior scan.  Replace to fix dialog parenting.
802
            client.handler = handler
×
UNCOV
803
            return client
×
804

805
        for device in devices:
×
806
            if device.id_ == _id:
×
UNCOV
807
                return self.create_client(device, handler, plugin)
×
808

809
    def force_pair_keystore(
4✔
810
        self,
811
        *,
812
        plugin: 'HW_PluginBase',
813
        handler: 'HardwareHandlerBase',
814
        info: 'DeviceInfo',
815
        keystore: 'Hardware_KeyStore',
816
    ) -> 'HardwareClientBase':
817
        xpub = keystore.xpub
×
818
        derivation = keystore.get_derivation_prefix()
×
819
        assert derivation is not None
×
820
        xtype = bip32.xpub_type(xpub)
×
821
        client = self._client_by_id(info.device.id_)
×
UNCOV
822
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
823
            # See comment above for same code
UNCOV
824
            client.handler = handler
×
825
            # This will trigger a PIN/passphrase entry request
826
            try:
×
827
                client_xpub = client.get_xpub(derivation, xtype)
×
UNCOV
828
            except (UserCancelled, RuntimeError):
×
829
                # Bad / cancelled PIN / passphrase
830
                client_xpub = None
×
831
            if client_xpub == xpub:
×
832
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
833
                with self.lock:
×
834
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
UNCOV
835
                return client
×
836

837
        # The user input has wrong PIN or passphrase, or cancelled input,
838
        # or it is not pairable
UNCOV
839
        raise DeviceUnpairableError(
×
840
            _('Electrum cannot pair with your {}.\n\n'
841
              'Before you request bitcoins to be sent to addresses in this '
842
              'wallet, ensure you can pair with your device, or that you have '
843
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
844
              'receive will be unspendable.').format(plugin.device))
845

846
    def list_pairable_device_infos(
4✔
847
        self,
848
        *,
849
        handler: Optional['HardwareHandlerBase'],
850
        plugin: 'HW_PluginBase',
851
        devices: Sequence['Device'] = None,
852
        include_failing_clients: bool = False,
853
    ) -> List['DeviceInfo']:
854
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
855
        Already paired devices are also included, as it is okay to reuse them.
856
        """
857
        if not plugin.libraries_available:
×
858
            message = plugin.get_library_not_available_message()
×
859
            raise HardwarePluginLibraryUnavailable(message)
×
860
        if devices is None:
×
861
            devices = self.scan_devices()
×
862
        infos = []
×
863
        for device in devices:
×
864
            if not plugin.can_recognize_device(device):
×
865
                continue
×
866
            try:
×
867
                client = self.create_client(device, handler, plugin)
×
868
                if not client:
×
869
                    continue
×
870
                label = client.label()
×
871
                is_initialized = client.is_initialized()
×
872
                soft_device_id = client.get_soft_device_id()
×
873
                model_name = client.device_model_name()
×
874
            except Exception as e:
×
875
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
876
                if include_failing_clients:
×
877
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
878
                continue
×
UNCOV
879
            infos.append(DeviceInfo(device=device,
×
880
                                    label=label,
881
                                    initialized=is_initialized,
882
                                    plugin_name=plugin.name,
883
                                    soft_device_id=soft_device_id,
884
                                    model_name=model_name))
885

UNCOV
886
        return infos
×
887

888
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
889
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
890
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
891
        """Select the device to use for keystore."""
892
        # ideally this should not be called from the GUI thread...
893
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
894
        while True:
×
895
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
896
            if infos:
×
897
                break
×
898
            if not allow_user_interaction:
×
899
                raise CannotAutoSelectDevice()
×
900
            msg = _('Please insert your {}').format(plugin.device)
×
901
            msg += " ("
×
902
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
903
                msg += f"label: {keystore.label}, "
×
904
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
UNCOV
905
            msg += ').\n\n{}\n\n{}'.format(
×
906
                _('Verify the cable is connected and that '
907
                  'no other application is using it.'),
908
                _('Try to connect again?')
909
            )
910
            if not handler.yes_no_question(msg):
×
911
                raise UserCancelled()
×
UNCOV
912
            devices = None
×
913

914
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
915
        # method 1: select device by id
916
        if keystore.soft_device_id:
×
917
            for info in infos:
×
918
                if info.soft_device_id == keystore.soft_device_id:
×
919
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
UNCOV
920
                    return info
×
921
        # method 2: select device by label
922
        #           but only if not a placeholder label and only if there is no collision
923
        device_labels = [info.label for info in infos]
×
UNCOV
924
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
925
                and device_labels.count(keystore.label) == 1):
926
            for info in infos:
×
927
                if info.label == keystore.label:
×
928
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
929
                    return info
×
930
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
931
        #           saved for keystore anyway, select it
UNCOV
932
        if (len(infos) == 1
×
933
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
934
                and keystore.soft_device_id is None):
935
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
UNCOV
936
            return infos[0]
×
937

938
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
939
        if not allow_user_interaction:
×
UNCOV
940
            raise CannotAutoSelectDevice()
×
941
        # ask user to select device manually
UNCOV
942
        msg = (
×
943
                _("Could not automatically pair with device for given keystore.") + "\n"
944
                + f"(keystore label: {keystore.label!r}, "
945
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
946
        msg += _("Please select which {} device to use:").format(plugin.device)
×
947
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
948
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
949
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
950
                                init=(_("initialized") if info.initialized else _("wiped")),
951
                                transport=info.device.transport_ui_string,
952
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
953
                        for info in infos]
UNCOV
954
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
955
                          f"num options: {len(infos)}. options: {infos}")
956
        c = handler.query_choice(msg, descriptions)
×
957
        if c is None:
×
958
            raise UserCancelled()
×
959
        info = infos[c]
×
UNCOV
960
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
961
        # note: updated label/soft_device_id will be saved after pairing succeeds
UNCOV
962
        return info
×
963

964
    @runs_in_hwd_thread
4✔
965
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
966
        try:
×
967
            import hid
×
968
        except ImportError:
×
UNCOV
969
            return []
×
970

971
        devices = []
×
972
        for d in hid.enumerate(0, 0):
×
973
            vendor_id = d['vendor_id']
×
974
            product_key = (vendor_id, d['product_id'])
×
975
            plugin = None
×
976
            if product_key in self._recognised_hardware:
×
977
                plugin = self._recognised_hardware[product_key]
×
978
            elif vendor_id in self._recognised_vendor:
×
979
                plugin = self._recognised_vendor[vendor_id]
×
980
            if plugin:
×
981
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
982
                if device:
×
983
                    devices.append(device)
×
UNCOV
984
        return devices
×
985

986
    @runs_in_hwd_thread
4✔
987
    @profiler
4✔
988
    def scan_devices(self) -> Sequence['Device']:
4✔
UNCOV
989
        self.logger.info("scanning devices...")
×
990

991
        # First see what's connected that we know about
UNCOV
992
        devices = self._scan_devices_with_hid()
×
993

994
        # Let plugin handlers enumerate devices we don't know about
995
        with self.lock:
×
996
            enumerate_funcs = list(self._enumerate_func)
×
997
        for f in enumerate_funcs:
×
998
            try:
×
999
                new_devices = f()
×
1000
            except BaseException as e:
×
UNCOV
1001
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1002
            else:
UNCOV
1003
                devices.extend(new_devices)
×
1004

1005
        # find out what was disconnected
1006
        client_ids = [dev.id_ for dev in devices]
×
1007
        disconnected_clients = []
×
1008
        with self.lock:
×
1009
            connected = {}
×
1010
            for client, id_ in self.clients.items():
×
1011
                if id_ in client_ids and client.has_usable_connection_with_device():
×
UNCOV
1012
                    connected[client] = id_
×
1013
                else:
1014
                    disconnected_clients.append((client, id_))
×
UNCOV
1015
            self.clients = connected
×
1016

1017
        # Unpair disconnected devices
1018
        for client, id_ in disconnected_clients:
×
1019
            self.unpair_id(id_)
×
1020
            if client.handler:
×
UNCOV
1021
                client.handler.update_status(False)
×
1022

UNCOV
1023
        return devices
×
1024

1025
    @classmethod
4✔
1026
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
UNCOV
1027
        ret = {}
×
1028
        # add libusb
1029
        try:
×
1030
            import usb1
×
1031
        except Exception as e:
×
UNCOV
1032
            ret["libusb.version"] = None
×
1033
        else:
1034
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1035
            try:
×
1036
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1037
            except AttributeError:
×
UNCOV
1038
                ret["libusb.path"] = None
×
1039
        # add hidapi
1040
        try:
×
1041
            import hid
×
1042
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1043
        except Exception as e:
×
1044
            from importlib.metadata import version
×
1045
            try:
×
1046
                ret["hidapi.version"] = version("hidapi")
×
1047
            except ImportError:
×
1048
                ret["hidapi.version"] = None
×
UNCOV
1049
        return ret
×
1050

1051
    def trigger_pairings(
4✔
1052
            self,
1053
            keystores: Sequence['KeyStore'],
1054
            *,
1055
            allow_user_interaction: bool = True,
1056
            devices: Sequence['Device'] = None,
1057
    ) -> None:
1058
        """Given a list of keystores, try to pair each with a connected hardware device.
1059

1060
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1061
        try to pair each keystore individually. Consider the following scenario:
1062
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1063
        - assume none of the devices are paired yet
1064
        1. if we tried to individually pair keystores, we might try with ks1 first
1065
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1066
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1067
             which is confusing and error-prone. It's especially problematic if the hw device does
1068
             not support labels (such as Ledger), as then the user cannot easily distinguish
1069
             same-type devices. (see #4199)
1070
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1071
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1072
        """
1073
        from .keystore import Hardware_KeyStore
×
1074
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1075
        if not keystores:
×
1076
            return
×
1077
        if devices is None:
×
UNCOV
1078
            devices = self.scan_devices()
×
1079
        # first pair with all devices that can be auto-selected
1080
        for ks in keystores:
×
1081
            try:
×
UNCOV
1082
                ks.get_client(
×
1083
                    force_pair=True,
1084
                    allow_user_interaction=False,
1085
                    devices=devices,
1086
                )
1087
            except UserCancelled:
×
1088
                pass
×
UNCOV
1089
        if allow_user_interaction:
×
1090
            # now do manual selections
1091
            for ks in keystores:
×
1092
                try:
×
UNCOV
1093
                    ks.get_client(
×
1094
                        force_pair=True,
1095
                        allow_user_interaction=True,
1096
                        devices=devices,
1097
                    )
1098
                except UserCancelled:
×
UNCOV
1099
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc