• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6418809642287104

11 Apr 2025 10:23AM UTC coverage: 60.932% (+0.6%) from 60.284%
6418809642287104

Pull #9723

CirrusCI

f321x
Fix assertion error in ln payments when using same seed in multiple wallets.

Make path calculation check if channel is not in our sending channels but still uses our nodeID as starting node of the path.

I noticed an assertion error when trying to pay an invoice from a seed i have opened channels with in different wallet instances (same seed, different wallet).
Because the channel seemed suitable for sending the payment path finding included the channel for sending in the first position of the route but then
in pay_to_route the channel for route[0] could not be found as it is not included in our channel list, causing the assert and payment to fail.
Pull Request #9723: lightning: prevent usage of unavailable channels for payment

1 of 2 new or added lines in 1 file covered. (50.0%)

253 existing lines in 2 files now uncovered.

21423 of 35159 relevant lines covered (60.93%)

2.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.12
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
4✔
27
import os
4✔
28
import pkgutil
4✔
29
import importlib.util
4✔
30
import time
4✔
31
import threading
4✔
32
import traceback
4✔
33
import sys
4✔
34
import aiohttp
4✔
35
import zipfile as zipfile_lib
4✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
4✔
40
import zipimport
4✔
41
from concurrent import futures
4✔
42
from functools import wraps, partial
4✔
43
from itertools import chain
4✔
44

45
from .i18n import _
4✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
47
from . import bip32
4✔
48
from . import plugins
4✔
49
from .simple_config import ConfigVar, SimpleConfig
4✔
50
from .logging import get_logger, Logger
4✔
51
from .crypto import sha256
4✔
52

53
if TYPE_CHECKING:
4✔
UNCOV
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
UNCOV
55
    from .keystore import Hardware_KeyStore, KeyStore
×
UNCOV
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
4✔
60
plugin_loaders = {}
4✔
61
hook_names = set()
4✔
62
hooks = {}
4✔
63
_root_permission_cache = {}
4✔
64
_exec_module_failure = {}  # type: Dict[str, Exception]
4✔
65

66

67
class Plugins(DaemonThread):
4✔
68

69
    LOGGING_SHORTCUT = 'p'
4✔
70
    pkgpath = os.path.dirname(plugins.__file__)
4✔
71

72
    @profiler
4✔
73
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
4✔
74
        self.config = config
4✔
75
        self.cmd_only = cmd_only  # type: bool
4✔
76
        self.internal_plugin_metadata = {}
4✔
77
        self.external_plugin_metadata = {}
4✔
78
        if cmd_only:
4✔
79
            # only import the command modules of plugins
UNCOV
80
            Logger.__init__(self)
×
UNCOV
81
            self.find_plugins()
×
UNCOV
82
            self.load_plugins()
×
UNCOV
83
            return
×
84
        DaemonThread.__init__(self)
4✔
85
        self.device_manager = DeviceMgr(config)
4✔
86
        self.name = 'Plugins'  # set name of thread
4✔
87
        self.hw_wallets = {}
4✔
88
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
89
        self.gui_name = gui_name
4✔
90
        self.find_plugins()
4✔
91
        self.load_plugins()
4✔
92
        self.add_jobs(self.device_manager.thread_jobs())
4✔
93
        self.start()
4✔
94

95
    @property
4✔
96
    def descriptions(self):
4✔
UNCOV
97
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
98

99
    def find_directory_plugins(self, pkg_path: str, external: bool):
4✔
100
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
101
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
4✔
102
        for loader, name, ispkg in iter_modules:
4✔
103
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
104
            #       once as data and once as code. To honor the "no duplicates" rule below,
105
            #       we exclude the ones packaged as *code*, here:
106
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
UNCOV
107
                continue
×
108
            module_path = os.path.join(pkg_path, name)
4✔
109
            if external and not self._has_recursive_root_permissions(module_path):
4✔
UNCOV
110
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
UNCOV
111
                continue
×
112
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
4✔
113
                continue
×
114
            try:
4✔
115
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
4✔
116
                    d = json.load(f)
4✔
117
            except FileNotFoundError:
4✔
118
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
4✔
119
                continue
4✔
120
            if 'fullname' not in d:
4✔
121
                continue
×
122
            d['display_name'] = d['fullname']
4✔
123
            d['path'] = module_path
4✔
124
            if not self.cmd_only:
4✔
125
                gui_good = self.gui_name in d.get('available_for', [])
4✔
126
                if not gui_good:
4✔
127
                    continue
4✔
128
                details = d.get('registers_wallet_type')
4✔
129
                if details:
4✔
130
                    self.register_wallet_type(name, gui_good, details)
4✔
131
                details = d.get('registers_keystore')
4✔
132
                if details:
4✔
133
                    self.register_keystore(name, gui_good, details)
4✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
4✔
UNCOV
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
UNCOV
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
4✔
138
                self.internal_plugin_metadata[name] = d
4✔
139
            else:
UNCOV
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
4✔
143
    def exec_module_from_spec(spec, path: str):
4✔
144
        if prev_fail := _exec_module_failure.get(path):
4✔
UNCOV
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
4✔
147
            module = importlib.util.module_from_spec(spec)
4✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
4✔
151
            spec.loader.exec_module(module)
4✔
UNCOV
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
UNCOV
157
            _exec_module_failure[path] = e
×
UNCOV
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
4✔
162

163
    def find_plugins(self):
4✔
164
        internal_plugins_path = (self.pkgpath, False)
4✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
4✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
4✔
167
            # external plugins enforce root permissions on the directory
168
            if pkg_path and os.path.exists(pkg_path):
4✔
169
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
4✔
170
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
4✔
171

172
    def load_plugins(self):
4✔
173
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
4✔
174
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
4✔
UNCOV
175
                try:
×
UNCOV
176
                    if self.cmd_only:  # only load init method to register commands
×
UNCOV
177
                        self.maybe_load_plugin_init_method(name)
×
178
                    else:
179
                        self.load_plugin_by_name(name)
×
180
                except BaseException as e:
×
UNCOV
181
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
182

183
    def _has_root_permissions(self, path):
4✔
184
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
185

186
    @profiler(min_threshold=0.5)
4✔
187
    def _has_recursive_root_permissions(self, path):
4✔
188
        """Check if a directory and all its subdirectories have root permissions"""
UNCOV
189
        if _root_permission_cache.get(path) is not None:
×
190
            return _root_permission_cache[path]
×
191
        _root_permission_cache[path] = False
×
192
        for root, dirs, files in os.walk(path):
×
193
            if not self._has_root_permissions(root):
×
194
                return False
×
UNCOV
195
            for f in files:
×
UNCOV
196
                if not self._has_root_permissions(os.path.join(root, f)):
×
197
                    return False
×
198
        _root_permission_cache[path] = True
×
199
        return True
×
200

201
    def get_external_plugin_dir(self):
4✔
202
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
4✔
203
            return
×
204
        pkg_path = '/opt/electrum_plugins'
4✔
205
        if not os.path.exists(pkg_path):
4✔
206
            self.logger.info(f'directory {pkg_path} does not exist')
4✔
207
            return
4✔
UNCOV
208
        if not self._has_root_permissions(pkg_path):
×
UNCOV
209
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
UNCOV
210
            return
×
UNCOV
211
        return pkg_path
×
212

213
    def zip_plugin_path(self, name):
4✔
214
        filename = self.get_metadata(name)['filename']
×
215
        if name in self.internal_plugin_metadata:
×
216
            pkg_path = self.pkgpath
×
217
        else:
218
            pkg_path = self.get_external_plugin_dir()
×
219
        return os.path.join(pkg_path, filename)
×
220

221
    def find_zip_plugins(self, pkg_path: str, external: bool):
4✔
222
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
223
        if pkg_path is None:
4✔
UNCOV
224
            return
×
225
        for filename in os.listdir(pkg_path):
4✔
226
            path = os.path.join(pkg_path, filename)
4✔
227
            if not filename.endswith('.zip'):
4✔
228
                continue
4✔
229
            if external and not self._has_root_permissions(path):
×
230
                self.logger.info(f'not loading {path}: file has user write permissions')
×
231
                continue
×
232
            try:
×
233
                zipfile = zipimport.zipimporter(path)
×
234
            except zipimport.ZipImportError:
×
235
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
236
                continue
×
UNCOV
237
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
238
                if b is False:
×
239
                    continue
×
240
                if name in self.internal_plugin_metadata:
×
UNCOV
241
                    raise Exception(f"duplicate plugins for name={name}")
×
UNCOV
242
                if name in self.external_plugin_metadata:
×
UNCOV
243
                    raise Exception(f"duplicate plugins for name={name}")
×
UNCOV
244
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
UNCOV
245
                    continue
×
UNCOV
246
                try:
×
UNCOV
247
                    with zipfile_lib.ZipFile(path) as file:
×
UNCOV
248
                        manifest_path = os.path.join(name, 'manifest.json')
×
249
                        with file.open(manifest_path, 'r') as f:
×
250
                            d = json.load(f)
×
251
                except Exception:
×
252
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
253
                    continue
×
254
                d['filename'] = filename
×
255
                d['is_zip'] = True
×
256
                d['path'] = path
×
257
                if not self.cmd_only:
×
258
                    gui_good = self.gui_name in d.get('available_for', [])
×
UNCOV
259
                    if not gui_good:
×
UNCOV
260
                        continue
×
UNCOV
261
                    if 'fullname' not in d:
×
262
                        continue
×
263
                    d['display_name'] = d['fullname']
×
264
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
265
                if external:
×
UNCOV
266
                    self.external_plugin_metadata[name] = d
×
267
                else:
268
                    self.internal_plugin_metadata[name] = d
×
269

270
    def get(self, name):
4✔
271
        return self.plugins.get(name)
×
272

273
    def count(self):
4✔
274
        return len(self.plugins)
×
275

276
    def load_plugin(self, name) -> 'BasePlugin':
4✔
277
        """Imports the code of the given plugin.
278
        note: can be called from any thread.
279
        """
280
        if self.get_metadata(name):
4✔
281
            return self.load_plugin_by_name(name)
4✔
282
        else:
283
            raise Exception(f"could not find plugin {name!r}")
×
284

285
    def maybe_load_plugin_init_method(self, name: str) -> None:
4✔
286
        """Loads the __init__.py module of the plugin if it is not already loaded."""
287
        is_external = name in self.external_plugin_metadata
4✔
288
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
4✔
289
        if base_name not in sys.modules:
4✔
290
            metadata = self.get_metadata(name)
4✔
291
            is_zip = metadata.get('is_zip', False)
4✔
292
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
293
            if not is_zip:
4✔
294
                if is_external:
4✔
295
                    path = os.path.join(metadata['path'], '__init__.py')
×
296
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
297
                else:
298
                    init_spec = importlib.util.find_spec(base_name)
4✔
299
            else:
300
                zipfile = zipimport.zipimporter(metadata['path'])
×
301
                init_spec = zipfile.find_spec(name)
×
302
            self.exec_module_from_spec(init_spec, base_name)
4✔
303
            if name == "trustedcoin":
4✔
304
                # removes trustedcoin after loading to not show it in the list of plugins
305
                del self.internal_plugin_metadata[name]
×
306

307
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
4✔
308
        if name in self.plugins:
4✔
309
            return self.plugins[name]
×
310

311
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
312
        self.maybe_load_plugin_init_method(name)
4✔
313

314
        is_external = name in self.external_plugin_metadata
4✔
315
        if not is_external:
4✔
316
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
317
        else:
318
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
319

320
        spec = importlib.util.find_spec(full_name)
4✔
321
        if spec is None:
4✔
322
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
323
        try:
4✔
324
            module = self.exec_module_from_spec(spec, full_name)
4✔
325
            plugin = module.Plugin(self, self.config, name)
4✔
326
        except Exception as e:
×
UNCOV
327
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
328
        self.add_jobs(plugin.thread_jobs())
4✔
329
        self.plugins[name] = plugin
4✔
330
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
331
        return plugin
4✔
332

333
    def close_plugin(self, plugin):
4✔
UNCOV
334
        self.remove_jobs(plugin.thread_jobs())
×
335

336
    def enable(self, name: str) -> 'BasePlugin':
4✔
UNCOV
337
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
UNCOV
338
        p = self.get(name)
×
UNCOV
339
        if p:
×
UNCOV
340
            return p
×
341
        return self.load_plugin(name)
×
342

343
    def disable(self, name: str) -> None:
4✔
UNCOV
344
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
UNCOV
345
        p = self.get(name)
×
UNCOV
346
        if not p:
×
UNCOV
347
            return
×
UNCOV
348
        self.plugins.pop(name)
×
UNCOV
349
        p.close()
×
UNCOV
350
        self.logger.info(f"closed {name}")
×
351

352
    @classmethod
4✔
353
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
354
        return key.startswith('enable_plugin_')
×
355

356
    def toggle(self, name: str) -> Optional['BasePlugin']:
4✔
UNCOV
357
        p = self.get(name)
×
UNCOV
358
        return self.disable(name) if p else self.enable(name)
×
359

360
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
361
        d = self.descriptions.get(name)
×
UNCOV
362
        if not d:
×
UNCOV
363
            return False
×
UNCOV
364
        deps = d.get('requires', [])
×
UNCOV
365
        for dep, s in deps:
×
366
            try:
×
UNCOV
367
                __import__(dep)
×
UNCOV
368
            except ImportError as e:
×
UNCOV
369
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
370
                return False
×
UNCOV
371
        requires = d.get('requires_wallet_type', [])
×
UNCOV
372
        return not requires or wallet.wallet_type in requires
×
373

374
    def get_hardware_support(self):
4✔
375
        out = []
×
376
        for name, (gui_good, details) in self.hw_wallets.items():
×
UNCOV
377
            if gui_good:
×
UNCOV
378
                try:
×
UNCOV
379
                    p = self.get_plugin(name)
×
380
                    if p.is_available():
×
UNCOV
381
                        out.append(HardwarePluginToScan(name=name,
×
382
                                                        description=details[2],
383
                                                        plugin=p,
384
                                                        exception=None))
UNCOV
385
                except Exception as e:
×
UNCOV
386
                    self.logger.exception(f"cannot load plugin for: {name}")
×
UNCOV
387
                    out.append(HardwarePluginToScan(name=name,
×
388
                                                    description=details[2],
389
                                                    plugin=None,
390
                                                    exception=e))
UNCOV
391
        return out
×
392

393
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
394
        from .wallet import register_wallet_type, register_constructor
4✔
395
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
396

397
        def loader():
4✔
398
            plugin = self.get_plugin(name)
4✔
399
            register_constructor(wallet_type, plugin.wallet_class)
4✔
400
        register_wallet_type(wallet_type)
4✔
401
        plugin_loaders[wallet_type] = loader
4✔
402

403
    def register_keystore(self, name, gui_good, details):
4✔
404
        from .keystore import register_keystore
4✔
405

406
        def dynamic_constructor(d):
4✔
407
            return self.get_plugin(name).keystore_class(d)
4✔
408
        if details[0] == 'hardware':
4✔
409
            self.hw_wallets[name] = (gui_good, details)
4✔
410
            self.logger.info(f"registering hardware {name}: {details}")
4✔
411
            register_keystore(details[1], dynamic_constructor)
4✔
412

413
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
414
        if name not in self.plugins:
4✔
415
            self.load_plugin(name)
4✔
416
        return self.plugins[name]
4✔
417

418
    def is_plugin_zip(self, name: str) -> bool:
4✔
419
        """Returns True if the plugin is a zip file"""
420
        if (metadata := self.get_metadata(name)) is None:
×
421
            return False
×
422
        return metadata.get('is_zip', False)
×
423

424
    def get_metadata(self, name: str) -> Optional[dict]:
4✔
425
        """Returns the metadata of the plugin"""
426
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
4✔
427
        if not metadata:
4✔
428
            return None
×
429
        return metadata
4✔
430

431
    def run(self):
4✔
432
        while self.is_running():
4✔
433
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
434
            self.run_jobs()
4✔
435
        self.on_stop()
4✔
436

437

438
def get_file_hash256(path: str) -> str:
4✔
439
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
UNCOV
440
    with open(path, 'rb') as f:
×
441
        return sha256(f.read()).hex()
×
442

443
def hook(func):
4✔
444
    hook_names.add(func.__name__)
4✔
445
    return func
4✔
446

447

448
def run_hook(name, *args):
4✔
449
    results = []
4✔
450
    f_list = hooks.get(name, [])
4✔
451
    for p, f in f_list:
4✔
452
        if p.is_enabled():
4✔
453
            try:
4✔
454
                r = f(*args)
4✔
455
            except Exception:
×
UNCOV
456
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
457
                r = False
×
458
            if r:
4✔
459
                results.append(r)
×
460

461
    if results:
4✔
462
        assert len(results) == 1, results
×
463
        return results[0]
×
464

465

466
class BasePlugin(Logger):
4✔
467

468
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
469
        self.parent = parent  # type: Plugins  # The plugins object
4✔
470
        self.name = name
4✔
471
        self.config = config
4✔
472
        self.wallet = None  # fixme: this field should not exist
4✔
473
        Logger.__init__(self)
4✔
474
        # add self to hooks
475
        for k in dir(self):
4✔
476
            if k in hook_names:
4✔
477
                l = hooks.get(k, [])
4✔
478
                l.append((self, getattr(self, k)))
4✔
479
                hooks[k] = l
4✔
480

481
    def __str__(self):
4✔
482
        return self.name
×
483

484
    def close(self):
4✔
485
        # remove self from hooks
UNCOV
486
        for attr_name in dir(self):
×
UNCOV
487
            if attr_name in hook_names:
×
488
                # found attribute in self that is also the name of a hook
UNCOV
489
                l = hooks.get(attr_name, [])
×
UNCOV
490
                try:
×
UNCOV
491
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
492
                except ValueError:
×
493
                    # maybe attr name just collided with hook name and was not hook
UNCOV
494
                    continue
×
UNCOV
495
                hooks[attr_name] = l
×
UNCOV
496
        self.parent.close_plugin(self)
×
UNCOV
497
        self.on_close()
×
498

499
    def on_close(self):
4✔
UNCOV
500
        pass
×
501

502
    def requires_settings(self) -> bool:
4✔
UNCOV
503
        return False
×
504

505
    def thread_jobs(self):
4✔
506
        return []
4✔
507

508
    def is_enabled(self):
4✔
UNCOV
509
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
510

511
    def is_available(self):
4✔
UNCOV
512
        return True
×
513

514
    def can_user_disable(self):
4✔
UNCOV
515
        return True
×
516

517
    def settings_widget(self, window):
4✔
518
        raise NotImplementedError()
×
519

520
    def settings_dialog(self, window):
4✔
UNCOV
521
        raise NotImplementedError()
×
522

523
    def read_file(self, filename: str) -> bytes:
4✔
UNCOV
524
        if self.parent.is_plugin_zip(self.name):
×
525
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
UNCOV
526
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
UNCOV
527
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
UNCOV
528
                    return myfile.read()
×
529
        else:
UNCOV
530
            if self.name in self.parent.internal_plugin_metadata:
×
UNCOV
531
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
532
            else:
UNCOV
533
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
UNCOV
534
            with open(path, 'rb') as myfile:
×
UNCOV
535
                return myfile.read()
×
536

537

538
class DeviceUnpairableError(UserFacingException): pass
4✔
539
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
540
class CannotAutoSelectDevice(Exception): pass
4✔
541

542

543
class Device(NamedTuple):
4✔
544
    path: Union[str, bytes]
4✔
545
    interface_number: int
4✔
546
    id_: str
4✔
547
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
548
    usage_page: int
4✔
549
    transport_ui_string: str
4✔
550

551

552
class DeviceInfo(NamedTuple):
4✔
553
    device: Device
4✔
554
    label: Optional[str] = None
4✔
555
    initialized: Optional[bool] = None
4✔
556
    exception: Optional[Exception] = None
4✔
557
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
558
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
559
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
560

561

562
class HardwarePluginToScan(NamedTuple):
4✔
563
    name: str
4✔
564
    description: str
4✔
565
    plugin: Optional['HW_PluginBase']
4✔
566
    exception: Optional[Exception]
4✔
567

568

569
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
570

571

572
# hidapi is not thread-safe
573
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
574
#     https://github.com/libusb/hidapi/issues/45
575
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
576
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
577
# It is not entirely clear to me, exactly what is safe and what isn't, when
578
# using multiple threads...
579
# Hence, we use a single thread for all device communications, including
580
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
581
# the following thread:
582
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
583
    max_workers=1,
584
    thread_name_prefix='hwd_comms_thread'
585
)
586

587
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
588
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
589
# To keep it simple, let's just import it now, as we are likely in the main thread here.
590
if threading.current_thread() is not threading.main_thread():
4✔
UNCOV
591
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
592
try:
4✔
593
    import hid
4✔
594
except ImportError:
4✔
595
    pass
4✔
596

597

598
T = TypeVar('T')
4✔
599

600

601
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
UNCOV
602
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
603
        return func()
×
604
    else:
UNCOV
605
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
606
        return fut.result()
×
607
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
608

609

610
def runs_in_hwd_thread(func):
4✔
611
    @wraps(func)
4✔
612
    def wrapper(*args, **kwargs):
4✔
613
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
614
    return wrapper
4✔
615

616

617
def assert_runs_in_hwd_thread():
4✔
UNCOV
618
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
619
        raise Exception("must only be called from HWD communication thread")
×
620

621

622
class DeviceMgr(ThreadJob):
4✔
623
    """Manages hardware clients.  A client communicates over a hardware
624
    channel with the device.
625

626
    In addition to tracking device HID IDs, the device manager tracks
627
    hardware wallets and manages wallet pairing.  A HID ID may be
628
    paired with a wallet when it is confirmed that the hardware device
629
    matches the wallet, i.e. they have the same master public key.  A
630
    HID ID can be unpaired if e.g. it is wiped.
631

632
    Because of hotplugging, a wallet must request its client
633
    dynamically each time it is required, rather than caching it
634
    itself.
635

636
    The device manager is shared across plugins, so just one place
637
    does hardware scans when needed.  By tracking HID IDs, if a device
638
    is plugged into a different port the wallet is automatically
639
    re-paired.
640

641
    Wallets are informed on connect / disconnect events.  It must
642
    implement connected(), disconnected() callbacks.  Being connected
643
    implies a pairing.  Callbacks can happen in any thread context,
644
    and we do them without holding the lock.
645

646
    Confusingly, the HID ID (serial number) reported by the HID system
647
    doesn't match the device ID reported by the device itself.  We use
648
    the HID IDs.
649

650
    This plugin is thread-safe.  Currently only devices supported by
651
    hidapi are implemented."""
652

653
    def __init__(self, config: SimpleConfig):
4✔
654
        ThreadJob.__init__(self)
4✔
655
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
656
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
657
        # A client->id_ map. Needs self.lock.
658
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
659
        # What we recognise.  (vendor_id, product_id) -> Plugin
660
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
661
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
662
        # Custom enumerate functions for devices we don't know about.
663
        self._enumerate_func = set()  # Needs self.lock.
4✔
664

665
        self.lock = threading.RLock()
4✔
666

667
        self.config = config
4✔
668

669
    def thread_jobs(self):
4✔
670
        # Thread job to handle device timeouts
671
        return [self]
4✔
672

673
    def run(self):
4✔
674
        '''Handle device timeouts.  Runs in the context of the Plugins
675
        thread.'''
676
        with self.lock:
4✔
677
            clients = list(self.clients.keys())
4✔
678
        cutoff = time.time() - self.config.get_session_timeout()
4✔
679
        for client in clients:
4✔
UNCOV
680
            client.timeout(cutoff)
×
681

682
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
UNCOV
683
        for pair in device_pairs:
×
UNCOV
684
            self._recognised_hardware[pair] = plugin
×
685

686
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
UNCOV
687
        for vendor_id in vendor_ids:
×
UNCOV
688
            self._recognised_vendor[vendor_id] = plugin
×
689

690
    def register_enumerate_func(self, func):
4✔
691
        with self.lock:
×
UNCOV
692
            self._enumerate_func.add(func)
×
693

694
    @runs_in_hwd_thread
4✔
695
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
696
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
697
        # Get from cache first
UNCOV
698
        client = self._client_by_id(device.id_)
×
UNCOV
699
        if client:
×
UNCOV
700
            return client
×
UNCOV
701
        client = plugin.create_client(device, handler)
×
702
        if client:
×
703
            self.logger.info(f"Registering {client}")
×
UNCOV
704
            with self.lock:
×
705
                self.clients[client] = device.id_
×
706
        return client
×
707

708
    def id_by_pairing_code(self, pairing_code):
4✔
UNCOV
709
        with self.lock:
×
UNCOV
710
            return self.pairing_code_to_id.get(pairing_code)
×
711

712
    def pairing_code_by_id(self, id_):
4✔
713
        with self.lock:
×
UNCOV
714
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
715
                if id2 == id_:
×
UNCOV
716
                    return pairing_code
×
UNCOV
717
            return None
×
718

719
    def unpair_pairing_code(self, pairing_code):
4✔
UNCOV
720
        with self.lock:
×
UNCOV
721
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
722
                return
×
UNCOV
723
            _id = self.pairing_code_to_id.pop(pairing_code)
×
UNCOV
724
        self._close_client(_id)
×
725

726
    def unpair_id(self, id_):
4✔
UNCOV
727
        pairing_code = self.pairing_code_by_id(id_)
×
UNCOV
728
        if pairing_code:
×
UNCOV
729
            self.unpair_pairing_code(pairing_code)
×
730
        else:
UNCOV
731
            self._close_client(id_)
×
732

733
    def _close_client(self, id_):
4✔
UNCOV
734
        with self.lock:
×
UNCOV
735
            client = self._client_by_id(id_)
×
UNCOV
736
            self.clients.pop(client, None)
×
UNCOV
737
        if client:
×
UNCOV
738
            client.close()
×
739

740
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
UNCOV
741
        with self.lock:
×
UNCOV
742
            for client, client_id in self.clients.items():
×
UNCOV
743
                if client_id == id_:
×
UNCOV
744
                    return client
×
UNCOV
745
        return None
×
746

747
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
748
        '''Returns a client for the device ID if one is registered.  If
749
        a device is wiped or in bootloader mode pairing is impossible;
750
        in such cases we communicate by device ID and not wallet.'''
UNCOV
751
        if scan_now:
×
UNCOV
752
            self.scan_devices()
×
UNCOV
753
        return self._client_by_id(id_)
×
754

755
    @runs_in_hwd_thread
4✔
756
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
757
                            keystore: 'Hardware_KeyStore',
758
                            force_pair: bool, *,
759
                            devices: Sequence['Device'] = None,
760
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
761
        self.logger.info("getting client for keystore")
×
UNCOV
762
        if handler is None:
×
UNCOV
763
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
764
        handler.update_status(False)
×
UNCOV
765
        pcode = keystore.pairing_code()
×
UNCOV
766
        client = None
×
767
        # search existing clients first (fast-path)
UNCOV
768
        if not devices:
×
UNCOV
769
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
770
        # search clients again, now allowing a (slow) scan
UNCOV
771
        if client is None:
×
UNCOV
772
            if devices is None:
×
UNCOV
773
                devices = self.scan_devices()
×
UNCOV
774
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
UNCOV
775
        if client is None and force_pair:
×
UNCOV
776
            try:
×
UNCOV
777
                info = self.select_device(plugin, handler, keystore, devices,
×
778
                                          allow_user_interaction=allow_user_interaction)
UNCOV
779
            except CannotAutoSelectDevice:
×
780
                pass
×
781
            else:
UNCOV
782
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
783
        if client:
×
784
            handler.update_status(True)
×
785
            # note: if select_device was called, we might also update label etc here:
UNCOV
786
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
787
        self.logger.info("end client for keystore")
×
788
        return client
×
789

790
    def client_by_pairing_code(
4✔
791
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
792
        devices: Sequence['Device'],
793
    ) -> Optional['HardwareClientBase']:
UNCOV
794
        _id = self.id_by_pairing_code(pairing_code)
×
UNCOV
795
        client = self._client_by_id(_id)
×
UNCOV
796
        if client:
×
UNCOV
797
            if type(client.plugin) != type(plugin):
×
798
                return
×
799
            # An unpaired client might have another wallet's handler
800
            # from a prior scan.  Replace to fix dialog parenting.
801
            client.handler = handler
×
802
            return client
×
803

804
        for device in devices:
×
805
            if device.id_ == _id:
×
806
                return self.create_client(device, handler, plugin)
×
807

808
    def force_pair_keystore(
4✔
809
        self,
810
        *,
811
        plugin: 'HW_PluginBase',
812
        handler: 'HardwareHandlerBase',
813
        info: 'DeviceInfo',
814
        keystore: 'Hardware_KeyStore',
815
    ) -> 'HardwareClientBase':
816
        xpub = keystore.xpub
×
817
        derivation = keystore.get_derivation_prefix()
×
UNCOV
818
        assert derivation is not None
×
UNCOV
819
        xtype = bip32.xpub_type(xpub)
×
820
        client = self._client_by_id(info.device.id_)
×
821
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
822
            # See comment above for same code
823
            client.handler = handler
×
824
            # This will trigger a PIN/passphrase entry request
UNCOV
825
            try:
×
UNCOV
826
                client_xpub = client.get_xpub(derivation, xtype)
×
827
            except (UserCancelled, RuntimeError):
×
828
                # Bad / cancelled PIN / passphrase
829
                client_xpub = None
×
UNCOV
830
            if client_xpub == xpub:
×
831
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
832
                with self.lock:
×
UNCOV
833
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
834
                return client
×
835

836
        # The user input has wrong PIN or passphrase, or cancelled input,
837
        # or it is not pairable
838
        raise DeviceUnpairableError(
×
839
            _('Electrum cannot pair with your {}.\n\n'
840
              'Before you request bitcoins to be sent to addresses in this '
841
              'wallet, ensure you can pair with your device, or that you have '
842
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
843
              'receive will be unspendable.').format(plugin.device))
844

845
    def list_pairable_device_infos(
4✔
846
        self,
847
        *,
848
        handler: Optional['HardwareHandlerBase'],
849
        plugin: 'HW_PluginBase',
850
        devices: Sequence['Device'] = None,
851
        include_failing_clients: bool = False,
852
    ) -> List['DeviceInfo']:
853
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
854
        Already paired devices are also included, as it is okay to reuse them.
855
        """
UNCOV
856
        if not plugin.libraries_available:
×
UNCOV
857
            message = plugin.get_library_not_available_message()
×
UNCOV
858
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
859
        if devices is None:
×
UNCOV
860
            devices = self.scan_devices()
×
861
        infos = []
×
862
        for device in devices:
×
863
            if not plugin.can_recognize_device(device):
×
864
                continue
×
865
            try:
×
866
                client = self.create_client(device, handler, plugin)
×
UNCOV
867
                if not client:
×
868
                    continue
×
869
                label = client.label()
×
UNCOV
870
                is_initialized = client.is_initialized()
×
871
                soft_device_id = client.get_soft_device_id()
×
872
                model_name = client.device_model_name()
×
873
            except Exception as e:
×
874
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
875
                if include_failing_clients:
×
876
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
877
                continue
×
UNCOV
878
            infos.append(DeviceInfo(device=device,
×
879
                                    label=label,
880
                                    initialized=is_initialized,
881
                                    plugin_name=plugin.name,
882
                                    soft_device_id=soft_device_id,
883
                                    model_name=model_name))
884

UNCOV
885
        return infos
×
886

887
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
888
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
889
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
890
        """Select the device to use for keystore."""
891
        # ideally this should not be called from the GUI thread...
892
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
893
        while True:
×
894
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
895
            if infos:
×
896
                break
×
897
            if not allow_user_interaction:
×
898
                raise CannotAutoSelectDevice()
×
UNCOV
899
            msg = _('Please insert your {}').format(plugin.device)
×
UNCOV
900
            msg += " ("
×
901
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
902
                msg += f"label: {keystore.label}, "
×
UNCOV
903
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
904
            msg += ').\n\n{}\n\n{}'.format(
×
905
                _('Verify the cable is connected and that '
906
                  'no other application is using it.'),
907
                _('Try to connect again?')
908
            )
UNCOV
909
            if not handler.yes_no_question(msg):
×
UNCOV
910
                raise UserCancelled()
×
UNCOV
911
            devices = None
×
912

913
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
914
        # method 1: select device by id
UNCOV
915
        if keystore.soft_device_id:
×
916
            for info in infos:
×
917
                if info.soft_device_id == keystore.soft_device_id:
×
918
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
919
                    return info
×
920
        # method 2: select device by label
921
        #           but only if not a placeholder label and only if there is no collision
UNCOV
922
        device_labels = [info.label for info in infos]
×
923
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
924
                and device_labels.count(keystore.label) == 1):
925
            for info in infos:
×
926
                if info.label == keystore.label:
×
927
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
928
                    return info
×
929
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
930
        #           saved for keystore anyway, select it
931
        if (len(infos) == 1
×
932
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
933
                and keystore.soft_device_id is None):
934
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
UNCOV
935
            return infos[0]
×
936

UNCOV
937
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
938
        if not allow_user_interaction:
×
UNCOV
939
            raise CannotAutoSelectDevice()
×
940
        # ask user to select device manually
UNCOV
941
        msg = (
×
942
                _("Could not automatically pair with device for given keystore.") + "\n"
943
                + f"(keystore label: {keystore.label!r}, "
944
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
UNCOV
945
        msg += _("Please select which {} device to use:").format(plugin.device)
×
UNCOV
946
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
947
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
948
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
949
                                init=(_("initialized") if info.initialized else _("wiped")),
950
                                transport=info.device.transport_ui_string,
951
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
952
                        for info in infos]
UNCOV
953
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
954
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
955
        c = handler.query_choice(msg, descriptions)
×
956
        if c is None:
×
957
            raise UserCancelled()
×
958
        info = infos[c]
×
959
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
960
        # note: updated label/soft_device_id will be saved after pairing succeeds
961
        return info
×
962

963
    @runs_in_hwd_thread
4✔
964
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
965
        try:
×
966
            import hid  # noqa: F811
×
967
        except ImportError:
×
968
            return []
×
969

970
        devices = []
×
971
        for d in hid.enumerate(0, 0):
×
972
            vendor_id = d['vendor_id']
×
973
            product_key = (vendor_id, d['product_id'])
×
974
            plugin = None
×
975
            if product_key in self._recognised_hardware:
×
976
                plugin = self._recognised_hardware[product_key]
×
977
            elif vendor_id in self._recognised_vendor:
×
978
                plugin = self._recognised_vendor[vendor_id]
×
UNCOV
979
            if plugin:
×
UNCOV
980
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
UNCOV
981
                if device:
×
UNCOV
982
                    devices.append(device)
×
UNCOV
983
        return devices
×
984

985
    @runs_in_hwd_thread
4✔
986
    @profiler
4✔
987
    def scan_devices(self) -> Sequence['Device']:
4✔
UNCOV
988
        self.logger.info("scanning devices...")
×
989

990
        # First see what's connected that we know about
UNCOV
991
        devices = self._scan_devices_with_hid()
×
992

993
        # Let plugin handlers enumerate devices we don't know about
994
        with self.lock:
×
995
            enumerate_funcs = list(self._enumerate_func)
×
996
        for f in enumerate_funcs:
×
997
            try:
×
998
                new_devices = f()
×
999
            except BaseException as e:
×
1000
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1001
            else:
1002
                devices.extend(new_devices)
×
1003

1004
        # find out what was disconnected
UNCOV
1005
        client_ids = [dev.id_ for dev in devices]
×
UNCOV
1006
        disconnected_clients = []
×
UNCOV
1007
        with self.lock:
×
UNCOV
1008
            connected = {}
×
1009
            for client, id_ in self.clients.items():
×
1010
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1011
                    connected[client] = id_
×
1012
                else:
UNCOV
1013
                    disconnected_clients.append((client, id_))
×
UNCOV
1014
            self.clients = connected
×
1015

1016
        # Unpair disconnected devices
1017
        for client, id_ in disconnected_clients:
×
1018
            self.unpair_id(id_)
×
1019
            if client.handler:
×
UNCOV
1020
                client.handler.update_status(False)
×
1021

1022
        return devices
×
1023

1024
    @classmethod
4✔
1025
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
1026
        ret = {}
×
1027
        # add libusb
1028
        try:
×
UNCOV
1029
            import usb1
×
UNCOV
1030
        except Exception as e:
×
1031
            ret["libusb.version"] = None
×
1032
        else:
UNCOV
1033
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1034
            try:
×
1035
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
1036
            except AttributeError:
×
1037
                ret["libusb.path"] = None
×
1038
        # add hidapi
1039
        try:
×
UNCOV
1040
            import hid  # noqa: F811
×
1041
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
1042
        except Exception as e:
×
UNCOV
1043
            from importlib.metadata import version
×
UNCOV
1044
            try:
×
1045
                ret["hidapi.version"] = version("hidapi")
×
1046
            except ImportError:
×
1047
                ret["hidapi.version"] = None
×
UNCOV
1048
        return ret
×
1049

1050
    def trigger_pairings(
4✔
1051
            self,
1052
            keystores: Sequence['KeyStore'],
1053
            *,
1054
            allow_user_interaction: bool = True,
1055
            devices: Sequence['Device'] = None,
1056
    ) -> None:
1057
        """Given a list of keystores, try to pair each with a connected hardware device.
1058

1059
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1060
        try to pair each keystore individually. Consider the following scenario:
1061
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1062
        - assume none of the devices are paired yet
1063
        1. if we tried to individually pair keystores, we might try with ks1 first
1064
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1065
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1066
             which is confusing and error-prone. It's especially problematic if the hw device does
1067
             not support labels (such as Ledger), as then the user cannot easily distinguish
1068
             same-type devices. (see #4199)
1069
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1070
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1071
        """
1072
        from .keystore import Hardware_KeyStore
×
1073
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1074
        if not keystores:
×
1075
            return
×
1076
        if devices is None:
×
1077
            devices = self.scan_devices()
×
1078
        # first pair with all devices that can be auto-selected
1079
        for ks in keystores:
×
1080
            try:
×
1081
                ks.get_client(
×
1082
                    force_pair=True,
1083
                    allow_user_interaction=False,
1084
                    devices=devices,
1085
                )
UNCOV
1086
            except UserCancelled:
×
UNCOV
1087
                pass
×
1088
        if allow_user_interaction:
×
1089
            # now do manual selections
UNCOV
1090
            for ks in keystores:
×
1091
                try:
×
UNCOV
1092
                    ks.get_client(
×
1093
                        force_pair=True,
1094
                        allow_user_interaction=True,
1095
                        devices=devices,
1096
                    )
1097
                except UserCancelled:
×
1098
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc