• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6293355014914048

13 Jan 2025 12:01PM UTC coverage: 60.566% (-0.07%) from 60.635%
6293355014914048

push

CirrusCI

web-flow
Merge pull request #9418 from SomberNight/202501_bump_min_python

bump min python to 3.10

4 of 6 new or added lines in 4 files covered. (66.67%)

6 existing lines in 5 files now uncovered.

20112 of 33207 relevant lines covered (60.57%)

3.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.52
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import pkgutil
5✔
28
import importlib.util
5✔
29
import time
5✔
30
import threading
5✔
31
import traceback
5✔
32
import sys
5✔
33
import aiohttp
5✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
5✔
38
import zipimport
5✔
39
from concurrent import futures
5✔
40
from functools import wraps, partial
5✔
41

42
from .i18n import _
5✔
43
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
44
from . import bip32
5✔
45
from . import plugins
5✔
46
from .simple_config import SimpleConfig
5✔
47
from .logging import get_logger, Logger
5✔
48
from .crypto import sha256
5✔
49

50
if TYPE_CHECKING:
5✔
51
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
52
    from .keystore import Hardware_KeyStore, KeyStore
×
53
    from .wallet import Abstract_Wallet
×
54

55

56
_logger = get_logger(__name__)
5✔
57
plugin_loaders = {}
5✔
58
hook_names = set()
5✔
59
hooks = {}
5✔
60

61

62
class Plugins(DaemonThread):
5✔
63

64
    LOGGING_SHORTCUT = 'p'
5✔
65
    pkgpath = os.path.dirname(plugins.__file__)
5✔
66

67
    @profiler
5✔
68
    def __init__(self, config: SimpleConfig, gui_name):
5✔
69
        DaemonThread.__init__(self)
5✔
70
        self.name = 'Plugins'  # set name of thread
5✔
71
        self.config = config
5✔
72
        self.hw_wallets = {}
5✔
73
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
74
        self.internal_plugin_metadata = {}
5✔
75
        self.external_plugin_metadata = {}
5✔
76
        self.gui_name = gui_name
5✔
77
        self.device_manager = DeviceMgr(config)
5✔
78
        self.find_internal_plugins()
5✔
79
        self.find_external_plugins()
5✔
80
        self.load_plugins()
5✔
81
        self.add_jobs(self.device_manager.thread_jobs())
5✔
82
        self.start()
5✔
83

84
    @property
5✔
85
    def descriptions(self):
5✔
86
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
87

88
    def find_internal_plugins(self):
5✔
89
        """Populates self.internal_plugin_metadata
90
        """
91
        iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
5✔
92
        for loader, name, ispkg in iter_modules:
5✔
93
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
94
            #       once as data and once as code. To honor the "no duplicates" rule below,
95
            #       we exclude the ones packaged as *code*, here:
96
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
97
                continue
×
98
            full_name = f'electrum.plugins.{name}'
5✔
99
            spec = importlib.util.find_spec(full_name)
5✔
100
            if spec is None:  # pkgutil found it but importlib can't ?!
5✔
101
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
102
            module = self.exec_module_from_spec(spec, full_name)
5✔
103
            d = module.__dict__
5✔
104
            if 'fullname' not in d:
5✔
105
                continue
5✔
106
            d['display_name'] = d['fullname']
5✔
107
            gui_good = self.gui_name in d.get('available_for', [])
5✔
108
            if not gui_good:
5✔
109
                continue
5✔
110
            details = d.get('registers_wallet_type')
5✔
111
            if details:
5✔
112
                self.register_wallet_type(name, gui_good, details)
5✔
113
            details = d.get('registers_keystore')
5✔
114
            if details:
5✔
115
                self.register_keystore(name, gui_good, details)
5✔
116
            if d.get('requires_wallet_type'):
5✔
117
                # trustedcoin will not be added to list
118
                continue
5✔
119
            if name in self.internal_plugin_metadata:
5✔
120
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
121
                raise Exception(f"duplicate plugins? for {name=}")
×
122
            self.internal_plugin_metadata[name] = d
5✔
123

124
    def exec_module_from_spec(self, spec, path):
5✔
125
        try:
5✔
126
            module = importlib.util.module_from_spec(spec)
5✔
127
            # sys.modules needs to be modified for relative imports to work
128
            # see https://stackoverflow.com/a/50395128
129
            sys.modules[path] = module
5✔
130
            spec.loader.exec_module(module)
5✔
UNCOV
131
        except Exception as e:
×
132
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
133
        return module
5✔
134

135
    def load_plugins(self):
5✔
136
        self.load_internal_plugins()
5✔
137
        self.load_external_plugins()
5✔
138

139
    def load_internal_plugins(self):
5✔
140
        for name, d in self.internal_plugin_metadata.items():
5✔
141
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
142
                try:
×
143
                    self.load_internal_plugin(name)
×
144
                except BaseException as e:
×
145
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
146

147
    def load_external_plugin(self, name):
5✔
148
        if name in self.plugins:
×
149
            return self.plugins[name]
×
150
        # If we do not have the metadata, it was not detected by `load_external_plugins`
151
        # on startup, or added by manual user installation after that point.
152
        metadata = self.external_plugin_metadata.get(name)
×
153
        if metadata is None:
×
154
            self.logger.exception(f"attempted to load unknown external plugin {name}")
×
155
            return
×
156
        full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
157
        spec = importlib.util.find_spec(full_name)
×
158
        if spec is None:
×
159
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
160
        module = self.exec_module_from_spec(spec, full_name)
×
161
        plugin = module.Plugin(self, self.config, name)
×
162
        self.add_jobs(plugin.thread_jobs())
×
163
        self.plugins[name] = plugin
×
164
        self.logger.info(f"loaded external plugin {name}")
×
165
        return plugin
×
166

167
    def _has_root_permissions(self, path):
5✔
168
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
169

170
    def get_external_plugin_dir(self):
5✔
171
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
172
            return
×
173
        pkg_path = '/opt/electrum_plugins'
5✔
174
        if not os.path.exists(pkg_path):
5✔
175
            self.logger.info(f'direcctory {pkg_path} does not exist')
5✔
176
            return
5✔
177
        if not self._has_root_permissions(pkg_path):
×
178
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
179
            return
×
180
        return pkg_path
×
181

182
    def external_plugin_path(self, name):
5✔
183
        metadata = self.external_plugin_metadata[name]
×
184
        filename = metadata['filename']
×
185
        return os.path.join(self.get_external_plugin_dir(), filename)
×
186

187
    def find_external_plugins(self):
5✔
188
        pkg_path = self.get_external_plugin_dir()
5✔
189
        if pkg_path is None:
5✔
190
            return
5✔
191
        for filename in os.listdir(pkg_path):
×
192
            path = os.path.join(pkg_path, filename)
×
193
            if not self._has_root_permissions(path):
×
194
                self.logger.info(f'not loading {path}: file has user write permissions')
×
195
                continue
×
196
            try:
×
197
                zipfile = zipimport.zipimporter(path)
×
198
            except zipimport.ZipImportError:
×
199
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
200
                continue
×
201
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
202
                if b is False:
×
203
                    continue
×
204
                if name in self.internal_plugin_metadata:
×
205
                    raise Exception(f"duplicate plugins for name={name}")
×
206
                if name in self.external_plugin_metadata:
×
207
                    raise Exception(f"duplicate plugins for name={name}")
×
208
                module_path = f'electrum_external_plugins.{name}'
×
NEW
209
                spec = zipfile.find_spec(name)
×
NEW
210
                module = self.exec_module_from_spec(spec, module_path)
×
211
                d = module.__dict__
×
212
                gui_good = self.gui_name in d.get('available_for', [])
×
213
                if not gui_good:
×
214
                    continue
×
215
                d['filename'] = filename
×
216
                if 'fullname' not in d:
×
217
                    continue
×
218
                d['display_name'] = d['fullname']
×
219
                d['zip_hash_sha256'] = get_file_hash256(path)
×
220
                self.external_plugin_metadata[name] = d
×
221

222
    def load_external_plugins(self):
5✔
223
        for name, d in self.external_plugin_metadata.items():
5✔
224
            if self.config.get('enable_plugin_' + name):
×
225
                try:
×
226
                    self.load_external_plugin(name)
×
227
                except BaseException as e:
×
228
                    traceback.print_exc(file=sys.stdout)  # shouldn't this be... suppressed unless -v?
×
229
                    self.logger.exception(f"cannot initialize plugin {name} {e!r}")
×
230

231
    def get(self, name):
5✔
232
        return self.plugins.get(name)
×
233

234
    def count(self):
5✔
235
        return len(self.plugins)
×
236

237
    def load_plugin(self, name) -> 'BasePlugin':
5✔
238
        """Imports the code of the given plugin.
239
        note: can be called from any thread.
240
        """
241
        if name in self.internal_plugin_metadata:
5✔
242
            return self.load_internal_plugin(name)
5✔
243
        elif name in self.external_plugin_metadata:
×
244
            return self.load_external_plugin(name)
×
245
        else:
246
            raise Exception(f"could not find plugin {name!r}")
×
247

248
    def load_internal_plugin(self, name) -> 'BasePlugin':
5✔
249
        if name in self.plugins:
5✔
250
            return self.plugins[name]
×
251
        full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
252
        spec = importlib.util.find_spec(full_name)
5✔
253
        if spec is None:
5✔
254
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
255
        try:
5✔
256
            module = importlib.util.module_from_spec(spec)
5✔
257
            spec.loader.exec_module(module)
5✔
258
            plugin = module.Plugin(self, self.config, name)
5✔
259
        except Exception as e:
×
260
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
261
        self.add_jobs(plugin.thread_jobs())
5✔
262
        self.plugins[name] = plugin
5✔
263
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
264
        return plugin
5✔
265

266
    def close_plugin(self, plugin):
5✔
267
        self.remove_jobs(plugin.thread_jobs())
×
268

269
    def enable(self, name: str) -> 'BasePlugin':
5✔
270
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
271
        p = self.get(name)
×
272
        if p:
×
273
            return p
×
274
        return self.load_plugin(name)
×
275

276
    def disable(self, name: str) -> None:
5✔
277
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
278
        p = self.get(name)
×
279
        if not p:
×
280
            return
×
281
        self.plugins.pop(name)
×
282
        p.close()
×
283
        self.logger.info(f"closed {name}")
×
284

285
    @classmethod
5✔
286
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
287
        return key.startswith('enable_plugin_')
×
288

289
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
290
        p = self.get(name)
×
291
        return self.disable(name) if p else self.enable(name)
×
292

293
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
294
        d = self.descriptions.get(name)
×
295
        if not d:
×
296
            return False
×
297
        deps = d.get('requires', [])
×
298
        for dep, s in deps:
×
299
            try:
×
300
                __import__(dep)
×
301
            except ImportError as e:
×
302
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
303
                return False
×
304
        requires = d.get('requires_wallet_type', [])
×
305
        return not requires or wallet.wallet_type in requires
×
306

307
    def get_hardware_support(self):
5✔
308
        out = []
×
309
        for name, (gui_good, details) in self.hw_wallets.items():
×
310
            if gui_good:
×
311
                try:
×
312
                    p = self.get_plugin(name)
×
313
                    if p.is_available():
×
314
                        out.append(HardwarePluginToScan(name=name,
×
315
                                                        description=details[2],
316
                                                        plugin=p,
317
                                                        exception=None))
318
                except Exception as e:
×
319
                    self.logger.exception(f"cannot load plugin for: {name}")
×
320
                    out.append(HardwarePluginToScan(name=name,
×
321
                                                    description=details[2],
322
                                                    plugin=None,
323
                                                    exception=e))
324
        return out
×
325

326
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
327
        from .wallet import register_wallet_type, register_constructor
5✔
328
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
329

330
        def loader():
5✔
331
            plugin = self.get_plugin(name)
5✔
332
            register_constructor(wallet_type, plugin.wallet_class)
5✔
333
        register_wallet_type(wallet_type)
5✔
334
        plugin_loaders[wallet_type] = loader
5✔
335

336
    def register_keystore(self, name, gui_good, details):
5✔
337
        from .keystore import register_keystore
5✔
338

339
        def dynamic_constructor(d):
5✔
340
            return self.get_plugin(name).keystore_class(d)
5✔
341
        if details[0] == 'hardware':
5✔
342
            self.hw_wallets[name] = (gui_good, details)
5✔
343
            self.logger.info(f"registering hardware {name}: {details}")
5✔
344
            register_keystore(details[1], dynamic_constructor)
5✔
345

346
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
347
        if name not in self.plugins:
5✔
348
            self.load_plugin(name)
5✔
349
        return self.plugins[name]
5✔
350

351
    def run(self):
5✔
352
        while self.is_running():
5✔
353
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
354
            self.run_jobs()
5✔
355
        self.on_stop()
5✔
356

357
def get_file_hash256(path: str) -> str:
5✔
358
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
359
    with open(path, 'rb') as f:
×
360
        return sha256(f.read()).hex()
×
361

362
def hook(func):
5✔
363
    hook_names.add(func.__name__)
5✔
364
    return func
5✔
365

366

367
def run_hook(name, *args):
5✔
368
    results = []
5✔
369
    f_list = hooks.get(name, [])
5✔
370
    for p, f in f_list:
5✔
371
        if p.is_enabled():
5✔
372
            try:
5✔
373
                r = f(*args)
5✔
374
            except Exception:
×
375
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
376
                r = False
×
377
            if r:
5✔
378
                results.append(r)
×
379

380
    if results:
5✔
381
        assert len(results) == 1, results
×
382
        return results[0]
×
383

384

385
class BasePlugin(Logger):
5✔
386

387
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
388
        self.parent = parent  # type: Plugins  # The plugins object
5✔
389
        self.name = name
5✔
390
        self.config = config
5✔
391
        self.wallet = None  # fixme: this field should not exist
5✔
392
        Logger.__init__(self)
5✔
393
        # add self to hooks
394
        for k in dir(self):
5✔
395
            if k in hook_names:
5✔
396
                l = hooks.get(k, [])
5✔
397
                l.append((self, getattr(self, k)))
5✔
398
                hooks[k] = l
5✔
399

400
    def __str__(self):
5✔
401
        return self.name
×
402

403
    def close(self):
5✔
404
        # remove self from hooks
405
        for attr_name in dir(self):
×
406
            if attr_name in hook_names:
×
407
                # found attribute in self that is also the name of a hook
408
                l = hooks.get(attr_name, [])
×
409
                try:
×
410
                    l.remove((self, getattr(self, attr_name)))
×
411
                except ValueError:
×
412
                    # maybe attr name just collided with hook name and was not hook
413
                    continue
×
414
                hooks[attr_name] = l
×
415
        self.parent.close_plugin(self)
×
416
        self.on_close()
×
417

418
    def on_close(self):
5✔
419
        pass
×
420

421
    def requires_settings(self) -> bool:
5✔
422
        return False
×
423

424
    def thread_jobs(self):
5✔
425
        return []
5✔
426

427
    def is_enabled(self):
5✔
428
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
429

430
    def is_available(self):
5✔
431
        return True
×
432

433
    def can_user_disable(self):
5✔
434
        return True
×
435

436
    def settings_widget(self, window):
5✔
437
        raise NotImplementedError()
×
438

439
    def settings_dialog(self, window):
5✔
440
        raise NotImplementedError()
×
441

442
    def read_file(self, filename: str) -> bytes:
5✔
443
        import zipfile
×
444
        if self.name in self.parent.external_plugin_metadata:
×
445
            plugin_filename = self.parent.external_plugin_path(self.name)
×
446
            with zipfile.ZipFile(plugin_filename) as myzip:
×
447
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
448
                    return myfile.read()
×
449
        else:
450
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
451
            with open(path, 'rb') as myfile:
×
452
                return myfile.read()
×
453

454

455
class DeviceUnpairableError(UserFacingException): pass
5✔
456
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
457
class CannotAutoSelectDevice(Exception): pass
5✔
458

459

460
class Device(NamedTuple):
5✔
461
    path: Union[str, bytes]
5✔
462
    interface_number: int
5✔
463
    id_: str
5✔
464
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
465
    usage_page: int
5✔
466
    transport_ui_string: str
5✔
467

468

469
class DeviceInfo(NamedTuple):
5✔
470
    device: Device
5✔
471
    label: Optional[str] = None
5✔
472
    initialized: Optional[bool] = None
5✔
473
    exception: Optional[Exception] = None
5✔
474
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
475
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
476
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
477

478

479
class HardwarePluginToScan(NamedTuple):
5✔
480
    name: str
5✔
481
    description: str
5✔
482
    plugin: Optional['HW_PluginBase']
5✔
483
    exception: Optional[Exception]
5✔
484

485

486
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
487

488

489
# hidapi is not thread-safe
490
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
491
#     https://github.com/libusb/hidapi/issues/45
492
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
493
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
494
# It is not entirely clear to me, exactly what is safe and what isn't, when
495
# using multiple threads...
496
# Hence, we use a single thread for all device communications, including
497
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
498
# the following thread:
499
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
500
    max_workers=1,
501
    thread_name_prefix='hwd_comms_thread'
502
)
503

504
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
505
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
506
# To keep it simple, let's just import it now, as we are likely in the main thread here.
507
if threading.current_thread() is not threading.main_thread():
5✔
508
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
509
try:
5✔
510
    import hid
5✔
511
except ImportError:
5✔
512
    pass
5✔
513

514

515
T = TypeVar('T')
5✔
516

517

518
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
519
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
520
        return func()
×
521
    else:
522
        fut = _hwd_comms_executor.submit(func)
×
523
        return fut.result()
×
524
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
525

526

527
def runs_in_hwd_thread(func):
5✔
528
    @wraps(func)
5✔
529
    def wrapper(*args, **kwargs):
5✔
530
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
531
    return wrapper
5✔
532

533

534
def assert_runs_in_hwd_thread():
5✔
535
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
536
        raise Exception("must only be called from HWD communication thread")
×
537

538

539
class DeviceMgr(ThreadJob):
5✔
540
    """Manages hardware clients.  A client communicates over a hardware
541
    channel with the device.
542

543
    In addition to tracking device HID IDs, the device manager tracks
544
    hardware wallets and manages wallet pairing.  A HID ID may be
545
    paired with a wallet when it is confirmed that the hardware device
546
    matches the wallet, i.e. they have the same master public key.  A
547
    HID ID can be unpaired if e.g. it is wiped.
548

549
    Because of hotplugging, a wallet must request its client
550
    dynamically each time it is required, rather than caching it
551
    itself.
552

553
    The device manager is shared across plugins, so just one place
554
    does hardware scans when needed.  By tracking HID IDs, if a device
555
    is plugged into a different port the wallet is automatically
556
    re-paired.
557

558
    Wallets are informed on connect / disconnect events.  It must
559
    implement connected(), disconnected() callbacks.  Being connected
560
    implies a pairing.  Callbacks can happen in any thread context,
561
    and we do them without holding the lock.
562

563
    Confusingly, the HID ID (serial number) reported by the HID system
564
    doesn't match the device ID reported by the device itself.  We use
565
    the HID IDs.
566

567
    This plugin is thread-safe.  Currently only devices supported by
568
    hidapi are implemented."""
569

570
    def __init__(self, config: SimpleConfig):
5✔
571
        ThreadJob.__init__(self)
5✔
572
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
573
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
574
        # A client->id_ map. Needs self.lock.
575
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
576
        # What we recognise.  (vendor_id, product_id) -> Plugin
577
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
578
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
579
        # Custom enumerate functions for devices we don't know about.
580
        self._enumerate_func = set()  # Needs self.lock.
5✔
581

582
        self.lock = threading.RLock()
5✔
583

584
        self.config = config
5✔
585

586
    def thread_jobs(self):
5✔
587
        # Thread job to handle device timeouts
588
        return [self]
5✔
589

590
    def run(self):
5✔
591
        '''Handle device timeouts.  Runs in the context of the Plugins
592
        thread.'''
593
        with self.lock:
5✔
594
            clients = list(self.clients.keys())
5✔
595
        cutoff = time.time() - self.config.get_session_timeout()
5✔
596
        for client in clients:
5✔
597
            client.timeout(cutoff)
×
598

599
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
600
        for pair in device_pairs:
×
601
            self._recognised_hardware[pair] = plugin
×
602

603
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
604
        for vendor_id in vendor_ids:
×
605
            self._recognised_vendor[vendor_id] = plugin
×
606

607
    def register_enumerate_func(self, func):
5✔
608
        with self.lock:
×
609
            self._enumerate_func.add(func)
×
610

611
    @runs_in_hwd_thread
5✔
612
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
613
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
614
        # Get from cache first
615
        client = self._client_by_id(device.id_)
×
616
        if client:
×
617
            return client
×
618
        client = plugin.create_client(device, handler)
×
619
        if client:
×
620
            self.logger.info(f"Registering {client}")
×
621
            with self.lock:
×
622
                self.clients[client] = device.id_
×
623
        return client
×
624

625
    def id_by_pairing_code(self, pairing_code):
5✔
626
        with self.lock:
×
627
            return self.pairing_code_to_id.get(pairing_code)
×
628

629
    def pairing_code_by_id(self, id_):
5✔
630
        with self.lock:
×
631
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
632
                if id2 == id_:
×
633
                    return pairing_code
×
634
            return None
×
635

636
    def unpair_pairing_code(self, pairing_code):
5✔
637
        with self.lock:
×
638
            if pairing_code not in self.pairing_code_to_id:
×
639
                return
×
640
            _id = self.pairing_code_to_id.pop(pairing_code)
×
641
        self._close_client(_id)
×
642

643
    def unpair_id(self, id_):
5✔
644
        pairing_code = self.pairing_code_by_id(id_)
×
645
        if pairing_code:
×
646
            self.unpair_pairing_code(pairing_code)
×
647
        else:
648
            self._close_client(id_)
×
649

650
    def _close_client(self, id_):
5✔
651
        with self.lock:
×
652
            client = self._client_by_id(id_)
×
653
            self.clients.pop(client, None)
×
654
        if client:
×
655
            client.close()
×
656

657
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
658
        with self.lock:
×
659
            for client, client_id in self.clients.items():
×
660
                if client_id == id_:
×
661
                    return client
×
662
        return None
×
663

664
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
665
        '''Returns a client for the device ID if one is registered.  If
666
        a device is wiped or in bootloader mode pairing is impossible;
667
        in such cases we communicate by device ID and not wallet.'''
668
        if scan_now:
×
669
            self.scan_devices()
×
670
        return self._client_by_id(id_)
×
671

672
    @runs_in_hwd_thread
5✔
673
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
674
                            keystore: 'Hardware_KeyStore',
675
                            force_pair: bool, *,
676
                            devices: Sequence['Device'] = None,
677
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
678
        self.logger.info("getting client for keystore")
×
679
        if handler is None:
×
680
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
681
        handler.update_status(False)
×
682
        pcode = keystore.pairing_code()
×
683
        client = None
×
684
        # search existing clients first (fast-path)
685
        if not devices:
×
686
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
687
        # search clients again, now allowing a (slow) scan
688
        if client is None:
×
689
            if devices is None:
×
690
                devices = self.scan_devices()
×
691
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
692
        if client is None and force_pair:
×
693
            try:
×
694
                info = self.select_device(plugin, handler, keystore, devices,
×
695
                                          allow_user_interaction=allow_user_interaction)
696
            except CannotAutoSelectDevice:
×
697
                pass
×
698
            else:
699
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
700
        if client:
×
701
            handler.update_status(True)
×
702
            # note: if select_device was called, we might also update label etc here:
703
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
704
        self.logger.info("end client for keystore")
×
705
        return client
×
706

707
    def client_by_pairing_code(
5✔
708
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
709
        devices: Sequence['Device'],
710
    ) -> Optional['HardwareClientBase']:
711
        _id = self.id_by_pairing_code(pairing_code)
×
712
        client = self._client_by_id(_id)
×
713
        if client:
×
714
            if type(client.plugin) != type(plugin):
×
715
                return
×
716
            # An unpaired client might have another wallet's handler
717
            # from a prior scan.  Replace to fix dialog parenting.
718
            client.handler = handler
×
719
            return client
×
720

721
        for device in devices:
×
722
            if device.id_ == _id:
×
723
                return self.create_client(device, handler, plugin)
×
724

725
    def force_pair_keystore(
5✔
726
        self,
727
        *,
728
        plugin: 'HW_PluginBase',
729
        handler: 'HardwareHandlerBase',
730
        info: 'DeviceInfo',
731
        keystore: 'Hardware_KeyStore',
732
    ) -> 'HardwareClientBase':
733
        xpub = keystore.xpub
×
734
        derivation = keystore.get_derivation_prefix()
×
735
        assert derivation is not None
×
736
        xtype = bip32.xpub_type(xpub)
×
737
        client = self._client_by_id(info.device.id_)
×
738
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
739
            # See comment above for same code
740
            client.handler = handler
×
741
            # This will trigger a PIN/passphrase entry request
742
            try:
×
743
                client_xpub = client.get_xpub(derivation, xtype)
×
744
            except (UserCancelled, RuntimeError):
×
745
                # Bad / cancelled PIN / passphrase
746
                client_xpub = None
×
747
            if client_xpub == xpub:
×
748
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
749
                with self.lock:
×
750
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
751
                return client
×
752

753
        # The user input has wrong PIN or passphrase, or cancelled input,
754
        # or it is not pairable
755
        raise DeviceUnpairableError(
×
756
            _('Electrum cannot pair with your {}.\n\n'
757
              'Before you request bitcoins to be sent to addresses in this '
758
              'wallet, ensure you can pair with your device, or that you have '
759
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
760
              'receive will be unspendable.').format(plugin.device))
761

762
    def list_pairable_device_infos(
5✔
763
        self,
764
        *,
765
        handler: Optional['HardwareHandlerBase'],
766
        plugin: 'HW_PluginBase',
767
        devices: Sequence['Device'] = None,
768
        include_failing_clients: bool = False,
769
    ) -> List['DeviceInfo']:
770
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
771
        Already paired devices are also included, as it is okay to reuse them.
772
        """
773
        if not plugin.libraries_available:
×
774
            message = plugin.get_library_not_available_message()
×
775
            raise HardwarePluginLibraryUnavailable(message)
×
776
        if devices is None:
×
777
            devices = self.scan_devices()
×
778
        infos = []
×
779
        for device in devices:
×
780
            if not plugin.can_recognize_device(device):
×
781
                continue
×
782
            try:
×
783
                client = self.create_client(device, handler, plugin)
×
784
                if not client:
×
785
                    continue
×
786
                label = client.label()
×
787
                is_initialized = client.is_initialized()
×
788
                soft_device_id = client.get_soft_device_id()
×
789
                model_name = client.device_model_name()
×
790
            except Exception as e:
×
791
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
792
                if include_failing_clients:
×
793
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
794
                continue
×
795
            infos.append(DeviceInfo(device=device,
×
796
                                    label=label,
797
                                    initialized=is_initialized,
798
                                    plugin_name=plugin.name,
799
                                    soft_device_id=soft_device_id,
800
                                    model_name=model_name))
801

802
        return infos
×
803

804
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
805
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
806
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
807
        """Select the device to use for keystore."""
808
        # ideally this should not be called from the GUI thread...
809
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
810
        while True:
×
811
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
812
            if infos:
×
813
                break
×
814
            if not allow_user_interaction:
×
815
                raise CannotAutoSelectDevice()
×
816
            msg = _('Please insert your {}').format(plugin.device)
×
817
            msg += " ("
×
818
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
819
                msg += f"label: {keystore.label}, "
×
820
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
821
            msg += ').\n\n{}\n\n{}'.format(
×
822
                _('Verify the cable is connected and that '
823
                  'no other application is using it.'),
824
                _('Try to connect again?')
825
            )
826
            if not handler.yes_no_question(msg):
×
827
                raise UserCancelled()
×
828
            devices = None
×
829

830
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
831
        # method 1: select device by id
832
        if keystore.soft_device_id:
×
833
            for info in infos:
×
834
                if info.soft_device_id == keystore.soft_device_id:
×
835
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
836
                    return info
×
837
        # method 2: select device by label
838
        #           but only if not a placeholder label and only if there is no collision
839
        device_labels = [info.label for info in infos]
×
840
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
841
                and device_labels.count(keystore.label) == 1):
842
            for info in infos:
×
843
                if info.label == keystore.label:
×
844
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
845
                    return info
×
846
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
847
        #           saved for keystore anyway, select it
848
        if (len(infos) == 1
×
849
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
850
                and keystore.soft_device_id is None):
851
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
852
            return infos[0]
×
853

854
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
855
        if not allow_user_interaction:
×
856
            raise CannotAutoSelectDevice()
×
857
        # ask user to select device manually
858
        msg = (
×
859
                _("Could not automatically pair with device for given keystore.") + "\n"
860
                + f"(keystore label: {keystore.label!r}, "
861
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
862
        msg += _("Please select which {} device to use:").format(plugin.device)
×
863
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
864
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
865
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
866
                                init=(_("initialized") if info.initialized else _("wiped")),
867
                                transport=info.device.transport_ui_string,
868
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
869
                        for info in infos]
870
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
871
                          f"num options: {len(infos)}. options: {infos}")
872
        c = handler.query_choice(msg, descriptions)
×
873
        if c is None:
×
874
            raise UserCancelled()
×
875
        info = infos[c]
×
876
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
877
        # note: updated label/soft_device_id will be saved after pairing succeeds
878
        return info
×
879

880
    @runs_in_hwd_thread
5✔
881
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
882
        try:
×
883
            import hid
×
884
        except ImportError:
×
885
            return []
×
886

887
        devices = []
×
888
        for d in hid.enumerate(0, 0):
×
889
            vendor_id = d['vendor_id']
×
890
            product_key = (vendor_id, d['product_id'])
×
891
            plugin = None
×
892
            if product_key in self._recognised_hardware:
×
893
                plugin = self._recognised_hardware[product_key]
×
894
            elif vendor_id in self._recognised_vendor:
×
895
                plugin = self._recognised_vendor[vendor_id]
×
896
            if plugin:
×
897
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
898
                if device:
×
899
                    devices.append(device)
×
900
        return devices
×
901

902
    @runs_in_hwd_thread
5✔
903
    @profiler
5✔
904
    def scan_devices(self) -> Sequence['Device']:
5✔
905
        self.logger.info("scanning devices...")
×
906

907
        # First see what's connected that we know about
908
        devices = self._scan_devices_with_hid()
×
909

910
        # Let plugin handlers enumerate devices we don't know about
911
        with self.lock:
×
912
            enumerate_funcs = list(self._enumerate_func)
×
913
        for f in enumerate_funcs:
×
914
            try:
×
915
                new_devices = f()
×
916
            except BaseException as e:
×
917
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
918
            else:
919
                devices.extend(new_devices)
×
920

921
        # find out what was disconnected
922
        client_ids = [dev.id_ for dev in devices]
×
923
        disconnected_clients = []
×
924
        with self.lock:
×
925
            connected = {}
×
926
            for client, id_ in self.clients.items():
×
927
                if id_ in client_ids and client.has_usable_connection_with_device():
×
928
                    connected[client] = id_
×
929
                else:
930
                    disconnected_clients.append((client, id_))
×
931
            self.clients = connected
×
932

933
        # Unpair disconnected devices
934
        for client, id_ in disconnected_clients:
×
935
            self.unpair_id(id_)
×
936
            if client.handler:
×
937
                client.handler.update_status(False)
×
938

939
        return devices
×
940

941
    @classmethod
5✔
942
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
943
        ret = {}
×
944
        # add libusb
945
        try:
×
946
            import usb1
×
947
        except Exception as e:
×
948
            ret["libusb.version"] = None
×
949
        else:
950
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
951
            try:
×
952
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
953
            except AttributeError:
×
954
                ret["libusb.path"] = None
×
955
        # add hidapi
956
        try:
×
957
            import hid
×
958
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
959
        except Exception as e:
×
960
            from importlib.metadata import version
×
961
            try:
×
962
                ret["hidapi.version"] = version("hidapi")
×
963
            except ImportError:
×
964
                ret["hidapi.version"] = None
×
965
        return ret
×
966

967
    def trigger_pairings(
5✔
968
            self,
969
            keystores: Sequence['KeyStore'],
970
            *,
971
            allow_user_interaction: bool = True,
972
            devices: Sequence['Device'] = None,
973
    ) -> None:
974
        """Given a list of keystores, try to pair each with a connected hardware device.
975

976
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
977
        try to pair each keystore individually. Consider the following scenario:
978
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
979
        - assume none of the devices are paired yet
980
        1. if we tried to individually pair keystores, we might try with ks1 first
981
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
982
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
983
             which is confusing and error-prone. It's especially problematic if the hw device does
984
             not support labels (such as Ledger), as then the user cannot easily distinguish
985
             same-type devices. (see #4199)
986
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
987
           and then tell the user ks1 could not be paired (and there are no devices left to try)
988
        """
989
        from .keystore import Hardware_KeyStore
×
990
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
991
        if not keystores:
×
992
            return
×
993
        if devices is None:
×
994
            devices = self.scan_devices()
×
995
        # first pair with all devices that can be auto-selected
996
        for ks in keystores:
×
997
            try:
×
998
                ks.get_client(
×
999
                    force_pair=True,
1000
                    allow_user_interaction=False,
1001
                    devices=devices,
1002
                )
1003
            except UserCancelled:
×
1004
                pass
×
1005
        if allow_user_interaction:
×
1006
            # now do manual selections
1007
            for ks in keystores:
×
1008
                try:
×
1009
                    ks.get_client(
×
1010
                        force_pair=True,
1011
                        allow_user_interaction=True,
1012
                        devices=devices,
1013
                    )
1014
                except UserCancelled:
×
1015
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc