• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4663502477983744

15 Mar 2025 12:22PM UTC coverage: 61.237% (-0.06%) from 61.297%
4663502477983744

Pull #9629

CirrusCI

ecdsa
Plugins call with cmd_only:
 - pass temporary config to Plugins
 - load only enabled plugins
 - parse the command line again after plugins are loaded
Pull Request #9629: Allow plugins to register CLI commands

24 of 65 new or added lines in 3 files covered. (36.92%)

260 existing lines in 4 files now uncovered.

21340 of 34848 relevant lines covered (61.24%)

2.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.2
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
4✔
27
import pkgutil
4✔
28
import importlib.util
4✔
29
import time
4✔
30
import threading
4✔
31
import traceback
4✔
32
import sys
4✔
33
import aiohttp
4✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
4✔
38
import zipimport
4✔
39
from concurrent import futures
4✔
40
from functools import wraps, partial
4✔
41

42
from .i18n import _
4✔
43
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
44
from . import bip32
4✔
45
from . import plugins
4✔
46
from .simple_config import SimpleConfig
4✔
47
from .logging import get_logger, Logger
4✔
48
from .crypto import sha256
4✔
49

50
if TYPE_CHECKING:
4✔
51
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
52
    from .keystore import Hardware_KeyStore, KeyStore
×
53
    from .wallet import Abstract_Wallet
×
54

55

56
_logger = get_logger(__name__)
4✔
57
plugin_loaders = {}
4✔
58
hook_names = set()
4✔
59
hooks = {}
4✔
60

61

62
class Plugins(DaemonThread):
4✔
63

64
    LOGGING_SHORTCUT = 'p'
4✔
65
    pkgpath = os.path.dirname(plugins.__file__)
4✔
66

67
    @profiler
4✔
68
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
4✔
69
        self.config = config
4✔
70
        self.cmd_only = cmd_only  # type: bool
4✔
71
        self.internal_plugin_metadata = {}
4✔
72
        self.external_plugin_metadata = {}
4✔
73
        self.loaded_command_modules = set()  # type: set[str]
4✔
74
        if cmd_only:
4✔
75
            # only import the command modules of plugins
NEW
76
            self.find_plugins()
×
NEW
77
            return
×
78
        DaemonThread.__init__(self)
4✔
79
        self.device_manager = DeviceMgr(config)
4✔
80
        self.name = 'Plugins'  # set name of thread
4✔
81
        self.hw_wallets = {}
4✔
82
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
83
        self.gui_name = gui_name
4✔
84
        self.find_plugins()
4✔
85
        self.load_plugins()
4✔
86
        self.add_jobs(self.device_manager.thread_jobs())
4✔
87
        self.start()
4✔
88

89
    @property
4✔
90
    def descriptions(self):
4✔
91
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
92

93
    def find_internal_plugins(self):
4✔
94
        """Populates self.internal_plugin_metadata"""
95
        iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
4✔
96
        for loader, name, ispkg in iter_modules:
4✔
97
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
98
            #       once as data and once as code. To honor the "no duplicates" rule below,
99
            #       we exclude the ones packaged as *code*, here:
100
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
101
                continue
×
102
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
4✔
NEW
103
                continue
×
104
            full_name = f'electrum.plugins.{name}' + ('.commands' if self.cmd_only else '')
4✔
105
            spec = importlib.util.find_spec(full_name)
4✔
106
            if spec is None:
4✔
NEW
107
                if self.cmd_only:
×
NEW
108
                    continue # no commands module in this plugin
×
UNCOV
109
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
110
            module = self.exec_module_from_spec(spec, full_name)
4✔
111
            if self.cmd_only:
4✔
NEW
112
                assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
NEW
113
                self.loaded_command_modules.add(name)
×
NEW
114
                continue
×
115
            d = module.__dict__
4✔
116
            if 'fullname' not in d:
4✔
117
                continue
4✔
118
            d['display_name'] = d['fullname']
4✔
119
            gui_good = self.gui_name in d.get('available_for', [])
4✔
120
            if not gui_good:
4✔
121
                continue
4✔
122
            details = d.get('registers_wallet_type')
4✔
123
            if details:
4✔
124
                self.register_wallet_type(name, gui_good, details)
4✔
125
            details = d.get('registers_keystore')
4✔
126
            if details:
4✔
127
                self.register_keystore(name, gui_good, details)
4✔
128
            if d.get('requires_wallet_type'):
4✔
129
                # trustedcoin will not be added to list
130
                continue
4✔
131
            if name in self.internal_plugin_metadata:
4✔
132
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
133
                raise Exception(f"duplicate plugins? for {name=}")
×
134
            self.internal_plugin_metadata[name] = d
4✔
135

136
    @staticmethod
4✔
137
    def exec_module_from_spec(spec, path):
4✔
138
        try:
4✔
139
            module = importlib.util.module_from_spec(spec)
4✔
140
            # sys.modules needs to be modified for relative imports to work
141
            # see https://stackoverflow.com/a/50395128
142
            sys.modules[path] = module
4✔
143
            spec.loader.exec_module(module)
4✔
144
        except Exception as e:
×
145
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
146
        return module
4✔
147

148
    def find_plugins(self):
4✔
149
        self.find_internal_plugins()
4✔
150
        self.find_external_plugins()
4✔
151

152
    def load_plugins(self):
4✔
153
        self.load_internal_plugins()
4✔
154
        self.load_external_plugins()
4✔
155

156
    def load_internal_plugins(self):
4✔
157
        for name, d in self.internal_plugin_metadata.items():
4✔
158
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
4✔
159
                try:
×
160
                    self.load_internal_plugin(name)
×
161
                except BaseException as e:
×
162
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
163

164
    def load_external_plugin(self, name):
4✔
165
        if name in self.plugins:
×
166
            return self.plugins[name]
×
167
        # If we do not have the metadata, it was not detected by `load_external_plugins`
168
        # on startup, or added by manual user installation after that point.
169
        metadata = self.external_plugin_metadata.get(name)
×
170
        if metadata is None:
×
171
            self.logger.exception(f"attempted to load unknown external plugin {name}")
×
172
            return
×
173
        full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
174
        spec = importlib.util.find_spec(full_name)
×
175
        if spec is None:
×
176
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
177
        module = self.exec_module_from_spec(spec, full_name)
×
178
        plugin = module.Plugin(self, self.config, name)
×
179
        self.add_jobs(plugin.thread_jobs())
×
180
        self.plugins[name] = plugin
×
181
        self.logger.info(f"loaded external plugin {name}")
×
182
        return plugin
×
183

184
    def _has_root_permissions(self, path):
4✔
185
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
186

187
    def get_external_plugin_dir(self):
4✔
188
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
4✔
189
            return
×
190
        pkg_path = '/opt/electrum_plugins'
4✔
191
        if not os.path.exists(pkg_path):
4✔
192
            self.logger.info(f'directory {pkg_path} does not exist')
4✔
193
            return
4✔
194
        if not self._has_root_permissions(pkg_path):
×
195
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
196
            return
×
197
        return pkg_path
×
198

199
    def external_plugin_path(self, name):
4✔
200
        metadata = self.external_plugin_metadata[name]
×
201
        filename = metadata['filename']
×
202
        return os.path.join(self.get_external_plugin_dir(), filename)
×
203

204
    def find_external_plugins(self):
4✔
205
        pkg_path = self.get_external_plugin_dir()
4✔
206
        if pkg_path is None:
4✔
207
            return
4✔
208
        for filename in os.listdir(pkg_path):
×
209
            path = os.path.join(pkg_path, filename)
×
210
            if not self._has_root_permissions(path):
×
211
                self.logger.info(f'not loading {path}: file has user write permissions')
×
212
                continue
×
213
            try:
×
214
                zipfile = zipimport.zipimporter(path)
×
215
            except zipimport.ZipImportError:
×
216
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
217
                continue
×
218
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
219
                if b is False:
×
220
                    continue
×
221
                if name in self.internal_plugin_metadata:
×
222
                    raise Exception(f"duplicate plugins for name={name}")
×
223
                if name in self.external_plugin_metadata:
×
224
                    raise Exception(f"duplicate plugins for name={name}")
×
225
                module_path = f'electrum_external_plugins.{name}'
×
226
                spec = zipfile.find_spec(name)
×
227
                module = self.exec_module_from_spec(spec, module_path)
×
NEW
228
                if self.cmd_only:
×
NEW
229
                    if self.config.get('enable_plugin_' + name) is not True:
×
NEW
230
                        continue
×
NEW
231
                    spec2 = importlib.util.find_spec(module_path + '.commands')
×
NEW
232
                    if spec2 is None:  # no commands module in this plugin
×
NEW
233
                        continue
×
NEW
234
                    self.exec_module_from_spec(spec2, module_path + '.commands')
×
NEW
235
                    assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
NEW
236
                    self.loaded_command_modules.add(name)
×
NEW
237
                    continue
×
238
                d = module.__dict__
×
239
                gui_good = self.gui_name in d.get('available_for', [])
×
240
                if not gui_good:
×
241
                    continue
×
242
                d['filename'] = filename
×
243
                if 'fullname' not in d:
×
244
                    continue
×
245
                d['display_name'] = d['fullname']
×
246
                d['zip_hash_sha256'] = get_file_hash256(path)
×
247
                self.external_plugin_metadata[name] = d
×
248

249
    def load_external_plugins(self):
4✔
250
        for name, d in self.external_plugin_metadata.items():
4✔
251
            if self.config.get('enable_plugin_' + name):
×
252
                try:
×
253
                    self.load_external_plugin(name)
×
254
                except BaseException as e:
×
255
                    traceback.print_exc(file=sys.stdout)  # shouldn't this be... suppressed unless -v?
×
256
                    self.logger.exception(f"cannot initialize plugin {name} {e!r}")
×
257

258
    def get(self, name):
4✔
259
        return self.plugins.get(name)
×
260

261
    def count(self):
4✔
262
        return len(self.plugins)
×
263

264
    def load_plugin(self, name) -> 'BasePlugin':
4✔
265
        """Imports the code of the given plugin.
266
        note: can be called from any thread.
267
        """
268
        if name in self.internal_plugin_metadata:
4✔
269
            return self.load_internal_plugin(name)
4✔
270
        elif name in self.external_plugin_metadata:
×
271
            return self.load_external_plugin(name)
×
272
        else:
273
            raise Exception(f"could not find plugin {name!r}")
×
274

275
    def load_internal_plugin(self, name) -> 'BasePlugin':
4✔
276
        if name in self.plugins:
4✔
277
            return self.plugins[name]
×
278
        full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
279
        spec = importlib.util.find_spec(full_name)
4✔
280
        if spec is None:
4✔
281
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
282
        try:
4✔
283
            module = importlib.util.module_from_spec(spec)
4✔
284
            spec.loader.exec_module(module)
4✔
285
            plugin = module.Plugin(self, self.config, name)
4✔
286
        except Exception as e:
×
287
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
288
        self.add_jobs(plugin.thread_jobs())
4✔
289
        self.plugins[name] = plugin
4✔
290
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
291
        return plugin
4✔
292

293
    def close_plugin(self, plugin):
4✔
294
        self.remove_jobs(plugin.thread_jobs())
×
295

296
    def enable(self, name: str) -> 'BasePlugin':
4✔
297
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
298
        p = self.get(name)
×
299
        if p:
×
300
            return p
×
301
        return self.load_plugin(name)
×
302

303
    def disable(self, name: str) -> None:
4✔
304
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
305
        p = self.get(name)
×
306
        if not p:
×
307
            return
×
308
        self.plugins.pop(name)
×
309
        p.close()
×
310
        self.logger.info(f"closed {name}")
×
311

312
    @classmethod
4✔
313
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
314
        return key.startswith('enable_plugin_')
×
315

316
    def toggle(self, name: str) -> Optional['BasePlugin']:
4✔
317
        p = self.get(name)
×
318
        return self.disable(name) if p else self.enable(name)
×
319

320
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
321
        d = self.descriptions.get(name)
×
322
        if not d:
×
323
            return False
×
324
        deps = d.get('requires', [])
×
325
        for dep, s in deps:
×
326
            try:
×
327
                __import__(dep)
×
328
            except ImportError as e:
×
329
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
330
                return False
×
331
        requires = d.get('requires_wallet_type', [])
×
332
        return not requires or wallet.wallet_type in requires
×
333

334
    def get_hardware_support(self):
4✔
335
        out = []
×
336
        for name, (gui_good, details) in self.hw_wallets.items():
×
337
            if gui_good:
×
338
                try:
×
339
                    p = self.get_plugin(name)
×
340
                    if p.is_available():
×
341
                        out.append(HardwarePluginToScan(name=name,
×
342
                                                        description=details[2],
343
                                                        plugin=p,
344
                                                        exception=None))
345
                except Exception as e:
×
346
                    self.logger.exception(f"cannot load plugin for: {name}")
×
347
                    out.append(HardwarePluginToScan(name=name,
×
348
                                                    description=details[2],
349
                                                    plugin=None,
350
                                                    exception=e))
351
        return out
×
352

353
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
354
        from .wallet import register_wallet_type, register_constructor
4✔
355
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
356

357
        def loader():
4✔
358
            plugin = self.get_plugin(name)
4✔
359
            register_constructor(wallet_type, plugin.wallet_class)
4✔
360
        register_wallet_type(wallet_type)
4✔
361
        plugin_loaders[wallet_type] = loader
4✔
362

363
    def register_keystore(self, name, gui_good, details):
4✔
364
        from .keystore import register_keystore
4✔
365

366
        def dynamic_constructor(d):
4✔
367
            return self.get_plugin(name).keystore_class(d)
4✔
368
        if details[0] == 'hardware':
4✔
369
            self.hw_wallets[name] = (gui_good, details)
4✔
370
            self.logger.info(f"registering hardware {name}: {details}")
4✔
371
            register_keystore(details[1], dynamic_constructor)
4✔
372

373
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
374
        if name not in self.plugins:
4✔
375
            self.load_plugin(name)
4✔
376
        return self.plugins[name]
4✔
377

378
    def run(self):
4✔
379
        while self.is_running():
4✔
380
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
381
            self.run_jobs()
4✔
382
        self.on_stop()
4✔
383

384

385
def get_file_hash256(path: str) -> str:
4✔
386
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
387
    with open(path, 'rb') as f:
×
388
        return sha256(f.read()).hex()
×
389

390
def hook(func):
4✔
391
    hook_names.add(func.__name__)
4✔
392
    return func
4✔
393

394

395
def run_hook(name, *args):
4✔
396
    results = []
4✔
397
    f_list = hooks.get(name, [])
4✔
398
    for p, f in f_list:
4✔
399
        if p.is_enabled():
4✔
400
            try:
4✔
401
                r = f(*args)
4✔
402
            except Exception:
×
403
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
404
                r = False
×
405
            if r:
4✔
406
                results.append(r)
×
407

408
    if results:
4✔
409
        assert len(results) == 1, results
×
410
        return results[0]
×
411

412

413
class BasePlugin(Logger):
4✔
414

415
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
416
        self.parent = parent  # type: Plugins  # The plugins object
4✔
417
        self.name = name
4✔
418
        self.config = config
4✔
419
        self.wallet = None  # fixme: this field should not exist
4✔
420
        Logger.__init__(self)
4✔
421
        # add self to hooks
422
        for k in dir(self):
4✔
423
            if k in hook_names:
4✔
424
                l = hooks.get(k, [])
4✔
425
                l.append((self, getattr(self, k)))
4✔
426
                hooks[k] = l
4✔
427

428
    def __str__(self):
4✔
429
        return self.name
×
430

431
    def close(self):
4✔
432
        # remove self from hooks
433
        for attr_name in dir(self):
×
434
            if attr_name in hook_names:
×
435
                # found attribute in self that is also the name of a hook
436
                l = hooks.get(attr_name, [])
×
437
                try:
×
438
                    l.remove((self, getattr(self, attr_name)))
×
439
                except ValueError:
×
440
                    # maybe attr name just collided with hook name and was not hook
441
                    continue
×
442
                hooks[attr_name] = l
×
443
        self.parent.close_plugin(self)
×
444
        self.on_close()
×
445

446
    def on_close(self):
4✔
447
        pass
×
448

449
    def requires_settings(self) -> bool:
4✔
450
        return False
×
451

452
    def thread_jobs(self):
4✔
453
        return []
4✔
454

455
    def is_enabled(self):
4✔
456
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
457

458
    def is_available(self):
4✔
459
        return True
×
460

461
    def can_user_disable(self):
4✔
462
        return True
×
463

464
    def settings_widget(self, window):
4✔
465
        raise NotImplementedError()
×
466

467
    def settings_dialog(self, window):
4✔
468
        raise NotImplementedError()
×
469

470
    def read_file(self, filename: str) -> bytes:
4✔
471
        import zipfile
×
472
        if self.name in self.parent.external_plugin_metadata:
×
473
            plugin_filename = self.parent.external_plugin_path(self.name)
×
474
            with zipfile.ZipFile(plugin_filename) as myzip:
×
475
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
476
                    return myfile.read()
×
477
        else:
478
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
479
            with open(path, 'rb') as myfile:
×
480
                return myfile.read()
×
481

482

483
class DeviceUnpairableError(UserFacingException): pass
4✔
484
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
485
class CannotAutoSelectDevice(Exception): pass
4✔
486

487

488
class Device(NamedTuple):
4✔
489
    path: Union[str, bytes]
4✔
490
    interface_number: int
4✔
491
    id_: str
4✔
492
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
493
    usage_page: int
4✔
494
    transport_ui_string: str
4✔
495

496

497
class DeviceInfo(NamedTuple):
4✔
498
    device: Device
4✔
499
    label: Optional[str] = None
4✔
500
    initialized: Optional[bool] = None
4✔
501
    exception: Optional[Exception] = None
4✔
502
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
503
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
504
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
505

506

507
class HardwarePluginToScan(NamedTuple):
4✔
508
    name: str
4✔
509
    description: str
4✔
510
    plugin: Optional['HW_PluginBase']
4✔
511
    exception: Optional[Exception]
4✔
512

513

514
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
515

516

517
# hidapi is not thread-safe
518
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
519
#     https://github.com/libusb/hidapi/issues/45
520
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
521
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
522
# It is not entirely clear to me, exactly what is safe and what isn't, when
523
# using multiple threads...
524
# Hence, we use a single thread for all device communications, including
525
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
526
# the following thread:
527
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
528
    max_workers=1,
529
    thread_name_prefix='hwd_comms_thread'
530
)
531

532
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
533
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
534
# To keep it simple, let's just import it now, as we are likely in the main thread here.
535
if threading.current_thread() is not threading.main_thread():
4✔
536
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
537
try:
4✔
538
    import hid
4✔
539
except ImportError:
4✔
540
    pass
4✔
541

542

543
T = TypeVar('T')
4✔
544

545

546
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
547
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
548
        return func()
×
549
    else:
550
        fut = _hwd_comms_executor.submit(func)
×
551
        return fut.result()
×
552
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
553

554

555
def runs_in_hwd_thread(func):
4✔
556
    @wraps(func)
4✔
557
    def wrapper(*args, **kwargs):
4✔
558
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
559
    return wrapper
4✔
560

561

562
def assert_runs_in_hwd_thread():
4✔
563
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
564
        raise Exception("must only be called from HWD communication thread")
×
565

566

567
class DeviceMgr(ThreadJob):
4✔
568
    """Manages hardware clients.  A client communicates over a hardware
569
    channel with the device.
570

571
    In addition to tracking device HID IDs, the device manager tracks
572
    hardware wallets and manages wallet pairing.  A HID ID may be
573
    paired with a wallet when it is confirmed that the hardware device
574
    matches the wallet, i.e. they have the same master public key.  A
575
    HID ID can be unpaired if e.g. it is wiped.
576

577
    Because of hotplugging, a wallet must request its client
578
    dynamically each time it is required, rather than caching it
579
    itself.
580

581
    The device manager is shared across plugins, so just one place
582
    does hardware scans when needed.  By tracking HID IDs, if a device
583
    is plugged into a different port the wallet is automatically
584
    re-paired.
585

586
    Wallets are informed on connect / disconnect events.  It must
587
    implement connected(), disconnected() callbacks.  Being connected
588
    implies a pairing.  Callbacks can happen in any thread context,
589
    and we do them without holding the lock.
590

591
    Confusingly, the HID ID (serial number) reported by the HID system
592
    doesn't match the device ID reported by the device itself.  We use
593
    the HID IDs.
594

595
    This plugin is thread-safe.  Currently only devices supported by
596
    hidapi are implemented."""
597

598
    def __init__(self, config: SimpleConfig):
4✔
599
        ThreadJob.__init__(self)
4✔
600
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
601
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
602
        # A client->id_ map. Needs self.lock.
603
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
604
        # What we recognise.  (vendor_id, product_id) -> Plugin
605
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
606
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
607
        # Custom enumerate functions for devices we don't know about.
608
        self._enumerate_func = set()  # Needs self.lock.
4✔
609

610
        self.lock = threading.RLock()
4✔
611

612
        self.config = config
4✔
613

614
    def thread_jobs(self):
4✔
615
        # Thread job to handle device timeouts
616
        return [self]
4✔
617

618
    def run(self):
4✔
619
        '''Handle device timeouts.  Runs in the context of the Plugins
620
        thread.'''
621
        with self.lock:
4✔
622
            clients = list(self.clients.keys())
4✔
623
        cutoff = time.time() - self.config.get_session_timeout()
4✔
624
        for client in clients:
4✔
625
            client.timeout(cutoff)
×
626

627
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
628
        for pair in device_pairs:
×
629
            self._recognised_hardware[pair] = plugin
×
630

631
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
632
        for vendor_id in vendor_ids:
×
633
            self._recognised_vendor[vendor_id] = plugin
×
634

635
    def register_enumerate_func(self, func):
4✔
636
        with self.lock:
×
637
            self._enumerate_func.add(func)
×
638

639
    @runs_in_hwd_thread
4✔
640
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
641
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
642
        # Get from cache first
643
        client = self._client_by_id(device.id_)
×
644
        if client:
×
645
            return client
×
646
        client = plugin.create_client(device, handler)
×
647
        if client:
×
648
            self.logger.info(f"Registering {client}")
×
649
            with self.lock:
×
650
                self.clients[client] = device.id_
×
651
        return client
×
652

653
    def id_by_pairing_code(self, pairing_code):
4✔
654
        with self.lock:
×
655
            return self.pairing_code_to_id.get(pairing_code)
×
656

657
    def pairing_code_by_id(self, id_):
4✔
658
        with self.lock:
×
659
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
660
                if id2 == id_:
×
661
                    return pairing_code
×
662
            return None
×
663

664
    def unpair_pairing_code(self, pairing_code):
4✔
665
        with self.lock:
×
666
            if pairing_code not in self.pairing_code_to_id:
×
667
                return
×
668
            _id = self.pairing_code_to_id.pop(pairing_code)
×
669
        self._close_client(_id)
×
670

671
    def unpair_id(self, id_):
4✔
672
        pairing_code = self.pairing_code_by_id(id_)
×
673
        if pairing_code:
×
674
            self.unpair_pairing_code(pairing_code)
×
675
        else:
676
            self._close_client(id_)
×
677

678
    def _close_client(self, id_):
4✔
679
        with self.lock:
×
680
            client = self._client_by_id(id_)
×
681
            self.clients.pop(client, None)
×
682
        if client:
×
683
            client.close()
×
684

685
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
686
        with self.lock:
×
687
            for client, client_id in self.clients.items():
×
688
                if client_id == id_:
×
689
                    return client
×
690
        return None
×
691

692
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
693
        '''Returns a client for the device ID if one is registered.  If
694
        a device is wiped or in bootloader mode pairing is impossible;
695
        in such cases we communicate by device ID and not wallet.'''
696
        if scan_now:
×
697
            self.scan_devices()
×
698
        return self._client_by_id(id_)
×
699

700
    @runs_in_hwd_thread
4✔
701
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
702
                            keystore: 'Hardware_KeyStore',
703
                            force_pair: bool, *,
704
                            devices: Sequence['Device'] = None,
705
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
706
        self.logger.info("getting client for keystore")
×
707
        if handler is None:
×
708
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
709
        handler.update_status(False)
×
710
        pcode = keystore.pairing_code()
×
711
        client = None
×
712
        # search existing clients first (fast-path)
713
        if not devices:
×
714
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
715
        # search clients again, now allowing a (slow) scan
716
        if client is None:
×
717
            if devices is None:
×
718
                devices = self.scan_devices()
×
719
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
720
        if client is None and force_pair:
×
721
            try:
×
722
                info = self.select_device(plugin, handler, keystore, devices,
×
723
                                          allow_user_interaction=allow_user_interaction)
724
            except CannotAutoSelectDevice:
×
725
                pass
×
726
            else:
727
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
728
        if client:
×
729
            handler.update_status(True)
×
730
            # note: if select_device was called, we might also update label etc here:
731
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
732
        self.logger.info("end client for keystore")
×
733
        return client
×
734

735
    def client_by_pairing_code(
4✔
736
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
737
        devices: Sequence['Device'],
738
    ) -> Optional['HardwareClientBase']:
739
        _id = self.id_by_pairing_code(pairing_code)
×
740
        client = self._client_by_id(_id)
×
741
        if client:
×
742
            if type(client.plugin) != type(plugin):
×
743
                return
×
744
            # An unpaired client might have another wallet's handler
745
            # from a prior scan.  Replace to fix dialog parenting.
746
            client.handler = handler
×
747
            return client
×
748

749
        for device in devices:
×
750
            if device.id_ == _id:
×
751
                return self.create_client(device, handler, plugin)
×
752

753
    def force_pair_keystore(
4✔
754
        self,
755
        *,
756
        plugin: 'HW_PluginBase',
757
        handler: 'HardwareHandlerBase',
758
        info: 'DeviceInfo',
759
        keystore: 'Hardware_KeyStore',
760
    ) -> 'HardwareClientBase':
761
        xpub = keystore.xpub
×
762
        derivation = keystore.get_derivation_prefix()
×
763
        assert derivation is not None
×
764
        xtype = bip32.xpub_type(xpub)
×
765
        client = self._client_by_id(info.device.id_)
×
766
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
767
            # See comment above for same code
768
            client.handler = handler
×
769
            # This will trigger a PIN/passphrase entry request
770
            try:
×
771
                client_xpub = client.get_xpub(derivation, xtype)
×
772
            except (UserCancelled, RuntimeError):
×
773
                # Bad / cancelled PIN / passphrase
774
                client_xpub = None
×
775
            if client_xpub == xpub:
×
776
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
777
                with self.lock:
×
778
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
779
                return client
×
780

781
        # The user input has wrong PIN or passphrase, or cancelled input,
782
        # or it is not pairable
783
        raise DeviceUnpairableError(
×
784
            _('Electrum cannot pair with your {}.\n\n'
785
              'Before you request bitcoins to be sent to addresses in this '
786
              'wallet, ensure you can pair with your device, or that you have '
787
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
788
              'receive will be unspendable.').format(plugin.device))
789

790
    def list_pairable_device_infos(
4✔
791
        self,
792
        *,
793
        handler: Optional['HardwareHandlerBase'],
794
        plugin: 'HW_PluginBase',
795
        devices: Sequence['Device'] = None,
796
        include_failing_clients: bool = False,
797
    ) -> List['DeviceInfo']:
798
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
799
        Already paired devices are also included, as it is okay to reuse them.
800
        """
801
        if not plugin.libraries_available:
×
802
            message = plugin.get_library_not_available_message()
×
803
            raise HardwarePluginLibraryUnavailable(message)
×
804
        if devices is None:
×
805
            devices = self.scan_devices()
×
806
        infos = []
×
807
        for device in devices:
×
808
            if not plugin.can_recognize_device(device):
×
809
                continue
×
810
            try:
×
811
                client = self.create_client(device, handler, plugin)
×
812
                if not client:
×
813
                    continue
×
814
                label = client.label()
×
815
                is_initialized = client.is_initialized()
×
816
                soft_device_id = client.get_soft_device_id()
×
817
                model_name = client.device_model_name()
×
818
            except Exception as e:
×
819
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
820
                if include_failing_clients:
×
821
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
822
                continue
×
823
            infos.append(DeviceInfo(device=device,
×
824
                                    label=label,
825
                                    initialized=is_initialized,
826
                                    plugin_name=plugin.name,
827
                                    soft_device_id=soft_device_id,
828
                                    model_name=model_name))
829

830
        return infos
×
831

832
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
833
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
834
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
835
        """Select the device to use for keystore."""
836
        # ideally this should not be called from the GUI thread...
837
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
838
        while True:
×
839
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
840
            if infos:
×
841
                break
×
842
            if not allow_user_interaction:
×
843
                raise CannotAutoSelectDevice()
×
844
            msg = _('Please insert your {}').format(plugin.device)
×
845
            msg += " ("
×
846
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
847
                msg += f"label: {keystore.label}, "
×
848
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
849
            msg += ').\n\n{}\n\n{}'.format(
×
850
                _('Verify the cable is connected and that '
851
                  'no other application is using it.'),
852
                _('Try to connect again?')
853
            )
854
            if not handler.yes_no_question(msg):
×
855
                raise UserCancelled()
×
856
            devices = None
×
857

858
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
859
        # method 1: select device by id
860
        if keystore.soft_device_id:
×
861
            for info in infos:
×
862
                if info.soft_device_id == keystore.soft_device_id:
×
863
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
864
                    return info
×
865
        # method 2: select device by label
866
        #           but only if not a placeholder label and only if there is no collision
867
        device_labels = [info.label for info in infos]
×
868
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
869
                and device_labels.count(keystore.label) == 1):
870
            for info in infos:
×
871
                if info.label == keystore.label:
×
872
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
873
                    return info
×
874
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
875
        #           saved for keystore anyway, select it
876
        if (len(infos) == 1
×
877
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
878
                and keystore.soft_device_id is None):
879
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
880
            return infos[0]
×
881

882
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
883
        if not allow_user_interaction:
×
884
            raise CannotAutoSelectDevice()
×
885
        # ask user to select device manually
886
        msg = (
×
887
                _("Could not automatically pair with device for given keystore.") + "\n"
888
                + f"(keystore label: {keystore.label!r}, "
889
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
890
        msg += _("Please select which {} device to use:").format(plugin.device)
×
891
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
892
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
893
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
894
                                init=(_("initialized") if info.initialized else _("wiped")),
895
                                transport=info.device.transport_ui_string,
896
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
897
                        for info in infos]
898
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
899
                          f"num options: {len(infos)}. options: {infos}")
900
        c = handler.query_choice(msg, descriptions)
×
901
        if c is None:
×
902
            raise UserCancelled()
×
903
        info = infos[c]
×
904
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
905
        # note: updated label/soft_device_id will be saved after pairing succeeds
906
        return info
×
907

908
    @runs_in_hwd_thread
4✔
909
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
910
        try:
×
911
            import hid
×
912
        except ImportError:
×
913
            return []
×
914

915
        devices = []
×
916
        for d in hid.enumerate(0, 0):
×
917
            vendor_id = d['vendor_id']
×
918
            product_key = (vendor_id, d['product_id'])
×
919
            plugin = None
×
920
            if product_key in self._recognised_hardware:
×
921
                plugin = self._recognised_hardware[product_key]
×
922
            elif vendor_id in self._recognised_vendor:
×
923
                plugin = self._recognised_vendor[vendor_id]
×
924
            if plugin:
×
925
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
926
                if device:
×
927
                    devices.append(device)
×
928
        return devices
×
929

930
    @runs_in_hwd_thread
4✔
931
    @profiler
4✔
932
    def scan_devices(self) -> Sequence['Device']:
4✔
933
        self.logger.info("scanning devices...")
×
934

935
        # First see what's connected that we know about
936
        devices = self._scan_devices_with_hid()
×
937

938
        # Let plugin handlers enumerate devices we don't know about
939
        with self.lock:
×
940
            enumerate_funcs = list(self._enumerate_func)
×
941
        for f in enumerate_funcs:
×
942
            try:
×
943
                new_devices = f()
×
944
            except BaseException as e:
×
945
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
946
            else:
947
                devices.extend(new_devices)
×
948

949
        # find out what was disconnected
950
        client_ids = [dev.id_ for dev in devices]
×
951
        disconnected_clients = []
×
952
        with self.lock:
×
953
            connected = {}
×
954
            for client, id_ in self.clients.items():
×
955
                if id_ in client_ids and client.has_usable_connection_with_device():
×
956
                    connected[client] = id_
×
957
                else:
958
                    disconnected_clients.append((client, id_))
×
959
            self.clients = connected
×
960

961
        # Unpair disconnected devices
962
        for client, id_ in disconnected_clients:
×
963
            self.unpair_id(id_)
×
964
            if client.handler:
×
965
                client.handler.update_status(False)
×
966

967
        return devices
×
968

969
    @classmethod
4✔
970
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
971
        ret = {}
×
972
        # add libusb
973
        try:
×
974
            import usb1
×
975
        except Exception as e:
×
976
            ret["libusb.version"] = None
×
977
        else:
978
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
979
            try:
×
980
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
981
            except AttributeError:
×
982
                ret["libusb.path"] = None
×
983
        # add hidapi
984
        try:
×
985
            import hid
×
986
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
987
        except Exception as e:
×
988
            from importlib.metadata import version
×
989
            try:
×
990
                ret["hidapi.version"] = version("hidapi")
×
991
            except ImportError:
×
992
                ret["hidapi.version"] = None
×
993
        return ret
×
994

995
    def trigger_pairings(
4✔
996
            self,
997
            keystores: Sequence['KeyStore'],
998
            *,
999
            allow_user_interaction: bool = True,
1000
            devices: Sequence['Device'] = None,
1001
    ) -> None:
1002
        """Given a list of keystores, try to pair each with a connected hardware device.
1003

1004
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1005
        try to pair each keystore individually. Consider the following scenario:
1006
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1007
        - assume none of the devices are paired yet
1008
        1. if we tried to individually pair keystores, we might try with ks1 first
1009
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1010
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1011
             which is confusing and error-prone. It's especially problematic if the hw device does
1012
             not support labels (such as Ledger), as then the user cannot easily distinguish
1013
             same-type devices. (see #4199)
1014
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1015
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1016
        """
1017
        from .keystore import Hardware_KeyStore
×
1018
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1019
        if not keystores:
×
1020
            return
×
1021
        if devices is None:
×
1022
            devices = self.scan_devices()
×
1023
        # first pair with all devices that can be auto-selected
1024
        for ks in keystores:
×
1025
            try:
×
1026
                ks.get_client(
×
1027
                    force_pair=True,
1028
                    allow_user_interaction=False,
1029
                    devices=devices,
1030
                )
1031
            except UserCancelled:
×
1032
                pass
×
1033
        if allow_user_interaction:
×
1034
            # now do manual selections
1035
            for ks in keystores:
×
1036
                try:
×
1037
                    ks.get_client(
×
1038
                        force_pair=True,
1039
                        allow_user_interaction=True,
1040
                        devices=devices,
1041
                    )
1042
                except UserCancelled:
×
1043
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc