• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6444389930631168

13 Mar 2025 11:52AM UTC coverage: 60.671% (-0.06%) from 60.733%
6444389930631168

Pull #9629

CirrusCI

f321x
add handling of plugin commands
Pull Request #9629: Allow plugins to register CLI commands

26 of 74 new or added lines in 3 files covered. (35.14%)

160 existing lines in 1 file now uncovered.

21137 of 34839 relevant lines covered (60.67%)

3.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.26
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41

42
from .i18n import _
5✔
43
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
44
from . import bip32
5✔
45
from . import plugins
5✔
46
from .simple_config import SimpleConfig
5✔
47
from .logging import get_logger, Logger
5✔
48
from .crypto import sha256
5✔
49

50
if TYPE_CHECKING:
5✔
51
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
52
    from .keystore import Hardware_KeyStore, KeyStore
×
53
    from .wallet import Abstract_Wallet
×
54

55

56
_logger = get_logger(__name__)
5✔
57
plugin_loaders = {}
5✔
58
hook_names = set()
5✔
59
hooks = {}
5✔
60

61

62
class Plugins(DaemonThread):
5✔
63

64
    LOGGING_SHORTCUT = 'p'
5✔
65
    pkgpath = os.path.dirname(plugins.__file__)
5✔
66

67
    @profiler
5✔
68
    def __init__(self, config: SimpleConfig = None, gui_name = None, cmd_only: bool = False):
5✔
69
        self.cmd_only = cmd_only  # type: bool
5✔
70
        self.internal_plugin_metadata = {}
5✔
71
        self.external_plugin_metadata = {}
5✔
72
        self.loaded_command_modules = set()  # type: set[str]
5✔
73
        if cmd_only:
5✔
74
            # only import the command modules of plugins
NEW
75
            self.find_plugins()
×
NEW
76
            return
×
77
        DaemonThread.__init__(self)
5✔
78
        self.device_manager = DeviceMgr(config)
5✔
79
        self.name = 'Plugins'  # set name of thread
5✔
80
        self.config = config
5✔
81
        self.hw_wallets = {}
5✔
82
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
83
        self.gui_name = gui_name
5✔
84
        self.find_plugins()
5✔
85
        self.load_plugins()
5✔
86
        self.add_jobs(self.device_manager.thread_jobs())
5✔
87
        self.start()
5✔
88

89
    def __getattr__(self, item):
5✔
90
        # to prevent accessing of a cmd_only instance of this class
NEW
91
        if self.cmd_only:
×
NEW
92
            raise Exception(f"This instance of Plugins is only for command importing")
×
93

94
    @property
5✔
95
    def descriptions(self):
5✔
NEW
96
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
97

98
    def find_internal_plugins(self):
5✔
99
        """Populates self.internal_plugin_metadata"""
100
        iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
5✔
101
        for loader, name, ispkg in iter_modules:
5✔
102
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
103
            #       once as data and once as code. To honor the "no duplicates" rule below,
104
            #       we exclude the ones packaged as *code*, here:
105
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
UNCOV
106
                continue
×
107
            full_name = f'electrum.plugins.{name}' + ('.commands' if self.cmd_only else '')
5✔
108
            spec = importlib.util.find_spec(full_name)
5✔
109
            if spec is None:
5✔
110
                if self.cmd_only:
×
NEW
111
                    continue # no commands module in this plugin
×
UNCOV
112
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
113
            module = self.exec_module_from_spec(spec, full_name)
5✔
114
            if self.cmd_only:
5✔
NEW
115
                assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
UNCOV
116
                self.loaded_command_modules.add(name)
×
UNCOV
117
                continue
×
118
            d = module.__dict__
5✔
119
            if 'fullname' not in d:
5✔
120
                continue
5✔
121
            d['display_name'] = d['fullname']
5✔
122
            gui_good = self.gui_name in d.get('available_for', [])
5✔
123
            if not gui_good:
5✔
124
                continue
5✔
125
            details = d.get('registers_wallet_type')
5✔
126
            if details:
5✔
127
                self.register_wallet_type(name, gui_good, details)
5✔
128
            details = d.get('registers_keystore')
5✔
129
            if details:
5✔
130
                self.register_keystore(name, gui_good, details)
5✔
131
            if d.get('requires_wallet_type'):
5✔
132
                # trustedcoin will not be added to list
133
                continue
5✔
134
            if name in self.internal_plugin_metadata:
5✔
UNCOV
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
UNCOV
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            self.internal_plugin_metadata[name] = d
5✔
138

139
    @staticmethod
5✔
140
    def exec_module_from_spec(spec, path):
5✔
141
        try:
5✔
142
            module = importlib.util.module_from_spec(spec)
5✔
143
            # sys.modules needs to be modified for relative imports to work
144
            # see https://stackoverflow.com/a/50395128
145
            sys.modules[path] = module
5✔
146
            spec.loader.exec_module(module)
5✔
UNCOV
147
        except Exception as e:
×
UNCOV
148
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
149
        return module
5✔
150

151
    def find_plugins(self):
5✔
152
        self.find_internal_plugins()
5✔
153
        self.find_external_plugins()
5✔
154

155
    def load_plugins(self):
5✔
156
        self.load_internal_plugins()
5✔
157
        self.load_external_plugins()
5✔
158

159
    def load_internal_plugins(self):
5✔
160
        for name, d in self.internal_plugin_metadata.items():
5✔
161
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
UNCOV
162
                try:
×
UNCOV
163
                    self.load_internal_plugin(name)
×
UNCOV
164
                except BaseException as e:
×
UNCOV
165
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
166

167
    def load_external_plugin(self, name):
5✔
168
        if name in self.plugins:
×
169
            return self.plugins[name]
×
170
        # If we do not have the metadata, it was not detected by `load_external_plugins`
171
        # on startup, or added by manual user installation after that point.
172
        metadata = self.external_plugin_metadata.get(name)
×
173
        if metadata is None:
×
UNCOV
174
            self.logger.exception(f"attempted to load unknown external plugin {name}")
×
UNCOV
175
            return
×
176
        full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
177
        spec = importlib.util.find_spec(full_name)
×
178
        if spec is None:
×
179
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
180
        module = self.exec_module_from_spec(spec, full_name)
×
181
        plugin = module.Plugin(self, self.config, name)
×
182
        self.add_jobs(plugin.thread_jobs())
×
183
        self.plugins[name] = plugin
×
184
        self.logger.info(f"loaded external plugin {name}")
×
185
        return plugin
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_external_plugin_dir(self):
5✔
191
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
192
            return
×
193
        pkg_path = '/opt/electrum_plugins'
5✔
194
        if not os.path.exists(pkg_path):
5✔
195
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
196
            return
5✔
UNCOV
197
        if not self._has_root_permissions(pkg_path):
×
UNCOV
198
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
NEW
199
            return
×
UNCOV
200
        return pkg_path
×
201

202
    def external_plugin_path(self, name):
5✔
203
        metadata = self.external_plugin_metadata[name]
×
204
        filename = metadata['filename']
×
UNCOV
205
        return os.path.join(self.get_external_plugin_dir(), filename)
×
206

207
    def find_external_plugins(self):
5✔
208
        pkg_path = self.get_external_plugin_dir()
5✔
209
        if pkg_path is None:
5✔
210
            return
5✔
UNCOV
211
        for filename in os.listdir(pkg_path):
×
UNCOV
212
            path = os.path.join(pkg_path, filename)
×
UNCOV
213
            if not self._has_root_permissions(path):
×
UNCOV
214
                self.logger.info(f'not loading {path}: file has user write permissions')
×
215
                continue
×
216
            try:
×
217
                zipfile = zipimport.zipimporter(path)
×
218
            except zipimport.ZipImportError:
×
219
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
220
                continue
×
221
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
222
                if b is False:
×
223
                    continue
×
224
                if name in self.internal_plugin_metadata:
×
225
                    raise Exception(f"duplicate plugins for name={name}")
×
226
                if name in self.external_plugin_metadata:
×
227
                    raise Exception(f"duplicate plugins for name={name}")
×
228
                module_path = f'electrum_external_plugins.{name}'
×
229
                spec = zipfile.find_spec(name)
×
230
                module = self.exec_module_from_spec(spec, module_path)
×
231
                if self.cmd_only:
×
232
                    spec2 = importlib.util.find_spec(module_path + '.commands')
×
233
                    if spec2 is None:  # no commands module in this plugin
×
234
                        continue
×
NEW
235
                    self.exec_module_from_spec(spec2, module_path + '.commands')
×
NEW
236
                    assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
NEW
237
                    self.loaded_command_modules.add(name)
×
NEW
238
                    continue
×
NEW
239
                d = module.__dict__
×
NEW
240
                gui_good = self.gui_name in d.get('available_for', [])
×
NEW
241
                if not gui_good:
×
NEW
242
                    continue
×
243
                d['filename'] = filename
×
244
                if 'fullname' not in d:
×
245
                    continue
×
246
                d['display_name'] = d['fullname']
×
247
                d['zip_hash_sha256'] = get_file_hash256(path)
×
248
                self.external_plugin_metadata[name] = d
×
249

250
    def load_external_plugins(self):
5✔
251
        for name, d in self.external_plugin_metadata.items():
5✔
252
            if self.config.get('enable_plugin_' + name):
×
UNCOV
253
                try:
×
UNCOV
254
                    self.load_external_plugin(name)
×
UNCOV
255
                except BaseException as e:
×
256
                    traceback.print_exc(file=sys.stdout)  # shouldn't this be... suppressed unless -v?
×
257
                    self.logger.exception(f"cannot initialize plugin {name} {e!r}")
×
258

259
    def get(self, name):
5✔
260
        return self.plugins.get(name)
×
261

262
    def count(self):
5✔
UNCOV
263
        return len(self.plugins)
×
264

265
    def load_plugin(self, name) -> 'BasePlugin':
5✔
266
        """Imports the code of the given plugin.
267
        note: can be called from any thread.
268
        """
269
        if name in self.internal_plugin_metadata:
5✔
270
            return self.load_internal_plugin(name)
5✔
UNCOV
271
        elif name in self.external_plugin_metadata:
×
UNCOV
272
            return self.load_external_plugin(name)
×
273
        else:
UNCOV
274
            raise Exception(f"could not find plugin {name!r}")
×
275

276
    def load_internal_plugin(self, name) -> 'BasePlugin':
5✔
277
        if name in self.plugins:
5✔
278
            return self.plugins[name]
×
279
        full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
280
        spec = importlib.util.find_spec(full_name)
5✔
281
        if spec is None:
5✔
282
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
283
        try:
5✔
284
            module = importlib.util.module_from_spec(spec)
5✔
285
            spec.loader.exec_module(module)
5✔
286
            plugin = module.Plugin(self, self.config, name)
5✔
UNCOV
287
        except Exception as e:
×
UNCOV
288
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
289
        self.add_jobs(plugin.thread_jobs())
5✔
290
        self.plugins[name] = plugin
5✔
291
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
292
        return plugin
5✔
293

294
    def close_plugin(self, plugin):
5✔
UNCOV
295
        self.remove_jobs(plugin.thread_jobs())
×
296

297
    def enable(self, name: str) -> 'BasePlugin':
5✔
UNCOV
298
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
299
        p = self.get(name)
×
UNCOV
300
        if p:
×
UNCOV
301
            return p
×
302
        return self.load_plugin(name)
×
303

304
    def disable(self, name: str) -> None:
5✔
305
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
306
        p = self.get(name)
×
UNCOV
307
        if not p:
×
UNCOV
308
            return
×
309
        self.plugins.pop(name)
×
310
        p.close()
×
311
        self.logger.info(f"closed {name}")
×
312

313
    @classmethod
5✔
314
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
315
        return key.startswith('enable_plugin_')
×
316

317
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
UNCOV
318
        p = self.get(name)
×
319
        return self.disable(name) if p else self.enable(name)
×
320

321
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
322
        d = self.descriptions.get(name)
×
323
        if not d:
×
UNCOV
324
            return False
×
UNCOV
325
        deps = d.get('requires', [])
×
326
        for dep, s in deps:
×
327
            try:
×
328
                __import__(dep)
×
329
            except ImportError as e:
×
330
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
331
                return False
×
332
        requires = d.get('requires_wallet_type', [])
×
333
        return not requires or wallet.wallet_type in requires
×
334

335
    def get_hardware_support(self):
5✔
336
        out = []
×
337
        for name, (gui_good, details) in self.hw_wallets.items():
×
UNCOV
338
            if gui_good:
×
UNCOV
339
                try:
×
340
                    p = self.get_plugin(name)
×
341
                    if p.is_available():
×
342
                        out.append(HardwarePluginToScan(name=name,
×
343
                                                        description=details[2],
344
                                                        plugin=p,
345
                                                        exception=None))
346
                except Exception as e:
×
UNCOV
347
                    self.logger.exception(f"cannot load plugin for: {name}")
×
UNCOV
348
                    out.append(HardwarePluginToScan(name=name,
×
349
                                                    description=details[2],
350
                                                    plugin=None,
351
                                                    exception=e))
352
        return out
×
353

354
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
355
        from .wallet import register_wallet_type, register_constructor
5✔
356
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
357

358
        def loader():
5✔
359
            plugin = self.get_plugin(name)
5✔
360
            register_constructor(wallet_type, plugin.wallet_class)
5✔
361
        register_wallet_type(wallet_type)
5✔
362
        plugin_loaders[wallet_type] = loader
5✔
363

364
    def register_keystore(self, name, gui_good, details):
5✔
365
        from .keystore import register_keystore
5✔
366

367
        def dynamic_constructor(d):
5✔
368
            return self.get_plugin(name).keystore_class(d)
5✔
369
        if details[0] == 'hardware':
5✔
370
            self.hw_wallets[name] = (gui_good, details)
5✔
371
            self.logger.info(f"registering hardware {name}: {details}")
5✔
372
            register_keystore(details[1], dynamic_constructor)
5✔
373

374
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
375
        if name not in self.plugins:
5✔
376
            self.load_plugin(name)
5✔
377
        return self.plugins[name]
5✔
378

379
    def run(self):
5✔
380
        while self.is_running():
5✔
381
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
382
            self.run_jobs()
5✔
383
        self.on_stop()
5✔
384

385

386
def get_file_hash256(path: str) -> str:
5✔
387
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
UNCOV
388
    with open(path, 'rb') as f:
×
NEW
389
        return sha256(f.read()).hex()
×
390

391
def hook(func):
5✔
392
    hook_names.add(func.__name__)
5✔
393
    return func
5✔
394

395

396
def run_hook(name, *args):
5✔
397
    results = []
5✔
398
    f_list = hooks.get(name, [])
5✔
399
    for p, f in f_list:
5✔
400
        if p.is_enabled():
5✔
401
            try:
5✔
402
                r = f(*args)
5✔
UNCOV
403
            except Exception:
×
UNCOV
404
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
405
                r = False
×
406
            if r:
5✔
407
                results.append(r)
×
408

409
    if results:
5✔
UNCOV
410
        assert len(results) == 1, results
×
411
        return results[0]
×
412

413

414
class BasePlugin(Logger):
5✔
415

416
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
417
        self.parent = parent  # type: Plugins  # The plugins object
5✔
418
        self.name = name
5✔
419
        self.config = config
5✔
420
        self.wallet = None  # fixme: this field should not exist
5✔
421
        Logger.__init__(self)
5✔
422
        # add self to hooks
423
        for k in dir(self):
5✔
424
            if k in hook_names:
5✔
425
                l = hooks.get(k, [])
5✔
426
                l.append((self, getattr(self, k)))
5✔
427
                hooks[k] = l
5✔
428

429
    def __str__(self):
5✔
UNCOV
430
        return self.name
×
431

432
    def close(self):
5✔
433
        # remove self from hooks
434
        for attr_name in dir(self):
×
UNCOV
435
            if attr_name in hook_names:
×
436
                # found attribute in self that is also the name of a hook
UNCOV
437
                l = hooks.get(attr_name, [])
×
438
                try:
×
439
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
440
                except ValueError:
×
441
                    # maybe attr name just collided with hook name and was not hook
442
                    continue
×
443
                hooks[attr_name] = l
×
444
        self.parent.close_plugin(self)
×
UNCOV
445
        self.on_close()
×
446

447
    def on_close(self):
5✔
448
        pass
×
449

450
    def requires_settings(self) -> bool:
5✔
UNCOV
451
        return False
×
452

453
    def thread_jobs(self):
5✔
454
        return []
5✔
455

456
    def is_enabled(self):
5✔
UNCOV
457
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
458

459
    def is_available(self):
5✔
UNCOV
460
        return True
×
461

462
    def can_user_disable(self):
5✔
UNCOV
463
        return True
×
464

465
    def settings_widget(self, window):
5✔
UNCOV
466
        raise NotImplementedError()
×
467

468
    def settings_dialog(self, window):
5✔
UNCOV
469
        raise NotImplementedError()
×
470

471
    def read_file(self, filename: str) -> bytes:
5✔
UNCOV
472
        import zipfile
×
473
        if self.name in self.parent.external_plugin_metadata:
×
UNCOV
474
            plugin_filename = self.parent.external_plugin_path(self.name)
×
UNCOV
475
            with zipfile.ZipFile(plugin_filename) as myzip:
×
476
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
477
                    return myfile.read()
×
478
        else:
479
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
480
            with open(path, 'rb') as myfile:
×
481
                return myfile.read()
×
482

483

484
class DeviceUnpairableError(UserFacingException): pass
5✔
485
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
486
class CannotAutoSelectDevice(Exception): pass
5✔
487

488

489
class Device(NamedTuple):
5✔
490
    path: Union[str, bytes]
5✔
491
    interface_number: int
5✔
492
    id_: str
5✔
493
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
494
    usage_page: int
5✔
495
    transport_ui_string: str
5✔
496

497

498
class DeviceInfo(NamedTuple):
5✔
499
    device: Device
5✔
500
    label: Optional[str] = None
5✔
501
    initialized: Optional[bool] = None
5✔
502
    exception: Optional[Exception] = None
5✔
503
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
504
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
505
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
506

507

508
class HardwarePluginToScan(NamedTuple):
5✔
509
    name: str
5✔
510
    description: str
5✔
511
    plugin: Optional['HW_PluginBase']
5✔
512
    exception: Optional[Exception]
5✔
513

514

515
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
516

517

518
# hidapi is not thread-safe
519
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
520
#     https://github.com/libusb/hidapi/issues/45
521
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
522
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
523
# It is not entirely clear to me, exactly what is safe and what isn't, when
524
# using multiple threads...
525
# Hence, we use a single thread for all device communications, including
526
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
527
# the following thread:
528
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
529
    max_workers=1,
530
    thread_name_prefix='hwd_comms_thread'
531
)
532

533
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
534
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
535
# To keep it simple, let's just import it now, as we are likely in the main thread here.
536
if threading.current_thread() is not threading.main_thread():
5✔
UNCOV
537
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
538
try:
5✔
539
    import hid
5✔
540
except ImportError:
5✔
541
    pass
5✔
542

543

544
T = TypeVar('T')
5✔
545

546

547
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
UNCOV
548
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
549
        return func()
×
550
    else:
UNCOV
551
        fut = _hwd_comms_executor.submit(func)
×
552
        return fut.result()
×
553
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
554

555

556
def runs_in_hwd_thread(func):
5✔
557
    @wraps(func)
5✔
558
    def wrapper(*args, **kwargs):
5✔
UNCOV
559
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
560
    return wrapper
5✔
561

562

563
def assert_runs_in_hwd_thread():
5✔
UNCOV
564
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
565
        raise Exception("must only be called from HWD communication thread")
×
566

567

568
class DeviceMgr(ThreadJob):
5✔
569
    """Manages hardware clients.  A client communicates over a hardware
570
    channel with the device.
571

572
    In addition to tracking device HID IDs, the device manager tracks
573
    hardware wallets and manages wallet pairing.  A HID ID may be
574
    paired with a wallet when it is confirmed that the hardware device
575
    matches the wallet, i.e. they have the same master public key.  A
576
    HID ID can be unpaired if e.g. it is wiped.
577

578
    Because of hotplugging, a wallet must request its client
579
    dynamically each time it is required, rather than caching it
580
    itself.
581

582
    The device manager is shared across plugins, so just one place
583
    does hardware scans when needed.  By tracking HID IDs, if a device
584
    is plugged into a different port the wallet is automatically
585
    re-paired.
586

587
    Wallets are informed on connect / disconnect events.  It must
588
    implement connected(), disconnected() callbacks.  Being connected
589
    implies a pairing.  Callbacks can happen in any thread context,
590
    and we do them without holding the lock.
591

592
    Confusingly, the HID ID (serial number) reported by the HID system
593
    doesn't match the device ID reported by the device itself.  We use
594
    the HID IDs.
595

596
    This plugin is thread-safe.  Currently only devices supported by
597
    hidapi are implemented."""
598

599
    def __init__(self, config: SimpleConfig):
5✔
600
        ThreadJob.__init__(self)
5✔
601
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
602
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
603
        # A client->id_ map. Needs self.lock.
604
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
605
        # What we recognise.  (vendor_id, product_id) -> Plugin
606
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
607
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
608
        # Custom enumerate functions for devices we don't know about.
609
        self._enumerate_func = set()  # Needs self.lock.
5✔
610

611
        self.lock = threading.RLock()
5✔
612

613
        self.config = config
5✔
614

615
    def thread_jobs(self):
5✔
616
        # Thread job to handle device timeouts
617
        return [self]
5✔
618

619
    def run(self):
5✔
620
        '''Handle device timeouts.  Runs in the context of the Plugins
621
        thread.'''
622
        with self.lock:
5✔
623
            clients = list(self.clients.keys())
5✔
624
        cutoff = time.time() - self.config.get_session_timeout()
5✔
625
        for client in clients:
5✔
UNCOV
626
            client.timeout(cutoff)
×
627

628
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
629
        for pair in device_pairs:
×
630
            self._recognised_hardware[pair] = plugin
×
631

632
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
633
        for vendor_id in vendor_ids:
×
634
            self._recognised_vendor[vendor_id] = plugin
×
635

636
    def register_enumerate_func(self, func):
5✔
637
        with self.lock:
×
638
            self._enumerate_func.add(func)
×
639

640
    @runs_in_hwd_thread
5✔
641
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
642
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
643
        # Get from cache first
UNCOV
644
        client = self._client_by_id(device.id_)
×
UNCOV
645
        if client:
×
UNCOV
646
            return client
×
UNCOV
647
        client = plugin.create_client(device, handler)
×
648
        if client:
×
649
            self.logger.info(f"Registering {client}")
×
650
            with self.lock:
×
651
                self.clients[client] = device.id_
×
652
        return client
×
653

654
    def id_by_pairing_code(self, pairing_code):
5✔
655
        with self.lock:
×
656
            return self.pairing_code_to_id.get(pairing_code)
×
657

658
    def pairing_code_by_id(self, id_):
5✔
659
        with self.lock:
×
660
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
661
                if id2 == id_:
×
UNCOV
662
                    return pairing_code
×
663
            return None
×
664

665
    def unpair_pairing_code(self, pairing_code):
5✔
666
        with self.lock:
×
667
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
668
                return
×
UNCOV
669
            _id = self.pairing_code_to_id.pop(pairing_code)
×
670
        self._close_client(_id)
×
671

672
    def unpair_id(self, id_):
5✔
673
        pairing_code = self.pairing_code_by_id(id_)
×
674
        if pairing_code:
×
UNCOV
675
            self.unpair_pairing_code(pairing_code)
×
676
        else:
677
            self._close_client(id_)
×
678

679
    def _close_client(self, id_):
5✔
UNCOV
680
        with self.lock:
×
681
            client = self._client_by_id(id_)
×
UNCOV
682
            self.clients.pop(client, None)
×
UNCOV
683
        if client:
×
684
            client.close()
×
685

686
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
687
        with self.lock:
×
688
            for client, client_id in self.clients.items():
×
UNCOV
689
                if client_id == id_:
×
UNCOV
690
                    return client
×
691
        return None
×
692

693
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
694
        '''Returns a client for the device ID if one is registered.  If
695
        a device is wiped or in bootloader mode pairing is impossible;
696
        in such cases we communicate by device ID and not wallet.'''
UNCOV
697
        if scan_now:
×
UNCOV
698
            self.scan_devices()
×
UNCOV
699
        return self._client_by_id(id_)
×
700

701
    @runs_in_hwd_thread
5✔
702
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
703
                            keystore: 'Hardware_KeyStore',
704
                            force_pair: bool, *,
705
                            devices: Sequence['Device'] = None,
706
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
707
        self.logger.info("getting client for keystore")
×
UNCOV
708
        if handler is None:
×
UNCOV
709
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
710
        handler.update_status(False)
×
711
        pcode = keystore.pairing_code()
×
712
        client = None
×
713
        # search existing clients first (fast-path)
714
        if not devices:
×
715
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
716
        # search clients again, now allowing a (slow) scan
UNCOV
717
        if client is None:
×
718
            if devices is None:
×
719
                devices = self.scan_devices()
×
UNCOV
720
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
721
        if client is None and force_pair:
×
722
            try:
×
723
                info = self.select_device(plugin, handler, keystore, devices,
×
724
                                          allow_user_interaction=allow_user_interaction)
725
            except CannotAutoSelectDevice:
×
726
                pass
×
727
            else:
UNCOV
728
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
729
        if client:
×
730
            handler.update_status(True)
×
731
            # note: if select_device was called, we might also update label etc here:
732
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
733
        self.logger.info("end client for keystore")
×
734
        return client
×
735

736
    def client_by_pairing_code(
5✔
737
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
738
        devices: Sequence['Device'],
739
    ) -> Optional['HardwareClientBase']:
UNCOV
740
        _id = self.id_by_pairing_code(pairing_code)
×
UNCOV
741
        client = self._client_by_id(_id)
×
UNCOV
742
        if client:
×
UNCOV
743
            if type(client.plugin) != type(plugin):
×
744
                return
×
745
            # An unpaired client might have another wallet's handler
746
            # from a prior scan.  Replace to fix dialog parenting.
747
            client.handler = handler
×
748
            return client
×
749

UNCOV
750
        for device in devices:
×
751
            if device.id_ == _id:
×
752
                return self.create_client(device, handler, plugin)
×
753

754
    def force_pair_keystore(
5✔
755
        self,
756
        *,
757
        plugin: 'HW_PluginBase',
758
        handler: 'HardwareHandlerBase',
759
        info: 'DeviceInfo',
760
        keystore: 'Hardware_KeyStore',
761
    ) -> 'HardwareClientBase':
UNCOV
762
        xpub = keystore.xpub
×
UNCOV
763
        derivation = keystore.get_derivation_prefix()
×
UNCOV
764
        assert derivation is not None
×
UNCOV
765
        xtype = bip32.xpub_type(xpub)
×
766
        client = self._client_by_id(info.device.id_)
×
767
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
768
            # See comment above for same code
769
            client.handler = handler
×
770
            # This will trigger a PIN/passphrase entry request
771
            try:
×
UNCOV
772
                client_xpub = client.get_xpub(derivation, xtype)
×
773
            except (UserCancelled, RuntimeError):
×
774
                # Bad / cancelled PIN / passphrase
775
                client_xpub = None
×
776
            if client_xpub == xpub:
×
777
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
778
                with self.lock:
×
779
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
780
                return client
×
781

782
        # The user input has wrong PIN or passphrase, or cancelled input,
783
        # or it is not pairable
784
        raise DeviceUnpairableError(
×
785
            _('Electrum cannot pair with your {}.\n\n'
786
              'Before you request bitcoins to be sent to addresses in this '
787
              'wallet, ensure you can pair with your device, or that you have '
788
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
789
              'receive will be unspendable.').format(plugin.device))
790

791
    def list_pairable_device_infos(
5✔
792
        self,
793
        *,
794
        handler: Optional['HardwareHandlerBase'],
795
        plugin: 'HW_PluginBase',
796
        devices: Sequence['Device'] = None,
797
        include_failing_clients: bool = False,
798
    ) -> List['DeviceInfo']:
799
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
800
        Already paired devices are also included, as it is okay to reuse them.
801
        """
UNCOV
802
        if not plugin.libraries_available:
×
UNCOV
803
            message = plugin.get_library_not_available_message()
×
UNCOV
804
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
805
        if devices is None:
×
806
            devices = self.scan_devices()
×
807
        infos = []
×
808
        for device in devices:
×
809
            if not plugin.can_recognize_device(device):
×
810
                continue
×
811
            try:
×
812
                client = self.create_client(device, handler, plugin)
×
813
                if not client:
×
814
                    continue
×
815
                label = client.label()
×
816
                is_initialized = client.is_initialized()
×
817
                soft_device_id = client.get_soft_device_id()
×
818
                model_name = client.device_model_name()
×
819
            except Exception as e:
×
820
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
821
                if include_failing_clients:
×
822
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
823
                continue
×
824
            infos.append(DeviceInfo(device=device,
×
825
                                    label=label,
826
                                    initialized=is_initialized,
827
                                    plugin_name=plugin.name,
828
                                    soft_device_id=soft_device_id,
829
                                    model_name=model_name))
830

UNCOV
831
        return infos
×
832

833
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
834
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
835
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
836
        """Select the device to use for keystore."""
837
        # ideally this should not be called from the GUI thread...
838
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
839
        while True:
×
UNCOV
840
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
UNCOV
841
            if infos:
×
UNCOV
842
                break
×
843
            if not allow_user_interaction:
×
844
                raise CannotAutoSelectDevice()
×
845
            msg = _('Please insert your {}').format(plugin.device)
×
846
            msg += " ("
×
847
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
848
                msg += f"label: {keystore.label}, "
×
849
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
850
            msg += ').\n\n{}\n\n{}'.format(
×
851
                _('Verify the cable is connected and that '
852
                  'no other application is using it.'),
853
                _('Try to connect again?')
854
            )
UNCOV
855
            if not handler.yes_no_question(msg):
×
UNCOV
856
                raise UserCancelled()
×
UNCOV
857
            devices = None
×
858

859
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
860
        # method 1: select device by id
861
        if keystore.soft_device_id:
×
UNCOV
862
            for info in infos:
×
UNCOV
863
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
864
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
865
                    return info
×
866
        # method 2: select device by label
867
        #           but only if not a placeholder label and only if there is no collision
868
        device_labels = [info.label for info in infos]
×
869
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
870
                and device_labels.count(keystore.label) == 1):
UNCOV
871
            for info in infos:
×
872
                if info.label == keystore.label:
×
873
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
874
                    return info
×
875
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
876
        #           saved for keystore anyway, select it
877
        if (len(infos) == 1
×
878
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
879
                and keystore.soft_device_id is None):
UNCOV
880
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
881
            return infos[0]
×
882

UNCOV
883
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
884
        if not allow_user_interaction:
×
885
            raise CannotAutoSelectDevice()
×
886
        # ask user to select device manually
887
        msg = (
×
888
                _("Could not automatically pair with device for given keystore.") + "\n"
889
                + f"(keystore label: {keystore.label!r}, "
890
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
891
        msg += _("Please select which {} device to use:").format(plugin.device)
×
UNCOV
892
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
893
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
894
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
895
                                init=(_("initialized") if info.initialized else _("wiped")),
896
                                transport=info.device.transport_ui_string,
897
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
898
                        for info in infos]
UNCOV
899
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
900
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
901
        c = handler.query_choice(msg, descriptions)
×
UNCOV
902
        if c is None:
×
903
            raise UserCancelled()
×
UNCOV
904
        info = infos[c]
×
905
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
906
        # note: updated label/soft_device_id will be saved after pairing succeeds
907
        return info
×
908

909
    @runs_in_hwd_thread
5✔
910
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
911
        try:
×
UNCOV
912
            import hid
×
UNCOV
913
        except ImportError:
×
UNCOV
914
            return []
×
915

916
        devices = []
×
917
        for d in hid.enumerate(0, 0):
×
918
            vendor_id = d['vendor_id']
×
UNCOV
919
            product_key = (vendor_id, d['product_id'])
×
920
            plugin = None
×
921
            if product_key in self._recognised_hardware:
×
922
                plugin = self._recognised_hardware[product_key]
×
923
            elif vendor_id in self._recognised_vendor:
×
924
                plugin = self._recognised_vendor[vendor_id]
×
925
            if plugin:
×
926
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
927
                if device:
×
928
                    devices.append(device)
×
929
        return devices
×
930

931
    @runs_in_hwd_thread
5✔
932
    @profiler
5✔
933
    def scan_devices(self) -> Sequence['Device']:
5✔
UNCOV
934
        self.logger.info("scanning devices...")
×
935

936
        # First see what's connected that we know about
UNCOV
937
        devices = self._scan_devices_with_hid()
×
938

939
        # Let plugin handlers enumerate devices we don't know about
UNCOV
940
        with self.lock:
×
941
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
942
        for f in enumerate_funcs:
×
UNCOV
943
            try:
×
944
                new_devices = f()
×
945
            except BaseException as e:
×
946
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
947
            else:
948
                devices.extend(new_devices)
×
949

950
        # find out what was disconnected
UNCOV
951
        client_ids = [dev.id_ for dev in devices]
×
952
        disconnected_clients = []
×
UNCOV
953
        with self.lock:
×
UNCOV
954
            connected = {}
×
955
            for client, id_ in self.clients.items():
×
956
                if id_ in client_ids and client.has_usable_connection_with_device():
×
957
                    connected[client] = id_
×
958
                else:
959
                    disconnected_clients.append((client, id_))
×
960
            self.clients = connected
×
961

962
        # Unpair disconnected devices
963
        for client, id_ in disconnected_clients:
×
964
            self.unpair_id(id_)
×
UNCOV
965
            if client.handler:
×
UNCOV
966
                client.handler.update_status(False)
×
967

968
        return devices
×
969

970
    @classmethod
5✔
971
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
972
        ret = {}
×
973
        # add libusb
UNCOV
974
        try:
×
UNCOV
975
            import usb1
×
976
        except Exception as e:
×
UNCOV
977
            ret["libusb.version"] = None
×
978
        else:
979
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
980
            try:
×
981
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
982
            except AttributeError:
×
983
                ret["libusb.path"] = None
×
984
        # add hidapi
985
        try:
×
986
            import hid
×
987
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
988
        except Exception as e:
×
989
            from importlib.metadata import version
×
990
            try:
×
991
                ret["hidapi.version"] = version("hidapi")
×
992
            except ImportError:
×
993
                ret["hidapi.version"] = None
×
994
        return ret
×
995

996
    def trigger_pairings(
5✔
997
            self,
998
            keystores: Sequence['KeyStore'],
999
            *,
1000
            allow_user_interaction: bool = True,
1001
            devices: Sequence['Device'] = None,
1002
    ) -> None:
1003
        """Given a list of keystores, try to pair each with a connected hardware device.
1004

1005
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1006
        try to pair each keystore individually. Consider the following scenario:
1007
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1008
        - assume none of the devices are paired yet
1009
        1. if we tried to individually pair keystores, we might try with ks1 first
1010
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1011
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1012
             which is confusing and error-prone. It's especially problematic if the hw device does
1013
             not support labels (such as Ledger), as then the user cannot easily distinguish
1014
             same-type devices. (see #4199)
1015
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1016
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1017
        """
UNCOV
1018
        from .keystore import Hardware_KeyStore
×
UNCOV
1019
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1020
        if not keystores:
×
UNCOV
1021
            return
×
1022
        if devices is None:
×
1023
            devices = self.scan_devices()
×
1024
        # first pair with all devices that can be auto-selected
1025
        for ks in keystores:
×
1026
            try:
×
1027
                ks.get_client(
×
1028
                    force_pair=True,
1029
                    allow_user_interaction=False,
1030
                    devices=devices,
1031
                )
UNCOV
1032
            except UserCancelled:
×
UNCOV
1033
                pass
×
UNCOV
1034
        if allow_user_interaction:
×
1035
            # now do manual selections
1036
            for ks in keystores:
×
1037
                try:
×
1038
                    ks.get_client(
×
1039
                        force_pair=True,
1040
                        allow_user_interaction=True,
1041
                        devices=devices,
1042
                    )
UNCOV
1043
                except UserCancelled:
×
UNCOV
1044
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc