• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4835092796801024

19 Mar 2025 10:20AM UTC coverage: 61.117% (-0.02%) from 61.133%
4835092796801024

Pull #9648

CirrusCI

ecdsa
config vars for plugins
Pull Request #9648: config vars for plugins

3 of 5 new or added lines in 1 file covered. (60.0%)

3 existing lines in 3 files now uncovered.

21353 of 34938 relevant lines covered (61.12%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.15
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import traceback
5✔
33
import sys
5✔
34
import aiohttp
5✔
35
import zipfile as zipfile_lib
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from concurrent import futures
5✔
42
from functools import wraps, partial
5✔
43
from itertools import chain
5✔
44

45
from .i18n import _
5✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
47
from . import bip32
5✔
48
from . import plugins
5✔
49
from .simple_config import ConfigVar, SimpleConfig
5✔
50
from .logging import get_logger, Logger
5✔
51
from .crypto import sha256
5✔
52

53
if TYPE_CHECKING:
5✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
5✔
60
plugin_loaders = {}
5✔
61
hook_names = set()
5✔
62
hooks = {}
5✔
63
_root_permission_cache = {}
5✔
64

65

66
class Plugins(DaemonThread):
5✔
67

68
    LOGGING_SHORTCUT = 'p'
5✔
69
    pkgpath = os.path.dirname(plugins.__file__)
5✔
70

71
    @profiler
5✔
72
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
73
        self.config = config
5✔
74
        self.cmd_only = cmd_only  # type: bool
5✔
75
        self.internal_plugin_metadata = {}
5✔
76
        self.external_plugin_metadata = {}
5✔
77
        if cmd_only:
5✔
78
            # only import the command modules of plugins
79
            Logger.__init__(self)
×
80
            self.find_plugins()
×
81
            self.load_plugins()
×
82
            return
×
83
        DaemonThread.__init__(self)
5✔
84
        self.device_manager = DeviceMgr(config)
5✔
85
        self.name = 'Plugins'  # set name of thread
5✔
86
        self.hw_wallets = {}
5✔
87
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
88
        self.gui_name = gui_name
5✔
89
        self.find_plugins()
5✔
90
        self.load_plugins()
5✔
91
        self.add_jobs(self.device_manager.thread_jobs())
5✔
92
        self.start()
5✔
93

94
    @property
5✔
95
    def descriptions(self):
5✔
96
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
97

98
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
99
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
100
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
101
        for loader, name, ispkg in iter_modules:
5✔
102
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
103
            #       once as data and once as code. To honor the "no duplicates" rule below,
104
            #       we exclude the ones packaged as *code*, here:
105
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
106
                continue
×
107
            module_path = os.path.join(pkg_path, name)
5✔
108
            if external and not self._has_recursive_root_permissions(module_path):
5✔
109
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
110
                continue
×
111
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
112
                continue
×
113
            try:
5✔
114
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
115
                    d = json.load(f)
5✔
116
            except FileNotFoundError:
5✔
117
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
118
                continue
5✔
119
            if 'fullname' not in d:
5✔
120
                continue
×
121
            d['display_name'] = d['fullname']
5✔
122
            d['path'] = module_path
5✔
123
            if not self.cmd_only:
5✔
124
                gui_good = self.gui_name in d.get('available_for', [])
5✔
125
                if not gui_good:
5✔
126
                    continue
5✔
127
                details = d.get('registers_wallet_type')
5✔
128
                if details:
5✔
129
                    self.register_wallet_type(name, gui_good, details)
5✔
130
                details = d.get('registers_keystore')
5✔
131
                if details:
5✔
132
                    self.register_keystore(name, gui_good, details)
5✔
133
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
134
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
135
                raise Exception(f"duplicate plugins? for {name=}")
×
136
            if not external:
5✔
137
                self.internal_plugin_metadata[name] = d
5✔
138
            else:
139
                self.external_plugin_metadata[name] = d
×
140

141
    @staticmethod
5✔
142
    def exec_module_from_spec(spec, path):
5✔
143
        try:
5✔
144
            module = importlib.util.module_from_spec(spec)
5✔
145
            # sys.modules needs to be modified for relative imports to work
146
            # see https://stackoverflow.com/a/50395128
147
            sys.modules[path] = module
5✔
148
            spec.loader.exec_module(module)
5✔
149
        except Exception as e:
×
150
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
151
        return module
5✔
152

153
    def find_plugins(self):
5✔
154
        internal_plugins_path = (self.pkgpath, False)
5✔
155
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
156
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
157
            # external plugins enforce root permissions on the directory
158
            if pkg_path and os.path.exists(pkg_path):
5✔
159
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
160
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
161

162
    def load_plugins(self):
5✔
163
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
164
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
165
                try:
×
166
                    if self.cmd_only:  # only load init method to register commands
×
167
                        self.maybe_load_plugin_init_method(name)
×
168
                    else:
169
                        self.load_plugin_by_name(name)
×
170
                except BaseException as e:
×
171
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
172

173
    def _has_root_permissions(self, path):
5✔
174
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
175

176
    @profiler(min_threshold=0.5)
5✔
177
    def _has_recursive_root_permissions(self, path):
5✔
178
        """Check if a directory and all its subdirectories have root permissions"""
179
        global _root_permission_cache
180
        if _root_permission_cache.get(path) is not None:
×
181
            return _root_permission_cache[path]
×
182
        _root_permission_cache[path] = False
×
183
        for root, dirs, files in os.walk(path):
×
184
            if not self._has_root_permissions(root):
×
185
                return False
×
186
            for f in files:
×
187
                if not self._has_root_permissions(os.path.join(root, f)):
×
188
                    return False
×
189
        _root_permission_cache[path] = True
×
190
        return True
×
191

192
    def get_external_plugin_dir(self):
5✔
193
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
194
            return
×
195
        pkg_path = '/opt/electrum_plugins'
5✔
196
        if not os.path.exists(pkg_path):
5✔
197
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
198
            return
5✔
199
        if not self._has_root_permissions(pkg_path):
×
200
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
201
            return
×
202
        return pkg_path
×
203

204
    def zip_plugin_path(self, name):
5✔
205
        filename = self.get_metadata(name)['filename']
×
206
        if name in self.internal_plugin_metadata:
×
207
            pkg_path = self.pkgpath
×
208
        else:
209
            pkg_path = self.get_external_plugin_dir()
×
210
        return os.path.join(pkg_path, filename)
×
211

212
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
213
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
214
        if pkg_path is None:
5✔
215
            return
×
216
        for filename in os.listdir(pkg_path):
5✔
217
            path = os.path.join(pkg_path, filename)
5✔
218
            if not filename.endswith('.zip'):
5✔
219
                continue
5✔
220
            if external and not self._has_root_permissions(path):
×
221
                self.logger.info(f'not loading {path}: file has user write permissions')
×
222
                continue
×
223
            try:
×
224
                zipfile = zipimport.zipimporter(path)
×
225
            except zipimport.ZipImportError:
×
226
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
227
                continue
×
228
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
229
                if b is False:
×
230
                    continue
×
231
                if name in self.internal_plugin_metadata:
×
232
                    raise Exception(f"duplicate plugins for name={name}")
×
233
                if name in self.external_plugin_metadata:
×
234
                    raise Exception(f"duplicate plugins for name={name}")
×
235
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
236
                    continue
×
237
                try:
×
238
                    with zipfile_lib.ZipFile(path) as file:
×
239
                        manifest_path = os.path.join(name, 'manifest.json')
×
240
                        with file.open(manifest_path, 'r') as f:
×
241
                            d = json.load(f)
×
242
                except Exception:
×
243
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
244
                    continue
×
245
                d['filename'] = filename
×
246
                d['is_zip'] = True
×
247
                d['path'] = path
×
248
                if not self.cmd_only:
×
249
                    gui_good = self.gui_name in d.get('available_for', [])
×
250
                    if not gui_good:
×
251
                        continue
×
252
                    if 'fullname' not in d:
×
253
                        continue
×
254
                    d['display_name'] = d['fullname']
×
255
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
256
                if external:
×
257
                    self.external_plugin_metadata[name] = d
×
258
                else:
259
                    self.internal_plugin_metadata[name] = d
×
260

261
    def get(self, name):
5✔
262
        return self.plugins.get(name)
×
263

264
    def count(self):
5✔
265
        return len(self.plugins)
×
266

267
    def load_plugin(self, name) -> 'BasePlugin':
5✔
268
        """Imports the code of the given plugin.
269
        note: can be called from any thread.
270
        """
271
        if self.get_metadata(name):
5✔
272
            return self.load_plugin_by_name(name)
5✔
273
        else:
274
            raise Exception(f"could not find plugin {name!r}")
×
275

276
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
277
        """Loads the __init__.py module of the plugin if it is not already loaded."""
278
        is_external = name in self.external_plugin_metadata
5✔
279
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
280
        if base_name not in sys.modules:
5✔
281
            metadata = self.get_metadata(name)
5✔
282
            is_zip = metadata.get('is_zip', False)
5✔
283
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
284
            if not is_zip:
5✔
285
                if is_external:
5✔
286
                    path = os.path.join(metadata['path'], '__init__.py')
×
287
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
288
                else:
289
                    init_spec = importlib.util.find_spec(base_name)
5✔
290
            else:
291
                zipfile = zipimport.zipimporter(metadata['path'])
×
292
                init_spec = zipfile.find_spec(name)
×
293
            module = self.exec_module_from_spec(init_spec, base_name)
5✔
294
            # import config vars
295
            if hasattr(module, 'config_vars'):
5✔
NEW
296
                for cv in module.config_vars:
×
NEW
297
                    setattr(SimpleConfig, cv.key().upper(), cv)
×
298
            if name == "trustedcoin":
5✔
299
                # removes trustedcoin after loading to not show it in the list of plugins
300
                del self.internal_plugin_metadata[name]
×
301

302
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
303
        if name in self.plugins:
5✔
304
            return self.plugins[name]
×
305

306
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
307
        self.maybe_load_plugin_init_method(name)
5✔
308

309
        is_external = name in self.external_plugin_metadata
5✔
310
        if not is_external:
5✔
311
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
312
        else:
313
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
314

315
        spec = importlib.util.find_spec(full_name)
5✔
316
        if spec is None:
5✔
317
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
318
        try:
5✔
319
            module = self.exec_module_from_spec(spec, full_name)
5✔
320
            plugin = module.Plugin(self, self.config, name)
5✔
321
        except Exception as e:
×
322
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
323
        self.add_jobs(plugin.thread_jobs())
5✔
324
        self.plugins[name] = plugin
5✔
325
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
326
        return plugin
5✔
327

328
    def close_plugin(self, plugin):
5✔
329
        self.remove_jobs(plugin.thread_jobs())
×
330

331
    def enable(self, name: str) -> 'BasePlugin':
5✔
332
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
333
        p = self.get(name)
×
334
        if p:
×
335
            return p
×
336
        return self.load_plugin(name)
×
337

338
    def disable(self, name: str) -> None:
5✔
339
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
340
        p = self.get(name)
×
341
        if not p:
×
342
            return
×
343
        self.plugins.pop(name)
×
344
        p.close()
×
345
        self.logger.info(f"closed {name}")
×
346

347
    @classmethod
5✔
348
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
349
        return key.startswith('enable_plugin_')
×
350

351
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
352
        p = self.get(name)
×
353
        return self.disable(name) if p else self.enable(name)
×
354

355
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
356
        d = self.descriptions.get(name)
×
357
        if not d:
×
358
            return False
×
359
        deps = d.get('requires', [])
×
360
        for dep, s in deps:
×
361
            try:
×
362
                __import__(dep)
×
363
            except ImportError as e:
×
364
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
365
                return False
×
366
        requires = d.get('requires_wallet_type', [])
×
367
        return not requires or wallet.wallet_type in requires
×
368

369
    def get_hardware_support(self):
5✔
370
        out = []
×
371
        for name, (gui_good, details) in self.hw_wallets.items():
×
372
            if gui_good:
×
373
                try:
×
374
                    p = self.get_plugin(name)
×
375
                    if p.is_available():
×
376
                        out.append(HardwarePluginToScan(name=name,
×
377
                                                        description=details[2],
378
                                                        plugin=p,
379
                                                        exception=None))
380
                except Exception as e:
×
381
                    self.logger.exception(f"cannot load plugin for: {name}")
×
382
                    out.append(HardwarePluginToScan(name=name,
×
383
                                                    description=details[2],
384
                                                    plugin=None,
385
                                                    exception=e))
386
        return out
×
387

388
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
389
        from .wallet import register_wallet_type, register_constructor
5✔
390
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
391

392
        def loader():
5✔
393
            plugin = self.get_plugin(name)
5✔
394
            register_constructor(wallet_type, plugin.wallet_class)
5✔
395
        register_wallet_type(wallet_type)
5✔
396
        plugin_loaders[wallet_type] = loader
5✔
397

398
    def register_keystore(self, name, gui_good, details):
5✔
399
        from .keystore import register_keystore
5✔
400

401
        def dynamic_constructor(d):
5✔
402
            return self.get_plugin(name).keystore_class(d)
5✔
403
        if details[0] == 'hardware':
5✔
404
            self.hw_wallets[name] = (gui_good, details)
5✔
405
            self.logger.info(f"registering hardware {name}: {details}")
5✔
406
            register_keystore(details[1], dynamic_constructor)
5✔
407

408
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
409
        if name not in self.plugins:
5✔
410
            self.load_plugin(name)
5✔
411
        return self.plugins[name]
5✔
412

413
    def is_plugin_zip(self, name: str) -> bool:
5✔
414
        """Returns True if the plugin is a zip file"""
415
        if (metadata := self.get_metadata(name)) is None:
×
416
            return False
×
417
        return metadata.get('is_zip', False)
×
418

419
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
420
        """Returns the metadata of the plugin"""
421
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
422
        if not metadata:
5✔
423
            return None
×
424
        return metadata
5✔
425

426
    def run(self):
5✔
427
        while self.is_running():
5✔
428
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
429
            self.run_jobs()
5✔
430
        self.on_stop()
5✔
431

432

433
def get_file_hash256(path: str) -> str:
5✔
434
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
435
    with open(path, 'rb') as f:
×
436
        return sha256(f.read()).hex()
×
437

438
def hook(func):
5✔
439
    hook_names.add(func.__name__)
5✔
440
    return func
5✔
441

442

443
def run_hook(name, *args):
5✔
444
    results = []
5✔
445
    f_list = hooks.get(name, [])
5✔
446
    for p, f in f_list:
5✔
447
        if p.is_enabled():
5✔
448
            try:
5✔
449
                r = f(*args)
5✔
450
            except Exception:
×
451
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
452
                r = False
×
453
            if r:
5✔
454
                results.append(r)
×
455

456
    if results:
5✔
457
        assert len(results) == 1, results
×
458
        return results[0]
×
459

460

461
class BasePlugin(Logger):
5✔
462

463
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
464
        self.parent = parent  # type: Plugins  # The plugins object
5✔
465
        self.name = name
5✔
466
        self.config = config
5✔
467
        self.wallet = None  # fixme: this field should not exist
5✔
468
        Logger.__init__(self)
5✔
469
        # add self to hooks
470
        for k in dir(self):
5✔
471
            if k in hook_names:
5✔
472
                l = hooks.get(k, [])
5✔
473
                l.append((self, getattr(self, k)))
5✔
474
                hooks[k] = l
5✔
475

476
    def __str__(self):
5✔
477
        return self.name
×
478

479
    def close(self):
5✔
480
        # remove self from hooks
481
        for attr_name in dir(self):
×
482
            if attr_name in hook_names:
×
483
                # found attribute in self that is also the name of a hook
484
                l = hooks.get(attr_name, [])
×
485
                try:
×
486
                    l.remove((self, getattr(self, attr_name)))
×
487
                except ValueError:
×
488
                    # maybe attr name just collided with hook name and was not hook
489
                    continue
×
490
                hooks[attr_name] = l
×
491
        self.parent.close_plugin(self)
×
492
        self.on_close()
×
493

494
    def on_close(self):
5✔
495
        pass
×
496

497
    def requires_settings(self) -> bool:
5✔
498
        return False
×
499

500
    def thread_jobs(self):
5✔
501
        return []
5✔
502

503
    def is_enabled(self):
5✔
504
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
505

506
    def is_available(self):
5✔
507
        return True
×
508

509
    def can_user_disable(self):
5✔
510
        return True
×
511

512
    def settings_widget(self, window):
5✔
513
        raise NotImplementedError()
×
514

515
    def settings_dialog(self, window):
5✔
516
        raise NotImplementedError()
×
517

518
    def read_file(self, filename: str) -> bytes:
5✔
519
        if self.parent.is_plugin_zip(self.name):
×
520
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
521
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
522
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
523
                    return myfile.read()
×
524
        else:
525
            if self.name in self.parent.internal_plugin_metadata:
×
526
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
527
            else:
528
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
529
            with open(path, 'rb') as myfile:
×
530
                return myfile.read()
×
531

532

533
class DeviceUnpairableError(UserFacingException): pass
5✔
534
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
535
class CannotAutoSelectDevice(Exception): pass
5✔
536

537

538
class Device(NamedTuple):
5✔
539
    path: Union[str, bytes]
5✔
540
    interface_number: int
5✔
541
    id_: str
5✔
542
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
543
    usage_page: int
5✔
544
    transport_ui_string: str
5✔
545

546

547
class DeviceInfo(NamedTuple):
5✔
548
    device: Device
5✔
549
    label: Optional[str] = None
5✔
550
    initialized: Optional[bool] = None
5✔
551
    exception: Optional[Exception] = None
5✔
552
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
553
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
554
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
555

556

557
class HardwarePluginToScan(NamedTuple):
5✔
558
    name: str
5✔
559
    description: str
5✔
560
    plugin: Optional['HW_PluginBase']
5✔
561
    exception: Optional[Exception]
5✔
562

563

564
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
565

566

567
# hidapi is not thread-safe
568
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
569
#     https://github.com/libusb/hidapi/issues/45
570
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
571
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
572
# It is not entirely clear to me, exactly what is safe and what isn't, when
573
# using multiple threads...
574
# Hence, we use a single thread for all device communications, including
575
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
576
# the following thread:
577
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
578
    max_workers=1,
579
    thread_name_prefix='hwd_comms_thread'
580
)
581

582
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
583
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
584
# To keep it simple, let's just import it now, as we are likely in the main thread here.
585
if threading.current_thread() is not threading.main_thread():
5✔
586
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
587
try:
5✔
588
    import hid
5✔
589
except ImportError:
5✔
590
    pass
5✔
591

592

593
T = TypeVar('T')
5✔
594

595

596
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
597
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
598
        return func()
×
599
    else:
600
        fut = _hwd_comms_executor.submit(func)
×
601
        return fut.result()
×
602
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
603

604

605
def runs_in_hwd_thread(func):
5✔
606
    @wraps(func)
5✔
607
    def wrapper(*args, **kwargs):
5✔
608
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
609
    return wrapper
5✔
610

611

612
def assert_runs_in_hwd_thread():
5✔
613
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
614
        raise Exception("must only be called from HWD communication thread")
×
615

616

617
class DeviceMgr(ThreadJob):
5✔
618
    """Manages hardware clients.  A client communicates over a hardware
619
    channel with the device.
620

621
    In addition to tracking device HID IDs, the device manager tracks
622
    hardware wallets and manages wallet pairing.  A HID ID may be
623
    paired with a wallet when it is confirmed that the hardware device
624
    matches the wallet, i.e. they have the same master public key.  A
625
    HID ID can be unpaired if e.g. it is wiped.
626

627
    Because of hotplugging, a wallet must request its client
628
    dynamically each time it is required, rather than caching it
629
    itself.
630

631
    The device manager is shared across plugins, so just one place
632
    does hardware scans when needed.  By tracking HID IDs, if a device
633
    is plugged into a different port the wallet is automatically
634
    re-paired.
635

636
    Wallets are informed on connect / disconnect events.  It must
637
    implement connected(), disconnected() callbacks.  Being connected
638
    implies a pairing.  Callbacks can happen in any thread context,
639
    and we do them without holding the lock.
640

641
    Confusingly, the HID ID (serial number) reported by the HID system
642
    doesn't match the device ID reported by the device itself.  We use
643
    the HID IDs.
644

645
    This plugin is thread-safe.  Currently only devices supported by
646
    hidapi are implemented."""
647

648
    def __init__(self, config: SimpleConfig):
5✔
649
        ThreadJob.__init__(self)
5✔
650
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
651
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
652
        # A client->id_ map. Needs self.lock.
653
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
654
        # What we recognise.  (vendor_id, product_id) -> Plugin
655
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
656
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
657
        # Custom enumerate functions for devices we don't know about.
658
        self._enumerate_func = set()  # Needs self.lock.
5✔
659

660
        self.lock = threading.RLock()
5✔
661

662
        self.config = config
5✔
663

664
    def thread_jobs(self):
5✔
665
        # Thread job to handle device timeouts
666
        return [self]
5✔
667

668
    def run(self):
5✔
669
        '''Handle device timeouts.  Runs in the context of the Plugins
670
        thread.'''
671
        with self.lock:
5✔
672
            clients = list(self.clients.keys())
5✔
673
        cutoff = time.time() - self.config.get_session_timeout()
5✔
674
        for client in clients:
5✔
675
            client.timeout(cutoff)
×
676

677
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
678
        for pair in device_pairs:
×
679
            self._recognised_hardware[pair] = plugin
×
680

681
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
682
        for vendor_id in vendor_ids:
×
683
            self._recognised_vendor[vendor_id] = plugin
×
684

685
    def register_enumerate_func(self, func):
5✔
686
        with self.lock:
×
687
            self._enumerate_func.add(func)
×
688

689
    @runs_in_hwd_thread
5✔
690
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
691
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
692
        # Get from cache first
693
        client = self._client_by_id(device.id_)
×
694
        if client:
×
695
            return client
×
696
        client = plugin.create_client(device, handler)
×
697
        if client:
×
698
            self.logger.info(f"Registering {client}")
×
699
            with self.lock:
×
700
                self.clients[client] = device.id_
×
701
        return client
×
702

703
    def id_by_pairing_code(self, pairing_code):
5✔
704
        with self.lock:
×
705
            return self.pairing_code_to_id.get(pairing_code)
×
706

707
    def pairing_code_by_id(self, id_):
5✔
708
        with self.lock:
×
709
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
710
                if id2 == id_:
×
711
                    return pairing_code
×
712
            return None
×
713

714
    def unpair_pairing_code(self, pairing_code):
5✔
715
        with self.lock:
×
716
            if pairing_code not in self.pairing_code_to_id:
×
717
                return
×
718
            _id = self.pairing_code_to_id.pop(pairing_code)
×
719
        self._close_client(_id)
×
720

721
    def unpair_id(self, id_):
5✔
722
        pairing_code = self.pairing_code_by_id(id_)
×
723
        if pairing_code:
×
724
            self.unpair_pairing_code(pairing_code)
×
725
        else:
726
            self._close_client(id_)
×
727

728
    def _close_client(self, id_):
5✔
729
        with self.lock:
×
730
            client = self._client_by_id(id_)
×
731
            self.clients.pop(client, None)
×
732
        if client:
×
733
            client.close()
×
734

735
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
736
        with self.lock:
×
737
            for client, client_id in self.clients.items():
×
738
                if client_id == id_:
×
739
                    return client
×
740
        return None
×
741

742
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
743
        '''Returns a client for the device ID if one is registered.  If
744
        a device is wiped or in bootloader mode pairing is impossible;
745
        in such cases we communicate by device ID and not wallet.'''
746
        if scan_now:
×
747
            self.scan_devices()
×
748
        return self._client_by_id(id_)
×
749

750
    @runs_in_hwd_thread
5✔
751
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
752
                            keystore: 'Hardware_KeyStore',
753
                            force_pair: bool, *,
754
                            devices: Sequence['Device'] = None,
755
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
756
        self.logger.info("getting client for keystore")
×
757
        if handler is None:
×
758
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
759
        handler.update_status(False)
×
760
        pcode = keystore.pairing_code()
×
761
        client = None
×
762
        # search existing clients first (fast-path)
763
        if not devices:
×
764
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
765
        # search clients again, now allowing a (slow) scan
766
        if client is None:
×
767
            if devices is None:
×
768
                devices = self.scan_devices()
×
769
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
770
        if client is None and force_pair:
×
771
            try:
×
772
                info = self.select_device(plugin, handler, keystore, devices,
×
773
                                          allow_user_interaction=allow_user_interaction)
774
            except CannotAutoSelectDevice:
×
775
                pass
×
776
            else:
777
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
778
        if client:
×
779
            handler.update_status(True)
×
780
            # note: if select_device was called, we might also update label etc here:
781
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
782
        self.logger.info("end client for keystore")
×
783
        return client
×
784

785
    def client_by_pairing_code(
5✔
786
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
787
        devices: Sequence['Device'],
788
    ) -> Optional['HardwareClientBase']:
789
        _id = self.id_by_pairing_code(pairing_code)
×
790
        client = self._client_by_id(_id)
×
791
        if client:
×
792
            if type(client.plugin) != type(plugin):
×
793
                return
×
794
            # An unpaired client might have another wallet's handler
795
            # from a prior scan.  Replace to fix dialog parenting.
796
            client.handler = handler
×
797
            return client
×
798

799
        for device in devices:
×
800
            if device.id_ == _id:
×
801
                return self.create_client(device, handler, plugin)
×
802

803
    def force_pair_keystore(
5✔
804
        self,
805
        *,
806
        plugin: 'HW_PluginBase',
807
        handler: 'HardwareHandlerBase',
808
        info: 'DeviceInfo',
809
        keystore: 'Hardware_KeyStore',
810
    ) -> 'HardwareClientBase':
811
        xpub = keystore.xpub
×
812
        derivation = keystore.get_derivation_prefix()
×
813
        assert derivation is not None
×
814
        xtype = bip32.xpub_type(xpub)
×
815
        client = self._client_by_id(info.device.id_)
×
816
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
817
            # See comment above for same code
818
            client.handler = handler
×
819
            # This will trigger a PIN/passphrase entry request
820
            try:
×
821
                client_xpub = client.get_xpub(derivation, xtype)
×
822
            except (UserCancelled, RuntimeError):
×
823
                # Bad / cancelled PIN / passphrase
824
                client_xpub = None
×
825
            if client_xpub == xpub:
×
826
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
827
                with self.lock:
×
828
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
829
                return client
×
830

831
        # The user input has wrong PIN or passphrase, or cancelled input,
832
        # or it is not pairable
833
        raise DeviceUnpairableError(
×
834
            _('Electrum cannot pair with your {}.\n\n'
835
              'Before you request bitcoins to be sent to addresses in this '
836
              'wallet, ensure you can pair with your device, or that you have '
837
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
838
              'receive will be unspendable.').format(plugin.device))
839

840
    def list_pairable_device_infos(
5✔
841
        self,
842
        *,
843
        handler: Optional['HardwareHandlerBase'],
844
        plugin: 'HW_PluginBase',
845
        devices: Sequence['Device'] = None,
846
        include_failing_clients: bool = False,
847
    ) -> List['DeviceInfo']:
848
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
849
        Already paired devices are also included, as it is okay to reuse them.
850
        """
851
        if not plugin.libraries_available:
×
852
            message = plugin.get_library_not_available_message()
×
853
            raise HardwarePluginLibraryUnavailable(message)
×
854
        if devices is None:
×
855
            devices = self.scan_devices()
×
856
        infos = []
×
857
        for device in devices:
×
858
            if not plugin.can_recognize_device(device):
×
859
                continue
×
860
            try:
×
861
                client = self.create_client(device, handler, plugin)
×
862
                if not client:
×
863
                    continue
×
864
                label = client.label()
×
865
                is_initialized = client.is_initialized()
×
866
                soft_device_id = client.get_soft_device_id()
×
867
                model_name = client.device_model_name()
×
868
            except Exception as e:
×
869
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
870
                if include_failing_clients:
×
871
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
872
                continue
×
873
            infos.append(DeviceInfo(device=device,
×
874
                                    label=label,
875
                                    initialized=is_initialized,
876
                                    plugin_name=plugin.name,
877
                                    soft_device_id=soft_device_id,
878
                                    model_name=model_name))
879

880
        return infos
×
881

882
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
883
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
884
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
885
        """Select the device to use for keystore."""
886
        # ideally this should not be called from the GUI thread...
887
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
888
        while True:
×
889
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
890
            if infos:
×
891
                break
×
892
            if not allow_user_interaction:
×
893
                raise CannotAutoSelectDevice()
×
894
            msg = _('Please insert your {}').format(plugin.device)
×
895
            msg += " ("
×
896
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
897
                msg += f"label: {keystore.label}, "
×
898
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
899
            msg += ').\n\n{}\n\n{}'.format(
×
900
                _('Verify the cable is connected and that '
901
                  'no other application is using it.'),
902
                _('Try to connect again?')
903
            )
904
            if not handler.yes_no_question(msg):
×
905
                raise UserCancelled()
×
906
            devices = None
×
907

908
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
909
        # method 1: select device by id
910
        if keystore.soft_device_id:
×
911
            for info in infos:
×
912
                if info.soft_device_id == keystore.soft_device_id:
×
913
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
914
                    return info
×
915
        # method 2: select device by label
916
        #           but only if not a placeholder label and only if there is no collision
917
        device_labels = [info.label for info in infos]
×
918
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
919
                and device_labels.count(keystore.label) == 1):
920
            for info in infos:
×
921
                if info.label == keystore.label:
×
922
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
923
                    return info
×
924
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
925
        #           saved for keystore anyway, select it
926
        if (len(infos) == 1
×
927
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
928
                and keystore.soft_device_id is None):
929
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
930
            return infos[0]
×
931

932
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
933
        if not allow_user_interaction:
×
934
            raise CannotAutoSelectDevice()
×
935
        # ask user to select device manually
936
        msg = (
×
937
                _("Could not automatically pair with device for given keystore.") + "\n"
938
                + f"(keystore label: {keystore.label!r}, "
939
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
940
        msg += _("Please select which {} device to use:").format(plugin.device)
×
941
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
942
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
943
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
944
                                init=(_("initialized") if info.initialized else _("wiped")),
945
                                transport=info.device.transport_ui_string,
946
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
947
                        for info in infos]
948
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
949
                          f"num options: {len(infos)}. options: {infos}")
950
        c = handler.query_choice(msg, descriptions)
×
951
        if c is None:
×
952
            raise UserCancelled()
×
953
        info = infos[c]
×
954
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
955
        # note: updated label/soft_device_id will be saved after pairing succeeds
956
        return info
×
957

958
    @runs_in_hwd_thread
5✔
959
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
960
        try:
×
961
            import hid
×
962
        except ImportError:
×
963
            return []
×
964

965
        devices = []
×
966
        for d in hid.enumerate(0, 0):
×
967
            vendor_id = d['vendor_id']
×
968
            product_key = (vendor_id, d['product_id'])
×
969
            plugin = None
×
970
            if product_key in self._recognised_hardware:
×
971
                plugin = self._recognised_hardware[product_key]
×
972
            elif vendor_id in self._recognised_vendor:
×
973
                plugin = self._recognised_vendor[vendor_id]
×
974
            if plugin:
×
975
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
976
                if device:
×
977
                    devices.append(device)
×
978
        return devices
×
979

980
    @runs_in_hwd_thread
5✔
981
    @profiler
5✔
982
    def scan_devices(self) -> Sequence['Device']:
5✔
983
        self.logger.info("scanning devices...")
×
984

985
        # First see what's connected that we know about
986
        devices = self._scan_devices_with_hid()
×
987

988
        # Let plugin handlers enumerate devices we don't know about
989
        with self.lock:
×
990
            enumerate_funcs = list(self._enumerate_func)
×
991
        for f in enumerate_funcs:
×
992
            try:
×
993
                new_devices = f()
×
994
            except BaseException as e:
×
995
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
996
            else:
997
                devices.extend(new_devices)
×
998

999
        # find out what was disconnected
1000
        client_ids = [dev.id_ for dev in devices]
×
1001
        disconnected_clients = []
×
1002
        with self.lock:
×
1003
            connected = {}
×
1004
            for client, id_ in self.clients.items():
×
1005
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1006
                    connected[client] = id_
×
1007
                else:
1008
                    disconnected_clients.append((client, id_))
×
1009
            self.clients = connected
×
1010

1011
        # Unpair disconnected devices
1012
        for client, id_ in disconnected_clients:
×
1013
            self.unpair_id(id_)
×
1014
            if client.handler:
×
1015
                client.handler.update_status(False)
×
1016

1017
        return devices
×
1018

1019
    @classmethod
5✔
1020
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1021
        ret = {}
×
1022
        # add libusb
1023
        try:
×
1024
            import usb1
×
1025
        except Exception as e:
×
1026
            ret["libusb.version"] = None
×
1027
        else:
1028
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1029
            try:
×
1030
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1031
            except AttributeError:
×
1032
                ret["libusb.path"] = None
×
1033
        # add hidapi
1034
        try:
×
1035
            import hid
×
1036
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1037
        except Exception as e:
×
1038
            from importlib.metadata import version
×
1039
            try:
×
1040
                ret["hidapi.version"] = version("hidapi")
×
1041
            except ImportError:
×
1042
                ret["hidapi.version"] = None
×
1043
        return ret
×
1044

1045
    def trigger_pairings(
5✔
1046
            self,
1047
            keystores: Sequence['KeyStore'],
1048
            *,
1049
            allow_user_interaction: bool = True,
1050
            devices: Sequence['Device'] = None,
1051
    ) -> None:
1052
        """Given a list of keystores, try to pair each with a connected hardware device.
1053

1054
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1055
        try to pair each keystore individually. Consider the following scenario:
1056
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1057
        - assume none of the devices are paired yet
1058
        1. if we tried to individually pair keystores, we might try with ks1 first
1059
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1060
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1061
             which is confusing and error-prone. It's especially problematic if the hw device does
1062
             not support labels (such as Ledger), as then the user cannot easily distinguish
1063
             same-type devices. (see #4199)
1064
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1065
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1066
        """
1067
        from .keystore import Hardware_KeyStore
×
1068
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1069
        if not keystores:
×
1070
            return
×
1071
        if devices is None:
×
1072
            devices = self.scan_devices()
×
1073
        # first pair with all devices that can be auto-selected
1074
        for ks in keystores:
×
1075
            try:
×
1076
                ks.get_client(
×
1077
                    force_pair=True,
1078
                    allow_user_interaction=False,
1079
                    devices=devices,
1080
                )
1081
            except UserCancelled:
×
1082
                pass
×
1083
        if allow_user_interaction:
×
1084
            # now do manual selections
1085
            for ks in keystores:
×
1086
                try:
×
1087
                    ks.get_client(
×
1088
                        force_pair=True,
1089
                        allow_user_interaction=True,
1090
                        devices=devices,
1091
                    )
1092
                except UserCancelled:
×
1093
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc