• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4630045018292224

18 Mar 2025 01:56PM UTC coverage: 61.127% (-0.06%) from 61.188%
4630045018292224

Pull #9651

CirrusCI

f321x
use manifest.json instead of loading init file for plugin registration
Pull Request #9651: Use manifest.json instead of loading init file for plugin registration

21 of 56 new or added lines in 2 files covered. (37.5%)

18 existing lines in 2 files now uncovered.

21364 of 34950 relevant lines covered (61.13%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.79
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import traceback
5✔
33
import sys
5✔
34
import aiohttp
5✔
35
import zipfile as zipfile_lib
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from concurrent import futures
5✔
42
from functools import wraps, partial
5✔
43
from itertools import chain
5✔
44

45
from .i18n import _
5✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
47
from . import bip32
5✔
48
from . import plugins
5✔
49
from .simple_config import SimpleConfig
5✔
50
from .logging import get_logger, Logger
5✔
51
from .crypto import sha256
5✔
52

53
if TYPE_CHECKING:
5✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
5✔
60
plugin_loaders = {}
5✔
61
hook_names = set()
5✔
62
hooks = {}
5✔
63
_root_permission_cache = {}
5✔
64

65

66
class Plugins(DaemonThread):
5✔
67

68
    LOGGING_SHORTCUT = 'p'
5✔
69
    pkgpath = os.path.dirname(plugins.__file__)
5✔
70

71
    @profiler
5✔
72
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
73
        self.config = config
5✔
74
        self.cmd_only = cmd_only  # type: bool
5✔
75
        self.internal_plugin_metadata = {}
5✔
76
        self.external_plugin_metadata = {}
5✔
77
        self.loaded_command_modules = set()  # type: set[str]
5✔
78
        if cmd_only:
5✔
79
            # only import the command modules of plugins
80
            Logger.__init__(self)
×
81
            self.find_plugins()
×
82
            return
×
83
        DaemonThread.__init__(self)
5✔
84
        self.device_manager = DeviceMgr(config)
5✔
85
        self.name = 'Plugins'  # set name of thread
5✔
86
        self.hw_wallets = {}
5✔
87
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
88
        self.gui_name = gui_name
5✔
89
        self.find_plugins()
5✔
90
        self.load_plugins()
5✔
91
        self.add_jobs(self.device_manager.thread_jobs())
5✔
92
        self.start()
5✔
93

94
    @property
5✔
95
    def descriptions(self):
5✔
96
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
97

98
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
99
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
100
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
101
        for loader, name, ispkg in iter_modules:
5✔
102
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
103
            #       once as data and once as code. To honor the "no duplicates" rule below,
104
            #       we exclude the ones packaged as *code*, here:
105
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
106
                continue
×
107
            module_path = os.path.join(pkg_path, name)
5✔
108
            if external and not self._has_recursive_root_permissions(module_path):
5✔
NEW
109
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
UNCOV
110
                continue
×
111
            if self.cmd_only and self.config.get('enable_plugin_' + name) is True:
5✔
112
                # we only want to load the init module for the commands if the plugin is enabled
NEW
113
                base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
×
NEW
114
                full_name = f'{base_name}.{name}'
×
NEW
115
                if external:
×
NEW
116
                    init_path = os.path.join(module_path, '__init__.py')
×
NEW
117
                    spec = importlib.util.spec_from_file_location(full_name, init_path)
×
118
                else:
NEW
119
                    spec = importlib.util.find_spec(full_name)
×
NEW
120
                if spec is None:
×
121
                    continue # no commands module in this plugin
×
NEW
122
                self.exec_module_from_spec(spec, full_name)
×
UNCOV
123
                assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
124
                self.loaded_command_modules.add(name)
×
125
                continue
×
126
            elif self.cmd_only:
5✔
127
                # we only look for commands but this plugin is not enabled
NEW
128
                continue
×
129
            try:
5✔
130
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
131
                    d = json.load(f)
5✔
132
            except FileNotFoundError:
5✔
133
                continue
5✔
134
            if 'fullname' not in d:
5✔
UNCOV
135
                continue
×
136
            d['display_name'] = d['fullname']
5✔
137
            gui_good = self.gui_name in d.get('available_for', [])
5✔
138
            if not gui_good:
5✔
139
                continue
5✔
140
            details = d.get('registers_wallet_type')
5✔
141
            if details:
5✔
142
                self.register_wallet_type(name, gui_good, details)
5✔
143
            details = d.get('registers_keystore')
5✔
144
            if details:
5✔
145
                self.register_keystore(name, gui_good, details)
5✔
146
            if d.get('requires_wallet_type'):
5✔
147
                # trustedcoin will not be added to list
148
                continue
5✔
149
            d['path'] = module_path
5✔
150
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
151
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
152
                raise Exception(f"duplicate plugins? for {name=}")
×
153
            if not external:
5✔
154
                self.internal_plugin_metadata[name] = d
5✔
155
            else:
156
                self.external_plugin_metadata[name] = d
×
157

158
    @staticmethod
5✔
159
    def exec_module_from_spec(spec, path):
5✔
160
        try:
5✔
161
            module = importlib.util.module_from_spec(spec)
5✔
162
            # sys.modules needs to be modified for relative imports to work
163
            # see https://stackoverflow.com/a/50395128
164
            sys.modules[path] = module
5✔
165
            spec.loader.exec_module(module)
5✔
166
        except Exception as e:
×
167
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
168
        return module
5✔
169

170
    def find_plugins(self):
5✔
171
        internal_plugins_path = (self.pkgpath, False)
5✔
172
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
173
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
174
            # external plugins enforce root permissions on the directory
175
            if pkg_path and os.path.exists(pkg_path):
5✔
176
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
177
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
178

179
    def load_plugins(self):
5✔
180
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
181
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
182
                try:
×
183
                    self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    @profiler(min_threshold=0.5)
5✔
191
    def _has_recursive_root_permissions(self, path):
5✔
192
        """Check if a directory and all its subdirectories have root permissions"""
193
        global _root_permission_cache
NEW
194
        if _root_permission_cache.get(path) is not None:
×
NEW
195
            return _root_permission_cache[path]
×
NEW
196
        _root_permission_cache[path] = False
×
197
        for root, dirs, files in os.walk(path):
×
198
            if not self._has_root_permissions(root):
×
199
                return False
×
200
            for f in files:
×
201
                if not self._has_root_permissions(os.path.join(root, f)):
×
202
                    return False
×
NEW
203
        _root_permission_cache[path] = True
×
UNCOV
204
        return True
×
205

206
    def get_external_plugin_dir(self):
5✔
207
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
208
            return
×
209
        pkg_path = '/opt/electrum_plugins'
5✔
210
        if not os.path.exists(pkg_path):
5✔
211
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
212
            return
5✔
213
        if not self._has_root_permissions(pkg_path):
×
214
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
215
            return
×
216
        return pkg_path
×
217

218
    def zip_plugin_path(self, name):
5✔
219
        filename = self.get_metadata(name)['filename']
×
220
        if name in self.internal_plugin_metadata:
×
221
            pkg_path = self.pkgpath
×
222
        else:
223
            pkg_path = self.get_external_plugin_dir()
×
224
        return os.path.join(pkg_path, filename)
×
225

226
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
227
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
228
        if pkg_path is None:
5✔
229
            return
×
230
        for filename in os.listdir(pkg_path):
5✔
231
            path = os.path.join(pkg_path, filename)
5✔
232
            if not filename.endswith('.zip'):
5✔
233
                continue
5✔
234
            if external and not self._has_root_permissions(path):
×
235
                self.logger.info(f'not loading {path}: file has user write permissions')
×
236
                continue
×
237
            try:
×
238
                zipfile = zipimport.zipimporter(path)
×
239
            except zipimport.ZipImportError:
×
240
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
241
                continue
×
242
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
243
                if b is False:
×
244
                    continue
×
245
                if name in self.internal_plugin_metadata:
×
246
                    raise Exception(f"duplicate plugins for name={name}")
×
247
                if name in self.external_plugin_metadata:
×
248
                    raise Exception(f"duplicate plugins for name={name}")
×
NEW
249
                if self.cmd_only and self.config.get('enable_plugin_' + name):
×
250
                    # load the init module of the plugin to register its commands
NEW
251
                    full_name = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
×
NEW
252
                    spec = zipfile.find_spec(name)
×
NEW
253
                    self.exec_module_from_spec(spec, full_name)
×
254
                    assert name not in self.loaded_command_modules, f"tried to load commands of {name} twice"
×
255
                    self.loaded_command_modules.add(name)
×
256
                    continue
×
NEW
257
                elif self.cmd_only:
×
NEW
258
                    continue
×
NEW
259
                try:
×
NEW
260
                    with zipfile_lib.ZipFile(path) as file:
×
NEW
261
                        manifest_path = os.path.join(name, 'manifest.json')
×
NEW
262
                        with file.open(manifest_path, 'r') as f:
×
NEW
263
                            d = json.load(f)
×
NEW
264
                except Exception:
×
NEW
265
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
NEW
266
                    continue
×
267
                gui_good = self.gui_name in d.get('available_for', [])
×
268
                if not gui_good:
×
269
                    continue
×
270
                d['filename'] = filename
×
271
                if 'fullname' not in d:
×
272
                    continue
×
273
                d['display_name'] = d['fullname']
×
274
                d['zip_hash_sha256'] = get_file_hash256(path)
×
275
                d['is_zip'] = True
×
NEW
276
                d['path'] = path
×
277
                if external:
×
278
                    self.external_plugin_metadata[name] = d
×
279
                else:
280
                    self.internal_plugin_metadata[name] = d
×
281

282
    def get(self, name):
5✔
283
        return self.plugins.get(name)
×
284

285
    def count(self):
5✔
286
        return len(self.plugins)
×
287

288
    def load_plugin(self, name) -> 'BasePlugin':
5✔
289
        """Imports the code of the given plugin.
290
        note: can be called from any thread.
291
        """
292
        if self.get_metadata(name):
5✔
293
            return self.load_plugin_by_name(name)
5✔
294
        else:
295
            raise Exception(f"could not find plugin {name!r}")
×
296

297
    def load_plugin_by_name(self, name) -> 'BasePlugin':
5✔
298
        if name in self.plugins:
5✔
299
            return self.plugins[name]
×
300

301
        is_zip = self.is_plugin_zip(name)
5✔
302
        is_external = name in self.external_plugin_metadata
5✔
303
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
304
        if base_name not in sys.modules:
5✔
305
            metadata = self.get_metadata(name)
5✔
306
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
307
            if not is_zip:
5✔
308
                if is_external:
5✔
NEW
309
                    path = os.path.join(metadata['path'], '__init__.py')
×
NEW
310
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
311
                else:
312
                    init_spec = importlib.util.find_spec(base_name)
5✔
313
            else:
NEW
314
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
315
                init_spec = zipfile.find_spec(name)
×
316
            self.exec_module_from_spec(init_spec, base_name)
5✔
317

318
        if not is_external:
5✔
319
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
320
        else:
321
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
322

323
        spec = importlib.util.find_spec(full_name)
5✔
324
        if spec is None:
5✔
325
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
326
        try:
5✔
327
            module = self.exec_module_from_spec(spec, full_name)
5✔
328
            plugin = module.Plugin(self, self.config, name)
5✔
329
        except Exception as e:
×
330
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
331
        self.add_jobs(plugin.thread_jobs())
5✔
332
        self.plugins[name] = plugin
5✔
333
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
334
        return plugin
5✔
335

336
    def close_plugin(self, plugin):
5✔
337
        self.remove_jobs(plugin.thread_jobs())
×
338

339
    def enable(self, name: str) -> 'BasePlugin':
5✔
340
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
341
        p = self.get(name)
×
342
        if p:
×
343
            return p
×
344
        return self.load_plugin(name)
×
345

346
    def disable(self, name: str) -> None:
5✔
347
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
348
        p = self.get(name)
×
349
        if not p:
×
350
            return
×
351
        self.plugins.pop(name)
×
352
        p.close()
×
353
        self.logger.info(f"closed {name}")
×
354

355
    @classmethod
5✔
356
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
357
        return key.startswith('enable_plugin_')
×
358

359
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
360
        p = self.get(name)
×
361
        return self.disable(name) if p else self.enable(name)
×
362

363
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
364
        d = self.descriptions.get(name)
×
365
        if not d:
×
366
            return False
×
367
        deps = d.get('requires', [])
×
368
        for dep, s in deps:
×
369
            try:
×
370
                __import__(dep)
×
371
            except ImportError as e:
×
372
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
373
                return False
×
374
        requires = d.get('requires_wallet_type', [])
×
375
        return not requires or wallet.wallet_type in requires
×
376

377
    def get_hardware_support(self):
5✔
378
        out = []
×
379
        for name, (gui_good, details) in self.hw_wallets.items():
×
380
            if gui_good:
×
381
                try:
×
382
                    p = self.get_plugin(name)
×
383
                    if p.is_available():
×
384
                        out.append(HardwarePluginToScan(name=name,
×
385
                                                        description=details[2],
386
                                                        plugin=p,
387
                                                        exception=None))
388
                except Exception as e:
×
389
                    self.logger.exception(f"cannot load plugin for: {name}")
×
390
                    out.append(HardwarePluginToScan(name=name,
×
391
                                                    description=details[2],
392
                                                    plugin=None,
393
                                                    exception=e))
394
        return out
×
395

396
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
397
        from .wallet import register_wallet_type, register_constructor
5✔
398
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
399

400
        def loader():
5✔
401
            plugin = self.get_plugin(name)
5✔
402
            register_constructor(wallet_type, plugin.wallet_class)
5✔
403
        register_wallet_type(wallet_type)
5✔
404
        plugin_loaders[wallet_type] = loader
5✔
405

406
    def register_keystore(self, name, gui_good, details):
5✔
407
        from .keystore import register_keystore
5✔
408

409
        def dynamic_constructor(d):
5✔
410
            return self.get_plugin(name).keystore_class(d)
5✔
411
        if details[0] == 'hardware':
5✔
412
            self.hw_wallets[name] = (gui_good, details)
5✔
413
            self.logger.info(f"registering hardware {name}: {details}")
5✔
414
            register_keystore(details[1], dynamic_constructor)
5✔
415

416
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
417
        if name not in self.plugins:
5✔
418
            self.load_plugin(name)
5✔
419
        return self.plugins[name]
5✔
420

421
    def is_plugin_zip(self, name: str) -> bool:
5✔
422
        """Returns True if the plugin is a zip file"""
423
        if (metadata := self.get_metadata(name)) is None:
5✔
424
            return False
5✔
425
        return metadata.get('is_zip', False)
5✔
426

427
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
428
        """Returns the metadata of the plugin"""
429
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
430
        if not metadata:
5✔
431
            return None
5✔
432
        return metadata
5✔
433

434
    def run(self):
5✔
435
        while self.is_running():
5✔
436
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
437
            self.run_jobs()
5✔
438
        self.on_stop()
5✔
439

440

441
def get_file_hash256(path: str) -> str:
5✔
442
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
443
    with open(path, 'rb') as f:
×
444
        return sha256(f.read()).hex()
×
445

446
def hook(func):
5✔
447
    hook_names.add(func.__name__)
5✔
448
    return func
5✔
449

450

451
def run_hook(name, *args):
5✔
452
    results = []
5✔
453
    f_list = hooks.get(name, [])
5✔
454
    for p, f in f_list:
5✔
455
        if p.is_enabled():
5✔
456
            try:
5✔
457
                r = f(*args)
5✔
458
            except Exception:
×
459
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
460
                r = False
×
461
            if r:
5✔
462
                results.append(r)
×
463

464
    if results:
5✔
465
        assert len(results) == 1, results
×
466
        return results[0]
×
467

468

469
class BasePlugin(Logger):
5✔
470

471
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
472
        self.parent = parent  # type: Plugins  # The plugins object
5✔
473
        self.name = name
5✔
474
        self.config = config
5✔
475
        self.wallet = None  # fixme: this field should not exist
5✔
476
        Logger.__init__(self)
5✔
477
        # add self to hooks
478
        for k in dir(self):
5✔
479
            if k in hook_names:
5✔
480
                l = hooks.get(k, [])
5✔
481
                l.append((self, getattr(self, k)))
5✔
482
                hooks[k] = l
5✔
483

484
    def __str__(self):
5✔
485
        return self.name
×
486

487
    def close(self):
5✔
488
        # remove self from hooks
489
        for attr_name in dir(self):
×
490
            if attr_name in hook_names:
×
491
                # found attribute in self that is also the name of a hook
492
                l = hooks.get(attr_name, [])
×
493
                try:
×
494
                    l.remove((self, getattr(self, attr_name)))
×
495
                except ValueError:
×
496
                    # maybe attr name just collided with hook name and was not hook
497
                    continue
×
498
                hooks[attr_name] = l
×
499
        self.parent.close_plugin(self)
×
500
        self.on_close()
×
501

502
    def on_close(self):
5✔
503
        pass
×
504

505
    def requires_settings(self) -> bool:
5✔
506
        return False
×
507

508
    def thread_jobs(self):
5✔
509
        return []
5✔
510

511
    def is_enabled(self):
5✔
512
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
513

514
    def is_available(self):
5✔
515
        return True
×
516

517
    def can_user_disable(self):
5✔
518
        return True
×
519

520
    def settings_widget(self, window):
5✔
521
        raise NotImplementedError()
×
522

523
    def settings_dialog(self, window):
5✔
524
        raise NotImplementedError()
×
525

526
    def read_file(self, filename: str) -> bytes:
5✔
527
        if self.parent.is_plugin_zip(self.name):
×
528
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
529
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
530
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
531
                    return myfile.read()
×
532
        else:
533
            if self.name in self.parent.internal_plugin_metadata:
×
534
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
535
            else:
536
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
537
            with open(path, 'rb') as myfile:
×
538
                return myfile.read()
×
539

540

541
class DeviceUnpairableError(UserFacingException): pass
5✔
542
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
543
class CannotAutoSelectDevice(Exception): pass
5✔
544

545

546
class Device(NamedTuple):
5✔
547
    path: Union[str, bytes]
5✔
548
    interface_number: int
5✔
549
    id_: str
5✔
550
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
551
    usage_page: int
5✔
552
    transport_ui_string: str
5✔
553

554

555
class DeviceInfo(NamedTuple):
5✔
556
    device: Device
5✔
557
    label: Optional[str] = None
5✔
558
    initialized: Optional[bool] = None
5✔
559
    exception: Optional[Exception] = None
5✔
560
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
561
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
562
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
563

564

565
class HardwarePluginToScan(NamedTuple):
5✔
566
    name: str
5✔
567
    description: str
5✔
568
    plugin: Optional['HW_PluginBase']
5✔
569
    exception: Optional[Exception]
5✔
570

571

572
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
573

574

575
# hidapi is not thread-safe
576
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
577
#     https://github.com/libusb/hidapi/issues/45
578
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
579
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
580
# It is not entirely clear to me, exactly what is safe and what isn't, when
581
# using multiple threads...
582
# Hence, we use a single thread for all device communications, including
583
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
584
# the following thread:
585
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
586
    max_workers=1,
587
    thread_name_prefix='hwd_comms_thread'
588
)
589

590
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
591
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
592
# To keep it simple, let's just import it now, as we are likely in the main thread here.
593
if threading.current_thread() is not threading.main_thread():
5✔
594
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
595
try:
5✔
596
    import hid
5✔
597
except ImportError:
5✔
598
    pass
5✔
599

600

601
T = TypeVar('T')
5✔
602

603

604
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
605
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
606
        return func()
×
607
    else:
608
        fut = _hwd_comms_executor.submit(func)
×
609
        return fut.result()
×
610
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
611

612

613
def runs_in_hwd_thread(func):
5✔
614
    @wraps(func)
5✔
615
    def wrapper(*args, **kwargs):
5✔
616
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
617
    return wrapper
5✔
618

619

620
def assert_runs_in_hwd_thread():
5✔
621
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
622
        raise Exception("must only be called from HWD communication thread")
×
623

624

625
class DeviceMgr(ThreadJob):
5✔
626
    """Manages hardware clients.  A client communicates over a hardware
627
    channel with the device.
628

629
    In addition to tracking device HID IDs, the device manager tracks
630
    hardware wallets and manages wallet pairing.  A HID ID may be
631
    paired with a wallet when it is confirmed that the hardware device
632
    matches the wallet, i.e. they have the same master public key.  A
633
    HID ID can be unpaired if e.g. it is wiped.
634

635
    Because of hotplugging, a wallet must request its client
636
    dynamically each time it is required, rather than caching it
637
    itself.
638

639
    The device manager is shared across plugins, so just one place
640
    does hardware scans when needed.  By tracking HID IDs, if a device
641
    is plugged into a different port the wallet is automatically
642
    re-paired.
643

644
    Wallets are informed on connect / disconnect events.  It must
645
    implement connected(), disconnected() callbacks.  Being connected
646
    implies a pairing.  Callbacks can happen in any thread context,
647
    and we do them without holding the lock.
648

649
    Confusingly, the HID ID (serial number) reported by the HID system
650
    doesn't match the device ID reported by the device itself.  We use
651
    the HID IDs.
652

653
    This plugin is thread-safe.  Currently only devices supported by
654
    hidapi are implemented."""
655

656
    def __init__(self, config: SimpleConfig):
5✔
657
        ThreadJob.__init__(self)
5✔
658
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
659
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
660
        # A client->id_ map. Needs self.lock.
661
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
662
        # What we recognise.  (vendor_id, product_id) -> Plugin
663
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
664
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
665
        # Custom enumerate functions for devices we don't know about.
666
        self._enumerate_func = set()  # Needs self.lock.
5✔
667

668
        self.lock = threading.RLock()
5✔
669

670
        self.config = config
5✔
671

672
    def thread_jobs(self):
5✔
673
        # Thread job to handle device timeouts
674
        return [self]
5✔
675

676
    def run(self):
5✔
677
        '''Handle device timeouts.  Runs in the context of the Plugins
678
        thread.'''
679
        with self.lock:
5✔
680
            clients = list(self.clients.keys())
5✔
681
        cutoff = time.time() - self.config.get_session_timeout()
5✔
682
        for client in clients:
5✔
683
            client.timeout(cutoff)
×
684

685
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
686
        for pair in device_pairs:
×
687
            self._recognised_hardware[pair] = plugin
×
688

689
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
690
        for vendor_id in vendor_ids:
×
691
            self._recognised_vendor[vendor_id] = plugin
×
692

693
    def register_enumerate_func(self, func):
5✔
694
        with self.lock:
×
695
            self._enumerate_func.add(func)
×
696

697
    @runs_in_hwd_thread
5✔
698
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
699
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
700
        # Get from cache first
701
        client = self._client_by_id(device.id_)
×
702
        if client:
×
703
            return client
×
704
        client = plugin.create_client(device, handler)
×
705
        if client:
×
706
            self.logger.info(f"Registering {client}")
×
707
            with self.lock:
×
708
                self.clients[client] = device.id_
×
709
        return client
×
710

711
    def id_by_pairing_code(self, pairing_code):
5✔
712
        with self.lock:
×
713
            return self.pairing_code_to_id.get(pairing_code)
×
714

715
    def pairing_code_by_id(self, id_):
5✔
716
        with self.lock:
×
717
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
718
                if id2 == id_:
×
719
                    return pairing_code
×
720
            return None
×
721

722
    def unpair_pairing_code(self, pairing_code):
5✔
723
        with self.lock:
×
724
            if pairing_code not in self.pairing_code_to_id:
×
725
                return
×
726
            _id = self.pairing_code_to_id.pop(pairing_code)
×
727
        self._close_client(_id)
×
728

729
    def unpair_id(self, id_):
5✔
730
        pairing_code = self.pairing_code_by_id(id_)
×
731
        if pairing_code:
×
732
            self.unpair_pairing_code(pairing_code)
×
733
        else:
734
            self._close_client(id_)
×
735

736
    def _close_client(self, id_):
5✔
737
        with self.lock:
×
738
            client = self._client_by_id(id_)
×
739
            self.clients.pop(client, None)
×
740
        if client:
×
741
            client.close()
×
742

743
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
744
        with self.lock:
×
745
            for client, client_id in self.clients.items():
×
746
                if client_id == id_:
×
747
                    return client
×
748
        return None
×
749

750
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
751
        '''Returns a client for the device ID if one is registered.  If
752
        a device is wiped or in bootloader mode pairing is impossible;
753
        in such cases we communicate by device ID and not wallet.'''
754
        if scan_now:
×
755
            self.scan_devices()
×
756
        return self._client_by_id(id_)
×
757

758
    @runs_in_hwd_thread
5✔
759
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
760
                            keystore: 'Hardware_KeyStore',
761
                            force_pair: bool, *,
762
                            devices: Sequence['Device'] = None,
763
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
764
        self.logger.info("getting client for keystore")
×
765
        if handler is None:
×
766
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
767
        handler.update_status(False)
×
768
        pcode = keystore.pairing_code()
×
769
        client = None
×
770
        # search existing clients first (fast-path)
771
        if not devices:
×
772
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
773
        # search clients again, now allowing a (slow) scan
774
        if client is None:
×
775
            if devices is None:
×
776
                devices = self.scan_devices()
×
777
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
778
        if client is None and force_pair:
×
779
            try:
×
780
                info = self.select_device(plugin, handler, keystore, devices,
×
781
                                          allow_user_interaction=allow_user_interaction)
782
            except CannotAutoSelectDevice:
×
783
                pass
×
784
            else:
785
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
786
        if client:
×
787
            handler.update_status(True)
×
788
            # note: if select_device was called, we might also update label etc here:
789
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
790
        self.logger.info("end client for keystore")
×
791
        return client
×
792

793
    def client_by_pairing_code(
5✔
794
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
795
        devices: Sequence['Device'],
796
    ) -> Optional['HardwareClientBase']:
797
        _id = self.id_by_pairing_code(pairing_code)
×
798
        client = self._client_by_id(_id)
×
799
        if client:
×
800
            if type(client.plugin) != type(plugin):
×
801
                return
×
802
            # An unpaired client might have another wallet's handler
803
            # from a prior scan.  Replace to fix dialog parenting.
804
            client.handler = handler
×
805
            return client
×
806

807
        for device in devices:
×
808
            if device.id_ == _id:
×
809
                return self.create_client(device, handler, plugin)
×
810

811
    def force_pair_keystore(
5✔
812
        self,
813
        *,
814
        plugin: 'HW_PluginBase',
815
        handler: 'HardwareHandlerBase',
816
        info: 'DeviceInfo',
817
        keystore: 'Hardware_KeyStore',
818
    ) -> 'HardwareClientBase':
819
        xpub = keystore.xpub
×
820
        derivation = keystore.get_derivation_prefix()
×
821
        assert derivation is not None
×
822
        xtype = bip32.xpub_type(xpub)
×
823
        client = self._client_by_id(info.device.id_)
×
824
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
825
            # See comment above for same code
826
            client.handler = handler
×
827
            # This will trigger a PIN/passphrase entry request
828
            try:
×
829
                client_xpub = client.get_xpub(derivation, xtype)
×
830
            except (UserCancelled, RuntimeError):
×
831
                # Bad / cancelled PIN / passphrase
832
                client_xpub = None
×
833
            if client_xpub == xpub:
×
834
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
835
                with self.lock:
×
836
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
837
                return client
×
838

839
        # The user input has wrong PIN or passphrase, or cancelled input,
840
        # or it is not pairable
841
        raise DeviceUnpairableError(
×
842
            _('Electrum cannot pair with your {}.\n\n'
843
              'Before you request bitcoins to be sent to addresses in this '
844
              'wallet, ensure you can pair with your device, or that you have '
845
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
846
              'receive will be unspendable.').format(plugin.device))
847

848
    def list_pairable_device_infos(
5✔
849
        self,
850
        *,
851
        handler: Optional['HardwareHandlerBase'],
852
        plugin: 'HW_PluginBase',
853
        devices: Sequence['Device'] = None,
854
        include_failing_clients: bool = False,
855
    ) -> List['DeviceInfo']:
856
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
857
        Already paired devices are also included, as it is okay to reuse them.
858
        """
859
        if not plugin.libraries_available:
×
860
            message = plugin.get_library_not_available_message()
×
861
            raise HardwarePluginLibraryUnavailable(message)
×
862
        if devices is None:
×
863
            devices = self.scan_devices()
×
864
        infos = []
×
865
        for device in devices:
×
866
            if not plugin.can_recognize_device(device):
×
867
                continue
×
868
            try:
×
869
                client = self.create_client(device, handler, plugin)
×
870
                if not client:
×
871
                    continue
×
872
                label = client.label()
×
873
                is_initialized = client.is_initialized()
×
874
                soft_device_id = client.get_soft_device_id()
×
875
                model_name = client.device_model_name()
×
876
            except Exception as e:
×
877
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
878
                if include_failing_clients:
×
879
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
880
                continue
×
881
            infos.append(DeviceInfo(device=device,
×
882
                                    label=label,
883
                                    initialized=is_initialized,
884
                                    plugin_name=plugin.name,
885
                                    soft_device_id=soft_device_id,
886
                                    model_name=model_name))
887

888
        return infos
×
889

890
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
891
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
892
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
893
        """Select the device to use for keystore."""
894
        # ideally this should not be called from the GUI thread...
895
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
896
        while True:
×
897
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
898
            if infos:
×
899
                break
×
900
            if not allow_user_interaction:
×
901
                raise CannotAutoSelectDevice()
×
902
            msg = _('Please insert your {}').format(plugin.device)
×
903
            msg += " ("
×
904
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
905
                msg += f"label: {keystore.label}, "
×
906
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
907
            msg += ').\n\n{}\n\n{}'.format(
×
908
                _('Verify the cable is connected and that '
909
                  'no other application is using it.'),
910
                _('Try to connect again?')
911
            )
912
            if not handler.yes_no_question(msg):
×
913
                raise UserCancelled()
×
914
            devices = None
×
915

916
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
917
        # method 1: select device by id
918
        if keystore.soft_device_id:
×
919
            for info in infos:
×
920
                if info.soft_device_id == keystore.soft_device_id:
×
921
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
922
                    return info
×
923
        # method 2: select device by label
924
        #           but only if not a placeholder label and only if there is no collision
925
        device_labels = [info.label for info in infos]
×
926
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
927
                and device_labels.count(keystore.label) == 1):
928
            for info in infos:
×
929
                if info.label == keystore.label:
×
930
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
931
                    return info
×
932
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
933
        #           saved for keystore anyway, select it
934
        if (len(infos) == 1
×
935
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
936
                and keystore.soft_device_id is None):
937
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
938
            return infos[0]
×
939

940
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
941
        if not allow_user_interaction:
×
942
            raise CannotAutoSelectDevice()
×
943
        # ask user to select device manually
944
        msg = (
×
945
                _("Could not automatically pair with device for given keystore.") + "\n"
946
                + f"(keystore label: {keystore.label!r}, "
947
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
948
        msg += _("Please select which {} device to use:").format(plugin.device)
×
949
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
950
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
951
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
952
                                init=(_("initialized") if info.initialized else _("wiped")),
953
                                transport=info.device.transport_ui_string,
954
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
955
                        for info in infos]
956
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
957
                          f"num options: {len(infos)}. options: {infos}")
958
        c = handler.query_choice(msg, descriptions)
×
959
        if c is None:
×
960
            raise UserCancelled()
×
961
        info = infos[c]
×
962
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
963
        # note: updated label/soft_device_id will be saved after pairing succeeds
964
        return info
×
965

966
    @runs_in_hwd_thread
5✔
967
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
968
        try:
×
969
            import hid
×
970
        except ImportError:
×
971
            return []
×
972

973
        devices = []
×
974
        for d in hid.enumerate(0, 0):
×
975
            vendor_id = d['vendor_id']
×
976
            product_key = (vendor_id, d['product_id'])
×
977
            plugin = None
×
978
            if product_key in self._recognised_hardware:
×
979
                plugin = self._recognised_hardware[product_key]
×
980
            elif vendor_id in self._recognised_vendor:
×
981
                plugin = self._recognised_vendor[vendor_id]
×
982
            if plugin:
×
983
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
984
                if device:
×
985
                    devices.append(device)
×
986
        return devices
×
987

988
    @runs_in_hwd_thread
5✔
989
    @profiler
5✔
990
    def scan_devices(self) -> Sequence['Device']:
5✔
991
        self.logger.info("scanning devices...")
×
992

993
        # First see what's connected that we know about
994
        devices = self._scan_devices_with_hid()
×
995

996
        # Let plugin handlers enumerate devices we don't know about
997
        with self.lock:
×
998
            enumerate_funcs = list(self._enumerate_func)
×
999
        for f in enumerate_funcs:
×
1000
            try:
×
1001
                new_devices = f()
×
1002
            except BaseException as e:
×
1003
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1004
            else:
1005
                devices.extend(new_devices)
×
1006

1007
        # find out what was disconnected
1008
        client_ids = [dev.id_ for dev in devices]
×
1009
        disconnected_clients = []
×
1010
        with self.lock:
×
1011
            connected = {}
×
1012
            for client, id_ in self.clients.items():
×
1013
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1014
                    connected[client] = id_
×
1015
                else:
1016
                    disconnected_clients.append((client, id_))
×
1017
            self.clients = connected
×
1018

1019
        # Unpair disconnected devices
1020
        for client, id_ in disconnected_clients:
×
1021
            self.unpair_id(id_)
×
1022
            if client.handler:
×
1023
                client.handler.update_status(False)
×
1024

1025
        return devices
×
1026

1027
    @classmethod
5✔
1028
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1029
        ret = {}
×
1030
        # add libusb
1031
        try:
×
1032
            import usb1
×
1033
        except Exception as e:
×
1034
            ret["libusb.version"] = None
×
1035
        else:
1036
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1037
            try:
×
1038
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1039
            except AttributeError:
×
1040
                ret["libusb.path"] = None
×
1041
        # add hidapi
1042
        try:
×
1043
            import hid
×
1044
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1045
        except Exception as e:
×
1046
            from importlib.metadata import version
×
1047
            try:
×
1048
                ret["hidapi.version"] = version("hidapi")
×
1049
            except ImportError:
×
1050
                ret["hidapi.version"] = None
×
1051
        return ret
×
1052

1053
    def trigger_pairings(
5✔
1054
            self,
1055
            keystores: Sequence['KeyStore'],
1056
            *,
1057
            allow_user_interaction: bool = True,
1058
            devices: Sequence['Device'] = None,
1059
    ) -> None:
1060
        """Given a list of keystores, try to pair each with a connected hardware device.
1061

1062
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1063
        try to pair each keystore individually. Consider the following scenario:
1064
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1065
        - assume none of the devices are paired yet
1066
        1. if we tried to individually pair keystores, we might try with ks1 first
1067
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1068
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1069
             which is confusing and error-prone. It's especially problematic if the hw device does
1070
             not support labels (such as Ledger), as then the user cannot easily distinguish
1071
             same-type devices. (see #4199)
1072
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1073
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1074
        """
1075
        from .keystore import Hardware_KeyStore
×
1076
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1077
        if not keystores:
×
1078
            return
×
1079
        if devices is None:
×
1080
            devices = self.scan_devices()
×
1081
        # first pair with all devices that can be auto-selected
1082
        for ks in keystores:
×
1083
            try:
×
1084
                ks.get_client(
×
1085
                    force_pair=True,
1086
                    allow_user_interaction=False,
1087
                    devices=devices,
1088
                )
1089
            except UserCancelled:
×
1090
                pass
×
1091
        if allow_user_interaction:
×
1092
            # now do manual selections
1093
            for ks in keystores:
×
1094
                try:
×
1095
                    ks.get_client(
×
1096
                        force_pair=True,
1097
                        allow_user_interaction=True,
1098
                        devices=devices,
1099
                    )
1100
                except UserCancelled:
×
1101
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc