• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5667497631809536

16 Mar 2025 11:09AM UTC coverage: 61.258% (-0.06%) from 61.317%
5667497631809536

push

CirrusCI

web-flow
Merge pull request #9629 from f321x/plugin-commands

Allow plugins to register CLI commands

27 of 64 new or added lines in 3 files covered. (42.19%)

2 existing lines in 2 files now uncovered.

21344 of 34843 relevant lines covered (61.26%)

3.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.14
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41

42
from .i18n import _
5✔
43
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
44
from . import bip32
5✔
45
from . import plugins
5✔
46
from .simple_config import SimpleConfig
5✔
47
from .logging import get_logger, Logger
5✔
48
from .crypto import sha256
5✔
49

50
if TYPE_CHECKING:
5✔
51
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
52
    from .keystore import Hardware_KeyStore, KeyStore
×
53
    from .wallet import Abstract_Wallet
×
54

55

56
_logger = get_logger(__name__)
5✔
57
plugin_loaders = {}
5✔
58
hook_names = set()
5✔
59
hooks = {}
5✔
60

61

62
class Plugins(DaemonThread):
5✔
63

64
    LOGGING_SHORTCUT = 'p'
5✔
65
    pkgpath = os.path.dirname(plugins.__file__)
5✔
66

67
    @profiler
5✔
68
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
69
        self.config = config
5✔
70
        self.cmd_only = cmd_only  # type: bool
5✔
71
        self.internal_plugin_metadata = {}
5✔
72
        self.external_plugin_metadata = {}
5✔
73
        self.loaded_command_modules = set()  # type: set[str]
5✔
74
        if cmd_only:
5✔
75
            # only import the command modules of plugins
NEW
76
            Logger.__init__(self)
×
NEW
77
            self.find_plugins()
×
NEW
78
            return
×
79
        DaemonThread.__init__(self)
5✔
80
        self.device_manager = DeviceMgr(config)
5✔
81
        self.name = 'Plugins'  # set name of thread
5✔
82
        self.hw_wallets = {}
5✔
83
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
84
        self.gui_name = gui_name
5✔
85
        self.find_plugins()
5✔
86
        self.load_plugins()
5✔
87
        self.add_jobs(self.device_manager.thread_jobs())
5✔
88
        self.start()
5✔
89

90
    @property
5✔
91
    def descriptions(self):
5✔
92
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
93

94
    def find_internal_plugins(self):
5✔
95
        """Populates self.internal_plugin_metadata"""
96
        iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
5✔
97
        for loader, name, ispkg in iter_modules:
5✔
98
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
99
            #       once as data and once as code. To honor the "no duplicates" rule below,
100
            #       we exclude the ones packaged as *code*, here:
101
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
102
                continue
×
103
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
5✔
NEW
104
                continue
×
105
            full_name = f'electrum.plugins.{name}' + ('.commands' if self.cmd_only else '')
5✔
106
            spec = importlib.util.find_spec(full_name)
5✔
107
            if spec is None:
5✔
NEW
108
                if self.cmd_only:
×
NEW
109
                    continue # no commands module in this plugin
×
UNCOV
110
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
111
            module = self.exec_module_from_spec(spec, full_name)
5✔
112
            if self.cmd_only:
5✔
NEW
113
                assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
NEW
114
                self.loaded_command_modules.add(name)
×
NEW
115
                continue
×
116
            d = module.__dict__
5✔
117
            if 'fullname' not in d:
5✔
118
                continue
5✔
119
            d['display_name'] = d['fullname']
5✔
120
            gui_good = self.gui_name in d.get('available_for', [])
5✔
121
            if not gui_good:
5✔
122
                continue
5✔
123
            details = d.get('registers_wallet_type')
5✔
124
            if details:
5✔
125
                self.register_wallet_type(name, gui_good, details)
5✔
126
            details = d.get('registers_keystore')
5✔
127
            if details:
5✔
128
                self.register_keystore(name, gui_good, details)
5✔
129
            if d.get('requires_wallet_type'):
5✔
130
                # trustedcoin will not be added to list
131
                continue
5✔
132
            if name in self.internal_plugin_metadata:
5✔
133
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
134
                raise Exception(f"duplicate plugins? for {name=}")
×
135
            self.internal_plugin_metadata[name] = d
5✔
136

137
    @staticmethod
5✔
138
    def exec_module_from_spec(spec, path):
5✔
139
        try:
5✔
140
            module = importlib.util.module_from_spec(spec)
5✔
141
            # sys.modules needs to be modified for relative imports to work
142
            # see https://stackoverflow.com/a/50395128
143
            sys.modules[path] = module
5✔
144
            spec.loader.exec_module(module)
5✔
145
        except Exception as e:
×
146
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
147
        return module
5✔
148

149
    def find_plugins(self):
5✔
150
        self.find_internal_plugins()
5✔
151
        self.find_external_plugins()
5✔
152

153
    def load_plugins(self):
5✔
154
        self.load_internal_plugins()
5✔
155
        self.load_external_plugins()
5✔
156

157
    def load_internal_plugins(self):
5✔
158
        for name, d in self.internal_plugin_metadata.items():
5✔
159
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
160
                try:
×
161
                    self.load_internal_plugin(name)
×
162
                except BaseException as e:
×
163
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
164

165
    def load_external_plugin(self, name):
5✔
166
        if name in self.plugins:
×
167
            return self.plugins[name]
×
168
        # If we do not have the metadata, it was not detected by `load_external_plugins`
169
        # on startup, or added by manual user installation after that point.
170
        metadata = self.external_plugin_metadata.get(name)
×
171
        if metadata is None:
×
172
            self.logger.exception(f"attempted to load unknown external plugin {name}")
×
173
            return
×
174
        full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
175
        spec = importlib.util.find_spec(full_name)
×
176
        if spec is None:
×
177
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
178
        module = self.exec_module_from_spec(spec, full_name)
×
179
        plugin = module.Plugin(self, self.config, name)
×
180
        self.add_jobs(plugin.thread_jobs())
×
181
        self.plugins[name] = plugin
×
182
        self.logger.info(f"loaded external plugin {name}")
×
183
        return plugin
×
184

185
    def _has_root_permissions(self, path):
5✔
186
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
187

188
    def get_external_plugin_dir(self):
5✔
189
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
190
            return
×
191
        pkg_path = '/opt/electrum_plugins'
5✔
192
        if not os.path.exists(pkg_path):
5✔
193
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
194
            return
5✔
195
        if not self._has_root_permissions(pkg_path):
×
196
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
197
            return
×
198
        return pkg_path
×
199

200
    def external_plugin_path(self, name):
5✔
201
        metadata = self.external_plugin_metadata[name]
×
202
        filename = metadata['filename']
×
203
        return os.path.join(self.get_external_plugin_dir(), filename)
×
204

205
    def find_external_plugins(self):
5✔
206
        pkg_path = self.get_external_plugin_dir()
5✔
207
        if pkg_path is None:
5✔
208
            return
5✔
209
        for filename in os.listdir(pkg_path):
×
210
            path = os.path.join(pkg_path, filename)
×
211
            if not self._has_root_permissions(path):
×
212
                self.logger.info(f'not loading {path}: file has user write permissions')
×
213
                continue
×
214
            try:
×
215
                zipfile = zipimport.zipimporter(path)
×
216
            except zipimport.ZipImportError:
×
217
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
218
                continue
×
219
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
220
                if b is False:
×
221
                    continue
×
222
                if name in self.internal_plugin_metadata:
×
223
                    raise Exception(f"duplicate plugins for name={name}")
×
224
                if name in self.external_plugin_metadata:
×
225
                    raise Exception(f"duplicate plugins for name={name}")
×
226
                module_path = f'electrum_external_plugins.{name}'
×
227
                spec = zipfile.find_spec(name)
×
228
                module = self.exec_module_from_spec(spec, module_path)
×
NEW
229
                if self.cmd_only:
×
NEW
230
                    if self.config.get('enable_plugin_' + name) is not True:
×
NEW
231
                        continue
×
NEW
232
                    spec2 = importlib.util.find_spec(module_path + '.commands')
×
NEW
233
                    if spec2 is None:  # no commands module in this plugin
×
NEW
234
                        continue
×
NEW
235
                    self.exec_module_from_spec(spec2, module_path + '.commands')
×
NEW
236
                    assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
NEW
237
                    self.loaded_command_modules.add(name)
×
NEW
238
                    continue
×
239
                d = module.__dict__
×
240
                gui_good = self.gui_name in d.get('available_for', [])
×
241
                if not gui_good:
×
242
                    continue
×
243
                d['filename'] = filename
×
244
                if 'fullname' not in d:
×
245
                    continue
×
246
                d['display_name'] = d['fullname']
×
247
                d['zip_hash_sha256'] = get_file_hash256(path)
×
248
                self.external_plugin_metadata[name] = d
×
249

250
    def load_external_plugins(self):
5✔
251
        for name, d in self.external_plugin_metadata.items():
5✔
252
            if self.config.get('enable_plugin_' + name):
×
253
                try:
×
254
                    self.load_external_plugin(name)
×
255
                except BaseException as e:
×
256
                    traceback.print_exc(file=sys.stdout)  # shouldn't this be... suppressed unless -v?
×
257
                    self.logger.exception(f"cannot initialize plugin {name} {e!r}")
×
258

259
    def get(self, name):
5✔
260
        return self.plugins.get(name)
×
261

262
    def count(self):
5✔
263
        return len(self.plugins)
×
264

265
    def load_plugin(self, name) -> 'BasePlugin':
5✔
266
        """Imports the code of the given plugin.
267
        note: can be called from any thread.
268
        """
269
        if name in self.internal_plugin_metadata:
5✔
270
            return self.load_internal_plugin(name)
5✔
271
        elif name in self.external_plugin_metadata:
×
272
            return self.load_external_plugin(name)
×
273
        else:
274
            raise Exception(f"could not find plugin {name!r}")
×
275

276
    def load_internal_plugin(self, name) -> 'BasePlugin':
5✔
277
        if name in self.plugins:
5✔
278
            return self.plugins[name]
×
279
        full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
280
        spec = importlib.util.find_spec(full_name)
5✔
281
        if spec is None:
5✔
282
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
283
        try:
5✔
284
            module = importlib.util.module_from_spec(spec)
5✔
285
            spec.loader.exec_module(module)
5✔
286
            plugin = module.Plugin(self, self.config, name)
5✔
287
        except Exception as e:
×
288
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
289
        self.add_jobs(plugin.thread_jobs())
5✔
290
        self.plugins[name] = plugin
5✔
291
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
292
        return plugin
5✔
293

294
    def close_plugin(self, plugin):
5✔
295
        self.remove_jobs(plugin.thread_jobs())
×
296

297
    def enable(self, name: str) -> 'BasePlugin':
5✔
298
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
299
        p = self.get(name)
×
300
        if p:
×
301
            return p
×
302
        return self.load_plugin(name)
×
303

304
    def disable(self, name: str) -> None:
5✔
305
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
306
        p = self.get(name)
×
307
        if not p:
×
308
            return
×
309
        self.plugins.pop(name)
×
310
        p.close()
×
311
        self.logger.info(f"closed {name}")
×
312

313
    @classmethod
5✔
314
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
315
        return key.startswith('enable_plugin_')
×
316

317
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
318
        p = self.get(name)
×
319
        return self.disable(name) if p else self.enable(name)
×
320

321
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
322
        d = self.descriptions.get(name)
×
323
        if not d:
×
324
            return False
×
325
        deps = d.get('requires', [])
×
326
        for dep, s in deps:
×
327
            try:
×
328
                __import__(dep)
×
329
            except ImportError as e:
×
330
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
331
                return False
×
332
        requires = d.get('requires_wallet_type', [])
×
333
        return not requires or wallet.wallet_type in requires
×
334

335
    def get_hardware_support(self):
5✔
336
        out = []
×
337
        for name, (gui_good, details) in self.hw_wallets.items():
×
338
            if gui_good:
×
339
                try:
×
340
                    p = self.get_plugin(name)
×
341
                    if p.is_available():
×
342
                        out.append(HardwarePluginToScan(name=name,
×
343
                                                        description=details[2],
344
                                                        plugin=p,
345
                                                        exception=None))
346
                except Exception as e:
×
347
                    self.logger.exception(f"cannot load plugin for: {name}")
×
348
                    out.append(HardwarePluginToScan(name=name,
×
349
                                                    description=details[2],
350
                                                    plugin=None,
351
                                                    exception=e))
352
        return out
×
353

354
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
355
        from .wallet import register_wallet_type, register_constructor
5✔
356
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
357

358
        def loader():
5✔
359
            plugin = self.get_plugin(name)
5✔
360
            register_constructor(wallet_type, plugin.wallet_class)
5✔
361
        register_wallet_type(wallet_type)
5✔
362
        plugin_loaders[wallet_type] = loader
5✔
363

364
    def register_keystore(self, name, gui_good, details):
5✔
365
        from .keystore import register_keystore
5✔
366

367
        def dynamic_constructor(d):
5✔
368
            return self.get_plugin(name).keystore_class(d)
5✔
369
        if details[0] == 'hardware':
5✔
370
            self.hw_wallets[name] = (gui_good, details)
5✔
371
            self.logger.info(f"registering hardware {name}: {details}")
5✔
372
            register_keystore(details[1], dynamic_constructor)
5✔
373

374
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
375
        if name not in self.plugins:
5✔
376
            self.load_plugin(name)
5✔
377
        return self.plugins[name]
5✔
378

379
    def run(self):
5✔
380
        while self.is_running():
5✔
381
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
382
            self.run_jobs()
5✔
383
        self.on_stop()
5✔
384

385

386
def get_file_hash256(path: str) -> str:
5✔
387
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
388
    with open(path, 'rb') as f:
×
389
        return sha256(f.read()).hex()
×
390

391
def hook(func):
5✔
392
    hook_names.add(func.__name__)
5✔
393
    return func
5✔
394

395

396
def run_hook(name, *args):
5✔
397
    results = []
5✔
398
    f_list = hooks.get(name, [])
5✔
399
    for p, f in f_list:
5✔
400
        if p.is_enabled():
5✔
401
            try:
5✔
402
                r = f(*args)
5✔
403
            except Exception:
×
404
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
405
                r = False
×
406
            if r:
5✔
407
                results.append(r)
×
408

409
    if results:
5✔
410
        assert len(results) == 1, results
×
411
        return results[0]
×
412

413

414
class BasePlugin(Logger):
5✔
415

416
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
417
        self.parent = parent  # type: Plugins  # The plugins object
5✔
418
        self.name = name
5✔
419
        self.config = config
5✔
420
        self.wallet = None  # fixme: this field should not exist
5✔
421
        Logger.__init__(self)
5✔
422
        # add self to hooks
423
        for k in dir(self):
5✔
424
            if k in hook_names:
5✔
425
                l = hooks.get(k, [])
5✔
426
                l.append((self, getattr(self, k)))
5✔
427
                hooks[k] = l
5✔
428

429
    def __str__(self):
5✔
430
        return self.name
×
431

432
    def close(self):
5✔
433
        # remove self from hooks
434
        for attr_name in dir(self):
×
435
            if attr_name in hook_names:
×
436
                # found attribute in self that is also the name of a hook
437
                l = hooks.get(attr_name, [])
×
438
                try:
×
439
                    l.remove((self, getattr(self, attr_name)))
×
440
                except ValueError:
×
441
                    # maybe attr name just collided with hook name and was not hook
442
                    continue
×
443
                hooks[attr_name] = l
×
444
        self.parent.close_plugin(self)
×
445
        self.on_close()
×
446

447
    def on_close(self):
5✔
448
        pass
×
449

450
    def requires_settings(self) -> bool:
5✔
451
        return False
×
452

453
    def thread_jobs(self):
5✔
454
        return []
5✔
455

456
    def is_enabled(self):
5✔
457
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
458

459
    def is_available(self):
5✔
460
        return True
×
461

462
    def can_user_disable(self):
5✔
463
        return True
×
464

465
    def settings_widget(self, window):
5✔
466
        raise NotImplementedError()
×
467

468
    def settings_dialog(self, window):
5✔
469
        raise NotImplementedError()
×
470

471
    def read_file(self, filename: str) -> bytes:
5✔
472
        import zipfile
×
473
        if self.name in self.parent.external_plugin_metadata:
×
474
            plugin_filename = self.parent.external_plugin_path(self.name)
×
475
            with zipfile.ZipFile(plugin_filename) as myzip:
×
476
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
477
                    return myfile.read()
×
478
        else:
479
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
480
            with open(path, 'rb') as myfile:
×
481
                return myfile.read()
×
482

483

484
class DeviceUnpairableError(UserFacingException): pass
5✔
485
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
486
class CannotAutoSelectDevice(Exception): pass
5✔
487

488

489
class Device(NamedTuple):
5✔
490
    path: Union[str, bytes]
5✔
491
    interface_number: int
5✔
492
    id_: str
5✔
493
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
494
    usage_page: int
5✔
495
    transport_ui_string: str
5✔
496

497

498
class DeviceInfo(NamedTuple):
5✔
499
    device: Device
5✔
500
    label: Optional[str] = None
5✔
501
    initialized: Optional[bool] = None
5✔
502
    exception: Optional[Exception] = None
5✔
503
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
504
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
505
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
506

507

508
class HardwarePluginToScan(NamedTuple):
5✔
509
    name: str
5✔
510
    description: str
5✔
511
    plugin: Optional['HW_PluginBase']
5✔
512
    exception: Optional[Exception]
5✔
513

514

515
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
516

517

518
# hidapi is not thread-safe
519
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
520
#     https://github.com/libusb/hidapi/issues/45
521
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
522
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
523
# It is not entirely clear to me, exactly what is safe and what isn't, when
524
# using multiple threads...
525
# Hence, we use a single thread for all device communications, including
526
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
527
# the following thread:
528
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
529
    max_workers=1,
530
    thread_name_prefix='hwd_comms_thread'
531
)
532

533
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
534
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
535
# To keep it simple, let's just import it now, as we are likely in the main thread here.
536
if threading.current_thread() is not threading.main_thread():
5✔
537
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
538
try:
5✔
539
    import hid
5✔
540
except ImportError:
5✔
541
    pass
5✔
542

543

544
T = TypeVar('T')
5✔
545

546

547
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
548
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
549
        return func()
×
550
    else:
551
        fut = _hwd_comms_executor.submit(func)
×
552
        return fut.result()
×
553
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
554

555

556
def runs_in_hwd_thread(func):
5✔
557
    @wraps(func)
5✔
558
    def wrapper(*args, **kwargs):
5✔
559
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
560
    return wrapper
5✔
561

562

563
def assert_runs_in_hwd_thread():
5✔
564
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
565
        raise Exception("must only be called from HWD communication thread")
×
566

567

568
class DeviceMgr(ThreadJob):
5✔
569
    """Manages hardware clients.  A client communicates over a hardware
570
    channel with the device.
571

572
    In addition to tracking device HID IDs, the device manager tracks
573
    hardware wallets and manages wallet pairing.  A HID ID may be
574
    paired with a wallet when it is confirmed that the hardware device
575
    matches the wallet, i.e. they have the same master public key.  A
576
    HID ID can be unpaired if e.g. it is wiped.
577

578
    Because of hotplugging, a wallet must request its client
579
    dynamically each time it is required, rather than caching it
580
    itself.
581

582
    The device manager is shared across plugins, so just one place
583
    does hardware scans when needed.  By tracking HID IDs, if a device
584
    is plugged into a different port the wallet is automatically
585
    re-paired.
586

587
    Wallets are informed on connect / disconnect events.  It must
588
    implement connected(), disconnected() callbacks.  Being connected
589
    implies a pairing.  Callbacks can happen in any thread context,
590
    and we do them without holding the lock.
591

592
    Confusingly, the HID ID (serial number) reported by the HID system
593
    doesn't match the device ID reported by the device itself.  We use
594
    the HID IDs.
595

596
    This plugin is thread-safe.  Currently only devices supported by
597
    hidapi are implemented."""
598

599
    def __init__(self, config: SimpleConfig):
5✔
600
        ThreadJob.__init__(self)
5✔
601
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
602
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
603
        # A client->id_ map. Needs self.lock.
604
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
605
        # What we recognise.  (vendor_id, product_id) -> Plugin
606
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
607
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
608
        # Custom enumerate functions for devices we don't know about.
609
        self._enumerate_func = set()  # Needs self.lock.
5✔
610

611
        self.lock = threading.RLock()
5✔
612

613
        self.config = config
5✔
614

615
    def thread_jobs(self):
5✔
616
        # Thread job to handle device timeouts
617
        return [self]
5✔
618

619
    def run(self):
5✔
620
        '''Handle device timeouts.  Runs in the context of the Plugins
621
        thread.'''
622
        with self.lock:
5✔
623
            clients = list(self.clients.keys())
5✔
624
        cutoff = time.time() - self.config.get_session_timeout()
5✔
625
        for client in clients:
5✔
626
            client.timeout(cutoff)
×
627

628
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
629
        for pair in device_pairs:
×
630
            self._recognised_hardware[pair] = plugin
×
631

632
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
633
        for vendor_id in vendor_ids:
×
634
            self._recognised_vendor[vendor_id] = plugin
×
635

636
    def register_enumerate_func(self, func):
5✔
637
        with self.lock:
×
638
            self._enumerate_func.add(func)
×
639

640
    @runs_in_hwd_thread
5✔
641
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
642
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
643
        # Get from cache first
644
        client = self._client_by_id(device.id_)
×
645
        if client:
×
646
            return client
×
647
        client = plugin.create_client(device, handler)
×
648
        if client:
×
649
            self.logger.info(f"Registering {client}")
×
650
            with self.lock:
×
651
                self.clients[client] = device.id_
×
652
        return client
×
653

654
    def id_by_pairing_code(self, pairing_code):
5✔
655
        with self.lock:
×
656
            return self.pairing_code_to_id.get(pairing_code)
×
657

658
    def pairing_code_by_id(self, id_):
5✔
659
        with self.lock:
×
660
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
661
                if id2 == id_:
×
662
                    return pairing_code
×
663
            return None
×
664

665
    def unpair_pairing_code(self, pairing_code):
5✔
666
        with self.lock:
×
667
            if pairing_code not in self.pairing_code_to_id:
×
668
                return
×
669
            _id = self.pairing_code_to_id.pop(pairing_code)
×
670
        self._close_client(_id)
×
671

672
    def unpair_id(self, id_):
5✔
673
        pairing_code = self.pairing_code_by_id(id_)
×
674
        if pairing_code:
×
675
            self.unpair_pairing_code(pairing_code)
×
676
        else:
677
            self._close_client(id_)
×
678

679
    def _close_client(self, id_):
5✔
680
        with self.lock:
×
681
            client = self._client_by_id(id_)
×
682
            self.clients.pop(client, None)
×
683
        if client:
×
684
            client.close()
×
685

686
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
687
        with self.lock:
×
688
            for client, client_id in self.clients.items():
×
689
                if client_id == id_:
×
690
                    return client
×
691
        return None
×
692

693
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
694
        '''Returns a client for the device ID if one is registered.  If
695
        a device is wiped or in bootloader mode pairing is impossible;
696
        in such cases we communicate by device ID and not wallet.'''
697
        if scan_now:
×
698
            self.scan_devices()
×
699
        return self._client_by_id(id_)
×
700

701
    @runs_in_hwd_thread
5✔
702
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
703
                            keystore: 'Hardware_KeyStore',
704
                            force_pair: bool, *,
705
                            devices: Sequence['Device'] = None,
706
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
707
        self.logger.info("getting client for keystore")
×
708
        if handler is None:
×
709
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
710
        handler.update_status(False)
×
711
        pcode = keystore.pairing_code()
×
712
        client = None
×
713
        # search existing clients first (fast-path)
714
        if not devices:
×
715
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
716
        # search clients again, now allowing a (slow) scan
717
        if client is None:
×
718
            if devices is None:
×
719
                devices = self.scan_devices()
×
720
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
721
        if client is None and force_pair:
×
722
            try:
×
723
                info = self.select_device(plugin, handler, keystore, devices,
×
724
                                          allow_user_interaction=allow_user_interaction)
725
            except CannotAutoSelectDevice:
×
726
                pass
×
727
            else:
728
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
729
        if client:
×
730
            handler.update_status(True)
×
731
            # note: if select_device was called, we might also update label etc here:
732
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
733
        self.logger.info("end client for keystore")
×
734
        return client
×
735

736
    def client_by_pairing_code(
5✔
737
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
738
        devices: Sequence['Device'],
739
    ) -> Optional['HardwareClientBase']:
740
        _id = self.id_by_pairing_code(pairing_code)
×
741
        client = self._client_by_id(_id)
×
742
        if client:
×
743
            if type(client.plugin) != type(plugin):
×
744
                return
×
745
            # An unpaired client might have another wallet's handler
746
            # from a prior scan.  Replace to fix dialog parenting.
747
            client.handler = handler
×
748
            return client
×
749

750
        for device in devices:
×
751
            if device.id_ == _id:
×
752
                return self.create_client(device, handler, plugin)
×
753

754
    def force_pair_keystore(
5✔
755
        self,
756
        *,
757
        plugin: 'HW_PluginBase',
758
        handler: 'HardwareHandlerBase',
759
        info: 'DeviceInfo',
760
        keystore: 'Hardware_KeyStore',
761
    ) -> 'HardwareClientBase':
762
        xpub = keystore.xpub
×
763
        derivation = keystore.get_derivation_prefix()
×
764
        assert derivation is not None
×
765
        xtype = bip32.xpub_type(xpub)
×
766
        client = self._client_by_id(info.device.id_)
×
767
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
768
            # See comment above for same code
769
            client.handler = handler
×
770
            # This will trigger a PIN/passphrase entry request
771
            try:
×
772
                client_xpub = client.get_xpub(derivation, xtype)
×
773
            except (UserCancelled, RuntimeError):
×
774
                # Bad / cancelled PIN / passphrase
775
                client_xpub = None
×
776
            if client_xpub == xpub:
×
777
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
778
                with self.lock:
×
779
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
780
                return client
×
781

782
        # The user input has wrong PIN or passphrase, or cancelled input,
783
        # or it is not pairable
784
        raise DeviceUnpairableError(
×
785
            _('Electrum cannot pair with your {}.\n\n'
786
              'Before you request bitcoins to be sent to addresses in this '
787
              'wallet, ensure you can pair with your device, or that you have '
788
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
789
              'receive will be unspendable.').format(plugin.device))
790

791
    def list_pairable_device_infos(
5✔
792
        self,
793
        *,
794
        handler: Optional['HardwareHandlerBase'],
795
        plugin: 'HW_PluginBase',
796
        devices: Sequence['Device'] = None,
797
        include_failing_clients: bool = False,
798
    ) -> List['DeviceInfo']:
799
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
800
        Already paired devices are also included, as it is okay to reuse them.
801
        """
802
        if not plugin.libraries_available:
×
803
            message = plugin.get_library_not_available_message()
×
804
            raise HardwarePluginLibraryUnavailable(message)
×
805
        if devices is None:
×
806
            devices = self.scan_devices()
×
807
        infos = []
×
808
        for device in devices:
×
809
            if not plugin.can_recognize_device(device):
×
810
                continue
×
811
            try:
×
812
                client = self.create_client(device, handler, plugin)
×
813
                if not client:
×
814
                    continue
×
815
                label = client.label()
×
816
                is_initialized = client.is_initialized()
×
817
                soft_device_id = client.get_soft_device_id()
×
818
                model_name = client.device_model_name()
×
819
            except Exception as e:
×
820
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
821
                if include_failing_clients:
×
822
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
823
                continue
×
824
            infos.append(DeviceInfo(device=device,
×
825
                                    label=label,
826
                                    initialized=is_initialized,
827
                                    plugin_name=plugin.name,
828
                                    soft_device_id=soft_device_id,
829
                                    model_name=model_name))
830

831
        return infos
×
832

833
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
834
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
835
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
836
        """Select the device to use for keystore."""
837
        # ideally this should not be called from the GUI thread...
838
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
839
        while True:
×
840
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
841
            if infos:
×
842
                break
×
843
            if not allow_user_interaction:
×
844
                raise CannotAutoSelectDevice()
×
845
            msg = _('Please insert your {}').format(plugin.device)
×
846
            msg += " ("
×
847
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
848
                msg += f"label: {keystore.label}, "
×
849
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
850
            msg += ').\n\n{}\n\n{}'.format(
×
851
                _('Verify the cable is connected and that '
852
                  'no other application is using it.'),
853
                _('Try to connect again?')
854
            )
855
            if not handler.yes_no_question(msg):
×
856
                raise UserCancelled()
×
857
            devices = None
×
858

859
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
860
        # method 1: select device by id
861
        if keystore.soft_device_id:
×
862
            for info in infos:
×
863
                if info.soft_device_id == keystore.soft_device_id:
×
864
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
865
                    return info
×
866
        # method 2: select device by label
867
        #           but only if not a placeholder label and only if there is no collision
868
        device_labels = [info.label for info in infos]
×
869
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
870
                and device_labels.count(keystore.label) == 1):
871
            for info in infos:
×
872
                if info.label == keystore.label:
×
873
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
874
                    return info
×
875
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
876
        #           saved for keystore anyway, select it
877
        if (len(infos) == 1
×
878
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
879
                and keystore.soft_device_id is None):
880
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
881
            return infos[0]
×
882

883
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
884
        if not allow_user_interaction:
×
885
            raise CannotAutoSelectDevice()
×
886
        # ask user to select device manually
887
        msg = (
×
888
                _("Could not automatically pair with device for given keystore.") + "\n"
889
                + f"(keystore label: {keystore.label!r}, "
890
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
891
        msg += _("Please select which {} device to use:").format(plugin.device)
×
892
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
893
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
894
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
895
                                init=(_("initialized") if info.initialized else _("wiped")),
896
                                transport=info.device.transport_ui_string,
897
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
898
                        for info in infos]
899
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
900
                          f"num options: {len(infos)}. options: {infos}")
901
        c = handler.query_choice(msg, descriptions)
×
902
        if c is None:
×
903
            raise UserCancelled()
×
904
        info = infos[c]
×
905
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
906
        # note: updated label/soft_device_id will be saved after pairing succeeds
907
        return info
×
908

909
    @runs_in_hwd_thread
5✔
910
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
911
        try:
×
912
            import hid
×
913
        except ImportError:
×
914
            return []
×
915

916
        devices = []
×
917
        for d in hid.enumerate(0, 0):
×
918
            vendor_id = d['vendor_id']
×
919
            product_key = (vendor_id, d['product_id'])
×
920
            plugin = None
×
921
            if product_key in self._recognised_hardware:
×
922
                plugin = self._recognised_hardware[product_key]
×
923
            elif vendor_id in self._recognised_vendor:
×
924
                plugin = self._recognised_vendor[vendor_id]
×
925
            if plugin:
×
926
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
927
                if device:
×
928
                    devices.append(device)
×
929
        return devices
×
930

931
    @runs_in_hwd_thread
5✔
932
    @profiler
5✔
933
    def scan_devices(self) -> Sequence['Device']:
5✔
934
        self.logger.info("scanning devices...")
×
935

936
        # First see what's connected that we know about
937
        devices = self._scan_devices_with_hid()
×
938

939
        # Let plugin handlers enumerate devices we don't know about
940
        with self.lock:
×
941
            enumerate_funcs = list(self._enumerate_func)
×
942
        for f in enumerate_funcs:
×
943
            try:
×
944
                new_devices = f()
×
945
            except BaseException as e:
×
946
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
947
            else:
948
                devices.extend(new_devices)
×
949

950
        # find out what was disconnected
951
        client_ids = [dev.id_ for dev in devices]
×
952
        disconnected_clients = []
×
953
        with self.lock:
×
954
            connected = {}
×
955
            for client, id_ in self.clients.items():
×
956
                if id_ in client_ids and client.has_usable_connection_with_device():
×
957
                    connected[client] = id_
×
958
                else:
959
                    disconnected_clients.append((client, id_))
×
960
            self.clients = connected
×
961

962
        # Unpair disconnected devices
963
        for client, id_ in disconnected_clients:
×
964
            self.unpair_id(id_)
×
965
            if client.handler:
×
966
                client.handler.update_status(False)
×
967

968
        return devices
×
969

970
    @classmethod
5✔
971
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
972
        ret = {}
×
973
        # add libusb
974
        try:
×
975
            import usb1
×
976
        except Exception as e:
×
977
            ret["libusb.version"] = None
×
978
        else:
979
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
980
            try:
×
981
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
982
            except AttributeError:
×
983
                ret["libusb.path"] = None
×
984
        # add hidapi
985
        try:
×
986
            import hid
×
987
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
988
        except Exception as e:
×
989
            from importlib.metadata import version
×
990
            try:
×
991
                ret["hidapi.version"] = version("hidapi")
×
992
            except ImportError:
×
993
                ret["hidapi.version"] = None
×
994
        return ret
×
995

996
    def trigger_pairings(
5✔
997
            self,
998
            keystores: Sequence['KeyStore'],
999
            *,
1000
            allow_user_interaction: bool = True,
1001
            devices: Sequence['Device'] = None,
1002
    ) -> None:
1003
        """Given a list of keystores, try to pair each with a connected hardware device.
1004

1005
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1006
        try to pair each keystore individually. Consider the following scenario:
1007
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1008
        - assume none of the devices are paired yet
1009
        1. if we tried to individually pair keystores, we might try with ks1 first
1010
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1011
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1012
             which is confusing and error-prone. It's especially problematic if the hw device does
1013
             not support labels (such as Ledger), as then the user cannot easily distinguish
1014
             same-type devices. (see #4199)
1015
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1016
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1017
        """
1018
        from .keystore import Hardware_KeyStore
×
1019
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1020
        if not keystores:
×
1021
            return
×
1022
        if devices is None:
×
1023
            devices = self.scan_devices()
×
1024
        # first pair with all devices that can be auto-selected
1025
        for ks in keystores:
×
1026
            try:
×
1027
                ks.get_client(
×
1028
                    force_pair=True,
1029
                    allow_user_interaction=False,
1030
                    devices=devices,
1031
                )
1032
            except UserCancelled:
×
1033
                pass
×
1034
        if allow_user_interaction:
×
1035
            # now do manual selections
1036
            for ks in keystores:
×
1037
                try:
×
1038
                    ks.get_client(
×
1039
                        force_pair=True,
1040
                        allow_user_interaction=True,
1041
                        devices=devices,
1042
                    )
1043
                except UserCancelled:
×
1044
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc