• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5310017932361728

18 Mar 2025 10:14AM UTC coverage: 61.188% (+0.05%) from 61.139%
5310017932361728

push

CirrusCI

web-flow
Merge pull request #9649 from f321x/move_commands_to_init

Move plugin commands to init file of plugin

3 of 8 new or added lines in 2 files covered. (37.5%)

2 existing lines in 2 files now uncovered.

21371 of 34927 relevant lines covered (61.19%)

3.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.98
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41
from itertools import chain
5✔
42

43
from .i18n import _
5✔
44
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
45
from . import bip32
5✔
46
from . import plugins
5✔
47
from .simple_config import SimpleConfig
5✔
48
from .logging import get_logger, Logger
5✔
49
from .crypto import sha256
5✔
50

51
if TYPE_CHECKING:
5✔
52
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
53
    from .keystore import Hardware_KeyStore, KeyStore
×
54
    from .wallet import Abstract_Wallet
×
55

56

57
_logger = get_logger(__name__)
5✔
58
plugin_loaders = {}
5✔
59
hook_names = set()
5✔
60
hooks = {}
5✔
61

62

63
class Plugins(DaemonThread):
5✔
64

65
    LOGGING_SHORTCUT = 'p'
5✔
66
    pkgpath = os.path.dirname(plugins.__file__)
5✔
67

68
    @profiler
5✔
69
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
70
        self.config = config
5✔
71
        self.cmd_only = cmd_only  # type: bool
5✔
72
        self.internal_plugin_metadata = {}
5✔
73
        self.external_plugin_metadata = {}
5✔
74
        self.loaded_command_modules = set()  # type: set[str]
5✔
75
        if cmd_only:
5✔
76
            # only import the command modules of plugins
77
            Logger.__init__(self)
×
78
            self.find_plugins()
×
79
            return
×
80
        DaemonThread.__init__(self)
5✔
81
        self.device_manager = DeviceMgr(config)
5✔
82
        self.name = 'Plugins'  # set name of thread
5✔
83
        self.hw_wallets = {}
5✔
84
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
85
        self.gui_name = gui_name
5✔
86
        self.find_plugins()
5✔
87
        self.load_plugins()
5✔
88
        self.add_jobs(self.device_manager.thread_jobs())
5✔
89
        self.start()
5✔
90

91
    @property
5✔
92
    def descriptions(self):
5✔
93
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
94

95
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
96
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
97
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
98
        for loader, name, ispkg in iter_modules:
5✔
99
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
100
            #       once as data and once as code. To honor the "no duplicates" rule below,
101
            #       we exclude the ones packaged as *code*, here:
102
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
103
                continue
×
104
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
5✔
105
                continue
×
106
            base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
5✔
107
            full_name = f'{base_name}.{name}'
5✔
108
            if external:
5✔
109
                module_path = os.path.join(pkg_path, name)
×
110
                if not self._has_recursive_root_permissions(module_path):
×
111
                    self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
112
                    continue
×
NEW
113
                module_path = os.path.join(module_path, '__init__.py')
×
114
                if not os.path.exists(module_path):
×
115
                    continue
×
116
                spec = importlib.util.spec_from_file_location(full_name, module_path)
×
117
            else:
118
                spec = importlib.util.find_spec(full_name)
5✔
119
            if spec is None:
5✔
120
                if self.cmd_only:
×
121
                    continue # no commands module in this plugin
×
122
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
123
            module = self.exec_module_from_spec(spec, full_name)
5✔
124
            if self.cmd_only:
5✔
NEW
125
                assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
126
                self.loaded_command_modules.add(name)
×
127
                continue
×
128
            d = module.__dict__
5✔
129
            if 'fullname' not in d:
5✔
130
                continue
5✔
131
            d['display_name'] = d['fullname']
5✔
132
            gui_good = self.gui_name in d.get('available_for', [])
5✔
133
            if not gui_good:
5✔
134
                continue
5✔
135
            details = d.get('registers_wallet_type')
5✔
136
            if details:
5✔
137
                self.register_wallet_type(name, gui_good, details)
5✔
138
            details = d.get('registers_keystore')
5✔
139
            if details:
5✔
140
                self.register_keystore(name, gui_good, details)
5✔
141
            if d.get('requires_wallet_type'):
5✔
142
                # trustedcoin will not be added to list
143
                continue
5✔
144
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
145
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
146
                raise Exception(f"duplicate plugins? for {name=}")
×
147
            if not external:
5✔
148
                self.internal_plugin_metadata[name] = d
5✔
149
            else:
150
                self.external_plugin_metadata[name] = d
×
151

152
    @staticmethod
5✔
153
    def exec_module_from_spec(spec, path):
5✔
154
        try:
5✔
155
            module = importlib.util.module_from_spec(spec)
5✔
156
            # sys.modules needs to be modified for relative imports to work
157
            # see https://stackoverflow.com/a/50395128
158
            sys.modules[path] = module
5✔
159
            spec.loader.exec_module(module)
5✔
160
        except Exception as e:
×
161
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
162
        return module
5✔
163

164
    def find_plugins(self):
5✔
165
        internal_plugins_path = (self.pkgpath, False)
5✔
166
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
167
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
168
            # external plugins enforce root permissions on the directory
169
            if pkg_path and os.path.exists(pkg_path):
5✔
170
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
171
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
177
                    self.load_plugin_by_name(name)
×
178
                except BaseException as e:
×
179
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
180

181
    def _has_root_permissions(self, path):
5✔
182
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
183

184
    @profiler(min_threshold=0.5)
5✔
185
    def _has_recursive_root_permissions(self, path):
5✔
186
        """Check if a directory and all its subdirectories have root permissions"""
187
        for root, dirs, files in os.walk(path):
×
188
            if not self._has_root_permissions(root):
×
189
                return False
×
190
            for f in files:
×
191
                if not self._has_root_permissions(os.path.join(root, f)):
×
192
                    return False
×
193
        return True
×
194

195
    def get_external_plugin_dir(self):
5✔
196
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
197
            return
×
198
        pkg_path = '/opt/electrum_plugins'
5✔
199
        if not os.path.exists(pkg_path):
5✔
200
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
201
            return
5✔
202
        if not self._has_root_permissions(pkg_path):
×
203
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
204
            return
×
205
        return pkg_path
×
206

207
    def zip_plugin_path(self, name):
5✔
208
        filename = self.get_metadata(name)['filename']
×
209
        if name in self.internal_plugin_metadata:
×
210
            pkg_path = self.pkgpath
×
211
        else:
212
            pkg_path = self.get_external_plugin_dir()
×
213
        return os.path.join(pkg_path, filename)
×
214

215
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
216
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
217
        if pkg_path is None:
5✔
218
            return
×
219
        for filename in os.listdir(pkg_path):
5✔
220
            path = os.path.join(pkg_path, filename)
5✔
221
            if not filename.endswith('.zip'):
5✔
222
                continue
5✔
223
            if external and not self._has_root_permissions(path):
×
224
                self.logger.info(f'not loading {path}: file has user write permissions')
×
225
                continue
×
226
            try:
×
227
                zipfile = zipimport.zipimporter(path)
×
228
            except zipimport.ZipImportError:
×
229
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
230
                continue
×
231
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
232
                if b is False:
×
233
                    continue
×
234
                if name in self.internal_plugin_metadata:
×
235
                    raise Exception(f"duplicate plugins for name={name}")
×
236
                if name in self.external_plugin_metadata:
×
237
                    raise Exception(f"duplicate plugins for name={name}")
×
NEW
238
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
NEW
239
                    continue
×
240
                module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
×
241
                spec = zipfile.find_spec(name)
×
242
                module = self.exec_module_from_spec(spec, module_path)
×
243
                if self.cmd_only:
×
NEW
244
                    assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
245
                    self.loaded_command_modules.add(name)
×
246
                    continue
×
247
                d = module.__dict__
×
248
                gui_good = self.gui_name in d.get('available_for', [])
×
249
                if not gui_good:
×
250
                    continue
×
251
                d['filename'] = filename
×
252
                if 'fullname' not in d:
×
253
                    continue
×
254
                d['display_name'] = d['fullname']
×
255
                d['zip_hash_sha256'] = get_file_hash256(path)
×
256
                d['is_zip'] = True
×
257
                if external:
×
258
                    self.external_plugin_metadata[name] = d
×
259
                else:
260
                    self.internal_plugin_metadata[name] = d
×
261

262
    def get(self, name):
5✔
263
        return self.plugins.get(name)
×
264

265
    def count(self):
5✔
266
        return len(self.plugins)
×
267

268
    def load_plugin(self, name) -> 'BasePlugin':
5✔
269
        """Imports the code of the given plugin.
270
        note: can be called from any thread.
271
        """
272
        if self.get_metadata(name):
5✔
273
            return self.load_plugin_by_name(name)
5✔
274
        else:
275
            raise Exception(f"could not find plugin {name!r}")
×
276

277
    def load_plugin_by_name(self, name) -> 'BasePlugin':
5✔
278
        if name in self.plugins:
5✔
279
            return self.plugins[name]
×
280

281
        is_zip = self.is_plugin_zip(name)
5✔
282
        is_external = name in self.external_plugin_metadata
5✔
283
        if not is_external:
5✔
284
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
285
        else:
286
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
287

288
        spec = importlib.util.find_spec(full_name)
5✔
289
        if spec is None:
5✔
290
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
291
        try:
5✔
292
            if is_zip:
5✔
293
                module = self.exec_module_from_spec(spec, full_name)
×
294
            else:
295
                module = importlib.util.module_from_spec(spec)
5✔
296
                spec.loader.exec_module(module)
5✔
297
            plugin = module.Plugin(self, self.config, name)
5✔
298
        except Exception as e:
×
299
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
300
        self.add_jobs(plugin.thread_jobs())
5✔
301
        self.plugins[name] = plugin
5✔
302
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
303
        return plugin
5✔
304

305
    def close_plugin(self, plugin):
5✔
306
        self.remove_jobs(plugin.thread_jobs())
×
307

308
    def enable(self, name: str) -> 'BasePlugin':
5✔
309
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
310
        p = self.get(name)
×
311
        if p:
×
312
            return p
×
313
        return self.load_plugin(name)
×
314

315
    def disable(self, name: str) -> None:
5✔
316
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
317
        p = self.get(name)
×
318
        if not p:
×
319
            return
×
320
        self.plugins.pop(name)
×
321
        p.close()
×
322
        self.logger.info(f"closed {name}")
×
323

324
    @classmethod
5✔
325
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
326
        return key.startswith('enable_plugin_')
×
327

328
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
329
        p = self.get(name)
×
330
        return self.disable(name) if p else self.enable(name)
×
331

332
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
333
        d = self.descriptions.get(name)
×
334
        if not d:
×
335
            return False
×
336
        deps = d.get('requires', [])
×
337
        for dep, s in deps:
×
338
            try:
×
339
                __import__(dep)
×
340
            except ImportError as e:
×
341
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
342
                return False
×
343
        requires = d.get('requires_wallet_type', [])
×
344
        return not requires or wallet.wallet_type in requires
×
345

346
    def get_hardware_support(self):
5✔
347
        out = []
×
348
        for name, (gui_good, details) in self.hw_wallets.items():
×
349
            if gui_good:
×
350
                try:
×
351
                    p = self.get_plugin(name)
×
352
                    if p.is_available():
×
353
                        out.append(HardwarePluginToScan(name=name,
×
354
                                                        description=details[2],
355
                                                        plugin=p,
356
                                                        exception=None))
357
                except Exception as e:
×
358
                    self.logger.exception(f"cannot load plugin for: {name}")
×
359
                    out.append(HardwarePluginToScan(name=name,
×
360
                                                    description=details[2],
361
                                                    plugin=None,
362
                                                    exception=e))
363
        return out
×
364

365
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
366
        from .wallet import register_wallet_type, register_constructor
5✔
367
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
368

369
        def loader():
5✔
370
            plugin = self.get_plugin(name)
5✔
371
            register_constructor(wallet_type, plugin.wallet_class)
5✔
372
        register_wallet_type(wallet_type)
5✔
373
        plugin_loaders[wallet_type] = loader
5✔
374

375
    def register_keystore(self, name, gui_good, details):
5✔
376
        from .keystore import register_keystore
5✔
377

378
        def dynamic_constructor(d):
5✔
379
            return self.get_plugin(name).keystore_class(d)
5✔
380
        if details[0] == 'hardware':
5✔
381
            self.hw_wallets[name] = (gui_good, details)
5✔
382
            self.logger.info(f"registering hardware {name}: {details}")
5✔
383
            register_keystore(details[1], dynamic_constructor)
5✔
384

385
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
386
        if name not in self.plugins:
5✔
387
            self.load_plugin(name)
5✔
388
        return self.plugins[name]
5✔
389

390
    def is_plugin_zip(self, name: str) -> bool:
5✔
391
        """Returns True if the plugin is a zip file"""
392
        if (metadata := self.get_metadata(name)) is None:
5✔
393
            return False
5✔
394
        return metadata.get('is_zip', False)
5✔
395

396
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
397
        """Returns the metadata of the plugin"""
398
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
399
        if not metadata:
5✔
400
            return None
5✔
401
        return metadata
5✔
402

403
    def run(self):
5✔
404
        while self.is_running():
5✔
405
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
406
            self.run_jobs()
5✔
407
        self.on_stop()
5✔
408

409

410
def get_file_hash256(path: str) -> str:
5✔
411
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
412
    with open(path, 'rb') as f:
×
413
        return sha256(f.read()).hex()
×
414

415
def hook(func):
5✔
416
    hook_names.add(func.__name__)
5✔
417
    return func
5✔
418

419

420
def run_hook(name, *args):
5✔
421
    results = []
5✔
422
    f_list = hooks.get(name, [])
5✔
423
    for p, f in f_list:
5✔
424
        if p.is_enabled():
5✔
425
            try:
5✔
426
                r = f(*args)
5✔
427
            except Exception:
×
428
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
429
                r = False
×
430
            if r:
5✔
431
                results.append(r)
×
432

433
    if results:
5✔
434
        assert len(results) == 1, results
×
435
        return results[0]
×
436

437

438
class BasePlugin(Logger):
5✔
439

440
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
441
        self.parent = parent  # type: Plugins  # The plugins object
5✔
442
        self.name = name
5✔
443
        self.config = config
5✔
444
        self.wallet = None  # fixme: this field should not exist
5✔
445
        Logger.__init__(self)
5✔
446
        # add self to hooks
447
        for k in dir(self):
5✔
448
            if k in hook_names:
5✔
449
                l = hooks.get(k, [])
5✔
450
                l.append((self, getattr(self, k)))
5✔
451
                hooks[k] = l
5✔
452

453
    def __str__(self):
5✔
454
        return self.name
×
455

456
    def close(self):
5✔
457
        # remove self from hooks
458
        for attr_name in dir(self):
×
459
            if attr_name in hook_names:
×
460
                # found attribute in self that is also the name of a hook
461
                l = hooks.get(attr_name, [])
×
462
                try:
×
463
                    l.remove((self, getattr(self, attr_name)))
×
464
                except ValueError:
×
465
                    # maybe attr name just collided with hook name and was not hook
466
                    continue
×
467
                hooks[attr_name] = l
×
468
        self.parent.close_plugin(self)
×
469
        self.on_close()
×
470

471
    def on_close(self):
5✔
472
        pass
×
473

474
    def requires_settings(self) -> bool:
5✔
475
        return False
×
476

477
    def thread_jobs(self):
5✔
478
        return []
5✔
479

480
    def is_enabled(self):
5✔
481
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
482

483
    def is_available(self):
5✔
484
        return True
×
485

486
    def can_user_disable(self):
5✔
487
        return True
×
488

489
    def settings_widget(self, window):
5✔
490
        raise NotImplementedError()
×
491

492
    def settings_dialog(self, window):
5✔
493
        raise NotImplementedError()
×
494

495
    def read_file(self, filename: str) -> bytes:
5✔
496
        import zipfile
×
497
        if self.parent.is_plugin_zip(self.name):
×
498
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
499
            with zipfile.ZipFile(plugin_filename) as myzip:
×
500
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
501
                    return myfile.read()
×
502
        else:
503
            if self.name in self.parent.internal_plugin_metadata:
×
504
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
505
            else:
506
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
507
            with open(path, 'rb') as myfile:
×
508
                return myfile.read()
×
509

510

511
class DeviceUnpairableError(UserFacingException): pass
5✔
512
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
513
class CannotAutoSelectDevice(Exception): pass
5✔
514

515

516
class Device(NamedTuple):
5✔
517
    path: Union[str, bytes]
5✔
518
    interface_number: int
5✔
519
    id_: str
5✔
520
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
521
    usage_page: int
5✔
522
    transport_ui_string: str
5✔
523

524

525
class DeviceInfo(NamedTuple):
5✔
526
    device: Device
5✔
527
    label: Optional[str] = None
5✔
528
    initialized: Optional[bool] = None
5✔
529
    exception: Optional[Exception] = None
5✔
530
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
531
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
532
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
533

534

535
class HardwarePluginToScan(NamedTuple):
5✔
536
    name: str
5✔
537
    description: str
5✔
538
    plugin: Optional['HW_PluginBase']
5✔
539
    exception: Optional[Exception]
5✔
540

541

542
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
543

544

545
# hidapi is not thread-safe
546
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
547
#     https://github.com/libusb/hidapi/issues/45
548
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
549
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
550
# It is not entirely clear to me, exactly what is safe and what isn't, when
551
# using multiple threads...
552
# Hence, we use a single thread for all device communications, including
553
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
554
# the following thread:
555
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
556
    max_workers=1,
557
    thread_name_prefix='hwd_comms_thread'
558
)
559

560
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
561
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
562
# To keep it simple, let's just import it now, as we are likely in the main thread here.
563
if threading.current_thread() is not threading.main_thread():
5✔
564
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
565
try:
5✔
566
    import hid
5✔
567
except ImportError:
5✔
568
    pass
5✔
569

570

571
T = TypeVar('T')
5✔
572

573

574
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
575
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
576
        return func()
×
577
    else:
578
        fut = _hwd_comms_executor.submit(func)
×
579
        return fut.result()
×
580
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
581

582

583
def runs_in_hwd_thread(func):
5✔
584
    @wraps(func)
5✔
585
    def wrapper(*args, **kwargs):
5✔
586
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
587
    return wrapper
5✔
588

589

590
def assert_runs_in_hwd_thread():
5✔
591
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
592
        raise Exception("must only be called from HWD communication thread")
×
593

594

595
class DeviceMgr(ThreadJob):
5✔
596
    """Manages hardware clients.  A client communicates over a hardware
597
    channel with the device.
598

599
    In addition to tracking device HID IDs, the device manager tracks
600
    hardware wallets and manages wallet pairing.  A HID ID may be
601
    paired with a wallet when it is confirmed that the hardware device
602
    matches the wallet, i.e. they have the same master public key.  A
603
    HID ID can be unpaired if e.g. it is wiped.
604

605
    Because of hotplugging, a wallet must request its client
606
    dynamically each time it is required, rather than caching it
607
    itself.
608

609
    The device manager is shared across plugins, so just one place
610
    does hardware scans when needed.  By tracking HID IDs, if a device
611
    is plugged into a different port the wallet is automatically
612
    re-paired.
613

614
    Wallets are informed on connect / disconnect events.  It must
615
    implement connected(), disconnected() callbacks.  Being connected
616
    implies a pairing.  Callbacks can happen in any thread context,
617
    and we do them without holding the lock.
618

619
    Confusingly, the HID ID (serial number) reported by the HID system
620
    doesn't match the device ID reported by the device itself.  We use
621
    the HID IDs.
622

623
    This plugin is thread-safe.  Currently only devices supported by
624
    hidapi are implemented."""
625

626
    def __init__(self, config: SimpleConfig):
5✔
627
        ThreadJob.__init__(self)
5✔
628
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
629
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
630
        # A client->id_ map. Needs self.lock.
631
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
632
        # What we recognise.  (vendor_id, product_id) -> Plugin
633
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
634
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
635
        # Custom enumerate functions for devices we don't know about.
636
        self._enumerate_func = set()  # Needs self.lock.
5✔
637

638
        self.lock = threading.RLock()
5✔
639

640
        self.config = config
5✔
641

642
    def thread_jobs(self):
5✔
643
        # Thread job to handle device timeouts
644
        return [self]
5✔
645

646
    def run(self):
5✔
647
        '''Handle device timeouts.  Runs in the context of the Plugins
648
        thread.'''
649
        with self.lock:
5✔
650
            clients = list(self.clients.keys())
5✔
651
        cutoff = time.time() - self.config.get_session_timeout()
5✔
652
        for client in clients:
5✔
653
            client.timeout(cutoff)
×
654

655
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
656
        for pair in device_pairs:
×
657
            self._recognised_hardware[pair] = plugin
×
658

659
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
660
        for vendor_id in vendor_ids:
×
661
            self._recognised_vendor[vendor_id] = plugin
×
662

663
    def register_enumerate_func(self, func):
5✔
664
        with self.lock:
×
665
            self._enumerate_func.add(func)
×
666

667
    @runs_in_hwd_thread
5✔
668
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
669
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
670
        # Get from cache first
671
        client = self._client_by_id(device.id_)
×
672
        if client:
×
673
            return client
×
674
        client = plugin.create_client(device, handler)
×
675
        if client:
×
676
            self.logger.info(f"Registering {client}")
×
677
            with self.lock:
×
678
                self.clients[client] = device.id_
×
679
        return client
×
680

681
    def id_by_pairing_code(self, pairing_code):
5✔
682
        with self.lock:
×
683
            return self.pairing_code_to_id.get(pairing_code)
×
684

685
    def pairing_code_by_id(self, id_):
5✔
686
        with self.lock:
×
687
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
688
                if id2 == id_:
×
689
                    return pairing_code
×
690
            return None
×
691

692
    def unpair_pairing_code(self, pairing_code):
5✔
693
        with self.lock:
×
694
            if pairing_code not in self.pairing_code_to_id:
×
695
                return
×
696
            _id = self.pairing_code_to_id.pop(pairing_code)
×
697
        self._close_client(_id)
×
698

699
    def unpair_id(self, id_):
5✔
700
        pairing_code = self.pairing_code_by_id(id_)
×
701
        if pairing_code:
×
702
            self.unpair_pairing_code(pairing_code)
×
703
        else:
704
            self._close_client(id_)
×
705

706
    def _close_client(self, id_):
5✔
707
        with self.lock:
×
708
            client = self._client_by_id(id_)
×
709
            self.clients.pop(client, None)
×
710
        if client:
×
711
            client.close()
×
712

713
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
714
        with self.lock:
×
715
            for client, client_id in self.clients.items():
×
716
                if client_id == id_:
×
717
                    return client
×
718
        return None
×
719

720
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
721
        '''Returns a client for the device ID if one is registered.  If
722
        a device is wiped or in bootloader mode pairing is impossible;
723
        in such cases we communicate by device ID and not wallet.'''
724
        if scan_now:
×
725
            self.scan_devices()
×
726
        return self._client_by_id(id_)
×
727

728
    @runs_in_hwd_thread
5✔
729
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
730
                            keystore: 'Hardware_KeyStore',
731
                            force_pair: bool, *,
732
                            devices: Sequence['Device'] = None,
733
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
734
        self.logger.info("getting client for keystore")
×
735
        if handler is None:
×
736
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
737
        handler.update_status(False)
×
738
        pcode = keystore.pairing_code()
×
739
        client = None
×
740
        # search existing clients first (fast-path)
741
        if not devices:
×
742
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
743
        # search clients again, now allowing a (slow) scan
744
        if client is None:
×
745
            if devices is None:
×
746
                devices = self.scan_devices()
×
747
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
748
        if client is None and force_pair:
×
749
            try:
×
750
                info = self.select_device(plugin, handler, keystore, devices,
×
751
                                          allow_user_interaction=allow_user_interaction)
752
            except CannotAutoSelectDevice:
×
753
                pass
×
754
            else:
755
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
756
        if client:
×
757
            handler.update_status(True)
×
758
            # note: if select_device was called, we might also update label etc here:
759
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
760
        self.logger.info("end client for keystore")
×
761
        return client
×
762

763
    def client_by_pairing_code(
5✔
764
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
765
        devices: Sequence['Device'],
766
    ) -> Optional['HardwareClientBase']:
767
        _id = self.id_by_pairing_code(pairing_code)
×
768
        client = self._client_by_id(_id)
×
769
        if client:
×
770
            if type(client.plugin) != type(plugin):
×
771
                return
×
772
            # An unpaired client might have another wallet's handler
773
            # from a prior scan.  Replace to fix dialog parenting.
774
            client.handler = handler
×
775
            return client
×
776

777
        for device in devices:
×
778
            if device.id_ == _id:
×
779
                return self.create_client(device, handler, plugin)
×
780

781
    def force_pair_keystore(
5✔
782
        self,
783
        *,
784
        plugin: 'HW_PluginBase',
785
        handler: 'HardwareHandlerBase',
786
        info: 'DeviceInfo',
787
        keystore: 'Hardware_KeyStore',
788
    ) -> 'HardwareClientBase':
789
        xpub = keystore.xpub
×
790
        derivation = keystore.get_derivation_prefix()
×
791
        assert derivation is not None
×
792
        xtype = bip32.xpub_type(xpub)
×
793
        client = self._client_by_id(info.device.id_)
×
794
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
795
            # See comment above for same code
796
            client.handler = handler
×
797
            # This will trigger a PIN/passphrase entry request
798
            try:
×
799
                client_xpub = client.get_xpub(derivation, xtype)
×
800
            except (UserCancelled, RuntimeError):
×
801
                # Bad / cancelled PIN / passphrase
802
                client_xpub = None
×
803
            if client_xpub == xpub:
×
804
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
805
                with self.lock:
×
806
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
807
                return client
×
808

809
        # The user input has wrong PIN or passphrase, or cancelled input,
810
        # or it is not pairable
811
        raise DeviceUnpairableError(
×
812
            _('Electrum cannot pair with your {}.\n\n'
813
              'Before you request bitcoins to be sent to addresses in this '
814
              'wallet, ensure you can pair with your device, or that you have '
815
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
816
              'receive will be unspendable.').format(plugin.device))
817

818
    def list_pairable_device_infos(
5✔
819
        self,
820
        *,
821
        handler: Optional['HardwareHandlerBase'],
822
        plugin: 'HW_PluginBase',
823
        devices: Sequence['Device'] = None,
824
        include_failing_clients: bool = False,
825
    ) -> List['DeviceInfo']:
826
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
827
        Already paired devices are also included, as it is okay to reuse them.
828
        """
829
        if not plugin.libraries_available:
×
830
            message = plugin.get_library_not_available_message()
×
831
            raise HardwarePluginLibraryUnavailable(message)
×
832
        if devices is None:
×
833
            devices = self.scan_devices()
×
834
        infos = []
×
835
        for device in devices:
×
836
            if not plugin.can_recognize_device(device):
×
837
                continue
×
838
            try:
×
839
                client = self.create_client(device, handler, plugin)
×
840
                if not client:
×
841
                    continue
×
842
                label = client.label()
×
843
                is_initialized = client.is_initialized()
×
844
                soft_device_id = client.get_soft_device_id()
×
845
                model_name = client.device_model_name()
×
846
            except Exception as e:
×
847
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
848
                if include_failing_clients:
×
849
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
850
                continue
×
851
            infos.append(DeviceInfo(device=device,
×
852
                                    label=label,
853
                                    initialized=is_initialized,
854
                                    plugin_name=plugin.name,
855
                                    soft_device_id=soft_device_id,
856
                                    model_name=model_name))
857

858
        return infos
×
859

860
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
861
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
862
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
863
        """Select the device to use for keystore."""
864
        # ideally this should not be called from the GUI thread...
865
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
866
        while True:
×
867
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
868
            if infos:
×
869
                break
×
870
            if not allow_user_interaction:
×
871
                raise CannotAutoSelectDevice()
×
872
            msg = _('Please insert your {}').format(plugin.device)
×
873
            msg += " ("
×
874
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
875
                msg += f"label: {keystore.label}, "
×
876
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
877
            msg += ').\n\n{}\n\n{}'.format(
×
878
                _('Verify the cable is connected and that '
879
                  'no other application is using it.'),
880
                _('Try to connect again?')
881
            )
882
            if not handler.yes_no_question(msg):
×
883
                raise UserCancelled()
×
884
            devices = None
×
885

886
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
887
        # method 1: select device by id
888
        if keystore.soft_device_id:
×
889
            for info in infos:
×
890
                if info.soft_device_id == keystore.soft_device_id:
×
891
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
892
                    return info
×
893
        # method 2: select device by label
894
        #           but only if not a placeholder label and only if there is no collision
895
        device_labels = [info.label for info in infos]
×
896
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
897
                and device_labels.count(keystore.label) == 1):
898
            for info in infos:
×
899
                if info.label == keystore.label:
×
900
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
901
                    return info
×
902
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
903
        #           saved for keystore anyway, select it
904
        if (len(infos) == 1
×
905
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
906
                and keystore.soft_device_id is None):
907
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
908
            return infos[0]
×
909

910
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
911
        if not allow_user_interaction:
×
912
            raise CannotAutoSelectDevice()
×
913
        # ask user to select device manually
914
        msg = (
×
915
                _("Could not automatically pair with device for given keystore.") + "\n"
916
                + f"(keystore label: {keystore.label!r}, "
917
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
918
        msg += _("Please select which {} device to use:").format(plugin.device)
×
919
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
920
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
921
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
922
                                init=(_("initialized") if info.initialized else _("wiped")),
923
                                transport=info.device.transport_ui_string,
924
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
925
                        for info in infos]
926
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
927
                          f"num options: {len(infos)}. options: {infos}")
928
        c = handler.query_choice(msg, descriptions)
×
929
        if c is None:
×
930
            raise UserCancelled()
×
931
        info = infos[c]
×
932
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
933
        # note: updated label/soft_device_id will be saved after pairing succeeds
934
        return info
×
935

936
    @runs_in_hwd_thread
5✔
937
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
938
        try:
×
939
            import hid
×
940
        except ImportError:
×
941
            return []
×
942

943
        devices = []
×
944
        for d in hid.enumerate(0, 0):
×
945
            vendor_id = d['vendor_id']
×
946
            product_key = (vendor_id, d['product_id'])
×
947
            plugin = None
×
948
            if product_key in self._recognised_hardware:
×
949
                plugin = self._recognised_hardware[product_key]
×
950
            elif vendor_id in self._recognised_vendor:
×
951
                plugin = self._recognised_vendor[vendor_id]
×
952
            if plugin:
×
953
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
954
                if device:
×
955
                    devices.append(device)
×
956
        return devices
×
957

958
    @runs_in_hwd_thread
5✔
959
    @profiler
5✔
960
    def scan_devices(self) -> Sequence['Device']:
5✔
961
        self.logger.info("scanning devices...")
×
962

963
        # First see what's connected that we know about
964
        devices = self._scan_devices_with_hid()
×
965

966
        # Let plugin handlers enumerate devices we don't know about
967
        with self.lock:
×
968
            enumerate_funcs = list(self._enumerate_func)
×
969
        for f in enumerate_funcs:
×
970
            try:
×
971
                new_devices = f()
×
972
            except BaseException as e:
×
973
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
974
            else:
975
                devices.extend(new_devices)
×
976

977
        # find out what was disconnected
978
        client_ids = [dev.id_ for dev in devices]
×
979
        disconnected_clients = []
×
980
        with self.lock:
×
981
            connected = {}
×
982
            for client, id_ in self.clients.items():
×
983
                if id_ in client_ids and client.has_usable_connection_with_device():
×
984
                    connected[client] = id_
×
985
                else:
986
                    disconnected_clients.append((client, id_))
×
987
            self.clients = connected
×
988

989
        # Unpair disconnected devices
990
        for client, id_ in disconnected_clients:
×
991
            self.unpair_id(id_)
×
992
            if client.handler:
×
993
                client.handler.update_status(False)
×
994

995
        return devices
×
996

997
    @classmethod
5✔
998
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
999
        ret = {}
×
1000
        # add libusb
1001
        try:
×
1002
            import usb1
×
1003
        except Exception as e:
×
1004
            ret["libusb.version"] = None
×
1005
        else:
1006
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1007
            try:
×
1008
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1009
            except AttributeError:
×
1010
                ret["libusb.path"] = None
×
1011
        # add hidapi
1012
        try:
×
1013
            import hid
×
1014
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1015
        except Exception as e:
×
1016
            from importlib.metadata import version
×
1017
            try:
×
1018
                ret["hidapi.version"] = version("hidapi")
×
1019
            except ImportError:
×
1020
                ret["hidapi.version"] = None
×
1021
        return ret
×
1022

1023
    def trigger_pairings(
5✔
1024
            self,
1025
            keystores: Sequence['KeyStore'],
1026
            *,
1027
            allow_user_interaction: bool = True,
1028
            devices: Sequence['Device'] = None,
1029
    ) -> None:
1030
        """Given a list of keystores, try to pair each with a connected hardware device.
1031

1032
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1033
        try to pair each keystore individually. Consider the following scenario:
1034
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1035
        - assume none of the devices are paired yet
1036
        1. if we tried to individually pair keystores, we might try with ks1 first
1037
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1038
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1039
             which is confusing and error-prone. It's especially problematic if the hw device does
1040
             not support labels (such as Ledger), as then the user cannot easily distinguish
1041
             same-type devices. (see #4199)
1042
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1043
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1044
        """
1045
        from .keystore import Hardware_KeyStore
×
1046
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1047
        if not keystores:
×
1048
            return
×
1049
        if devices is None:
×
1050
            devices = self.scan_devices()
×
1051
        # first pair with all devices that can be auto-selected
1052
        for ks in keystores:
×
1053
            try:
×
1054
                ks.get_client(
×
1055
                    force_pair=True,
1056
                    allow_user_interaction=False,
1057
                    devices=devices,
1058
                )
1059
            except UserCancelled:
×
1060
                pass
×
1061
        if allow_user_interaction:
×
1062
            # now do manual selections
1063
            for ks in keystores:
×
1064
                try:
×
1065
                    ks.get_client(
×
1066
                        force_pair=True,
1067
                        allow_user_interaction=True,
1068
                        devices=devices,
1069
                    )
1070
                except UserCancelled:
×
1071
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc