• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5113580489539584

10 Apr 2025 11:20AM UTC coverage: 60.296% (-0.04%) from 60.334%
5113580489539584

Pull #9718

CirrusCI

ecdsa
plugins: save salt with the serialized plugins key
Pull Request #9718: Userspace plugins:

26 of 139 new or added lines in 1 file covered. (18.71%)

5 existing lines in 3 files now uncovered.

21562 of 35760 relevant lines covered (60.3%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.11
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68

69
class Plugins(DaemonThread):
5✔
70

71
    LOGGING_SHORTCUT = 'p'
5✔
72
    pkgpath = os.path.dirname(plugins.__file__)
5✔
73
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
74
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
75

76
    @profiler
5✔
77
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
78
        self.config = config
5✔
79
        self.cmd_only = cmd_only  # type: bool
5✔
80
        self.internal_plugin_metadata = {}
5✔
81
        self.external_plugin_metadata = {}
5✔
82
        if cmd_only:
5✔
83
            # only import the command modules of plugins
84
            Logger.__init__(self)
×
85
            self.find_plugins()
×
86
            self.load_plugins()
×
87
            return
×
88
        DaemonThread.__init__(self)
5✔
89
        self.device_manager = DeviceMgr(config)
5✔
90
        self.name = 'Plugins'  # set name of thread
5✔
91
        self.hw_wallets = {}
5✔
92
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
93
        self.gui_name = gui_name
5✔
94
        self.find_plugins()
5✔
95
        self.load_plugins()
5✔
96
        self.add_jobs(self.device_manager.thread_jobs())
5✔
97
        self.start()
5✔
98

99
    @property
5✔
100
    def descriptions(self):
5✔
101
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
102

103
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
104
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
105
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
106
        for loader, name, ispkg in iter_modules:
5✔
107
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
108
            #       once as data and once as code. To honor the "no duplicates" rule below,
109
            #       we exclude the ones packaged as *code*, here:
110
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
111
                continue
×
112
            module_path = os.path.join(pkg_path, name)
5✔
113
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
114
                continue
×
115
            try:
5✔
116
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
117
                    d = json.load(f)
5✔
118
            except FileNotFoundError:
×
119
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
120
                continue
×
121
            if 'fullname' not in d:
5✔
122
                continue
×
123
            d['path'] = module_path
5✔
124
            if not self.cmd_only:
5✔
125
                gui_good = self.gui_name in d.get('available_for', [])
5✔
126
                if not gui_good:
5✔
127
                    continue
5✔
128
                details = d.get('registers_wallet_type')
5✔
129
                if details:
5✔
130
                    self.register_wallet_type(name, gui_good, details)
5✔
131
                details = d.get('registers_keystore')
5✔
132
                if details:
5✔
133
                    self.register_keystore(name, gui_good, details)
5✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
5✔
138
                self.internal_plugin_metadata[name] = d
5✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
5✔
143
    def exec_module_from_spec(spec, path: str):
5✔
144
        if prev_fail := _exec_module_failure.get(path):
5✔
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
5✔
147
            module = importlib.util.module_from_spec(spec)
5✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
5✔
151
            spec.loader.exec_module(module)
5✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
157
            _exec_module_failure[path] = e
×
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
5✔
162

163
    def find_plugins(self):
5✔
164
        internal_plugins_path = (self.pkgpath, False)
5✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
167
            if pkg_path and os.path.exists(pkg_path):
5✔
168
                if not external:
5✔
169
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
170
                else:
171
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
177
                    if self.cmd_only:  # only load init method to register commands
×
178
                        self.maybe_load_plugin_init_method(name)
×
179
                    else:
180
                        self.load_plugin_by_name(name)
×
181
                except BaseException as e:
×
182
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
183

184
    def _has_root_permissions(self, path):
5✔
185
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
186

187
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
NEW
188
        if sys.platform in ['win32']:
×
NEW
189
            keyfile_path = self.keyfile_windows
×
NEW
190
            keyfile_help = _('This file can be edited with Regdit')
×
NEW
191
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
192
            keyfile_path = self.keyfile_linux
×
NEW
193
            keyfile_help = _('The file must have root permissions')
×
194
        else:
NEW
195
            raise Exception('platform not supported')
×
NEW
196
        return keyfile_path, keyfile_help
×
197

198
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
199
        """
200
        returns pubkey, salt
201
        returns None, None if the pubkey has not been set
202
        """
NEW
203
        if sys.platform in ['win32']:
×
NEW
204
            import winreg
×
NEW
205
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
NEW
206
                try:
×
NEW
207
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
NEW
208
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
NEW
209
                except Exception as e:
×
NEW
210
                    self.logger.info(f'winreg error: {e}')
×
NEW
211
                    return None, None
×
NEW
212
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
213
            if not os.path.exists(self.keyfile_linux):
×
NEW
214
                return None, None
×
NEW
215
            if not self._has_root_permissions(self.keyfile_linux):
×
NEW
216
                return
×
NEW
217
            with open(self.keyfile_linux) as f:
×
NEW
218
                key_hex = f.read()
×
219
        else:
NEW
220
            return None, None
×
221
        # all good
NEW
222
        key = bytes.fromhex(key_hex)
×
NEW
223
        salt = key[0:32]
×
NEW
224
        pubkey = key[32:]
×
NEW
225
        return pubkey, salt
×
226

227
    def get_external_plugin_dir(self) -> str:
5✔
228
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
229
        if not os.path.exists(pkg_path):
5✔
230
            os.mkdir(pkg_path)
5✔
231
        return pkg_path
5✔
232

233
    async def download_external_plugin(self, url):
5✔
NEW
234
        filename = os.path.basename(urlparse(url).path)
×
NEW
235
        pkg_path = self.get_external_plugin_dir()
×
NEW
236
        path = os.path.join(pkg_path, filename)
×
NEW
237
        async with aiohttp.ClientSession() as session:
×
NEW
238
            async with session.get(url) as resp:
×
NEW
239
                if resp.status == 200:
×
NEW
240
                    with open(path, 'wb') as fd:
×
NEW
241
                        async for chunk in resp.content.iter_chunked(10):
×
NEW
242
                            fd.write(chunk)
×
NEW
243
        return path
×
244

245
    def read_manifest(self, path) -> dict:
5✔
246
        """ return json dict """
NEW
247
        with zipfile_lib.ZipFile(path) as file:
×
NEW
248
            for filename in file.namelist():
×
NEW
249
                if filename.endswith('manifest.json'):
×
NEW
250
                    break
×
251
            else:
NEW
252
                raise Exception('could not find manifest.json in zip archive')
×
NEW
253
            with file.open(filename, 'r') as f:
×
NEW
254
                manifest = json.load(f)
×
NEW
255
                manifest['path'] = path  # external, path of the zipfile
×
NEW
256
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
NEW
257
                manifest['is_zip'] = True
×
NEW
258
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
NEW
259
                return manifest
×
260

261
    def zip_plugin_path(self, name) -> str:
5✔
NEW
262
        path = self.get_metadata(name)['path']
×
NEW
263
        filename = os.path.basename(path)
×
264
        if name in self.internal_plugin_metadata:
×
265
            pkg_path = self.pkgpath
×
266
        else:
267
            pkg_path = self.get_external_plugin_dir()
×
268
        return os.path.join(pkg_path, filename)
×
269

270
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
271
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
272
        if pkg_path is None:
5✔
273
            return
×
274
        for filename in os.listdir(pkg_path):
5✔
UNCOV
275
            path = os.path.join(pkg_path, filename)
×
UNCOV
276
            if not filename.endswith('.zip'):
×
UNCOV
277
                continue
×
278
            try:
×
NEW
279
                d = self.read_manifest(path)
×
NEW
280
            except Exception:
×
NEW
281
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
282
                continue
×
NEW
283
            name = d['name']
×
NEW
284
            if name in self.internal_plugin_metadata:
×
NEW
285
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
286
            if name in self.external_plugin_metadata:
×
NEW
287
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
288
            if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
NEW
289
                continue
×
NEW
290
            min_version = d.get('min_electrum_version')
×
NEW
291
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
NEW
292
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
293
                continue
×
NEW
294
            max_version = d.get('max_electrum_version')
×
NEW
295
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
NEW
296
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
297
                continue
×
298

NEW
299
            if not self.cmd_only:
×
NEW
300
                gui_good = self.gui_name in d.get('available_for', [])
×
NEW
301
                if not gui_good:
×
302
                    continue
×
NEW
303
                if 'fullname' not in d:
×
304
                    continue
×
NEW
305
            if external:
×
NEW
306
                self.external_plugin_metadata[name] = d
×
307
            else:
NEW
308
                self.internal_plugin_metadata[name] = d
×
309

310
    def get(self, name):
5✔
311
        return self.plugins.get(name)
×
312

313
    def count(self):
5✔
314
        return len(self.plugins)
×
315

316
    def load_plugin(self, name) -> 'BasePlugin':
5✔
317
        """Imports the code of the given plugin.
318
        note: can be called from any thread.
319
        """
320
        if self.get_metadata(name):
5✔
321
            return self.load_plugin_by_name(name)
5✔
322
        else:
323
            raise Exception(f"could not find plugin {name!r}")
×
324

325
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
326
        """Loads the __init__.py module of the plugin if it is not already loaded."""
327
        is_external = name in self.external_plugin_metadata
5✔
328
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
329
        if base_name not in sys.modules:
5✔
330
            metadata = self.get_metadata(name)
5✔
331
            is_zip = metadata.get('is_zip', False)
5✔
332
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
333
            if not is_zip:
5✔
334
                if is_external:
5✔
335
                    # this branch is deprecated: external plugins are always zip files
336
                    path = os.path.join(metadata['path'], '__init__.py')
×
337
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
338
                else:
339
                    init_spec = importlib.util.find_spec(base_name)
5✔
340
            else:
341
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
342
                dirname = metadata['dirname']
×
NEW
343
                init_spec = zipfile.find_spec(dirname)
×
344

345
            self.exec_module_from_spec(init_spec, base_name)
5✔
346
            if name == "trustedcoin":
5✔
347
                # removes trustedcoin after loading to not show it in the list of plugins
348
                del self.internal_plugin_metadata[name]
×
349

350
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
351
        if name in self.plugins:
5✔
352
            return self.plugins[name]
×
353
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
354
        self.maybe_load_plugin_init_method(name)
5✔
355
        is_external = name in self.external_plugin_metadata
5✔
356
        if is_external and not self.is_authorized(name):
5✔
NEW
357
            self.logger.info(f'plugin not authorized {name}')
×
NEW
358
            return
×
359
        if not is_external:
5✔
360
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
361
        else:
362
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
363

364
        spec = importlib.util.find_spec(full_name)
5✔
365
        if spec is None:
5✔
366
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
367
        try:
5✔
368
            module = self.exec_module_from_spec(spec, full_name)
5✔
369
            plugin = module.Plugin(self, self.config, name)
5✔
370
        except Exception as e:
×
371
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
372
        self.add_jobs(plugin.thread_jobs())
5✔
373
        self.plugins[name] = plugin
5✔
374
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
375
        return plugin
5✔
376

377
    def close_plugin(self, plugin):
5✔
378
        self.remove_jobs(plugin.thread_jobs())
×
379

380
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
NEW
381
        from hashlib import pbkdf2_hmac
×
NEW
382
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=1024)
×
NEW
383
        return ECPrivkey(secret)
×
384

385
    def is_installed(self, name) -> bool:
5✔
386
        """an external plugin may be installed but not authorized """
NEW
387
        return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
×
388

389
    def is_authorized(self, name) -> bool:
5✔
NEW
390
        if name in self.internal_plugin_metadata:
×
NEW
391
            return True
×
NEW
392
        if name not in self.external_plugin_metadata:
×
NEW
393
            return False
×
NEW
394
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
NEW
395
        if not pubkey_bytes:
×
NEW
396
            return False
×
NEW
397
        if not self.is_plugin_zip(name):
×
NEW
398
            return False
×
NEW
399
        filename = self.zip_plugin_path(name)
×
NEW
400
        plugin_hash = get_file_hash256(filename)
×
NEW
401
        sig = self.config.get('authorize_plugin_' + name)
×
NEW
402
        if not sig:
×
NEW
403
            return False
×
NEW
404
        pubkey = ECPubkey(pubkey_bytes)
×
NEW
405
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
406

407
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
NEW
408
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
NEW
409
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
NEW
410
        plugin_hash = get_file_hash256(filename)
×
NEW
411
        sig = privkey.ecdsa_sign(plugin_hash)
×
NEW
412
        value = sig.hex()
×
NEW
413
        self.config.set_key('authorize_plugin_' + name, value, save=True)
×
414

415
    def enable(self, name: str) -> 'BasePlugin':
5✔
416
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
417
        p = self.get(name)
×
418
        if p:
×
419
            return p
×
420
        return self.load_plugin(name)
×
421

422
    def disable(self, name: str) -> None:
5✔
423
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
424
        p = self.get(name)
×
425
        if not p:
×
426
            return
×
427
        self.plugins.pop(name)
×
428
        p.close()
×
429
        self.logger.info(f"closed {name}")
×
430

431
    @classmethod
5✔
432
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
NEW
433
        return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
×
434

435
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
436
        p = self.get(name)
×
437
        return self.disable(name) if p else self.enable(name)
×
438

439
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
440
        d = self.descriptions.get(name)
×
441
        if not d:
×
442
            return False
×
443
        deps = d.get('requires', [])
×
444
        for dep, s in deps:
×
445
            try:
×
446
                __import__(dep)
×
447
            except ImportError as e:
×
448
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
449
                return False
×
450
        requires = d.get('requires_wallet_type', [])
×
451
        return not requires or wallet.wallet_type in requires
×
452

453
    def get_hardware_support(self):
5✔
454
        out = []
×
455
        for name, (gui_good, details) in self.hw_wallets.items():
×
456
            if gui_good:
×
457
                try:
×
458
                    p = self.get_plugin(name)
×
459
                    if p.is_available():
×
460
                        out.append(HardwarePluginToScan(name=name,
×
461
                                                        description=details[2],
462
                                                        plugin=p,
463
                                                        exception=None))
464
                except Exception as e:
×
465
                    self.logger.exception(f"cannot load plugin for: {name}")
×
466
                    out.append(HardwarePluginToScan(name=name,
×
467
                                                    description=details[2],
468
                                                    plugin=None,
469
                                                    exception=e))
470
        return out
×
471

472
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
473
        from .wallet import register_wallet_type, register_constructor
5✔
474
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
475

476
        def loader():
5✔
477
            plugin = self.get_plugin(name)
5✔
478
            register_constructor(wallet_type, plugin.wallet_class)
5✔
479
        register_wallet_type(wallet_type)
5✔
480
        plugin_loaders[wallet_type] = loader
5✔
481

482
    def register_keystore(self, name, gui_good, details):
5✔
483
        from .keystore import register_keystore
5✔
484

485
        def dynamic_constructor(d):
5✔
486
            return self.get_plugin(name).keystore_class(d)
5✔
487
        if details[0] == 'hardware':
5✔
488
            self.hw_wallets[name] = (gui_good, details)
5✔
489
            self.logger.info(f"registering hardware {name}: {details}")
5✔
490
            register_keystore(details[1], dynamic_constructor)
5✔
491

492
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
493
        if name not in self.plugins:
5✔
494
            self.load_plugin(name)
5✔
495
        return self.plugins[name]
5✔
496

497
    def is_plugin_zip(self, name: str) -> bool:
5✔
498
        """Returns True if the plugin is a zip file"""
499
        if (metadata := self.get_metadata(name)) is None:
×
500
            return False
×
501
        return metadata.get('is_zip', False)
×
502

503
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
504
        """Returns the metadata of the plugin"""
505
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
506
        if not metadata:
5✔
507
            return None
×
508
        return metadata
5✔
509

510
    def run(self):
5✔
511
        while self.is_running():
5✔
512
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
513
            self.run_jobs()
5✔
514
        self.on_stop()
5✔
515

516

517
def get_file_hash256(path: str) -> bytes:
5✔
518
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
519
    with open(path, 'rb') as f:
×
NEW
520
        return sha256(f.read())
×
521

522

523
def hook(func):
5✔
524
    hook_names.add(func.__name__)
5✔
525
    return func
5✔
526

527

528
def run_hook(name, *args):
5✔
529
    results = []
5✔
530
    f_list = hooks.get(name, [])
5✔
531
    for p, f in f_list:
5✔
532
        if p.is_enabled():
5✔
533
            try:
5✔
534
                r = f(*args)
5✔
535
            except Exception:
×
536
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
537
                r = False
×
538
            if r:
5✔
539
                results.append(r)
×
540

541
    if results:
5✔
542
        assert len(results) == 1, results
×
543
        return results[0]
×
544

545

546
class BasePlugin(Logger):
5✔
547

548
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
549
        self.parent = parent  # type: Plugins  # The plugins object
5✔
550
        self.name = name
5✔
551
        self.config = config
5✔
552
        self.wallet = None  # fixme: this field should not exist
5✔
553
        Logger.__init__(self)
5✔
554
        # add self to hooks
555
        for k in dir(self):
5✔
556
            if k in hook_names:
5✔
557
                l = hooks.get(k, [])
5✔
558
                l.append((self, getattr(self, k)))
5✔
559
                hooks[k] = l
5✔
560

561
    def __str__(self):
5✔
562
        return self.name
×
563

564
    def close(self):
5✔
565
        # remove self from hooks
566
        for attr_name in dir(self):
×
567
            if attr_name in hook_names:
×
568
                # found attribute in self that is also the name of a hook
569
                l = hooks.get(attr_name, [])
×
570
                try:
×
571
                    l.remove((self, getattr(self, attr_name)))
×
572
                except ValueError:
×
573
                    # maybe attr name just collided with hook name and was not hook
574
                    continue
×
575
                hooks[attr_name] = l
×
576
        self.parent.close_plugin(self)
×
577
        self.on_close()
×
578

579
    def on_close(self):
5✔
580
        pass
×
581

582
    def requires_settings(self) -> bool:
5✔
583
        return False
×
584

585
    def thread_jobs(self):
5✔
586
        return []
5✔
587

588
    def is_enabled(self):
5✔
589
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
590

591
    def is_available(self):
5✔
592
        return True
×
593

594
    def can_user_disable(self):
5✔
595
        return True
×
596

597
    def settings_widget(self, window):
5✔
598
        raise NotImplementedError()
×
599

600
    def settings_dialog(self, window):
5✔
601
        raise NotImplementedError()
×
602

603
    def read_file(self, filename: str) -> bytes:
5✔
604
        if self.parent.is_plugin_zip(self.name):
×
605
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
606
            metadata = self.parent.external_plugin_metadata[self.name]
×
NEW
607
            dirname = metadata['dirname']
×
608
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
NEW
609
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
610
                    return myfile.read()
×
611
        else:
612
            if self.name in self.parent.internal_plugin_metadata:
×
613
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
614
            else:
615
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
616
            with open(path, 'rb') as myfile:
×
617
                return myfile.read()
×
618

619

620
class DeviceUnpairableError(UserFacingException): pass
5✔
621
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
622
class CannotAutoSelectDevice(Exception): pass
5✔
623

624

625
class Device(NamedTuple):
5✔
626
    path: Union[str, bytes]
5✔
627
    interface_number: int
5✔
628
    id_: str
5✔
629
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
630
    usage_page: int
5✔
631
    transport_ui_string: str
5✔
632

633

634
class DeviceInfo(NamedTuple):
5✔
635
    device: Device
5✔
636
    label: Optional[str] = None
5✔
637
    initialized: Optional[bool] = None
5✔
638
    exception: Optional[Exception] = None
5✔
639
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
640
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
641
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
642

643

644
class HardwarePluginToScan(NamedTuple):
5✔
645
    name: str
5✔
646
    description: str
5✔
647
    plugin: Optional['HW_PluginBase']
5✔
648
    exception: Optional[Exception]
5✔
649

650

651
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
652

653

654
# hidapi is not thread-safe
655
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
656
#     https://github.com/libusb/hidapi/issues/45
657
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
658
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
659
# It is not entirely clear to me, exactly what is safe and what isn't, when
660
# using multiple threads...
661
# Hence, we use a single thread for all device communications, including
662
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
663
# the following thread:
664
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
665
    max_workers=1,
666
    thread_name_prefix='hwd_comms_thread'
667
)
668

669
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
670
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
671
# To keep it simple, let's just import it now, as we are likely in the main thread here.
672
if threading.current_thread() is not threading.main_thread():
5✔
673
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
674
try:
5✔
675
    import hid
5✔
676
except ImportError:
5✔
677
    pass
5✔
678

679

680
T = TypeVar('T')
5✔
681

682

683
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
684
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
685
        return func()
×
686
    else:
687
        fut = _hwd_comms_executor.submit(func)
×
688
        return fut.result()
×
689
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
690

691

692
def runs_in_hwd_thread(func):
5✔
693
    @wraps(func)
5✔
694
    def wrapper(*args, **kwargs):
5✔
695
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
696
    return wrapper
5✔
697

698

699
def assert_runs_in_hwd_thread():
5✔
700
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
701
        raise Exception("must only be called from HWD communication thread")
×
702

703

704
class DeviceMgr(ThreadJob):
5✔
705
    """Manages hardware clients.  A client communicates over a hardware
706
    channel with the device.
707

708
    In addition to tracking device HID IDs, the device manager tracks
709
    hardware wallets and manages wallet pairing.  A HID ID may be
710
    paired with a wallet when it is confirmed that the hardware device
711
    matches the wallet, i.e. they have the same master public key.  A
712
    HID ID can be unpaired if e.g. it is wiped.
713

714
    Because of hotplugging, a wallet must request its client
715
    dynamically each time it is required, rather than caching it
716
    itself.
717

718
    The device manager is shared across plugins, so just one place
719
    does hardware scans when needed.  By tracking HID IDs, if a device
720
    is plugged into a different port the wallet is automatically
721
    re-paired.
722

723
    Wallets are informed on connect / disconnect events.  It must
724
    implement connected(), disconnected() callbacks.  Being connected
725
    implies a pairing.  Callbacks can happen in any thread context,
726
    and we do them without holding the lock.
727

728
    Confusingly, the HID ID (serial number) reported by the HID system
729
    doesn't match the device ID reported by the device itself.  We use
730
    the HID IDs.
731

732
    This plugin is thread-safe.  Currently only devices supported by
733
    hidapi are implemented."""
734

735
    def __init__(self, config: SimpleConfig):
5✔
736
        ThreadJob.__init__(self)
5✔
737
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
738
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
739
        # A client->id_ map. Needs self.lock.
740
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
741
        # What we recognise.  (vendor_id, product_id) -> Plugin
742
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
743
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
744
        # Custom enumerate functions for devices we don't know about.
745
        self._enumerate_func = set()  # Needs self.lock.
5✔
746

747
        self.lock = threading.RLock()
5✔
748

749
        self.config = config
5✔
750

751
    def thread_jobs(self):
5✔
752
        # Thread job to handle device timeouts
753
        return [self]
5✔
754

755
    def run(self):
5✔
756
        '''Handle device timeouts.  Runs in the context of the Plugins
757
        thread.'''
758
        with self.lock:
5✔
759
            clients = list(self.clients.keys())
5✔
760
        cutoff = time.time() - self.config.get_session_timeout()
5✔
761
        for client in clients:
5✔
762
            client.timeout(cutoff)
×
763

764
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
765
        for pair in device_pairs:
×
766
            self._recognised_hardware[pair] = plugin
×
767

768
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
769
        for vendor_id in vendor_ids:
×
770
            self._recognised_vendor[vendor_id] = plugin
×
771

772
    def register_enumerate_func(self, func):
5✔
773
        with self.lock:
×
774
            self._enumerate_func.add(func)
×
775

776
    @runs_in_hwd_thread
5✔
777
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
778
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
779
        # Get from cache first
780
        client = self._client_by_id(device.id_)
×
781
        if client:
×
782
            return client
×
783
        client = plugin.create_client(device, handler)
×
784
        if client:
×
785
            self.logger.info(f"Registering {client}")
×
786
            with self.lock:
×
787
                self.clients[client] = device.id_
×
788
        return client
×
789

790
    def id_by_pairing_code(self, pairing_code):
5✔
791
        with self.lock:
×
792
            return self.pairing_code_to_id.get(pairing_code)
×
793

794
    def pairing_code_by_id(self, id_):
5✔
795
        with self.lock:
×
796
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
797
                if id2 == id_:
×
798
                    return pairing_code
×
799
            return None
×
800

801
    def unpair_pairing_code(self, pairing_code):
5✔
802
        with self.lock:
×
803
            if pairing_code not in self.pairing_code_to_id:
×
804
                return
×
805
            _id = self.pairing_code_to_id.pop(pairing_code)
×
806
        self._close_client(_id)
×
807

808
    def unpair_id(self, id_):
5✔
809
        pairing_code = self.pairing_code_by_id(id_)
×
810
        if pairing_code:
×
811
            self.unpair_pairing_code(pairing_code)
×
812
        else:
813
            self._close_client(id_)
×
814

815
    def _close_client(self, id_):
5✔
816
        with self.lock:
×
817
            client = self._client_by_id(id_)
×
818
            self.clients.pop(client, None)
×
819
        if client:
×
820
            client.close()
×
821

822
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
823
        with self.lock:
×
824
            for client, client_id in self.clients.items():
×
825
                if client_id == id_:
×
826
                    return client
×
827
        return None
×
828

829
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
830
        '''Returns a client for the device ID if one is registered.  If
831
        a device is wiped or in bootloader mode pairing is impossible;
832
        in such cases we communicate by device ID and not wallet.'''
833
        if scan_now:
×
834
            self.scan_devices()
×
835
        return self._client_by_id(id_)
×
836

837
    @runs_in_hwd_thread
5✔
838
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
839
                            keystore: 'Hardware_KeyStore',
840
                            force_pair: bool, *,
841
                            devices: Sequence['Device'] = None,
842
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
843
        self.logger.info("getting client for keystore")
×
844
        if handler is None:
×
845
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
846
        handler.update_status(False)
×
847
        pcode = keystore.pairing_code()
×
848
        client = None
×
849
        # search existing clients first (fast-path)
850
        if not devices:
×
851
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
852
        # search clients again, now allowing a (slow) scan
853
        if client is None:
×
854
            if devices is None:
×
855
                devices = self.scan_devices()
×
856
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
857
        if client is None and force_pair:
×
858
            try:
×
859
                info = self.select_device(plugin, handler, keystore, devices,
×
860
                                          allow_user_interaction=allow_user_interaction)
861
            except CannotAutoSelectDevice:
×
862
                pass
×
863
            else:
864
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
865
        if client:
×
866
            handler.update_status(True)
×
867
            # note: if select_device was called, we might also update label etc here:
868
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
869
        self.logger.info("end client for keystore")
×
870
        return client
×
871

872
    def client_by_pairing_code(
5✔
873
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
874
        devices: Sequence['Device'],
875
    ) -> Optional['HardwareClientBase']:
876
        _id = self.id_by_pairing_code(pairing_code)
×
877
        client = self._client_by_id(_id)
×
878
        if client:
×
879
            if type(client.plugin) != type(plugin):
×
880
                return
×
881
            # An unpaired client might have another wallet's handler
882
            # from a prior scan.  Replace to fix dialog parenting.
883
            client.handler = handler
×
884
            return client
×
885

886
        for device in devices:
×
887
            if device.id_ == _id:
×
888
                return self.create_client(device, handler, plugin)
×
889

890
    def force_pair_keystore(
5✔
891
        self,
892
        *,
893
        plugin: 'HW_PluginBase',
894
        handler: 'HardwareHandlerBase',
895
        info: 'DeviceInfo',
896
        keystore: 'Hardware_KeyStore',
897
    ) -> 'HardwareClientBase':
898
        xpub = keystore.xpub
×
899
        derivation = keystore.get_derivation_prefix()
×
900
        assert derivation is not None
×
901
        xtype = bip32.xpub_type(xpub)
×
902
        client = self._client_by_id(info.device.id_)
×
903
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
904
            # See comment above for same code
905
            client.handler = handler
×
906
            # This will trigger a PIN/passphrase entry request
907
            try:
×
908
                client_xpub = client.get_xpub(derivation, xtype)
×
909
            except (UserCancelled, RuntimeError):
×
910
                # Bad / cancelled PIN / passphrase
911
                client_xpub = None
×
912
            if client_xpub == xpub:
×
913
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
914
                with self.lock:
×
915
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
916
                return client
×
917

918
        # The user input has wrong PIN or passphrase, or cancelled input,
919
        # or it is not pairable
920
        raise DeviceUnpairableError(
×
921
            _('Electrum cannot pair with your {}.\n\n'
922
              'Before you request bitcoins to be sent to addresses in this '
923
              'wallet, ensure you can pair with your device, or that you have '
924
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
925
              'receive will be unspendable.').format(plugin.device))
926

927
    def list_pairable_device_infos(
5✔
928
        self,
929
        *,
930
        handler: Optional['HardwareHandlerBase'],
931
        plugin: 'HW_PluginBase',
932
        devices: Sequence['Device'] = None,
933
        include_failing_clients: bool = False,
934
    ) -> List['DeviceInfo']:
935
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
936
        Already paired devices are also included, as it is okay to reuse them.
937
        """
938
        if not plugin.libraries_available:
×
939
            message = plugin.get_library_not_available_message()
×
940
            raise HardwarePluginLibraryUnavailable(message)
×
941
        if devices is None:
×
942
            devices = self.scan_devices()
×
943
        infos = []
×
944
        for device in devices:
×
945
            if not plugin.can_recognize_device(device):
×
946
                continue
×
947
            try:
×
948
                client = self.create_client(device, handler, plugin)
×
949
                if not client:
×
950
                    continue
×
951
                label = client.label()
×
952
                is_initialized = client.is_initialized()
×
953
                soft_device_id = client.get_soft_device_id()
×
954
                model_name = client.device_model_name()
×
955
            except Exception as e:
×
956
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
957
                if include_failing_clients:
×
958
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
959
                continue
×
960
            infos.append(DeviceInfo(device=device,
×
961
                                    label=label,
962
                                    initialized=is_initialized,
963
                                    plugin_name=plugin.name,
964
                                    soft_device_id=soft_device_id,
965
                                    model_name=model_name))
966

967
        return infos
×
968

969
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
970
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
971
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
972
        """Select the device to use for keystore."""
973
        # ideally this should not be called from the GUI thread...
974
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
975
        while True:
×
976
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
977
            if infos:
×
978
                break
×
979
            if not allow_user_interaction:
×
980
                raise CannotAutoSelectDevice()
×
981
            msg = _('Please insert your {}').format(plugin.device)
×
982
            msg += " ("
×
983
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
984
                msg += f"label: {keystore.label}, "
×
985
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
986
            msg += ').\n\n{}\n\n{}'.format(
×
987
                _('Verify the cable is connected and that '
988
                  'no other application is using it.'),
989
                _('Try to connect again?')
990
            )
991
            if not handler.yes_no_question(msg):
×
992
                raise UserCancelled()
×
993
            devices = None
×
994

995
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
996
        # method 1: select device by id
997
        if keystore.soft_device_id:
×
998
            for info in infos:
×
999
                if info.soft_device_id == keystore.soft_device_id:
×
1000
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1001
                    return info
×
1002
        # method 2: select device by label
1003
        #           but only if not a placeholder label and only if there is no collision
1004
        device_labels = [info.label for info in infos]
×
1005
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1006
                and device_labels.count(keystore.label) == 1):
1007
            for info in infos:
×
1008
                if info.label == keystore.label:
×
1009
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1010
                    return info
×
1011
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1012
        #           saved for keystore anyway, select it
1013
        if (len(infos) == 1
×
1014
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1015
                and keystore.soft_device_id is None):
1016
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1017
            return infos[0]
×
1018

1019
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1020
        if not allow_user_interaction:
×
1021
            raise CannotAutoSelectDevice()
×
1022
        # ask user to select device manually
1023
        msg = (
×
1024
                _("Could not automatically pair with device for given keystore.") + "\n"
1025
                + f"(keystore label: {keystore.label!r}, "
1026
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1027
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1028
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1029
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1030
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1031
                                init=(_("initialized") if info.initialized else _("wiped")),
1032
                                transport=info.device.transport_ui_string,
1033
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1034
                        for info in infos]
1035
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1036
                          f"num options: {len(infos)}. options: {infos}")
1037
        c = handler.query_choice(msg, descriptions)
×
1038
        if c is None:
×
1039
            raise UserCancelled()
×
1040
        info = infos[c]
×
1041
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1042
        # note: updated label/soft_device_id will be saved after pairing succeeds
1043
        return info
×
1044

1045
    @runs_in_hwd_thread
5✔
1046
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1047
        try:
×
1048
            import hid  # noqa: F811
×
1049
        except ImportError:
×
1050
            return []
×
1051

1052
        devices = []
×
1053
        for d in hid.enumerate(0, 0):
×
1054
            vendor_id = d['vendor_id']
×
1055
            product_key = (vendor_id, d['product_id'])
×
1056
            plugin = None
×
1057
            if product_key in self._recognised_hardware:
×
1058
                plugin = self._recognised_hardware[product_key]
×
1059
            elif vendor_id in self._recognised_vendor:
×
1060
                plugin = self._recognised_vendor[vendor_id]
×
1061
            if plugin:
×
1062
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1063
                if device:
×
1064
                    devices.append(device)
×
1065
        return devices
×
1066

1067
    @runs_in_hwd_thread
5✔
1068
    @profiler
5✔
1069
    def scan_devices(self) -> Sequence['Device']:
5✔
1070
        self.logger.info("scanning devices...")
×
1071

1072
        # First see what's connected that we know about
1073
        devices = self._scan_devices_with_hid()
×
1074

1075
        # Let plugin handlers enumerate devices we don't know about
1076
        with self.lock:
×
1077
            enumerate_funcs = list(self._enumerate_func)
×
1078
        for f in enumerate_funcs:
×
1079
            try:
×
1080
                new_devices = f()
×
1081
            except BaseException as e:
×
1082
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1083
            else:
1084
                devices.extend(new_devices)
×
1085

1086
        # find out what was disconnected
1087
        client_ids = [dev.id_ for dev in devices]
×
1088
        disconnected_clients = []
×
1089
        with self.lock:
×
1090
            connected = {}
×
1091
            for client, id_ in self.clients.items():
×
1092
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1093
                    connected[client] = id_
×
1094
                else:
1095
                    disconnected_clients.append((client, id_))
×
1096
            self.clients = connected
×
1097

1098
        # Unpair disconnected devices
1099
        for client, id_ in disconnected_clients:
×
1100
            self.unpair_id(id_)
×
1101
            if client.handler:
×
1102
                client.handler.update_status(False)
×
1103

1104
        return devices
×
1105

1106
    @classmethod
5✔
1107
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1108
        ret = {}
×
1109
        # add libusb
1110
        try:
×
1111
            import usb1
×
1112
        except Exception as e:
×
1113
            ret["libusb.version"] = None
×
1114
        else:
1115
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1116
            try:
×
1117
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1118
            except AttributeError:
×
1119
                ret["libusb.path"] = None
×
1120
        # add hidapi
1121
        try:
×
1122
            import hid  # noqa: F811
×
1123
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1124
        except Exception as e:
×
1125
            from importlib.metadata import version
×
1126
            try:
×
1127
                ret["hidapi.version"] = version("hidapi")
×
1128
            except ImportError:
×
1129
                ret["hidapi.version"] = None
×
1130
        return ret
×
1131

1132
    def trigger_pairings(
5✔
1133
            self,
1134
            keystores: Sequence['KeyStore'],
1135
            *,
1136
            allow_user_interaction: bool = True,
1137
            devices: Sequence['Device'] = None,
1138
    ) -> None:
1139
        """Given a list of keystores, try to pair each with a connected hardware device.
1140

1141
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1142
        try to pair each keystore individually. Consider the following scenario:
1143
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1144
        - assume none of the devices are paired yet
1145
        1. if we tried to individually pair keystores, we might try with ks1 first
1146
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1147
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1148
             which is confusing and error-prone. It's especially problematic if the hw device does
1149
             not support labels (such as Ledger), as then the user cannot easily distinguish
1150
             same-type devices. (see #4199)
1151
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1152
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1153
        """
1154
        from .keystore import Hardware_KeyStore
×
1155
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1156
        if not keystores:
×
1157
            return
×
1158
        if devices is None:
×
1159
            devices = self.scan_devices()
×
1160
        # first pair with all devices that can be auto-selected
1161
        for ks in keystores:
×
1162
            try:
×
1163
                ks.get_client(
×
1164
                    force_pair=True,
1165
                    allow_user_interaction=False,
1166
                    devices=devices,
1167
                )
1168
            except UserCancelled:
×
1169
                pass
×
1170
        if allow_user_interaction:
×
1171
            # now do manual selections
1172
            for ks in keystores:
×
1173
                try:
×
1174
                    ks.get_client(
×
1175
                        force_pair=True,
1176
                        allow_user_interaction=True,
1177
                        devices=devices,
1178
                    )
1179
                except UserCancelled:
×
1180
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc