• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6548963425255424

09 Apr 2025 11:03AM UTC coverage: 60.93% (-0.03%) from 60.957%
6548963425255424

Pull #9718

CirrusCI

ecdsa
Userspace plugins:
 - Allow plugins saved as zipfiles in user data dir
 - plugins are authorized with a user chosen password
 - pubkey derived from password is saved with admin permissions
Pull Request #9718: Userspace plugins:

26 of 138 new or added lines in 1 file covered. (18.84%)

100 existing lines in 3 files now uncovered.

21442 of 35191 relevant lines covered (60.93%)

3.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.49
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68

69
class Plugins(DaemonThread):
5✔
70

71
    LOGGING_SHORTCUT = 'p'
5✔
72
    pkgpath = os.path.dirname(plugins.__file__)
5✔
73
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
74
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
75

76
    @profiler
5✔
77
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
78
        self.config = config
5✔
79
        self.cmd_only = cmd_only  # type: bool
5✔
80
        self.internal_plugin_metadata = {}
5✔
81
        self.external_plugin_metadata = {}
5✔
82
        if cmd_only:
5✔
83
            # only import the command modules of plugins
84
            Logger.__init__(self)
×
85
            self.find_plugins()
×
86
            self.load_plugins()
×
87
            return
×
88
        DaemonThread.__init__(self)
5✔
89
        self.device_manager = DeviceMgr(config)
5✔
90
        self.name = 'Plugins'  # set name of thread
5✔
91
        self.hw_wallets = {}
5✔
92
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
93
        self.gui_name = gui_name
5✔
94
        self.find_plugins()
5✔
95
        self.load_plugins()
5✔
96
        self.add_jobs(self.device_manager.thread_jobs())
5✔
97
        self.start()
5✔
98

99
    @property
5✔
100
    def descriptions(self):
5✔
101
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
102

103
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
104
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
105
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
106
        for loader, name, ispkg in iter_modules:
5✔
107
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
108
            #       once as data and once as code. To honor the "no duplicates" rule below,
109
            #       we exclude the ones packaged as *code*, here:
110
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
111
                continue
×
112
            module_path = os.path.join(pkg_path, name)
5✔
113
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
114
                continue
×
115
            try:
5✔
116
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
117
                    d = json.load(f)
5✔
118
            except FileNotFoundError:
5✔
119
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
120
                continue
5✔
121
            if 'fullname' not in d:
5✔
122
                continue
×
123
            d['path'] = module_path
5✔
124
            if not self.cmd_only:
5✔
125
                gui_good = self.gui_name in d.get('available_for', [])
5✔
126
                if not gui_good:
5✔
127
                    continue
5✔
128
                details = d.get('registers_wallet_type')
5✔
129
                if details:
5✔
130
                    self.register_wallet_type(name, gui_good, details)
5✔
131
                details = d.get('registers_keystore')
5✔
132
                if details:
5✔
133
                    self.register_keystore(name, gui_good, details)
5✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
5✔
138
                self.internal_plugin_metadata[name] = d
5✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
5✔
143
    def exec_module_from_spec(spec, path: str):
5✔
144
        if prev_fail := _exec_module_failure.get(path):
5✔
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
5✔
147
            module = importlib.util.module_from_spec(spec)
5✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
5✔
151
            spec.loader.exec_module(module)
5✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
157
            _exec_module_failure[path] = e
×
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
5✔
162

163
    def find_plugins(self):
5✔
164
        internal_plugins_path = (self.pkgpath, False)
5✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
167
            if pkg_path and os.path.exists(pkg_path):
5✔
168
                if not external:
5✔
169
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
170
                else:
171
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
177
                    if self.cmd_only:  # only load init method to register commands
×
178
                        self.maybe_load_plugin_init_method(name)
×
179
                    else:
180
                        self.load_plugin_by_name(name)
×
181
                except BaseException as e:
×
182
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
183

184
    def _has_root_permissions(self, path):
5✔
185
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
186

187
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
NEW
188
        if sys.platform in ['win32']:
×
NEW
189
            keyfile_path = self.keyfile_windows
×
NEW
190
            keyfile_help = _('This file can be edited with Regdit')
×
NEW
191
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
192
            keyfile_path = self.keyfile_linux
×
NEW
193
            keyfile_help = _('The file must have root permissions')
×
194
        else:
NEW
195
            raise Exception('platform not supported')
×
NEW
196
        return keyfile_path, keyfile_help
×
197

198
    def get_pubkey_bytes(self) -> Optional[str]:
5✔
199
        """ returns None if the pubkey has not been set """
NEW
200
        if sys.platform in ['win32']:
×
NEW
201
            import winreg
×
NEW
202
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
NEW
203
                try:
×
NEW
204
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
NEW
205
                        pubkey_hex = winreg.QueryValue(key, "PluginsKey")
×
NEW
206
                except Exception as e:
×
NEW
207
                    self.logger.info(f'winreg error: {e}')
×
NEW
208
                    return
×
NEW
209
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
210
            if not os.path.exists(self.keyfile_linux):
×
NEW
211
                return
×
NEW
212
            if not self._has_root_permissions(self.keyfile_linux):
×
NEW
213
                return
×
NEW
214
            with open(self.keyfile_linux) as f:
×
NEW
215
                pubkey_hex = f.read()
×
216
        else:
UNCOV
217
            return
×
218
        # all good
NEW
219
        return bytes.fromhex(pubkey_hex)
×
220

221
    def get_external_plugin_dir(self) -> str:
5✔
222
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
223
        if not os.path.exists(pkg_path):
5✔
224
            os.mkdir(pkg_path)
5✔
225
        return pkg_path
5✔
226

227
    async def download_external_plugin(self, url):
5✔
NEW
228
        filename = os.path.basename(urlparse(url).path)
×
NEW
229
        pkg_path = self.get_external_plugin_dir()
×
NEW
230
        path = os.path.join(pkg_path, filename)
×
NEW
231
        async with aiohttp.ClientSession() as session:
×
NEW
232
            async with session.get(url) as resp:
×
NEW
233
                if resp.status == 200:
×
NEW
234
                    with open(path, 'wb') as fd:
×
NEW
235
                        async for chunk in resp.content.iter_chunked(10):
×
NEW
236
                            fd.write(chunk)
×
NEW
237
        return path
×
238

239
    def read_manifest(self, path) -> dict:
5✔
240
        """ return json dict """
NEW
241
        with zipfile_lib.ZipFile(path) as file:
×
NEW
242
            for filename in file.namelist():
×
NEW
243
                if filename.endswith('manifest.json'):
×
NEW
244
                    break
×
245
            else:
NEW
246
                raise Exception('could not find manifest.json in zip archive')
×
NEW
247
            with file.open(filename, 'r') as f:
×
NEW
248
                manifest = json.load(f)
×
NEW
249
                manifest['path'] = path  # external, path of the zipfile
×
NEW
250
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
NEW
251
                manifest['is_zip'] = True
×
NEW
252
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
NEW
253
                return manifest
×
254

255
    def zip_plugin_path(self, name) -> str:
5✔
NEW
256
        path = self.get_metadata(name)['path']
×
NEW
257
        filename = os.path.basename(path)
×
258
        if name in self.internal_plugin_metadata:
×
259
            pkg_path = self.pkgpath
×
260
        else:
261
            pkg_path = self.get_external_plugin_dir()
×
262
        return os.path.join(pkg_path, filename)
×
263

264
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
265
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
266
        if pkg_path is None:
5✔
267
            return
×
268
        for filename in os.listdir(pkg_path):
5✔
UNCOV
269
            path = os.path.join(pkg_path, filename)
×
UNCOV
270
            if not filename.endswith('.zip'):
×
UNCOV
271
                continue
×
272
            try:
×
NEW
273
                d = self.read_manifest(path)
×
NEW
274
            except Exception:
×
NEW
275
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
276
                continue
×
NEW
277
            name = d['name']
×
NEW
278
            if name in self.internal_plugin_metadata:
×
NEW
279
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
280
            if name in self.external_plugin_metadata:
×
NEW
281
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
282
            if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
NEW
283
                continue
×
NEW
284
            min_version = d.get('min_electrum_version')
×
NEW
285
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
NEW
286
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
287
                continue
×
NEW
288
            max_version = d.get('max_electrum_version')
×
NEW
289
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
NEW
290
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
291
                continue
×
292

NEW
293
            if not self.cmd_only:
×
NEW
294
                gui_good = self.gui_name in d.get('available_for', [])
×
NEW
295
                if not gui_good:
×
296
                    continue
×
NEW
297
                if 'fullname' not in d:
×
298
                    continue
×
NEW
299
            if external:
×
NEW
300
                self.external_plugin_metadata[name] = d
×
301
            else:
NEW
302
                self.internal_plugin_metadata[name] = d
×
303

304
    def get(self, name):
5✔
305
        return self.plugins.get(name)
×
306

307
    def count(self):
5✔
308
        return len(self.plugins)
×
309

310
    def load_plugin(self, name) -> 'BasePlugin':
5✔
311
        """Imports the code of the given plugin.
312
        note: can be called from any thread.
313
        """
314
        if self.get_metadata(name):
5✔
315
            return self.load_plugin_by_name(name)
5✔
316
        else:
317
            raise Exception(f"could not find plugin {name!r}")
×
318

319
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
320
        """Loads the __init__.py module of the plugin if it is not already loaded."""
321
        is_external = name in self.external_plugin_metadata
5✔
322
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
323
        if base_name not in sys.modules:
5✔
324
            metadata = self.get_metadata(name)
5✔
325
            is_zip = metadata.get('is_zip', False)
5✔
326
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
327
            if not is_zip:
5✔
328
                if is_external:
5✔
329
                    # this branch is deprecated: external plugins are always zip files
330
                    path = os.path.join(metadata['path'], '__init__.py')
×
331
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
332
                else:
333
                    init_spec = importlib.util.find_spec(base_name)
5✔
334
            else:
335
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
336
                dirname = metadata['dirname']
×
NEW
337
                init_spec = zipfile.find_spec(dirname)
×
338

339
            self.exec_module_from_spec(init_spec, base_name)
5✔
340
            if name == "trustedcoin":
5✔
341
                # removes trustedcoin after loading to not show it in the list of plugins
342
                del self.internal_plugin_metadata[name]
×
343

344
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
345
        if name in self.plugins:
5✔
346
            return self.plugins[name]
×
347
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
348
        self.maybe_load_plugin_init_method(name)
5✔
349
        is_external = name in self.external_plugin_metadata
5✔
350
        if is_external and not self.is_authorized(name):
5✔
NEW
351
            self.logger.info(f'plugin not authorized {name}')
×
NEW
352
            return
×
353
        if not is_external:
5✔
354
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
355
        else:
356
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
357

358
        spec = importlib.util.find_spec(full_name)
5✔
359
        if spec is None:
5✔
360
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
361
        try:
5✔
362
            module = self.exec_module_from_spec(spec, full_name)
5✔
363
            plugin = module.Plugin(self, self.config, name)
5✔
364
        except Exception as e:
×
365
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
366
        self.add_jobs(plugin.thread_jobs())
5✔
367
        self.plugins[name] = plugin
5✔
368
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
369
        return plugin
5✔
370

371
    def close_plugin(self, plugin):
5✔
372
        self.remove_jobs(plugin.thread_jobs())
×
373

374
    def derive_privkey(self, pw: str) -> ECPrivkey:
5✔
NEW
375
        from hashlib import pbkdf2_hmac
×
NEW
376
        salt = self.config.get('plugins_salt')
×
NEW
377
        if salt is None:
×
NEW
378
            salt = os.urandom(32).hex()
×
NEW
379
            self.config.set_key('plugins_salt', salt)
×
NEW
380
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), bytes.fromhex(salt), iterations=1024)
×
NEW
381
        return ECPrivkey(secret)
×
382

383
    def is_installed(self, name) -> bool:
5✔
384
        """an external plugin may be installed but not authorized """
NEW
385
        return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
×
386

387
    def is_authorized(self, name) -> bool:
5✔
NEW
388
        if name in self.internal_plugin_metadata:
×
NEW
389
            return True
×
NEW
390
        if name not in self.external_plugin_metadata:
×
NEW
391
            return False
×
NEW
392
        pubkey_bytes = self.get_pubkey_bytes()
×
NEW
393
        if not pubkey_bytes:
×
NEW
394
            return False
×
NEW
395
        if not self.is_plugin_zip(name):
×
NEW
396
            return False
×
NEW
397
        filename = self.zip_plugin_path(name)
×
NEW
398
        plugin_hash = get_file_hash256(filename)
×
NEW
399
        sig = self.config.get('authorize_plugin_' + name)
×
NEW
400
        if not sig:
×
NEW
401
            return False
×
NEW
402
        pubkey = ECPubkey(pubkey_bytes)
×
NEW
403
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
404

405
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
NEW
406
        assert self.get_pubkey_bytes() == privkey.get_public_key_bytes()
×
NEW
407
        plugin_hash = get_file_hash256(filename)
×
NEW
408
        sig = privkey.ecdsa_sign(plugin_hash)
×
NEW
409
        value = sig.hex()
×
NEW
410
        self.config.set_key('authorize_plugin_' + name, value, save=True)
×
411

412
    def enable(self, name: str) -> 'BasePlugin':
5✔
413
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
414
        p = self.get(name)
×
415
        if p:
×
416
            return p
×
417
        return self.load_plugin(name)
×
418

419
    def disable(self, name: str) -> None:
5✔
420
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
421
        p = self.get(name)
×
422
        if not p:
×
423
            return
×
424
        self.plugins.pop(name)
×
425
        p.close()
×
426
        self.logger.info(f"closed {name}")
×
427

428
    @classmethod
5✔
429
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
NEW
430
        return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
×
431

432
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
433
        p = self.get(name)
×
434
        return self.disable(name) if p else self.enable(name)
×
435

436
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
437
        d = self.descriptions.get(name)
×
438
        if not d:
×
439
            return False
×
440
        deps = d.get('requires', [])
×
441
        for dep, s in deps:
×
442
            try:
×
443
                __import__(dep)
×
444
            except ImportError as e:
×
445
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
446
                return False
×
447
        requires = d.get('requires_wallet_type', [])
×
448
        return not requires or wallet.wallet_type in requires
×
449

450
    def get_hardware_support(self):
5✔
451
        out = []
×
452
        for name, (gui_good, details) in self.hw_wallets.items():
×
453
            if gui_good:
×
454
                try:
×
455
                    p = self.get_plugin(name)
×
456
                    if p.is_available():
×
457
                        out.append(HardwarePluginToScan(name=name,
×
458
                                                        description=details[2],
459
                                                        plugin=p,
460
                                                        exception=None))
461
                except Exception as e:
×
462
                    self.logger.exception(f"cannot load plugin for: {name}")
×
463
                    out.append(HardwarePluginToScan(name=name,
×
464
                                                    description=details[2],
465
                                                    plugin=None,
466
                                                    exception=e))
467
        return out
×
468

469
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
470
        from .wallet import register_wallet_type, register_constructor
5✔
471
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
472

473
        def loader():
5✔
474
            plugin = self.get_plugin(name)
5✔
475
            register_constructor(wallet_type, plugin.wallet_class)
5✔
476
        register_wallet_type(wallet_type)
5✔
477
        plugin_loaders[wallet_type] = loader
5✔
478

479
    def register_keystore(self, name, gui_good, details):
5✔
480
        from .keystore import register_keystore
5✔
481

482
        def dynamic_constructor(d):
5✔
483
            return self.get_plugin(name).keystore_class(d)
5✔
484
        if details[0] == 'hardware':
5✔
485
            self.hw_wallets[name] = (gui_good, details)
5✔
486
            self.logger.info(f"registering hardware {name}: {details}")
5✔
487
            register_keystore(details[1], dynamic_constructor)
5✔
488

489
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
490
        if name not in self.plugins:
5✔
491
            self.load_plugin(name)
5✔
492
        return self.plugins[name]
5✔
493

494
    def is_plugin_zip(self, name: str) -> bool:
5✔
495
        """Returns True if the plugin is a zip file"""
496
        if (metadata := self.get_metadata(name)) is None:
×
497
            return False
×
498
        return metadata.get('is_zip', False)
×
499

500
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
501
        """Returns the metadata of the plugin"""
502
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
503
        if not metadata:
5✔
504
            return None
×
505
        return metadata
5✔
506

507
    def run(self):
5✔
508
        while self.is_running():
5✔
509
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
510
            self.run_jobs()
5✔
511
        self.on_stop()
5✔
512

513

514
def get_file_hash256(path: str) -> bytes:
5✔
515
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
516
    with open(path, 'rb') as f:
×
NEW
517
        return sha256(f.read())
×
518

519

520
def hook(func):
5✔
521
    hook_names.add(func.__name__)
5✔
522
    return func
5✔
523

524

525
def run_hook(name, *args):
5✔
526
    results = []
5✔
527
    f_list = hooks.get(name, [])
5✔
528
    for p, f in f_list:
5✔
529
        if p.is_enabled():
5✔
530
            try:
5✔
531
                r = f(*args)
5✔
532
            except Exception:
×
533
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
534
                r = False
×
535
            if r:
5✔
536
                results.append(r)
×
537

538
    if results:
5✔
539
        assert len(results) == 1, results
×
540
        return results[0]
×
541

542

543
class BasePlugin(Logger):
5✔
544

545
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
546
        self.parent = parent  # type: Plugins  # The plugins object
5✔
547
        self.name = name
5✔
548
        self.config = config
5✔
549
        self.wallet = None  # fixme: this field should not exist
5✔
550
        Logger.__init__(self)
5✔
551
        # add self to hooks
552
        for k in dir(self):
5✔
553
            if k in hook_names:
5✔
554
                l = hooks.get(k, [])
5✔
555
                l.append((self, getattr(self, k)))
5✔
556
                hooks[k] = l
5✔
557

558
    def __str__(self):
5✔
559
        return self.name
×
560

561
    def close(self):
5✔
562
        # remove self from hooks
563
        for attr_name in dir(self):
×
564
            if attr_name in hook_names:
×
565
                # found attribute in self that is also the name of a hook
566
                l = hooks.get(attr_name, [])
×
567
                try:
×
568
                    l.remove((self, getattr(self, attr_name)))
×
569
                except ValueError:
×
570
                    # maybe attr name just collided with hook name and was not hook
571
                    continue
×
572
                hooks[attr_name] = l
×
573
        self.parent.close_plugin(self)
×
574
        self.on_close()
×
575

576
    def on_close(self):
5✔
577
        pass
×
578

579
    def requires_settings(self) -> bool:
5✔
580
        return False
×
581

582
    def thread_jobs(self):
5✔
583
        return []
5✔
584

585
    def is_enabled(self):
5✔
586
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
587

588
    def is_available(self):
5✔
589
        return True
×
590

591
    def can_user_disable(self):
5✔
592
        return True
×
593

594
    def settings_widget(self, window):
5✔
595
        raise NotImplementedError()
×
596

597
    def settings_dialog(self, window):
5✔
598
        raise NotImplementedError()
×
599

600
    def read_file(self, filename: str) -> bytes:
5✔
601
        if self.parent.is_plugin_zip(self.name):
×
602
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
603
            metadata = self.parent.external_plugin_metadata[self.name]
×
NEW
604
            dirname = metadata['dirname']
×
605
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
NEW
606
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
607
                    return myfile.read()
×
608
        else:
609
            if self.name in self.parent.internal_plugin_metadata:
×
610
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
611
            else:
612
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
613
            with open(path, 'rb') as myfile:
×
614
                return myfile.read()
×
615

616

617
class DeviceUnpairableError(UserFacingException): pass
5✔
618
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
619
class CannotAutoSelectDevice(Exception): pass
5✔
620

621

622
class Device(NamedTuple):
5✔
623
    path: Union[str, bytes]
5✔
624
    interface_number: int
5✔
625
    id_: str
5✔
626
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
627
    usage_page: int
5✔
628
    transport_ui_string: str
5✔
629

630

631
class DeviceInfo(NamedTuple):
5✔
632
    device: Device
5✔
633
    label: Optional[str] = None
5✔
634
    initialized: Optional[bool] = None
5✔
635
    exception: Optional[Exception] = None
5✔
636
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
637
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
638
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
639

640

641
class HardwarePluginToScan(NamedTuple):
5✔
642
    name: str
5✔
643
    description: str
5✔
644
    plugin: Optional['HW_PluginBase']
5✔
645
    exception: Optional[Exception]
5✔
646

647

648
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
649

650

651
# hidapi is not thread-safe
652
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
653
#     https://github.com/libusb/hidapi/issues/45
654
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
655
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
656
# It is not entirely clear to me, exactly what is safe and what isn't, when
657
# using multiple threads...
658
# Hence, we use a single thread for all device communications, including
659
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
660
# the following thread:
661
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
662
    max_workers=1,
663
    thread_name_prefix='hwd_comms_thread'
664
)
665

666
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
667
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
668
# To keep it simple, let's just import it now, as we are likely in the main thread here.
669
if threading.current_thread() is not threading.main_thread():
5✔
670
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
671
try:
5✔
672
    import hid
5✔
673
except ImportError:
5✔
674
    pass
5✔
675

676

677
T = TypeVar('T')
5✔
678

679

680
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
681
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
682
        return func()
×
683
    else:
684
        fut = _hwd_comms_executor.submit(func)
×
685
        return fut.result()
×
686
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
687

688

689
def runs_in_hwd_thread(func):
5✔
690
    @wraps(func)
5✔
691
    def wrapper(*args, **kwargs):
5✔
692
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
693
    return wrapper
5✔
694

695

696
def assert_runs_in_hwd_thread():
5✔
697
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
698
        raise Exception("must only be called from HWD communication thread")
×
699

700

701
class DeviceMgr(ThreadJob):
5✔
702
    """Manages hardware clients.  A client communicates over a hardware
703
    channel with the device.
704

705
    In addition to tracking device HID IDs, the device manager tracks
706
    hardware wallets and manages wallet pairing.  A HID ID may be
707
    paired with a wallet when it is confirmed that the hardware device
708
    matches the wallet, i.e. they have the same master public key.  A
709
    HID ID can be unpaired if e.g. it is wiped.
710

711
    Because of hotplugging, a wallet must request its client
712
    dynamically each time it is required, rather than caching it
713
    itself.
714

715
    The device manager is shared across plugins, so just one place
716
    does hardware scans when needed.  By tracking HID IDs, if a device
717
    is plugged into a different port the wallet is automatically
718
    re-paired.
719

720
    Wallets are informed on connect / disconnect events.  It must
721
    implement connected(), disconnected() callbacks.  Being connected
722
    implies a pairing.  Callbacks can happen in any thread context,
723
    and we do them without holding the lock.
724

725
    Confusingly, the HID ID (serial number) reported by the HID system
726
    doesn't match the device ID reported by the device itself.  We use
727
    the HID IDs.
728

729
    This plugin is thread-safe.  Currently only devices supported by
730
    hidapi are implemented."""
731

732
    def __init__(self, config: SimpleConfig):
5✔
733
        ThreadJob.__init__(self)
5✔
734
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
735
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
736
        # A client->id_ map. Needs self.lock.
737
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
738
        # What we recognise.  (vendor_id, product_id) -> Plugin
739
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
740
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
741
        # Custom enumerate functions for devices we don't know about.
742
        self._enumerate_func = set()  # Needs self.lock.
5✔
743

744
        self.lock = threading.RLock()
5✔
745

746
        self.config = config
5✔
747

748
    def thread_jobs(self):
5✔
749
        # Thread job to handle device timeouts
750
        return [self]
5✔
751

752
    def run(self):
5✔
753
        '''Handle device timeouts.  Runs in the context of the Plugins
754
        thread.'''
755
        with self.lock:
5✔
756
            clients = list(self.clients.keys())
5✔
757
        cutoff = time.time() - self.config.get_session_timeout()
5✔
758
        for client in clients:
5✔
759
            client.timeout(cutoff)
×
760

761
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
762
        for pair in device_pairs:
×
763
            self._recognised_hardware[pair] = plugin
×
764

765
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
766
        for vendor_id in vendor_ids:
×
767
            self._recognised_vendor[vendor_id] = plugin
×
768

769
    def register_enumerate_func(self, func):
5✔
770
        with self.lock:
×
771
            self._enumerate_func.add(func)
×
772

773
    @runs_in_hwd_thread
5✔
774
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
775
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
776
        # Get from cache first
777
        client = self._client_by_id(device.id_)
×
778
        if client:
×
779
            return client
×
780
        client = plugin.create_client(device, handler)
×
781
        if client:
×
782
            self.logger.info(f"Registering {client}")
×
783
            with self.lock:
×
784
                self.clients[client] = device.id_
×
785
        return client
×
786

787
    def id_by_pairing_code(self, pairing_code):
5✔
788
        with self.lock:
×
789
            return self.pairing_code_to_id.get(pairing_code)
×
790

791
    def pairing_code_by_id(self, id_):
5✔
792
        with self.lock:
×
793
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
794
                if id2 == id_:
×
795
                    return pairing_code
×
796
            return None
×
797

798
    def unpair_pairing_code(self, pairing_code):
5✔
799
        with self.lock:
×
800
            if pairing_code not in self.pairing_code_to_id:
×
801
                return
×
802
            _id = self.pairing_code_to_id.pop(pairing_code)
×
803
        self._close_client(_id)
×
804

805
    def unpair_id(self, id_):
5✔
806
        pairing_code = self.pairing_code_by_id(id_)
×
807
        if pairing_code:
×
808
            self.unpair_pairing_code(pairing_code)
×
809
        else:
810
            self._close_client(id_)
×
811

812
    def _close_client(self, id_):
5✔
813
        with self.lock:
×
814
            client = self._client_by_id(id_)
×
815
            self.clients.pop(client, None)
×
816
        if client:
×
817
            client.close()
×
818

819
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
820
        with self.lock:
×
821
            for client, client_id in self.clients.items():
×
822
                if client_id == id_:
×
823
                    return client
×
824
        return None
×
825

826
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
827
        '''Returns a client for the device ID if one is registered.  If
828
        a device is wiped or in bootloader mode pairing is impossible;
829
        in such cases we communicate by device ID and not wallet.'''
830
        if scan_now:
×
831
            self.scan_devices()
×
832
        return self._client_by_id(id_)
×
833

834
    @runs_in_hwd_thread
5✔
835
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
836
                            keystore: 'Hardware_KeyStore',
837
                            force_pair: bool, *,
838
                            devices: Sequence['Device'] = None,
839
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
840
        self.logger.info("getting client for keystore")
×
841
        if handler is None:
×
842
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
843
        handler.update_status(False)
×
844
        pcode = keystore.pairing_code()
×
845
        client = None
×
846
        # search existing clients first (fast-path)
847
        if not devices:
×
848
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
849
        # search clients again, now allowing a (slow) scan
850
        if client is None:
×
851
            if devices is None:
×
852
                devices = self.scan_devices()
×
853
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
854
        if client is None and force_pair:
×
855
            try:
×
856
                info = self.select_device(plugin, handler, keystore, devices,
×
857
                                          allow_user_interaction=allow_user_interaction)
858
            except CannotAutoSelectDevice:
×
859
                pass
×
860
            else:
861
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
862
        if client:
×
863
            handler.update_status(True)
×
864
            # note: if select_device was called, we might also update label etc here:
865
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
866
        self.logger.info("end client for keystore")
×
867
        return client
×
868

869
    def client_by_pairing_code(
5✔
870
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
871
        devices: Sequence['Device'],
872
    ) -> Optional['HardwareClientBase']:
873
        _id = self.id_by_pairing_code(pairing_code)
×
874
        client = self._client_by_id(_id)
×
875
        if client:
×
876
            if type(client.plugin) != type(plugin):
×
877
                return
×
878
            # An unpaired client might have another wallet's handler
879
            # from a prior scan.  Replace to fix dialog parenting.
880
            client.handler = handler
×
881
            return client
×
882

883
        for device in devices:
×
884
            if device.id_ == _id:
×
885
                return self.create_client(device, handler, plugin)
×
886

887
    def force_pair_keystore(
5✔
888
        self,
889
        *,
890
        plugin: 'HW_PluginBase',
891
        handler: 'HardwareHandlerBase',
892
        info: 'DeviceInfo',
893
        keystore: 'Hardware_KeyStore',
894
    ) -> 'HardwareClientBase':
895
        xpub = keystore.xpub
×
896
        derivation = keystore.get_derivation_prefix()
×
897
        assert derivation is not None
×
898
        xtype = bip32.xpub_type(xpub)
×
899
        client = self._client_by_id(info.device.id_)
×
900
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
901
            # See comment above for same code
902
            client.handler = handler
×
903
            # This will trigger a PIN/passphrase entry request
904
            try:
×
905
                client_xpub = client.get_xpub(derivation, xtype)
×
906
            except (UserCancelled, RuntimeError):
×
907
                # Bad / cancelled PIN / passphrase
908
                client_xpub = None
×
909
            if client_xpub == xpub:
×
910
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
911
                with self.lock:
×
912
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
913
                return client
×
914

915
        # The user input has wrong PIN or passphrase, or cancelled input,
916
        # or it is not pairable
917
        raise DeviceUnpairableError(
×
918
            _('Electrum cannot pair with your {}.\n\n'
919
              'Before you request bitcoins to be sent to addresses in this '
920
              'wallet, ensure you can pair with your device, or that you have '
921
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
922
              'receive will be unspendable.').format(plugin.device))
923

924
    def list_pairable_device_infos(
5✔
925
        self,
926
        *,
927
        handler: Optional['HardwareHandlerBase'],
928
        plugin: 'HW_PluginBase',
929
        devices: Sequence['Device'] = None,
930
        include_failing_clients: bool = False,
931
    ) -> List['DeviceInfo']:
932
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
933
        Already paired devices are also included, as it is okay to reuse them.
934
        """
935
        if not plugin.libraries_available:
×
936
            message = plugin.get_library_not_available_message()
×
937
            raise HardwarePluginLibraryUnavailable(message)
×
938
        if devices is None:
×
939
            devices = self.scan_devices()
×
940
        infos = []
×
941
        for device in devices:
×
942
            if not plugin.can_recognize_device(device):
×
943
                continue
×
944
            try:
×
945
                client = self.create_client(device, handler, plugin)
×
946
                if not client:
×
947
                    continue
×
948
                label = client.label()
×
949
                is_initialized = client.is_initialized()
×
950
                soft_device_id = client.get_soft_device_id()
×
951
                model_name = client.device_model_name()
×
952
            except Exception as e:
×
953
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
954
                if include_failing_clients:
×
955
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
956
                continue
×
957
            infos.append(DeviceInfo(device=device,
×
958
                                    label=label,
959
                                    initialized=is_initialized,
960
                                    plugin_name=plugin.name,
961
                                    soft_device_id=soft_device_id,
962
                                    model_name=model_name))
963

964
        return infos
×
965

966
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
967
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
968
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
969
        """Select the device to use for keystore."""
970
        # ideally this should not be called from the GUI thread...
971
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
972
        while True:
×
973
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
974
            if infos:
×
975
                break
×
976
            if not allow_user_interaction:
×
977
                raise CannotAutoSelectDevice()
×
978
            msg = _('Please insert your {}').format(plugin.device)
×
979
            msg += " ("
×
980
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
981
                msg += f"label: {keystore.label}, "
×
982
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
983
            msg += ').\n\n{}\n\n{}'.format(
×
984
                _('Verify the cable is connected and that '
985
                  'no other application is using it.'),
986
                _('Try to connect again?')
987
            )
988
            if not handler.yes_no_question(msg):
×
989
                raise UserCancelled()
×
990
            devices = None
×
991

992
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
993
        # method 1: select device by id
994
        if keystore.soft_device_id:
×
995
            for info in infos:
×
996
                if info.soft_device_id == keystore.soft_device_id:
×
997
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
998
                    return info
×
999
        # method 2: select device by label
1000
        #           but only if not a placeholder label and only if there is no collision
1001
        device_labels = [info.label for info in infos]
×
1002
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1003
                and device_labels.count(keystore.label) == 1):
1004
            for info in infos:
×
1005
                if info.label == keystore.label:
×
1006
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1007
                    return info
×
1008
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1009
        #           saved for keystore anyway, select it
1010
        if (len(infos) == 1
×
1011
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1012
                and keystore.soft_device_id is None):
1013
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1014
            return infos[0]
×
1015

1016
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1017
        if not allow_user_interaction:
×
1018
            raise CannotAutoSelectDevice()
×
1019
        # ask user to select device manually
1020
        msg = (
×
1021
                _("Could not automatically pair with device for given keystore.") + "\n"
1022
                + f"(keystore label: {keystore.label!r}, "
1023
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1024
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1025
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1026
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1027
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1028
                                init=(_("initialized") if info.initialized else _("wiped")),
1029
                                transport=info.device.transport_ui_string,
1030
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1031
                        for info in infos]
1032
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1033
                          f"num options: {len(infos)}. options: {infos}")
1034
        c = handler.query_choice(msg, descriptions)
×
1035
        if c is None:
×
1036
            raise UserCancelled()
×
1037
        info = infos[c]
×
1038
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1039
        # note: updated label/soft_device_id will be saved after pairing succeeds
1040
        return info
×
1041

1042
    @runs_in_hwd_thread
5✔
1043
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1044
        try:
×
1045
            import hid  # noqa: F811
×
1046
        except ImportError:
×
1047
            return []
×
1048

1049
        devices = []
×
1050
        for d in hid.enumerate(0, 0):
×
1051
            vendor_id = d['vendor_id']
×
1052
            product_key = (vendor_id, d['product_id'])
×
1053
            plugin = None
×
1054
            if product_key in self._recognised_hardware:
×
1055
                plugin = self._recognised_hardware[product_key]
×
1056
            elif vendor_id in self._recognised_vendor:
×
1057
                plugin = self._recognised_vendor[vendor_id]
×
1058
            if plugin:
×
1059
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1060
                if device:
×
1061
                    devices.append(device)
×
1062
        return devices
×
1063

1064
    @runs_in_hwd_thread
5✔
1065
    @profiler
5✔
1066
    def scan_devices(self) -> Sequence['Device']:
5✔
1067
        self.logger.info("scanning devices...")
×
1068

1069
        # First see what's connected that we know about
1070
        devices = self._scan_devices_with_hid()
×
1071

1072
        # Let plugin handlers enumerate devices we don't know about
1073
        with self.lock:
×
1074
            enumerate_funcs = list(self._enumerate_func)
×
1075
        for f in enumerate_funcs:
×
1076
            try:
×
1077
                new_devices = f()
×
1078
            except BaseException as e:
×
1079
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1080
            else:
1081
                devices.extend(new_devices)
×
1082

1083
        # find out what was disconnected
1084
        client_ids = [dev.id_ for dev in devices]
×
1085
        disconnected_clients = []
×
1086
        with self.lock:
×
1087
            connected = {}
×
1088
            for client, id_ in self.clients.items():
×
1089
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1090
                    connected[client] = id_
×
1091
                else:
1092
                    disconnected_clients.append((client, id_))
×
1093
            self.clients = connected
×
1094

1095
        # Unpair disconnected devices
1096
        for client, id_ in disconnected_clients:
×
1097
            self.unpair_id(id_)
×
1098
            if client.handler:
×
1099
                client.handler.update_status(False)
×
1100

1101
        return devices
×
1102

1103
    @classmethod
5✔
1104
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1105
        ret = {}
×
1106
        # add libusb
1107
        try:
×
1108
            import usb1
×
1109
        except Exception as e:
×
1110
            ret["libusb.version"] = None
×
1111
        else:
1112
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1113
            try:
×
1114
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1115
            except AttributeError:
×
1116
                ret["libusb.path"] = None
×
1117
        # add hidapi
1118
        try:
×
1119
            import hid  # noqa: F811
×
1120
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1121
        except Exception as e:
×
1122
            from importlib.metadata import version
×
1123
            try:
×
1124
                ret["hidapi.version"] = version("hidapi")
×
1125
            except ImportError:
×
1126
                ret["hidapi.version"] = None
×
1127
        return ret
×
1128

1129
    def trigger_pairings(
5✔
1130
            self,
1131
            keystores: Sequence['KeyStore'],
1132
            *,
1133
            allow_user_interaction: bool = True,
1134
            devices: Sequence['Device'] = None,
1135
    ) -> None:
1136
        """Given a list of keystores, try to pair each with a connected hardware device.
1137

1138
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1139
        try to pair each keystore individually. Consider the following scenario:
1140
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1141
        - assume none of the devices are paired yet
1142
        1. if we tried to individually pair keystores, we might try with ks1 first
1143
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1144
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1145
             which is confusing and error-prone. It's especially problematic if the hw device does
1146
             not support labels (such as Ledger), as then the user cannot easily distinguish
1147
             same-type devices. (see #4199)
1148
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1149
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1150
        """
1151
        from .keystore import Hardware_KeyStore
×
1152
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1153
        if not keystores:
×
1154
            return
×
1155
        if devices is None:
×
1156
            devices = self.scan_devices()
×
1157
        # first pair with all devices that can be auto-selected
1158
        for ks in keystores:
×
1159
            try:
×
1160
                ks.get_client(
×
1161
                    force_pair=True,
1162
                    allow_user_interaction=False,
1163
                    devices=devices,
1164
                )
1165
            except UserCancelled:
×
1166
                pass
×
1167
        if allow_user_interaction:
×
1168
            # now do manual selections
1169
            for ks in keystores:
×
1170
                try:
×
1171
                    ks.get_client(
×
1172
                        force_pair=True,
1173
                        allow_user_interaction=True,
1174
                        devices=devices,
1175
                    )
1176
                except UserCancelled:
×
1177
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc