• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6041435855650816

10 Jan 2025 03:19PM UTC coverage: 60.634% (-0.002%) from 60.636%
6041435855650816

push

CirrusCI

ecdsa
swaps: make the zeroconf option non-persisted

Since we allow swaps with random servers, we should not persist that setting.

1 of 4 new or added lines in 1 file covered. (25.0%)

65 existing lines in 25 files now uncovered.

20116 of 33176 relevant lines covered (60.63%)

3.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.59
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
6✔
27
import pkgutil
6✔
28
import importlib.util
6✔
29
import time
6✔
30
import threading
6✔
31
import traceback
6✔
32
import sys
6✔
33
import aiohttp
6✔
34

35
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
6✔
36
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
37
import concurrent
6✔
38
import zipimport
6✔
39
from concurrent import futures
6✔
40
from functools import wraps, partial
6✔
41

42
from .i18n import _
6✔
43
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
6✔
44
from . import bip32
6✔
45
from . import plugins
6✔
46
from .simple_config import SimpleConfig
6✔
47
from .logging import get_logger, Logger
6✔
48
from .crypto import sha256
6✔
49

50
if TYPE_CHECKING:
6✔
51
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
52
    from .keystore import Hardware_KeyStore, KeyStore
×
53
    from .wallet import Abstract_Wallet
×
54

55

56
_logger = get_logger(__name__)
6✔
57
plugin_loaders = {}
6✔
58
hook_names = set()
6✔
59
hooks = {}
6✔
60

61

62
class Plugins(DaemonThread):
6✔
63

64
    LOGGING_SHORTCUT = 'p'
6✔
65
    pkgpath = os.path.dirname(plugins.__file__)
6✔
66

67
    @profiler
6✔
68
    def __init__(self, config: SimpleConfig, gui_name):
6✔
69
        DaemonThread.__init__(self)
6✔
70
        self.name = 'Plugins'  # set name of thread
6✔
71
        self.config = config
6✔
72
        self.hw_wallets = {}
6✔
73
        self.plugins = {}  # type: Dict[str, BasePlugin]
6✔
74
        self.internal_plugin_metadata = {}
6✔
75
        self.external_plugin_metadata = {}
6✔
76
        self.gui_name = gui_name
6✔
77
        self.device_manager = DeviceMgr(config)
6✔
78
        self.find_internal_plugins()
6✔
79
        self.find_external_plugins()
6✔
80
        self.load_plugins()
6✔
81
        self.add_jobs(self.device_manager.thread_jobs())
6✔
82
        self.start()
6✔
83

84
    @property
6✔
85
    def descriptions(self):
6✔
86
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
87

88
    def find_internal_plugins(self):
6✔
89
        """Populates self.internal_plugin_metadata
90
        """
91
        iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
6✔
92
        for loader, name, ispkg in iter_modules:
6✔
93
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
94
            #       once as data and once as code. To honor the "no duplicates" rule below,
95
            #       we exclude the ones packaged as *code*, here:
96
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
6✔
97
                continue
×
98
            full_name = f'electrum.plugins.{name}'
6✔
99
            spec = importlib.util.find_spec(full_name)
6✔
100
            if spec is None:  # pkgutil found it but importlib can't ?!
6✔
101
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
102
            module = self.exec_module_from_spec(spec, full_name)
6✔
103
            d = module.__dict__
6✔
104
            if 'fullname' not in d:
6✔
105
                continue
6✔
106
            d['display_name'] = d['fullname']
6✔
107
            gui_good = self.gui_name in d.get('available_for', [])
6✔
108
            if not gui_good:
6✔
109
                continue
6✔
110
            details = d.get('registers_wallet_type')
6✔
111
            if details:
6✔
112
                self.register_wallet_type(name, gui_good, details)
6✔
113
            details = d.get('registers_keystore')
6✔
114
            if details:
6✔
115
                self.register_keystore(name, gui_good, details)
6✔
116
            if d.get('requires_wallet_type'):
6✔
117
                # trustedcoin will not be added to list
118
                continue
6✔
119
            if name in self.internal_plugin_metadata:
6✔
120
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
121
                raise Exception(f"duplicate plugins? for {name=}")
×
122
            self.internal_plugin_metadata[name] = d
6✔
123

124
    def exec_module_from_spec(self, spec, path):
6✔
125
        try:
6✔
126
            module = importlib.util.module_from_spec(spec)
6✔
127
            # sys.modules needs to be modified for relative imports to work
128
            # see https://stackoverflow.com/a/50395128
129
            sys.modules[path] = module
6✔
130
            if sys.version_info >= (3, 10):
6✔
131
                spec.loader.exec_module(module)
5✔
132
            else:
UNCOV
133
                module = spec.loader.load_module(path)
1✔
134
        except Exception as e:
×
135
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
136
        return module
6✔
137

138
    def load_plugins(self):
6✔
139
        self.load_internal_plugins()
6✔
140
        self.load_external_plugins()
6✔
141

142
    def load_internal_plugins(self):
6✔
143
        for name, d in self.internal_plugin_metadata.items():
6✔
144
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
6✔
145
                try:
×
146
                    self.load_internal_plugin(name)
×
147
                except BaseException as e:
×
148
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
149

150
    def load_external_plugin(self, name):
6✔
151
        if name in self.plugins:
×
152
            return self.plugins[name]
×
153
        # If we do not have the metadata, it was not detected by `load_external_plugins`
154
        # on startup, or added by manual user installation after that point.
155
        metadata = self.external_plugin_metadata.get(name)
×
156
        if metadata is None:
×
157
            self.logger.exception(f"attempted to load unknown external plugin {name}")
×
158
            return
×
159
        full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
160
        spec = importlib.util.find_spec(full_name)
×
161
        if spec is None:
×
162
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
163
        module = self.exec_module_from_spec(spec, full_name)
×
164
        plugin = module.Plugin(self, self.config, name)
×
165
        self.add_jobs(plugin.thread_jobs())
×
166
        self.plugins[name] = plugin
×
167
        self.logger.info(f"loaded external plugin {name}")
×
168
        return plugin
×
169

170
    def _has_root_permissions(self, path):
6✔
171
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
172

173
    def get_external_plugin_dir(self):
6✔
174
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
6✔
175
            return
×
176
        pkg_path = '/opt/electrum_plugins'
6✔
177
        if not os.path.exists(pkg_path):
6✔
178
            self.logger.info(f'direcctory {pkg_path} does not exist')
6✔
179
            return
6✔
180
        if not self._has_root_permissions(pkg_path):
×
181
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
182
            return
×
183
        return pkg_path
×
184

185
    def external_plugin_path(self, name):
6✔
186
        metadata = self.external_plugin_metadata[name]
×
187
        filename = metadata['filename']
×
188
        return os.path.join(self.get_external_plugin_dir(), filename)
×
189

190
    def find_external_plugins(self):
6✔
191
        pkg_path = self.get_external_plugin_dir()
6✔
192
        if pkg_path is None:
6✔
193
            return
6✔
194
        for filename in os.listdir(pkg_path):
×
195
            path = os.path.join(pkg_path, filename)
×
196
            if not self._has_root_permissions(path):
×
197
                self.logger.info(f'not loading {path}: file has user write permissions')
×
198
                continue
×
199
            try:
×
200
                zipfile = zipimport.zipimporter(path)
×
201
            except zipimport.ZipImportError:
×
202
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
203
                continue
×
204
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
205
                if b is False:
×
206
                    continue
×
207
                if name in self.internal_plugin_metadata:
×
208
                    raise Exception(f"duplicate plugins for name={name}")
×
209
                if name in self.external_plugin_metadata:
×
210
                    raise Exception(f"duplicate plugins for name={name}")
×
211
                module_path = f'electrum_external_plugins.{name}'
×
212
                if sys.version_info >= (3, 10):
×
213
                    spec = zipfile.find_spec(name)
×
214
                    module = self.exec_module_from_spec(spec, module_path)
×
215
                else:
216
                    module = zipfile.load_module(name)
×
217
                    sys.modules[module_path] = module
×
218
                d = module.__dict__
×
219
                gui_good = self.gui_name in d.get('available_for', [])
×
220
                if not gui_good:
×
221
                    continue
×
222
                d['filename'] = filename
×
223
                if 'fullname' not in d:
×
224
                    continue
×
225
                d['display_name'] = d['fullname']
×
226
                d['zip_hash_sha256'] = get_file_hash256(path)
×
227
                self.external_plugin_metadata[name] = d
×
228

229
    def load_external_plugins(self):
6✔
230
        for name, d in self.external_plugin_metadata.items():
6✔
231
            if self.config.get('enable_plugin_' + name):
×
232
                try:
×
233
                    self.load_external_plugin(name)
×
234
                except BaseException as e:
×
235
                    traceback.print_exc(file=sys.stdout)  # shouldn't this be... suppressed unless -v?
×
236
                    self.logger.exception(f"cannot initialize plugin {name} {e!r}")
×
237

238
    def get(self, name):
6✔
239
        return self.plugins.get(name)
×
240

241
    def count(self):
6✔
242
        return len(self.plugins)
×
243

244
    def load_plugin(self, name) -> 'BasePlugin':
6✔
245
        """Imports the code of the given plugin.
246
        note: can be called from any thread.
247
        """
248
        if name in self.internal_plugin_metadata:
6✔
249
            return self.load_internal_plugin(name)
6✔
250
        elif name in self.external_plugin_metadata:
×
251
            return self.load_external_plugin(name)
×
252
        else:
253
            raise Exception(f"could not find plugin {name!r}")
×
254

255
    def load_internal_plugin(self, name) -> 'BasePlugin':
6✔
256
        if name in self.plugins:
6✔
257
            return self.plugins[name]
×
258
        full_name = f'electrum.plugins.{name}.{self.gui_name}'
6✔
259
        spec = importlib.util.find_spec(full_name)
6✔
260
        if spec is None:
6✔
261
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
262
        try:
6✔
263
            module = importlib.util.module_from_spec(spec)
6✔
264
            spec.loader.exec_module(module)
6✔
265
            plugin = module.Plugin(self, self.config, name)
6✔
266
        except Exception as e:
×
267
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
268
        self.add_jobs(plugin.thread_jobs())
6✔
269
        self.plugins[name] = plugin
6✔
270
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
6✔
271
        return plugin
6✔
272

273
    def close_plugin(self, plugin):
6✔
274
        self.remove_jobs(plugin.thread_jobs())
×
275

276
    def enable(self, name: str) -> 'BasePlugin':
6✔
277
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
278
        p = self.get(name)
×
279
        if p:
×
280
            return p
×
281
        return self.load_plugin(name)
×
282

283
    def disable(self, name: str) -> None:
6✔
284
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
285
        p = self.get(name)
×
286
        if not p:
×
287
            return
×
288
        self.plugins.pop(name)
×
289
        p.close()
×
290
        self.logger.info(f"closed {name}")
×
291

292
    @classmethod
6✔
293
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
6✔
294
        return key.startswith('enable_plugin_')
×
295

296
    def toggle(self, name: str) -> Optional['BasePlugin']:
6✔
297
        p = self.get(name)
×
298
        return self.disable(name) if p else self.enable(name)
×
299

300
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
6✔
301
        d = self.descriptions.get(name)
×
302
        if not d:
×
303
            return False
×
304
        deps = d.get('requires', [])
×
305
        for dep, s in deps:
×
306
            try:
×
307
                __import__(dep)
×
308
            except ImportError as e:
×
309
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
310
                return False
×
311
        requires = d.get('requires_wallet_type', [])
×
312
        return not requires or wallet.wallet_type in requires
×
313

314
    def get_hardware_support(self):
6✔
315
        out = []
×
316
        for name, (gui_good, details) in self.hw_wallets.items():
×
317
            if gui_good:
×
318
                try:
×
319
                    p = self.get_plugin(name)
×
320
                    if p.is_available():
×
321
                        out.append(HardwarePluginToScan(name=name,
×
322
                                                        description=details[2],
323
                                                        plugin=p,
324
                                                        exception=None))
325
                except Exception as e:
×
326
                    self.logger.exception(f"cannot load plugin for: {name}")
×
327
                    out.append(HardwarePluginToScan(name=name,
×
328
                                                    description=details[2],
329
                                                    plugin=None,
330
                                                    exception=e))
331
        return out
×
332

333
    def register_wallet_type(self, name, gui_good, wallet_type):
6✔
334
        from .wallet import register_wallet_type, register_constructor
6✔
335
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
6✔
336

337
        def loader():
6✔
338
            plugin = self.get_plugin(name)
6✔
339
            register_constructor(wallet_type, plugin.wallet_class)
6✔
340
        register_wallet_type(wallet_type)
6✔
341
        plugin_loaders[wallet_type] = loader
6✔
342

343
    def register_keystore(self, name, gui_good, details):
6✔
344
        from .keystore import register_keystore
6✔
345

346
        def dynamic_constructor(d):
6✔
347
            return self.get_plugin(name).keystore_class(d)
6✔
348
        if details[0] == 'hardware':
6✔
349
            self.hw_wallets[name] = (gui_good, details)
6✔
350
            self.logger.info(f"registering hardware {name}: {details}")
6✔
351
            register_keystore(details[1], dynamic_constructor)
6✔
352

353
    def get_plugin(self, name: str) -> 'BasePlugin':
6✔
354
        if name not in self.plugins:
6✔
355
            self.load_plugin(name)
6✔
356
        return self.plugins[name]
6✔
357

358
    def run(self):
6✔
359
        while self.is_running():
6✔
360
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
6✔
361
            self.run_jobs()
6✔
362
        self.on_stop()
6✔
363

364
def get_file_hash256(path: str) -> str:
6✔
365
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
366
    with open(path, 'rb') as f:
×
367
        return sha256(f.read()).hex()
×
368

369
def hook(func):
6✔
370
    hook_names.add(func.__name__)
6✔
371
    return func
6✔
372

373

374
def run_hook(name, *args):
6✔
375
    results = []
6✔
376
    f_list = hooks.get(name, [])
6✔
377
    for p, f in f_list:
6✔
378
        if p.is_enabled():
6✔
379
            try:
6✔
380
                r = f(*args)
6✔
381
            except Exception:
×
382
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
383
                r = False
×
384
            if r:
6✔
385
                results.append(r)
×
386

387
    if results:
6✔
388
        assert len(results) == 1, results
×
389
        return results[0]
×
390

391

392
class BasePlugin(Logger):
6✔
393

394
    def __init__(self, parent, config: 'SimpleConfig', name):
6✔
395
        self.parent = parent  # type: Plugins  # The plugins object
6✔
396
        self.name = name
6✔
397
        self.config = config
6✔
398
        self.wallet = None  # fixme: this field should not exist
6✔
399
        Logger.__init__(self)
6✔
400
        # add self to hooks
401
        for k in dir(self):
6✔
402
            if k in hook_names:
6✔
403
                l = hooks.get(k, [])
6✔
404
                l.append((self, getattr(self, k)))
6✔
405
                hooks[k] = l
6✔
406

407
    def __str__(self):
6✔
408
        return self.name
×
409

410
    def close(self):
6✔
411
        # remove self from hooks
412
        for attr_name in dir(self):
×
413
            if attr_name in hook_names:
×
414
                # found attribute in self that is also the name of a hook
415
                l = hooks.get(attr_name, [])
×
416
                try:
×
417
                    l.remove((self, getattr(self, attr_name)))
×
418
                except ValueError:
×
419
                    # maybe attr name just collided with hook name and was not hook
420
                    continue
×
421
                hooks[attr_name] = l
×
422
        self.parent.close_plugin(self)
×
423
        self.on_close()
×
424

425
    def on_close(self):
6✔
426
        pass
×
427

428
    def requires_settings(self) -> bool:
6✔
429
        return False
×
430

431
    def thread_jobs(self):
6✔
432
        return []
6✔
433

434
    def is_enabled(self):
6✔
435
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
436

437
    def is_available(self):
6✔
438
        return True
×
439

440
    def can_user_disable(self):
6✔
441
        return True
×
442

443
    def settings_widget(self, window):
6✔
444
        raise NotImplementedError()
×
445

446
    def settings_dialog(self, window):
6✔
447
        raise NotImplementedError()
×
448

449
    def read_file(self, filename: str) -> bytes:
6✔
450
        import zipfile
×
451
        if self.name in self.parent.external_plugin_metadata:
×
452
            plugin_filename = self.parent.external_plugin_path(self.name)
×
453
            with zipfile.ZipFile(plugin_filename) as myzip:
×
454
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
455
                    return myfile.read()
×
456
        else:
457
            path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
458
            with open(path, 'rb') as myfile:
×
459
                return myfile.read()
×
460

461

462
class DeviceUnpairableError(UserFacingException): pass
6✔
463
class HardwarePluginLibraryUnavailable(Exception): pass
6✔
464
class CannotAutoSelectDevice(Exception): pass
6✔
465

466

467
class Device(NamedTuple):
6✔
468
    path: Union[str, bytes]
6✔
469
    interface_number: int
6✔
470
    id_: str
6✔
471
    product_key: Any   # when using hid, often Tuple[int, int]
6✔
472
    usage_page: int
6✔
473
    transport_ui_string: str
6✔
474

475

476
class DeviceInfo(NamedTuple):
6✔
477
    device: Device
6✔
478
    label: Optional[str] = None
6✔
479
    initialized: Optional[bool] = None
6✔
480
    exception: Optional[Exception] = None
6✔
481
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
6✔
482
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
6✔
483
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
6✔
484

485

486
class HardwarePluginToScan(NamedTuple):
6✔
487
    name: str
6✔
488
    description: str
6✔
489
    plugin: Optional['HW_PluginBase']
6✔
490
    exception: Optional[Exception]
6✔
491

492

493
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
6✔
494

495

496
# hidapi is not thread-safe
497
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
498
#     https://github.com/libusb/hidapi/issues/45
499
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
500
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
501
# It is not entirely clear to me, exactly what is safe and what isn't, when
502
# using multiple threads...
503
# Hence, we use a single thread for all device communications, including
504
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
505
# the following thread:
506
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
6✔
507
    max_workers=1,
508
    thread_name_prefix='hwd_comms_thread'
509
)
510

511
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
512
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
513
# To keep it simple, let's just import it now, as we are likely in the main thread here.
514
if threading.current_thread() is not threading.main_thread():
6✔
515
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
516
try:
6✔
517
    import hid
6✔
518
except ImportError:
6✔
519
    pass
6✔
520

521

522
T = TypeVar('T')
6✔
523

524

525
def run_in_hwd_thread(func: Callable[[], T]) -> T:
6✔
526
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
527
        return func()
×
528
    else:
529
        fut = _hwd_comms_executor.submit(func)
×
530
        return fut.result()
×
531
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
532

533

534
def runs_in_hwd_thread(func):
6✔
535
    @wraps(func)
6✔
536
    def wrapper(*args, **kwargs):
6✔
537
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
538
    return wrapper
6✔
539

540

541
def assert_runs_in_hwd_thread():
6✔
542
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
543
        raise Exception("must only be called from HWD communication thread")
×
544

545

546
class DeviceMgr(ThreadJob):
6✔
547
    """Manages hardware clients.  A client communicates over a hardware
548
    channel with the device.
549

550
    In addition to tracking device HID IDs, the device manager tracks
551
    hardware wallets and manages wallet pairing.  A HID ID may be
552
    paired with a wallet when it is confirmed that the hardware device
553
    matches the wallet, i.e. they have the same master public key.  A
554
    HID ID can be unpaired if e.g. it is wiped.
555

556
    Because of hotplugging, a wallet must request its client
557
    dynamically each time it is required, rather than caching it
558
    itself.
559

560
    The device manager is shared across plugins, so just one place
561
    does hardware scans when needed.  By tracking HID IDs, if a device
562
    is plugged into a different port the wallet is automatically
563
    re-paired.
564

565
    Wallets are informed on connect / disconnect events.  It must
566
    implement connected(), disconnected() callbacks.  Being connected
567
    implies a pairing.  Callbacks can happen in any thread context,
568
    and we do them without holding the lock.
569

570
    Confusingly, the HID ID (serial number) reported by the HID system
571
    doesn't match the device ID reported by the device itself.  We use
572
    the HID IDs.
573

574
    This plugin is thread-safe.  Currently only devices supported by
575
    hidapi are implemented."""
576

577
    def __init__(self, config: SimpleConfig):
6✔
578
        ThreadJob.__init__(self)
6✔
579
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
580
        self.pairing_code_to_id = {}  # type: Dict[str, str]
6✔
581
        # A client->id_ map. Needs self.lock.
582
        self.clients = {}  # type: Dict[HardwareClientBase, str]
6✔
583
        # What we recognise.  (vendor_id, product_id) -> Plugin
584
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
6✔
585
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
6✔
586
        # Custom enumerate functions for devices we don't know about.
587
        self._enumerate_func = set()  # Needs self.lock.
6✔
588

589
        self.lock = threading.RLock()
6✔
590

591
        self.config = config
6✔
592

593
    def thread_jobs(self):
6✔
594
        # Thread job to handle device timeouts
595
        return [self]
6✔
596

597
    def run(self):
6✔
598
        '''Handle device timeouts.  Runs in the context of the Plugins
599
        thread.'''
600
        with self.lock:
6✔
601
            clients = list(self.clients.keys())
6✔
602
        cutoff = time.time() - self.config.get_session_timeout()
6✔
603
        for client in clients:
6✔
604
            client.timeout(cutoff)
×
605

606
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
6✔
607
        for pair in device_pairs:
×
608
            self._recognised_hardware[pair] = plugin
×
609

610
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
6✔
611
        for vendor_id in vendor_ids:
×
612
            self._recognised_vendor[vendor_id] = plugin
×
613

614
    def register_enumerate_func(self, func):
6✔
615
        with self.lock:
×
616
            self._enumerate_func.add(func)
×
617

618
    @runs_in_hwd_thread
6✔
619
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
6✔
620
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
621
        # Get from cache first
622
        client = self._client_by_id(device.id_)
×
623
        if client:
×
624
            return client
×
625
        client = plugin.create_client(device, handler)
×
626
        if client:
×
627
            self.logger.info(f"Registering {client}")
×
628
            with self.lock:
×
629
                self.clients[client] = device.id_
×
630
        return client
×
631

632
    def id_by_pairing_code(self, pairing_code):
6✔
633
        with self.lock:
×
634
            return self.pairing_code_to_id.get(pairing_code)
×
635

636
    def pairing_code_by_id(self, id_):
6✔
637
        with self.lock:
×
638
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
639
                if id2 == id_:
×
640
                    return pairing_code
×
641
            return None
×
642

643
    def unpair_pairing_code(self, pairing_code):
6✔
644
        with self.lock:
×
645
            if pairing_code not in self.pairing_code_to_id:
×
646
                return
×
647
            _id = self.pairing_code_to_id.pop(pairing_code)
×
648
        self._close_client(_id)
×
649

650
    def unpair_id(self, id_):
6✔
651
        pairing_code = self.pairing_code_by_id(id_)
×
652
        if pairing_code:
×
653
            self.unpair_pairing_code(pairing_code)
×
654
        else:
655
            self._close_client(id_)
×
656

657
    def _close_client(self, id_):
6✔
658
        with self.lock:
×
659
            client = self._client_by_id(id_)
×
660
            self.clients.pop(client, None)
×
661
        if client:
×
662
            client.close()
×
663

664
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
6✔
665
        with self.lock:
×
666
            for client, client_id in self.clients.items():
×
667
                if client_id == id_:
×
668
                    return client
×
669
        return None
×
670

671
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
6✔
672
        '''Returns a client for the device ID if one is registered.  If
673
        a device is wiped or in bootloader mode pairing is impossible;
674
        in such cases we communicate by device ID and not wallet.'''
675
        if scan_now:
×
676
            self.scan_devices()
×
677
        return self._client_by_id(id_)
×
678

679
    @runs_in_hwd_thread
6✔
680
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
6✔
681
                            keystore: 'Hardware_KeyStore',
682
                            force_pair: bool, *,
683
                            devices: Sequence['Device'] = None,
684
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
685
        self.logger.info("getting client for keystore")
×
686
        if handler is None:
×
687
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
688
        handler.update_status(False)
×
689
        pcode = keystore.pairing_code()
×
690
        client = None
×
691
        # search existing clients first (fast-path)
692
        if not devices:
×
693
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
694
        # search clients again, now allowing a (slow) scan
695
        if client is None:
×
696
            if devices is None:
×
697
                devices = self.scan_devices()
×
698
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
699
        if client is None and force_pair:
×
700
            try:
×
701
                info = self.select_device(plugin, handler, keystore, devices,
×
702
                                          allow_user_interaction=allow_user_interaction)
703
            except CannotAutoSelectDevice:
×
704
                pass
×
705
            else:
706
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
707
        if client:
×
708
            handler.update_status(True)
×
709
            # note: if select_device was called, we might also update label etc here:
710
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
711
        self.logger.info("end client for keystore")
×
712
        return client
×
713

714
    def client_by_pairing_code(
6✔
715
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
716
        devices: Sequence['Device'],
717
    ) -> Optional['HardwareClientBase']:
718
        _id = self.id_by_pairing_code(pairing_code)
×
719
        client = self._client_by_id(_id)
×
720
        if client:
×
721
            if type(client.plugin) != type(plugin):
×
722
                return
×
723
            # An unpaired client might have another wallet's handler
724
            # from a prior scan.  Replace to fix dialog parenting.
725
            client.handler = handler
×
726
            return client
×
727

728
        for device in devices:
×
729
            if device.id_ == _id:
×
730
                return self.create_client(device, handler, plugin)
×
731

732
    def force_pair_keystore(
6✔
733
        self,
734
        *,
735
        plugin: 'HW_PluginBase',
736
        handler: 'HardwareHandlerBase',
737
        info: 'DeviceInfo',
738
        keystore: 'Hardware_KeyStore',
739
    ) -> 'HardwareClientBase':
740
        xpub = keystore.xpub
×
741
        derivation = keystore.get_derivation_prefix()
×
742
        assert derivation is not None
×
743
        xtype = bip32.xpub_type(xpub)
×
744
        client = self._client_by_id(info.device.id_)
×
745
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
746
            # See comment above for same code
747
            client.handler = handler
×
748
            # This will trigger a PIN/passphrase entry request
749
            try:
×
750
                client_xpub = client.get_xpub(derivation, xtype)
×
751
            except (UserCancelled, RuntimeError):
×
752
                # Bad / cancelled PIN / passphrase
753
                client_xpub = None
×
754
            if client_xpub == xpub:
×
755
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
756
                with self.lock:
×
757
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
758
                return client
×
759

760
        # The user input has wrong PIN or passphrase, or cancelled input,
761
        # or it is not pairable
762
        raise DeviceUnpairableError(
×
763
            _('Electrum cannot pair with your {}.\n\n'
764
              'Before you request bitcoins to be sent to addresses in this '
765
              'wallet, ensure you can pair with your device, or that you have '
766
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
767
              'receive will be unspendable.').format(plugin.device))
768

769
    def list_pairable_device_infos(
6✔
770
        self,
771
        *,
772
        handler: Optional['HardwareHandlerBase'],
773
        plugin: 'HW_PluginBase',
774
        devices: Sequence['Device'] = None,
775
        include_failing_clients: bool = False,
776
    ) -> List['DeviceInfo']:
777
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
778
        Already paired devices are also included, as it is okay to reuse them.
779
        """
780
        if not plugin.libraries_available:
×
781
            message = plugin.get_library_not_available_message()
×
782
            raise HardwarePluginLibraryUnavailable(message)
×
783
        if devices is None:
×
784
            devices = self.scan_devices()
×
785
        infos = []
×
786
        for device in devices:
×
787
            if not plugin.can_recognize_device(device):
×
788
                continue
×
789
            try:
×
790
                client = self.create_client(device, handler, plugin)
×
791
                if not client:
×
792
                    continue
×
793
                label = client.label()
×
794
                is_initialized = client.is_initialized()
×
795
                soft_device_id = client.get_soft_device_id()
×
796
                model_name = client.device_model_name()
×
797
            except Exception as e:
×
798
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
799
                if include_failing_clients:
×
800
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
801
                continue
×
802
            infos.append(DeviceInfo(device=device,
×
803
                                    label=label,
804
                                    initialized=is_initialized,
805
                                    plugin_name=plugin.name,
806
                                    soft_device_id=soft_device_id,
807
                                    model_name=model_name))
808

809
        return infos
×
810

811
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
6✔
812
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
813
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
814
        """Select the device to use for keystore."""
815
        # ideally this should not be called from the GUI thread...
816
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
817
        while True:
818
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
819
            if infos:
×
820
                break
×
821
            if not allow_user_interaction:
×
822
                raise CannotAutoSelectDevice()
×
823
            msg = _('Please insert your {}').format(plugin.device)
×
824
            msg += " ("
×
825
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
826
                msg += f"label: {keystore.label}, "
×
827
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
828
            msg += ').\n\n{}\n\n{}'.format(
×
829
                _('Verify the cable is connected and that '
830
                  'no other application is using it.'),
831
                _('Try to connect again?')
832
            )
833
            if not handler.yes_no_question(msg):
×
834
                raise UserCancelled()
×
835
            devices = None
×
836

837
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
838
        # method 1: select device by id
839
        if keystore.soft_device_id:
×
840
            for info in infos:
×
841
                if info.soft_device_id == keystore.soft_device_id:
×
842
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
843
                    return info
×
844
        # method 2: select device by label
845
        #           but only if not a placeholder label and only if there is no collision
846
        device_labels = [info.label for info in infos]
×
847
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
848
                and device_labels.count(keystore.label) == 1):
849
            for info in infos:
×
850
                if info.label == keystore.label:
×
851
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
852
                    return info
×
853
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
854
        #           saved for keystore anyway, select it
855
        if (len(infos) == 1
×
856
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
857
                and keystore.soft_device_id is None):
858
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
859
            return infos[0]
×
860

861
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
862
        if not allow_user_interaction:
×
863
            raise CannotAutoSelectDevice()
×
864
        # ask user to select device manually
865
        msg = (
×
866
                _("Could not automatically pair with device for given keystore.") + "\n"
867
                + f"(keystore label: {keystore.label!r}, "
868
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
869
        msg += _("Please select which {} device to use:").format(plugin.device)
×
870
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
871
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
872
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
873
                                init=(_("initialized") if info.initialized else _("wiped")),
874
                                transport=info.device.transport_ui_string,
875
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
876
                        for info in infos]
877
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
878
                          f"num options: {len(infos)}. options: {infos}")
879
        c = handler.query_choice(msg, descriptions)
×
880
        if c is None:
×
881
            raise UserCancelled()
×
882
        info = infos[c]
×
883
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
884
        # note: updated label/soft_device_id will be saved after pairing succeeds
885
        return info
×
886

887
    @runs_in_hwd_thread
6✔
888
    def _scan_devices_with_hid(self) -> List['Device']:
6✔
889
        try:
×
890
            import hid
×
891
        except ImportError:
×
892
            return []
×
893

894
        devices = []
×
895
        for d in hid.enumerate(0, 0):
×
896
            vendor_id = d['vendor_id']
×
897
            product_key = (vendor_id, d['product_id'])
×
898
            plugin = None
×
899
            if product_key in self._recognised_hardware:
×
900
                plugin = self._recognised_hardware[product_key]
×
901
            elif vendor_id in self._recognised_vendor:
×
902
                plugin = self._recognised_vendor[vendor_id]
×
903
            if plugin:
×
904
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
905
                if device:
×
906
                    devices.append(device)
×
907
        return devices
×
908

909
    @runs_in_hwd_thread
6✔
910
    @profiler
6✔
911
    def scan_devices(self) -> Sequence['Device']:
6✔
912
        self.logger.info("scanning devices...")
×
913

914
        # First see what's connected that we know about
915
        devices = self._scan_devices_with_hid()
×
916

917
        # Let plugin handlers enumerate devices we don't know about
918
        with self.lock:
×
919
            enumerate_funcs = list(self._enumerate_func)
×
920
        for f in enumerate_funcs:
×
921
            try:
×
922
                new_devices = f()
×
923
            except BaseException as e:
×
924
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
925
            else:
926
                devices.extend(new_devices)
×
927

928
        # find out what was disconnected
929
        client_ids = [dev.id_ for dev in devices]
×
930
        disconnected_clients = []
×
931
        with self.lock:
×
932
            connected = {}
×
933
            for client, id_ in self.clients.items():
×
934
                if id_ in client_ids and client.has_usable_connection_with_device():
×
935
                    connected[client] = id_
×
936
                else:
937
                    disconnected_clients.append((client, id_))
×
938
            self.clients = connected
×
939

940
        # Unpair disconnected devices
941
        for client, id_ in disconnected_clients:
×
942
            self.unpair_id(id_)
×
943
            if client.handler:
×
944
                client.handler.update_status(False)
×
945

946
        return devices
×
947

948
    @classmethod
6✔
949
    def version_info(cls) -> Mapping[str, Optional[str]]:
6✔
950
        ret = {}
×
951
        # add libusb
952
        try:
×
953
            import usb1
×
954
        except Exception as e:
×
955
            ret["libusb.version"] = None
×
956
        else:
957
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
958
            try:
×
959
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
960
            except AttributeError:
×
961
                ret["libusb.path"] = None
×
962
        # add hidapi
963
        try:
×
964
            import hid
×
965
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
966
        except Exception as e:
×
967
            from importlib.metadata import version
×
968
            try:
×
969
                ret["hidapi.version"] = version("hidapi")
×
970
            except ImportError:
×
971
                ret["hidapi.version"] = None
×
972
        return ret
×
973

974
    def trigger_pairings(
6✔
975
            self,
976
            keystores: Sequence['KeyStore'],
977
            *,
978
            allow_user_interaction: bool = True,
979
            devices: Sequence['Device'] = None,
980
    ) -> None:
981
        """Given a list of keystores, try to pair each with a connected hardware device.
982

983
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
984
        try to pair each keystore individually. Consider the following scenario:
985
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
986
        - assume none of the devices are paired yet
987
        1. if we tried to individually pair keystores, we might try with ks1 first
988
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
989
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
990
             which is confusing and error-prone. It's especially problematic if the hw device does
991
             not support labels (such as Ledger), as then the user cannot easily distinguish
992
             same-type devices. (see #4199)
993
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
994
           and then tell the user ks1 could not be paired (and there are no devices left to try)
995
        """
996
        from .keystore import Hardware_KeyStore
×
997
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
998
        if not keystores:
×
999
            return
×
1000
        if devices is None:
×
1001
            devices = self.scan_devices()
×
1002
        # first pair with all devices that can be auto-selected
1003
        for ks in keystores:
×
1004
            try:
×
1005
                ks.get_client(
×
1006
                    force_pair=True,
1007
                    allow_user_interaction=False,
1008
                    devices=devices,
1009
                )
1010
            except UserCancelled:
×
1011
                pass
×
1012
        if allow_user_interaction:
×
1013
            # now do manual selections
1014
            for ks in keystores:
×
1015
                try:
×
1016
                    ks.get_client(
×
1017
                        force_pair=True,
1018
                        allow_user_interaction=True,
1019
                        devices=devices,
1020
                    )
1021
                except UserCancelled:
×
1022
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc