• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4981978665058304

17 Mar 2025 03:41PM UTC coverage: 61.18% (+0.02%) from 61.158%
4981978665058304

push

CirrusCI

web-flow
Merge pull request #9646 from f321x/plugins_import_dir_and_zip

Allow all plugins to be either zip or directory based

42 of 77 new or added lines in 1 file covered. (54.55%)

14 existing lines in 6 files now uncovered.

21370 of 34930 relevant lines covered (61.18%)

3.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.75
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41
from itertools import chain
5✔
42

43
from .i18n import _
5✔
44
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
45
from . import bip32
5✔
46
from . import plugins
5✔
47
from .simple_config import SimpleConfig
5✔
48
from .logging import get_logger, Logger
5✔
49
from .crypto import sha256
5✔
50

51
if TYPE_CHECKING:
5✔
52
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
53
    from .keystore import Hardware_KeyStore, KeyStore
×
54
    from .wallet import Abstract_Wallet
×
55

56

57
_logger = get_logger(__name__)
5✔
58
plugin_loaders = {}
5✔
59
hook_names = set()
5✔
60
hooks = {}
5✔
61

62

63
class Plugins(DaemonThread):
5✔
64

65
    LOGGING_SHORTCUT = 'p'
5✔
66
    pkgpath = os.path.dirname(plugins.__file__)
5✔
67

68
    @profiler
5✔
69
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
70
        self.config = config
5✔
71
        self.cmd_only = cmd_only  # type: bool
5✔
72
        self.internal_plugin_metadata = {}
5✔
73
        self.external_plugin_metadata = {}
5✔
74
        self.loaded_command_modules = set()  # type: set[str]
5✔
75
        if cmd_only:
5✔
76
            # only import the command modules of plugins
77
            Logger.__init__(self)
×
78
            self.find_plugins()
×
79
            return
×
80
        DaemonThread.__init__(self)
5✔
81
        self.device_manager = DeviceMgr(config)
5✔
82
        self.name = 'Plugins'  # set name of thread
5✔
83
        self.hw_wallets = {}
5✔
84
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
85
        self.gui_name = gui_name
5✔
86
        self.find_plugins()
5✔
87
        self.load_plugins()
5✔
88
        self.add_jobs(self.device_manager.thread_jobs())
5✔
89
        self.start()
5✔
90

91
    @property
5✔
92
    def descriptions(self):
5✔
93
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
94

95
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
96
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
97
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
98
        for loader, name, ispkg in iter_modules:
5✔
99
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
100
            #       once as data and once as code. To honor the "no duplicates" rule below,
101
            #       we exclude the ones packaged as *code*, here:
102
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
103
                continue
×
104
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
5✔
105
                continue
×
106
            base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
5✔
107
            full_name = f'{base_name}.{name}' + ('.commands' if self.cmd_only else '')
5✔
108
            if external:
5✔
NEW
109
                module_path = os.path.join(pkg_path, name)
×
NEW
110
                if not self._has_recursive_root_permissions(module_path):
×
NEW
111
                    self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
NEW
112
                    continue
×
NEW
113
                module_path = os.path.join(module_path, 'commands.py' if self.cmd_only else '__init__.py')
×
NEW
114
                if not os.path.exists(module_path):
×
NEW
115
                    continue
×
NEW
116
                spec = importlib.util.spec_from_file_location(full_name, module_path)
×
117
            else:
118
                spec = importlib.util.find_spec(full_name)
5✔
119
            if spec is None:
5✔
120
                if self.cmd_only:
×
121
                    continue # no commands module in this plugin
×
122
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
123
            module = self.exec_module_from_spec(spec, full_name)
5✔
124
            if self.cmd_only:
5✔
125
                assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
126
                self.loaded_command_modules.add(name)
×
127
                continue
×
128
            d = module.__dict__
5✔
129
            if 'fullname' not in d:
5✔
130
                continue
5✔
131
            d['display_name'] = d['fullname']
5✔
132
            gui_good = self.gui_name in d.get('available_for', [])
5✔
133
            if not gui_good:
5✔
134
                continue
5✔
135
            details = d.get('registers_wallet_type')
5✔
136
            if details:
5✔
137
                self.register_wallet_type(name, gui_good, details)
5✔
138
            details = d.get('registers_keystore')
5✔
139
            if details:
5✔
140
                self.register_keystore(name, gui_good, details)
5✔
141
            if d.get('requires_wallet_type'):
5✔
142
                # trustedcoin will not be added to list
143
                continue
5✔
144
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
145
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
146
                raise Exception(f"duplicate plugins? for {name=}")
×
147
            if not external:
5✔
148
                self.internal_plugin_metadata[name] = d
5✔
149
            else:
NEW
150
                self.external_plugin_metadata[name] = d
×
151

152
    @staticmethod
5✔
153
    def exec_module_from_spec(spec, path):
5✔
154
        try:
5✔
155
            module = importlib.util.module_from_spec(spec)
5✔
156
            # sys.modules needs to be modified for relative imports to work
157
            # see https://stackoverflow.com/a/50395128
158
            sys.modules[path] = module
5✔
159
            spec.loader.exec_module(module)
5✔
160
        except Exception as e:
×
161
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
162
        return module
5✔
163

164
    def find_plugins(self):
5✔
165
        internal_plugins_path = (self.pkgpath, False)
5✔
166
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
167
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
168
            # external plugins enforce root permissions on the directory
169
            if pkg_path and os.path.exists(pkg_path):
5✔
170
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
171
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
NEW
177
                    self.load_plugin_by_name(name)
×
178
                except BaseException as e:
×
179
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
180

181
    def _has_root_permissions(self, path):
5✔
182
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
183

184
    @profiler(min_threshold=0.5)
5✔
185
    def _has_recursive_root_permissions(self, path):
5✔
186
        """Check if a directory and all its subdirectories have root permissions"""
NEW
187
        for root, dirs, files in os.walk(path):
×
NEW
188
            if not self._has_root_permissions(root):
×
NEW
189
                return False
×
NEW
190
            for f in files:
×
NEW
191
                if not self._has_root_permissions(os.path.join(root, f)):
×
NEW
192
                    return False
×
NEW
193
        return True
×
194

195
    def get_external_plugin_dir(self):
5✔
196
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
197
            return
×
198
        pkg_path = '/opt/electrum_plugins'
5✔
199
        if not os.path.exists(pkg_path):
5✔
200
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
201
            return
5✔
202
        if not self._has_root_permissions(pkg_path):
×
203
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
204
            return
×
205
        return pkg_path
×
206

207
    def zip_plugin_path(self, name):
5✔
NEW
208
        filename = self.get_metadata(name)['filename']
×
NEW
209
        if name in self.internal_plugin_metadata:
×
NEW
210
            pkg_path = self.pkgpath
×
211
        else:
NEW
212
            pkg_path = self.get_external_plugin_dir()
×
NEW
213
        return os.path.join(pkg_path, filename)
×
214

215
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
216
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
217
        if pkg_path is None:
5✔
UNCOV
218
            return
×
219
        for filename in os.listdir(pkg_path):
5✔
220
            path = os.path.join(pkg_path, filename)
5✔
221
            if not filename.endswith('.zip'):
5✔
222
                continue
5✔
NEW
223
            if external and not self._has_root_permissions(path):
×
224
                self.logger.info(f'not loading {path}: file has user write permissions')
×
225
                continue
×
226
            try:
×
227
                zipfile = zipimport.zipimporter(path)
×
228
            except zipimport.ZipImportError:
×
229
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
230
                continue
×
231
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
232
                if b is False:
×
233
                    continue
×
234
                if name in self.internal_plugin_metadata:
×
235
                    raise Exception(f"duplicate plugins for name={name}")
×
236
                if name in self.external_plugin_metadata:
×
237
                    raise Exception(f"duplicate plugins for name={name}")
×
NEW
238
                module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
×
239
                spec = zipfile.find_spec(name)
×
240
                module = self.exec_module_from_spec(spec, module_path)
×
241
                if self.cmd_only:
×
242
                    if self.config.get('enable_plugin_' + name) is not True:
×
243
                        continue
×
244
                    spec2 = importlib.util.find_spec(module_path + '.commands')
×
245
                    if spec2 is None:  # no commands module in this plugin
×
246
                        continue
×
247
                    self.exec_module_from_spec(spec2, module_path + '.commands')
×
248
                    assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
249
                    self.loaded_command_modules.add(name)
×
250
                    continue
×
251
                d = module.__dict__
×
252
                gui_good = self.gui_name in d.get('available_for', [])
×
253
                if not gui_good:
×
254
                    continue
×
255
                d['filename'] = filename
×
256
                if 'fullname' not in d:
×
257
                    continue
×
258
                d['display_name'] = d['fullname']
×
259
                d['zip_hash_sha256'] = get_file_hash256(path)
×
NEW
260
                d['is_zip'] = True
×
NEW
261
                if external:
×
NEW
262
                    self.external_plugin_metadata[name] = d
×
263
                else:
NEW
264
                    self.internal_plugin_metadata[name] = d
×
265

266
    def get(self, name):
5✔
267
        return self.plugins.get(name)
×
268

269
    def count(self):
5✔
270
        return len(self.plugins)
×
271

272
    def load_plugin(self, name) -> 'BasePlugin':
5✔
273
        """Imports the code of the given plugin.
274
        note: can be called from any thread.
275
        """
276
        if self.get_metadata(name):
5✔
277
            return self.load_plugin_by_name(name)
5✔
278
        else:
279
            raise Exception(f"could not find plugin {name!r}")
×
280

281
    def load_plugin_by_name(self, name) -> 'BasePlugin':
5✔
282
        if name in self.plugins:
5✔
283
            return self.plugins[name]
×
284

285
        is_zip = self.is_plugin_zip(name)
5✔
286
        is_external = name in self.external_plugin_metadata
5✔
287
        if not is_external:
5✔
288
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
289
        else:
NEW
290
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
291

292
        spec = importlib.util.find_spec(full_name)
5✔
293
        if spec is None:
5✔
294
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
295
        try:
5✔
296
            if is_zip:
5✔
NEW
297
                module = self.exec_module_from_spec(spec, full_name)
×
298
            else:
299
                module = importlib.util.module_from_spec(spec)
5✔
300
                spec.loader.exec_module(module)
5✔
301
            plugin = module.Plugin(self, self.config, name)
5✔
302
        except Exception as e:
×
303
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
304
        self.add_jobs(plugin.thread_jobs())
5✔
305
        self.plugins[name] = plugin
5✔
306
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
307
        return plugin
5✔
308

309
    def close_plugin(self, plugin):
5✔
310
        self.remove_jobs(plugin.thread_jobs())
×
311

312
    def enable(self, name: str) -> 'BasePlugin':
5✔
313
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
314
        p = self.get(name)
×
315
        if p:
×
316
            return p
×
317
        return self.load_plugin(name)
×
318

319
    def disable(self, name: str) -> None:
5✔
320
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
321
        p = self.get(name)
×
322
        if not p:
×
323
            return
×
324
        self.plugins.pop(name)
×
325
        p.close()
×
326
        self.logger.info(f"closed {name}")
×
327

328
    @classmethod
5✔
329
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
330
        return key.startswith('enable_plugin_')
×
331

332
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
333
        p = self.get(name)
×
334
        return self.disable(name) if p else self.enable(name)
×
335

336
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
337
        d = self.descriptions.get(name)
×
338
        if not d:
×
339
            return False
×
340
        deps = d.get('requires', [])
×
341
        for dep, s in deps:
×
342
            try:
×
343
                __import__(dep)
×
344
            except ImportError as e:
×
345
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
346
                return False
×
347
        requires = d.get('requires_wallet_type', [])
×
348
        return not requires or wallet.wallet_type in requires
×
349

350
    def get_hardware_support(self):
5✔
351
        out = []
×
352
        for name, (gui_good, details) in self.hw_wallets.items():
×
353
            if gui_good:
×
354
                try:
×
355
                    p = self.get_plugin(name)
×
356
                    if p.is_available():
×
357
                        out.append(HardwarePluginToScan(name=name,
×
358
                                                        description=details[2],
359
                                                        plugin=p,
360
                                                        exception=None))
361
                except Exception as e:
×
362
                    self.logger.exception(f"cannot load plugin for: {name}")
×
363
                    out.append(HardwarePluginToScan(name=name,
×
364
                                                    description=details[2],
365
                                                    plugin=None,
366
                                                    exception=e))
367
        return out
×
368

369
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
370
        from .wallet import register_wallet_type, register_constructor
5✔
371
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
372

373
        def loader():
5✔
374
            plugin = self.get_plugin(name)
5✔
375
            register_constructor(wallet_type, plugin.wallet_class)
5✔
376
        register_wallet_type(wallet_type)
5✔
377
        plugin_loaders[wallet_type] = loader
5✔
378

379
    def register_keystore(self, name, gui_good, details):
5✔
380
        from .keystore import register_keystore
5✔
381

382
        def dynamic_constructor(d):
5✔
383
            return self.get_plugin(name).keystore_class(d)
5✔
384
        if details[0] == 'hardware':
5✔
385
            self.hw_wallets[name] = (gui_good, details)
5✔
386
            self.logger.info(f"registering hardware {name}: {details}")
5✔
387
            register_keystore(details[1], dynamic_constructor)
5✔
388

389
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
390
        if name not in self.plugins:
5✔
391
            self.load_plugin(name)
5✔
392
        return self.plugins[name]
5✔
393

394
    def is_plugin_zip(self, name: str) -> bool:
5✔
395
        """Returns True if the plugin is a zip file"""
396
        if (metadata := self.get_metadata(name)) is None:
5✔
397
            return False
5✔
398
        return metadata.get('is_zip', False)
5✔
399

400
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
401
        """Returns the metadata of the plugin"""
402
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
403
        if not metadata:
5✔
404
            return None
5✔
405
        return metadata
5✔
406

407
    def run(self):
5✔
408
        while self.is_running():
5✔
409
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
410
            self.run_jobs()
5✔
411
        self.on_stop()
5✔
412

413

414
def get_file_hash256(path: str) -> str:
5✔
415
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
416
    with open(path, 'rb') as f:
×
417
        return sha256(f.read()).hex()
×
418

419
def hook(func):
5✔
420
    hook_names.add(func.__name__)
5✔
421
    return func
5✔
422

423

424
def run_hook(name, *args):
5✔
425
    results = []
5✔
426
    f_list = hooks.get(name, [])
5✔
427
    for p, f in f_list:
5✔
428
        if p.is_enabled():
5✔
429
            try:
5✔
430
                r = f(*args)
5✔
431
            except Exception:
×
432
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
433
                r = False
×
434
            if r:
5✔
435
                results.append(r)
×
436

437
    if results:
5✔
438
        assert len(results) == 1, results
×
439
        return results[0]
×
440

441

442
class BasePlugin(Logger):
5✔
443

444
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
445
        self.parent = parent  # type: Plugins  # The plugins object
5✔
446
        self.name = name
5✔
447
        self.config = config
5✔
448
        self.wallet = None  # fixme: this field should not exist
5✔
449
        Logger.__init__(self)
5✔
450
        # add self to hooks
451
        for k in dir(self):
5✔
452
            if k in hook_names:
5✔
453
                l = hooks.get(k, [])
5✔
454
                l.append((self, getattr(self, k)))
5✔
455
                hooks[k] = l
5✔
456

457
    def __str__(self):
5✔
458
        return self.name
×
459

460
    def close(self):
5✔
461
        # remove self from hooks
462
        for attr_name in dir(self):
×
463
            if attr_name in hook_names:
×
464
                # found attribute in self that is also the name of a hook
465
                l = hooks.get(attr_name, [])
×
466
                try:
×
467
                    l.remove((self, getattr(self, attr_name)))
×
468
                except ValueError:
×
469
                    # maybe attr name just collided with hook name and was not hook
470
                    continue
×
471
                hooks[attr_name] = l
×
472
        self.parent.close_plugin(self)
×
473
        self.on_close()
×
474

475
    def on_close(self):
5✔
476
        pass
×
477

478
    def requires_settings(self) -> bool:
5✔
479
        return False
×
480

481
    def thread_jobs(self):
5✔
482
        return []
5✔
483

484
    def is_enabled(self):
5✔
485
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
486

487
    def is_available(self):
5✔
488
        return True
×
489

490
    def can_user_disable(self):
5✔
491
        return True
×
492

493
    def settings_widget(self, window):
5✔
494
        raise NotImplementedError()
×
495

496
    def settings_dialog(self, window):
5✔
497
        raise NotImplementedError()
×
498

499
    def read_file(self, filename: str) -> bytes:
5✔
500
        import zipfile
×
NEW
501
        if self.parent.is_plugin_zip(self.name):
×
NEW
502
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
503
            with zipfile.ZipFile(plugin_filename) as myzip:
×
504
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
505
                    return myfile.read()
×
506
        else:
NEW
507
            if filename in self.parent.internal_plugin_metadata:
×
NEW
508
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
509
            else:
NEW
510
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
511
            with open(path, 'rb') as myfile:
×
512
                return myfile.read()
×
513

514

515
class DeviceUnpairableError(UserFacingException): pass
5✔
516
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
517
class CannotAutoSelectDevice(Exception): pass
5✔
518

519

520
class Device(NamedTuple):
5✔
521
    path: Union[str, bytes]
5✔
522
    interface_number: int
5✔
523
    id_: str
5✔
524
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
525
    usage_page: int
5✔
526
    transport_ui_string: str
5✔
527

528

529
class DeviceInfo(NamedTuple):
5✔
530
    device: Device
5✔
531
    label: Optional[str] = None
5✔
532
    initialized: Optional[bool] = None
5✔
533
    exception: Optional[Exception] = None
5✔
534
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
535
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
536
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
537

538

539
class HardwarePluginToScan(NamedTuple):
5✔
540
    name: str
5✔
541
    description: str
5✔
542
    plugin: Optional['HW_PluginBase']
5✔
543
    exception: Optional[Exception]
5✔
544

545

546
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
547

548

549
# hidapi is not thread-safe
550
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
551
#     https://github.com/libusb/hidapi/issues/45
552
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
553
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
554
# It is not entirely clear to me, exactly what is safe and what isn't, when
555
# using multiple threads...
556
# Hence, we use a single thread for all device communications, including
557
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
558
# the following thread:
559
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
560
    max_workers=1,
561
    thread_name_prefix='hwd_comms_thread'
562
)
563

564
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
565
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
566
# To keep it simple, let's just import it now, as we are likely in the main thread here.
567
if threading.current_thread() is not threading.main_thread():
5✔
568
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
569
try:
5✔
570
    import hid
5✔
571
except ImportError:
5✔
572
    pass
5✔
573

574

575
T = TypeVar('T')
5✔
576

577

578
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
579
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
580
        return func()
×
581
    else:
582
        fut = _hwd_comms_executor.submit(func)
×
583
        return fut.result()
×
584
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
585

586

587
def runs_in_hwd_thread(func):
5✔
588
    @wraps(func)
5✔
589
    def wrapper(*args, **kwargs):
5✔
590
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
591
    return wrapper
5✔
592

593

594
def assert_runs_in_hwd_thread():
5✔
595
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
596
        raise Exception("must only be called from HWD communication thread")
×
597

598

599
class DeviceMgr(ThreadJob):
5✔
600
    """Manages hardware clients.  A client communicates over a hardware
601
    channel with the device.
602

603
    In addition to tracking device HID IDs, the device manager tracks
604
    hardware wallets and manages wallet pairing.  A HID ID may be
605
    paired with a wallet when it is confirmed that the hardware device
606
    matches the wallet, i.e. they have the same master public key.  A
607
    HID ID can be unpaired if e.g. it is wiped.
608

609
    Because of hotplugging, a wallet must request its client
610
    dynamically each time it is required, rather than caching it
611
    itself.
612

613
    The device manager is shared across plugins, so just one place
614
    does hardware scans when needed.  By tracking HID IDs, if a device
615
    is plugged into a different port the wallet is automatically
616
    re-paired.
617

618
    Wallets are informed on connect / disconnect events.  It must
619
    implement connected(), disconnected() callbacks.  Being connected
620
    implies a pairing.  Callbacks can happen in any thread context,
621
    and we do them without holding the lock.
622

623
    Confusingly, the HID ID (serial number) reported by the HID system
624
    doesn't match the device ID reported by the device itself.  We use
625
    the HID IDs.
626

627
    This plugin is thread-safe.  Currently only devices supported by
628
    hidapi are implemented."""
629

630
    def __init__(self, config: SimpleConfig):
5✔
631
        ThreadJob.__init__(self)
5✔
632
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
633
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
634
        # A client->id_ map. Needs self.lock.
635
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
636
        # What we recognise.  (vendor_id, product_id) -> Plugin
637
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
638
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
639
        # Custom enumerate functions for devices we don't know about.
640
        self._enumerate_func = set()  # Needs self.lock.
5✔
641

642
        self.lock = threading.RLock()
5✔
643

644
        self.config = config
5✔
645

646
    def thread_jobs(self):
5✔
647
        # Thread job to handle device timeouts
648
        return [self]
5✔
649

650
    def run(self):
5✔
651
        '''Handle device timeouts.  Runs in the context of the Plugins
652
        thread.'''
653
        with self.lock:
5✔
654
            clients = list(self.clients.keys())
5✔
655
        cutoff = time.time() - self.config.get_session_timeout()
5✔
656
        for client in clients:
5✔
657
            client.timeout(cutoff)
×
658

659
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
660
        for pair in device_pairs:
×
661
            self._recognised_hardware[pair] = plugin
×
662

663
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
664
        for vendor_id in vendor_ids:
×
665
            self._recognised_vendor[vendor_id] = plugin
×
666

667
    def register_enumerate_func(self, func):
5✔
668
        with self.lock:
×
669
            self._enumerate_func.add(func)
×
670

671
    @runs_in_hwd_thread
5✔
672
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
673
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
674
        # Get from cache first
675
        client = self._client_by_id(device.id_)
×
676
        if client:
×
677
            return client
×
678
        client = plugin.create_client(device, handler)
×
679
        if client:
×
680
            self.logger.info(f"Registering {client}")
×
681
            with self.lock:
×
682
                self.clients[client] = device.id_
×
683
        return client
×
684

685
    def id_by_pairing_code(self, pairing_code):
5✔
686
        with self.lock:
×
687
            return self.pairing_code_to_id.get(pairing_code)
×
688

689
    def pairing_code_by_id(self, id_):
5✔
690
        with self.lock:
×
691
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
692
                if id2 == id_:
×
693
                    return pairing_code
×
694
            return None
×
695

696
    def unpair_pairing_code(self, pairing_code):
5✔
697
        with self.lock:
×
698
            if pairing_code not in self.pairing_code_to_id:
×
699
                return
×
700
            _id = self.pairing_code_to_id.pop(pairing_code)
×
701
        self._close_client(_id)
×
702

703
    def unpair_id(self, id_):
5✔
704
        pairing_code = self.pairing_code_by_id(id_)
×
705
        if pairing_code:
×
706
            self.unpair_pairing_code(pairing_code)
×
707
        else:
708
            self._close_client(id_)
×
709

710
    def _close_client(self, id_):
5✔
711
        with self.lock:
×
712
            client = self._client_by_id(id_)
×
713
            self.clients.pop(client, None)
×
714
        if client:
×
715
            client.close()
×
716

717
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
718
        with self.lock:
×
719
            for client, client_id in self.clients.items():
×
720
                if client_id == id_:
×
721
                    return client
×
722
        return None
×
723

724
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
725
        '''Returns a client for the device ID if one is registered.  If
726
        a device is wiped or in bootloader mode pairing is impossible;
727
        in such cases we communicate by device ID and not wallet.'''
728
        if scan_now:
×
729
            self.scan_devices()
×
730
        return self._client_by_id(id_)
×
731

732
    @runs_in_hwd_thread
5✔
733
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
734
                            keystore: 'Hardware_KeyStore',
735
                            force_pair: bool, *,
736
                            devices: Sequence['Device'] = None,
737
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
738
        self.logger.info("getting client for keystore")
×
739
        if handler is None:
×
740
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
741
        handler.update_status(False)
×
742
        pcode = keystore.pairing_code()
×
743
        client = None
×
744
        # search existing clients first (fast-path)
745
        if not devices:
×
746
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
747
        # search clients again, now allowing a (slow) scan
748
        if client is None:
×
749
            if devices is None:
×
750
                devices = self.scan_devices()
×
751
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
752
        if client is None and force_pair:
×
753
            try:
×
754
                info = self.select_device(plugin, handler, keystore, devices,
×
755
                                          allow_user_interaction=allow_user_interaction)
756
            except CannotAutoSelectDevice:
×
757
                pass
×
758
            else:
759
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
760
        if client:
×
761
            handler.update_status(True)
×
762
            # note: if select_device was called, we might also update label etc here:
763
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
764
        self.logger.info("end client for keystore")
×
765
        return client
×
766

767
    def client_by_pairing_code(
5✔
768
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
769
        devices: Sequence['Device'],
770
    ) -> Optional['HardwareClientBase']:
771
        _id = self.id_by_pairing_code(pairing_code)
×
772
        client = self._client_by_id(_id)
×
773
        if client:
×
774
            if type(client.plugin) != type(plugin):
×
775
                return
×
776
            # An unpaired client might have another wallet's handler
777
            # from a prior scan.  Replace to fix dialog parenting.
778
            client.handler = handler
×
779
            return client
×
780

781
        for device in devices:
×
782
            if device.id_ == _id:
×
783
                return self.create_client(device, handler, plugin)
×
784

785
    def force_pair_keystore(
5✔
786
        self,
787
        *,
788
        plugin: 'HW_PluginBase',
789
        handler: 'HardwareHandlerBase',
790
        info: 'DeviceInfo',
791
        keystore: 'Hardware_KeyStore',
792
    ) -> 'HardwareClientBase':
793
        xpub = keystore.xpub
×
794
        derivation = keystore.get_derivation_prefix()
×
795
        assert derivation is not None
×
796
        xtype = bip32.xpub_type(xpub)
×
797
        client = self._client_by_id(info.device.id_)
×
798
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
799
            # See comment above for same code
800
            client.handler = handler
×
801
            # This will trigger a PIN/passphrase entry request
802
            try:
×
803
                client_xpub = client.get_xpub(derivation, xtype)
×
804
            except (UserCancelled, RuntimeError):
×
805
                # Bad / cancelled PIN / passphrase
806
                client_xpub = None
×
807
            if client_xpub == xpub:
×
808
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
809
                with self.lock:
×
810
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
811
                return client
×
812

813
        # The user input has wrong PIN or passphrase, or cancelled input,
814
        # or it is not pairable
815
        raise DeviceUnpairableError(
×
816
            _('Electrum cannot pair with your {}.\n\n'
817
              'Before you request bitcoins to be sent to addresses in this '
818
              'wallet, ensure you can pair with your device, or that you have '
819
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
820
              'receive will be unspendable.').format(plugin.device))
821

822
    def list_pairable_device_infos(
5✔
823
        self,
824
        *,
825
        handler: Optional['HardwareHandlerBase'],
826
        plugin: 'HW_PluginBase',
827
        devices: Sequence['Device'] = None,
828
        include_failing_clients: bool = False,
829
    ) -> List['DeviceInfo']:
830
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
831
        Already paired devices are also included, as it is okay to reuse them.
832
        """
833
        if not plugin.libraries_available:
×
834
            message = plugin.get_library_not_available_message()
×
835
            raise HardwarePluginLibraryUnavailable(message)
×
836
        if devices is None:
×
837
            devices = self.scan_devices()
×
838
        infos = []
×
839
        for device in devices:
×
840
            if not plugin.can_recognize_device(device):
×
841
                continue
×
842
            try:
×
843
                client = self.create_client(device, handler, plugin)
×
844
                if not client:
×
845
                    continue
×
846
                label = client.label()
×
847
                is_initialized = client.is_initialized()
×
848
                soft_device_id = client.get_soft_device_id()
×
849
                model_name = client.device_model_name()
×
850
            except Exception as e:
×
851
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
852
                if include_failing_clients:
×
853
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
854
                continue
×
855
            infos.append(DeviceInfo(device=device,
×
856
                                    label=label,
857
                                    initialized=is_initialized,
858
                                    plugin_name=plugin.name,
859
                                    soft_device_id=soft_device_id,
860
                                    model_name=model_name))
861

862
        return infos
×
863

864
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
865
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
866
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
867
        """Select the device to use for keystore."""
868
        # ideally this should not be called from the GUI thread...
869
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
870
        while True:
×
871
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
872
            if infos:
×
873
                break
×
874
            if not allow_user_interaction:
×
875
                raise CannotAutoSelectDevice()
×
876
            msg = _('Please insert your {}').format(plugin.device)
×
877
            msg += " ("
×
878
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
879
                msg += f"label: {keystore.label}, "
×
880
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
881
            msg += ').\n\n{}\n\n{}'.format(
×
882
                _('Verify the cable is connected and that '
883
                  'no other application is using it.'),
884
                _('Try to connect again?')
885
            )
886
            if not handler.yes_no_question(msg):
×
887
                raise UserCancelled()
×
888
            devices = None
×
889

890
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
891
        # method 1: select device by id
892
        if keystore.soft_device_id:
×
893
            for info in infos:
×
894
                if info.soft_device_id == keystore.soft_device_id:
×
895
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
896
                    return info
×
897
        # method 2: select device by label
898
        #           but only if not a placeholder label and only if there is no collision
899
        device_labels = [info.label for info in infos]
×
900
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
901
                and device_labels.count(keystore.label) == 1):
902
            for info in infos:
×
903
                if info.label == keystore.label:
×
904
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
905
                    return info
×
906
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
907
        #           saved for keystore anyway, select it
908
        if (len(infos) == 1
×
909
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
910
                and keystore.soft_device_id is None):
911
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
912
            return infos[0]
×
913

914
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
915
        if not allow_user_interaction:
×
916
            raise CannotAutoSelectDevice()
×
917
        # ask user to select device manually
918
        msg = (
×
919
                _("Could not automatically pair with device for given keystore.") + "\n"
920
                + f"(keystore label: {keystore.label!r}, "
921
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
922
        msg += _("Please select which {} device to use:").format(plugin.device)
×
923
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
924
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
925
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
926
                                init=(_("initialized") if info.initialized else _("wiped")),
927
                                transport=info.device.transport_ui_string,
928
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
929
                        for info in infos]
930
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
931
                          f"num options: {len(infos)}. options: {infos}")
932
        c = handler.query_choice(msg, descriptions)
×
933
        if c is None:
×
934
            raise UserCancelled()
×
935
        info = infos[c]
×
936
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
937
        # note: updated label/soft_device_id will be saved after pairing succeeds
938
        return info
×
939

940
    @runs_in_hwd_thread
5✔
941
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
942
        try:
×
943
            import hid
×
944
        except ImportError:
×
945
            return []
×
946

947
        devices = []
×
948
        for d in hid.enumerate(0, 0):
×
949
            vendor_id = d['vendor_id']
×
950
            product_key = (vendor_id, d['product_id'])
×
951
            plugin = None
×
952
            if product_key in self._recognised_hardware:
×
953
                plugin = self._recognised_hardware[product_key]
×
954
            elif vendor_id in self._recognised_vendor:
×
955
                plugin = self._recognised_vendor[vendor_id]
×
956
            if plugin:
×
957
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
958
                if device:
×
959
                    devices.append(device)
×
960
        return devices
×
961

962
    @runs_in_hwd_thread
5✔
963
    @profiler
5✔
964
    def scan_devices(self) -> Sequence['Device']:
5✔
965
        self.logger.info("scanning devices...")
×
966

967
        # First see what's connected that we know about
968
        devices = self._scan_devices_with_hid()
×
969

970
        # Let plugin handlers enumerate devices we don't know about
971
        with self.lock:
×
972
            enumerate_funcs = list(self._enumerate_func)
×
973
        for f in enumerate_funcs:
×
974
            try:
×
975
                new_devices = f()
×
976
            except BaseException as e:
×
977
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
978
            else:
979
                devices.extend(new_devices)
×
980

981
        # find out what was disconnected
982
        client_ids = [dev.id_ for dev in devices]
×
983
        disconnected_clients = []
×
984
        with self.lock:
×
985
            connected = {}
×
986
            for client, id_ in self.clients.items():
×
987
                if id_ in client_ids and client.has_usable_connection_with_device():
×
988
                    connected[client] = id_
×
989
                else:
990
                    disconnected_clients.append((client, id_))
×
991
            self.clients = connected
×
992

993
        # Unpair disconnected devices
994
        for client, id_ in disconnected_clients:
×
995
            self.unpair_id(id_)
×
996
            if client.handler:
×
997
                client.handler.update_status(False)
×
998

999
        return devices
×
1000

1001
    @classmethod
5✔
1002
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1003
        ret = {}
×
1004
        # add libusb
1005
        try:
×
1006
            import usb1
×
1007
        except Exception as e:
×
1008
            ret["libusb.version"] = None
×
1009
        else:
1010
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1011
            try:
×
1012
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1013
            except AttributeError:
×
1014
                ret["libusb.path"] = None
×
1015
        # add hidapi
1016
        try:
×
1017
            import hid
×
1018
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1019
        except Exception as e:
×
1020
            from importlib.metadata import version
×
1021
            try:
×
1022
                ret["hidapi.version"] = version("hidapi")
×
1023
            except ImportError:
×
1024
                ret["hidapi.version"] = None
×
1025
        return ret
×
1026

1027
    def trigger_pairings(
5✔
1028
            self,
1029
            keystores: Sequence['KeyStore'],
1030
            *,
1031
            allow_user_interaction: bool = True,
1032
            devices: Sequence['Device'] = None,
1033
    ) -> None:
1034
        """Given a list of keystores, try to pair each with a connected hardware device.
1035

1036
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1037
        try to pair each keystore individually. Consider the following scenario:
1038
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1039
        - assume none of the devices are paired yet
1040
        1. if we tried to individually pair keystores, we might try with ks1 first
1041
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1042
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1043
             which is confusing and error-prone. It's especially problematic if the hw device does
1044
             not support labels (such as Ledger), as then the user cannot easily distinguish
1045
             same-type devices. (see #4199)
1046
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1047
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1048
        """
1049
        from .keystore import Hardware_KeyStore
×
1050
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1051
        if not keystores:
×
1052
            return
×
1053
        if devices is None:
×
1054
            devices = self.scan_devices()
×
1055
        # first pair with all devices that can be auto-selected
1056
        for ks in keystores:
×
1057
            try:
×
1058
                ks.get_client(
×
1059
                    force_pair=True,
1060
                    allow_user_interaction=False,
1061
                    devices=devices,
1062
                )
1063
            except UserCancelled:
×
1064
                pass
×
1065
        if allow_user_interaction:
×
1066
            # now do manual selections
1067
            for ks in keystores:
×
1068
                try:
×
1069
                    ks.get_client(
×
1070
                        force_pair=True,
1071
                        allow_user_interaction=True,
1072
                        devices=devices,
1073
                    )
1074
                except UserCancelled:
×
1075
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc