• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4678414268891136

19 Mar 2025 09:03AM UTC coverage: 61.141% (-0.05%) from 61.188%
4678414268891136

Pull #9651

CirrusCI

f321x
separate finding and loading of plugins
Pull Request #9651: Use manifest.json instead of loading init file for plugin registration

35 of 68 new or added lines in 2 files covered. (51.47%)

22 existing lines in 2 files now uncovered.

21361 of 34937 relevant lines covered (61.14%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.1
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import traceback
5✔
33
import sys
5✔
34
import aiohttp
5✔
35
import zipfile as zipfile_lib
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from concurrent import futures
5✔
42
from functools import wraps, partial
5✔
43
from itertools import chain
5✔
44

45
from .i18n import _
5✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
47
from . import bip32
5✔
48
from . import plugins
5✔
49
from .simple_config import SimpleConfig
5✔
50
from .logging import get_logger, Logger
5✔
51
from .crypto import sha256
5✔
52

53
if TYPE_CHECKING:
5✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
5✔
60
plugin_loaders = {}
5✔
61
hook_names = set()
5✔
62
hooks = {}
5✔
63
_root_permission_cache = {}
5✔
64

65

66
class Plugins(DaemonThread):
5✔
67

68
    LOGGING_SHORTCUT = 'p'
5✔
69
    pkgpath = os.path.dirname(plugins.__file__)
5✔
70

71
    @profiler
5✔
72
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
73
        self.config = config
5✔
74
        self.cmd_only = cmd_only  # type: bool
5✔
75
        self.internal_plugin_metadata = {}
5✔
76
        self.external_plugin_metadata = {}
5✔
77
        if cmd_only:
5✔
78
            # only import the command modules of plugins
79
            Logger.__init__(self)
×
80
            self.find_plugins()
×
NEW
81
            self.load_plugins()
×
UNCOV
82
            return
×
83
        DaemonThread.__init__(self)
5✔
84
        self.device_manager = DeviceMgr(config)
5✔
85
        self.name = 'Plugins'  # set name of thread
5✔
86
        self.hw_wallets = {}
5✔
87
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
88
        self.gui_name = gui_name
5✔
89
        self.find_plugins()
5✔
90
        self.load_plugins()
5✔
91
        self.add_jobs(self.device_manager.thread_jobs())
5✔
92
        self.start()
5✔
93

94
    @property
5✔
95
    def descriptions(self):
5✔
96
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
97

98
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
99
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
100
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
101
        for loader, name, ispkg in iter_modules:
5✔
102
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
103
            #       once as data and once as code. To honor the "no duplicates" rule below,
104
            #       we exclude the ones packaged as *code*, here:
105
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
106
                continue
×
107
            module_path = os.path.join(pkg_path, name)
5✔
108
            if external and not self._has_recursive_root_permissions(module_path):
5✔
NEW
109
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
UNCOV
110
                continue
×
111
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
NEW
112
                continue
×
113
            try:
5✔
114
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
115
                    d = json.load(f)
5✔
116
            except FileNotFoundError:
5✔
117
                continue
5✔
118
            if 'fullname' not in d:
5✔
UNCOV
119
                continue
×
120
            d['display_name'] = d['fullname']
5✔
121
            d['path'] = module_path
5✔
122
            if not self.cmd_only:
5✔
123
                gui_good = self.gui_name in d.get('available_for', [])
5✔
124
                if not gui_good:
5✔
125
                    continue
5✔
126
                details = d.get('registers_wallet_type')
5✔
127
                if details:
5✔
128
                    self.register_wallet_type(name, gui_good, details)
5✔
129
                details = d.get('registers_keystore')
5✔
130
                if details:
5✔
131
                    self.register_keystore(name, gui_good, details)
5✔
132
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
133
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
134
                raise Exception(f"duplicate plugins? for {name=}")
×
135
            if not external:
5✔
136
                self.internal_plugin_metadata[name] = d
5✔
137
            else:
138
                self.external_plugin_metadata[name] = d
×
139

140
    @staticmethod
5✔
141
    def exec_module_from_spec(spec, path):
5✔
142
        try:
5✔
143
            module = importlib.util.module_from_spec(spec)
5✔
144
            # sys.modules needs to be modified for relative imports to work
145
            # see https://stackoverflow.com/a/50395128
146
            sys.modules[path] = module
5✔
147
            spec.loader.exec_module(module)
5✔
148
        except Exception as e:
×
149
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
150
        return module
5✔
151

152
    def find_plugins(self):
5✔
153
        internal_plugins_path = (self.pkgpath, False)
5✔
154
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
155
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
156
            # external plugins enforce root permissions on the directory
157
            if pkg_path and os.path.exists(pkg_path):
5✔
158
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
159
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
160

161
    def load_plugins(self):
5✔
162
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
163
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
164
                try:
×
NEW
165
                    if self.cmd_only:  # only load init method to register commands
×
NEW
166
                        self.maybe_load_plugin_init_method(name)
×
167
                    else:
NEW
168
                        self.load_plugin_by_name(name)
×
169
                except BaseException as e:
×
170
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
171

172
    def _has_root_permissions(self, path):
5✔
173
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
174

175
    @profiler(min_threshold=0.5)
5✔
176
    def _has_recursive_root_permissions(self, path):
5✔
177
        """Check if a directory and all its subdirectories have root permissions"""
178
        global _root_permission_cache
NEW
179
        if _root_permission_cache.get(path) is not None:
×
NEW
180
            return _root_permission_cache[path]
×
NEW
181
        _root_permission_cache[path] = False
×
182
        for root, dirs, files in os.walk(path):
×
183
            if not self._has_root_permissions(root):
×
184
                return False
×
185
            for f in files:
×
186
                if not self._has_root_permissions(os.path.join(root, f)):
×
187
                    return False
×
NEW
188
        _root_permission_cache[path] = True
×
UNCOV
189
        return True
×
190

191
    def get_external_plugin_dir(self):
5✔
192
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
193
            return
×
194
        pkg_path = '/opt/electrum_plugins'
5✔
195
        if not os.path.exists(pkg_path):
5✔
196
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
197
            return
5✔
198
        if not self._has_root_permissions(pkg_path):
×
199
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
200
            return
×
201
        return pkg_path
×
202

203
    def zip_plugin_path(self, name):
5✔
204
        filename = self.get_metadata(name)['filename']
×
205
        if name in self.internal_plugin_metadata:
×
206
            pkg_path = self.pkgpath
×
207
        else:
208
            pkg_path = self.get_external_plugin_dir()
×
209
        return os.path.join(pkg_path, filename)
×
210

211
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
212
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
213
        if pkg_path is None:
5✔
214
            return
×
215
        for filename in os.listdir(pkg_path):
5✔
216
            path = os.path.join(pkg_path, filename)
5✔
217
            if not filename.endswith('.zip'):
5✔
218
                continue
5✔
219
            if external and not self._has_root_permissions(path):
×
220
                self.logger.info(f'not loading {path}: file has user write permissions')
×
221
                continue
×
222
            try:
×
223
                zipfile = zipimport.zipimporter(path)
×
224
            except zipimport.ZipImportError:
×
225
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
226
                continue
×
227
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
228
                if b is False:
×
229
                    continue
×
230
                if name in self.internal_plugin_metadata:
×
231
                    raise Exception(f"duplicate plugins for name={name}")
×
232
                if name in self.external_plugin_metadata:
×
233
                    raise Exception(f"duplicate plugins for name={name}")
×
234
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
235
                    continue
×
NEW
236
                try:
×
NEW
237
                    with zipfile_lib.ZipFile(path) as file:
×
NEW
238
                        manifest_path = os.path.join(name, 'manifest.json')
×
NEW
239
                        with file.open(manifest_path, 'r') as f:
×
NEW
240
                            d = json.load(f)
×
NEW
241
                except Exception:
×
NEW
242
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
243
                    continue
×
244
                d['filename'] = filename
×
245
                d['is_zip'] = True
×
NEW
246
                d['path'] = path
×
NEW
247
                if not self.cmd_only:
×
NEW
248
                    gui_good = self.gui_name in d.get('available_for', [])
×
NEW
249
                    if not gui_good:
×
NEW
250
                        continue
×
NEW
251
                    if 'fullname' not in d:
×
NEW
252
                        continue
×
NEW
253
                    d['display_name'] = d['fullname']
×
NEW
254
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
255
                if external:
×
256
                    self.external_plugin_metadata[name] = d
×
257
                else:
258
                    self.internal_plugin_metadata[name] = d
×
259

260
    def get(self, name):
5✔
261
        return self.plugins.get(name)
×
262

263
    def count(self):
5✔
264
        return len(self.plugins)
×
265

266
    def load_plugin(self, name) -> 'BasePlugin':
5✔
267
        """Imports the code of the given plugin.
268
        note: can be called from any thread.
269
        """
270
        if self.get_metadata(name):
5✔
271
            return self.load_plugin_by_name(name)
5✔
272
        else:
273
            raise Exception(f"could not find plugin {name!r}")
×
274

275
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
276
        """Loads the __init__.py module of the plugin if it is not already loaded."""
277
        is_external = name in self.external_plugin_metadata
5✔
278
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
279
        if base_name not in sys.modules:
5✔
280
            metadata = self.get_metadata(name)
5✔
281
            is_zip = metadata.get('is_zip', False)
5✔
282
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
283
            if not is_zip:
5✔
284
                if is_external:
5✔
NEW
285
                    path = os.path.join(metadata['path'], '__init__.py')
×
NEW
286
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
287
                else:
288
                    init_spec = importlib.util.find_spec(base_name)
5✔
289
            else:
NEW
290
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
291
                init_spec = zipfile.find_spec(name)
×
292
            self.exec_module_from_spec(init_spec, base_name)
5✔
293
            if metadata.get('requires_wallet_type'):
5✔
294
                # removes trustedcoin after loading to not show it in the list of plugins
NEW
295
                self.internal_plugin_metadata.pop(name, None)
×
296

297
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
298
        if name in self.plugins:
5✔
299
            return self.plugins[name]
×
300

301
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
302
        self.maybe_load_plugin_init_method(name)
5✔
303

304
        is_external = name in self.external_plugin_metadata
5✔
305
        if not is_external:
5✔
306
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
307
        else:
308
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
309

310
        spec = importlib.util.find_spec(full_name)
5✔
311
        if spec is None:
5✔
312
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
313
        try:
5✔
314
            module = self.exec_module_from_spec(spec, full_name)
5✔
315
            plugin = module.Plugin(self, self.config, name)
5✔
316
        except Exception as e:
×
317
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
318
        self.add_jobs(plugin.thread_jobs())
5✔
319
        self.plugins[name] = plugin
5✔
320
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
321
        return plugin
5✔
322

323
    def close_plugin(self, plugin):
5✔
324
        self.remove_jobs(plugin.thread_jobs())
×
325

326
    def enable(self, name: str) -> 'BasePlugin':
5✔
327
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
328
        p = self.get(name)
×
329
        if p:
×
330
            return p
×
331
        return self.load_plugin(name)
×
332

333
    def disable(self, name: str) -> None:
5✔
334
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
335
        p = self.get(name)
×
336
        if not p:
×
337
            return
×
338
        self.plugins.pop(name)
×
339
        p.close()
×
340
        self.logger.info(f"closed {name}")
×
341

342
    @classmethod
5✔
343
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
344
        return key.startswith('enable_plugin_')
×
345

346
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
347
        p = self.get(name)
×
348
        return self.disable(name) if p else self.enable(name)
×
349

350
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
351
        d = self.descriptions.get(name)
×
352
        if not d:
×
353
            return False
×
354
        deps = d.get('requires', [])
×
355
        for dep, s in deps:
×
356
            try:
×
357
                __import__(dep)
×
358
            except ImportError as e:
×
359
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
360
                return False
×
361
        requires = d.get('requires_wallet_type', [])
×
362
        return not requires or wallet.wallet_type in requires
×
363

364
    def get_hardware_support(self):
5✔
365
        out = []
×
366
        for name, (gui_good, details) in self.hw_wallets.items():
×
367
            if gui_good:
×
368
                try:
×
369
                    p = self.get_plugin(name)
×
370
                    if p.is_available():
×
371
                        out.append(HardwarePluginToScan(name=name,
×
372
                                                        description=details[2],
373
                                                        plugin=p,
374
                                                        exception=None))
375
                except Exception as e:
×
376
                    self.logger.exception(f"cannot load plugin for: {name}")
×
377
                    out.append(HardwarePluginToScan(name=name,
×
378
                                                    description=details[2],
379
                                                    plugin=None,
380
                                                    exception=e))
381
        return out
×
382

383
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
384
        from .wallet import register_wallet_type, register_constructor
5✔
385
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
386

387
        def loader():
5✔
388
            plugin = self.get_plugin(name)
5✔
389
            register_constructor(wallet_type, plugin.wallet_class)
5✔
390
        register_wallet_type(wallet_type)
5✔
391
        plugin_loaders[wallet_type] = loader
5✔
392

393
    def register_keystore(self, name, gui_good, details):
5✔
394
        from .keystore import register_keystore
5✔
395

396
        def dynamic_constructor(d):
5✔
397
            return self.get_plugin(name).keystore_class(d)
5✔
398
        if details[0] == 'hardware':
5✔
399
            self.hw_wallets[name] = (gui_good, details)
5✔
400
            self.logger.info(f"registering hardware {name}: {details}")
5✔
401
            register_keystore(details[1], dynamic_constructor)
5✔
402

403
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
404
        if name not in self.plugins:
5✔
405
            self.load_plugin(name)
5✔
406
        return self.plugins[name]
5✔
407

408
    def is_plugin_zip(self, name: str) -> bool:
5✔
409
        """Returns True if the plugin is a zip file"""
UNCOV
410
        if (metadata := self.get_metadata(name)) is None:
×
UNCOV
411
            return False
×
UNCOV
412
        return metadata.get('is_zip', False)
×
413

414
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
415
        """Returns the metadata of the plugin"""
416
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
417
        if not metadata:
5✔
UNCOV
418
            return None
×
419
        return metadata
5✔
420

421
    def run(self):
5✔
422
        while self.is_running():
5✔
423
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
424
            self.run_jobs()
5✔
425
        self.on_stop()
5✔
426

427

428
def get_file_hash256(path: str) -> str:
5✔
429
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
430
    with open(path, 'rb') as f:
×
431
        return sha256(f.read()).hex()
×
432

433
def hook(func):
5✔
434
    hook_names.add(func.__name__)
5✔
435
    return func
5✔
436

437

438
def run_hook(name, *args):
5✔
439
    results = []
5✔
440
    f_list = hooks.get(name, [])
5✔
441
    for p, f in f_list:
5✔
442
        if p.is_enabled():
5✔
443
            try:
5✔
444
                r = f(*args)
5✔
445
            except Exception:
×
446
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
447
                r = False
×
448
            if r:
5✔
449
                results.append(r)
×
450

451
    if results:
5✔
452
        assert len(results) == 1, results
×
453
        return results[0]
×
454

455

456
class BasePlugin(Logger):
5✔
457

458
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
459
        self.parent = parent  # type: Plugins  # The plugins object
5✔
460
        self.name = name
5✔
461
        self.config = config
5✔
462
        self.wallet = None  # fixme: this field should not exist
5✔
463
        Logger.__init__(self)
5✔
464
        # add self to hooks
465
        for k in dir(self):
5✔
466
            if k in hook_names:
5✔
467
                l = hooks.get(k, [])
5✔
468
                l.append((self, getattr(self, k)))
5✔
469
                hooks[k] = l
5✔
470

471
    def __str__(self):
5✔
472
        return self.name
×
473

474
    def close(self):
5✔
475
        # remove self from hooks
476
        for attr_name in dir(self):
×
477
            if attr_name in hook_names:
×
478
                # found attribute in self that is also the name of a hook
479
                l = hooks.get(attr_name, [])
×
480
                try:
×
481
                    l.remove((self, getattr(self, attr_name)))
×
482
                except ValueError:
×
483
                    # maybe attr name just collided with hook name and was not hook
484
                    continue
×
485
                hooks[attr_name] = l
×
486
        self.parent.close_plugin(self)
×
487
        self.on_close()
×
488

489
    def on_close(self):
5✔
490
        pass
×
491

492
    def requires_settings(self) -> bool:
5✔
493
        return False
×
494

495
    def thread_jobs(self):
5✔
496
        return []
5✔
497

498
    def is_enabled(self):
5✔
499
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
500

501
    def is_available(self):
5✔
502
        return True
×
503

504
    def can_user_disable(self):
5✔
505
        return True
×
506

507
    def settings_widget(self, window):
5✔
508
        raise NotImplementedError()
×
509

510
    def settings_dialog(self, window):
5✔
511
        raise NotImplementedError()
×
512

513
    def read_file(self, filename: str) -> bytes:
5✔
514
        if self.parent.is_plugin_zip(self.name):
×
515
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
516
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
517
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
518
                    return myfile.read()
×
519
        else:
520
            if self.name in self.parent.internal_plugin_metadata:
×
521
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
522
            else:
523
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
524
            with open(path, 'rb') as myfile:
×
525
                return myfile.read()
×
526

527

528
class DeviceUnpairableError(UserFacingException): pass
5✔
529
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
530
class CannotAutoSelectDevice(Exception): pass
5✔
531

532

533
class Device(NamedTuple):
5✔
534
    path: Union[str, bytes]
5✔
535
    interface_number: int
5✔
536
    id_: str
5✔
537
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
538
    usage_page: int
5✔
539
    transport_ui_string: str
5✔
540

541

542
class DeviceInfo(NamedTuple):
5✔
543
    device: Device
5✔
544
    label: Optional[str] = None
5✔
545
    initialized: Optional[bool] = None
5✔
546
    exception: Optional[Exception] = None
5✔
547
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
548
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
549
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
550

551

552
class HardwarePluginToScan(NamedTuple):
5✔
553
    name: str
5✔
554
    description: str
5✔
555
    plugin: Optional['HW_PluginBase']
5✔
556
    exception: Optional[Exception]
5✔
557

558

559
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
560

561

562
# hidapi is not thread-safe
563
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
564
#     https://github.com/libusb/hidapi/issues/45
565
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
566
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
567
# It is not entirely clear to me, exactly what is safe and what isn't, when
568
# using multiple threads...
569
# Hence, we use a single thread for all device communications, including
570
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
571
# the following thread:
572
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
573
    max_workers=1,
574
    thread_name_prefix='hwd_comms_thread'
575
)
576

577
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
578
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
579
# To keep it simple, let's just import it now, as we are likely in the main thread here.
580
if threading.current_thread() is not threading.main_thread():
5✔
581
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
582
try:
5✔
583
    import hid
5✔
584
except ImportError:
5✔
585
    pass
5✔
586

587

588
T = TypeVar('T')
5✔
589

590

591
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
592
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
593
        return func()
×
594
    else:
595
        fut = _hwd_comms_executor.submit(func)
×
596
        return fut.result()
×
597
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
598

599

600
def runs_in_hwd_thread(func):
5✔
601
    @wraps(func)
5✔
602
    def wrapper(*args, **kwargs):
5✔
603
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
604
    return wrapper
5✔
605

606

607
def assert_runs_in_hwd_thread():
5✔
608
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
609
        raise Exception("must only be called from HWD communication thread")
×
610

611

612
class DeviceMgr(ThreadJob):
5✔
613
    """Manages hardware clients.  A client communicates over a hardware
614
    channel with the device.
615

616
    In addition to tracking device HID IDs, the device manager tracks
617
    hardware wallets and manages wallet pairing.  A HID ID may be
618
    paired with a wallet when it is confirmed that the hardware device
619
    matches the wallet, i.e. they have the same master public key.  A
620
    HID ID can be unpaired if e.g. it is wiped.
621

622
    Because of hotplugging, a wallet must request its client
623
    dynamically each time it is required, rather than caching it
624
    itself.
625

626
    The device manager is shared across plugins, so just one place
627
    does hardware scans when needed.  By tracking HID IDs, if a device
628
    is plugged into a different port the wallet is automatically
629
    re-paired.
630

631
    Wallets are informed on connect / disconnect events.  It must
632
    implement connected(), disconnected() callbacks.  Being connected
633
    implies a pairing.  Callbacks can happen in any thread context,
634
    and we do them without holding the lock.
635

636
    Confusingly, the HID ID (serial number) reported by the HID system
637
    doesn't match the device ID reported by the device itself.  We use
638
    the HID IDs.
639

640
    This plugin is thread-safe.  Currently only devices supported by
641
    hidapi are implemented."""
642

643
    def __init__(self, config: SimpleConfig):
5✔
644
        ThreadJob.__init__(self)
5✔
645
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
646
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
647
        # A client->id_ map. Needs self.lock.
648
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
649
        # What we recognise.  (vendor_id, product_id) -> Plugin
650
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
651
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
652
        # Custom enumerate functions for devices we don't know about.
653
        self._enumerate_func = set()  # Needs self.lock.
5✔
654

655
        self.lock = threading.RLock()
5✔
656

657
        self.config = config
5✔
658

659
    def thread_jobs(self):
5✔
660
        # Thread job to handle device timeouts
661
        return [self]
5✔
662

663
    def run(self):
5✔
664
        '''Handle device timeouts.  Runs in the context of the Plugins
665
        thread.'''
666
        with self.lock:
5✔
667
            clients = list(self.clients.keys())
5✔
668
        cutoff = time.time() - self.config.get_session_timeout()
5✔
669
        for client in clients:
5✔
670
            client.timeout(cutoff)
×
671

672
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
673
        for pair in device_pairs:
×
674
            self._recognised_hardware[pair] = plugin
×
675

676
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
677
        for vendor_id in vendor_ids:
×
678
            self._recognised_vendor[vendor_id] = plugin
×
679

680
    def register_enumerate_func(self, func):
5✔
681
        with self.lock:
×
682
            self._enumerate_func.add(func)
×
683

684
    @runs_in_hwd_thread
5✔
685
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
686
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
687
        # Get from cache first
688
        client = self._client_by_id(device.id_)
×
689
        if client:
×
690
            return client
×
691
        client = plugin.create_client(device, handler)
×
692
        if client:
×
693
            self.logger.info(f"Registering {client}")
×
694
            with self.lock:
×
695
                self.clients[client] = device.id_
×
696
        return client
×
697

698
    def id_by_pairing_code(self, pairing_code):
5✔
699
        with self.lock:
×
700
            return self.pairing_code_to_id.get(pairing_code)
×
701

702
    def pairing_code_by_id(self, id_):
5✔
703
        with self.lock:
×
704
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
705
                if id2 == id_:
×
706
                    return pairing_code
×
707
            return None
×
708

709
    def unpair_pairing_code(self, pairing_code):
5✔
710
        with self.lock:
×
711
            if pairing_code not in self.pairing_code_to_id:
×
712
                return
×
713
            _id = self.pairing_code_to_id.pop(pairing_code)
×
714
        self._close_client(_id)
×
715

716
    def unpair_id(self, id_):
5✔
717
        pairing_code = self.pairing_code_by_id(id_)
×
718
        if pairing_code:
×
719
            self.unpair_pairing_code(pairing_code)
×
720
        else:
721
            self._close_client(id_)
×
722

723
    def _close_client(self, id_):
5✔
724
        with self.lock:
×
725
            client = self._client_by_id(id_)
×
726
            self.clients.pop(client, None)
×
727
        if client:
×
728
            client.close()
×
729

730
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
731
        with self.lock:
×
732
            for client, client_id in self.clients.items():
×
733
                if client_id == id_:
×
734
                    return client
×
735
        return None
×
736

737
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
738
        '''Returns a client for the device ID if one is registered.  If
739
        a device is wiped or in bootloader mode pairing is impossible;
740
        in such cases we communicate by device ID and not wallet.'''
741
        if scan_now:
×
742
            self.scan_devices()
×
743
        return self._client_by_id(id_)
×
744

745
    @runs_in_hwd_thread
5✔
746
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
747
                            keystore: 'Hardware_KeyStore',
748
                            force_pair: bool, *,
749
                            devices: Sequence['Device'] = None,
750
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
751
        self.logger.info("getting client for keystore")
×
752
        if handler is None:
×
753
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
754
        handler.update_status(False)
×
755
        pcode = keystore.pairing_code()
×
756
        client = None
×
757
        # search existing clients first (fast-path)
758
        if not devices:
×
759
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
760
        # search clients again, now allowing a (slow) scan
761
        if client is None:
×
762
            if devices is None:
×
763
                devices = self.scan_devices()
×
764
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
765
        if client is None and force_pair:
×
766
            try:
×
767
                info = self.select_device(plugin, handler, keystore, devices,
×
768
                                          allow_user_interaction=allow_user_interaction)
769
            except CannotAutoSelectDevice:
×
770
                pass
×
771
            else:
772
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
773
        if client:
×
774
            handler.update_status(True)
×
775
            # note: if select_device was called, we might also update label etc here:
776
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
777
        self.logger.info("end client for keystore")
×
778
        return client
×
779

780
    def client_by_pairing_code(
5✔
781
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
782
        devices: Sequence['Device'],
783
    ) -> Optional['HardwareClientBase']:
784
        _id = self.id_by_pairing_code(pairing_code)
×
785
        client = self._client_by_id(_id)
×
786
        if client:
×
787
            if type(client.plugin) != type(plugin):
×
788
                return
×
789
            # An unpaired client might have another wallet's handler
790
            # from a prior scan.  Replace to fix dialog parenting.
791
            client.handler = handler
×
792
            return client
×
793

794
        for device in devices:
×
795
            if device.id_ == _id:
×
796
                return self.create_client(device, handler, plugin)
×
797

798
    def force_pair_keystore(
5✔
799
        self,
800
        *,
801
        plugin: 'HW_PluginBase',
802
        handler: 'HardwareHandlerBase',
803
        info: 'DeviceInfo',
804
        keystore: 'Hardware_KeyStore',
805
    ) -> 'HardwareClientBase':
806
        xpub = keystore.xpub
×
807
        derivation = keystore.get_derivation_prefix()
×
808
        assert derivation is not None
×
809
        xtype = bip32.xpub_type(xpub)
×
810
        client = self._client_by_id(info.device.id_)
×
811
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
812
            # See comment above for same code
813
            client.handler = handler
×
814
            # This will trigger a PIN/passphrase entry request
815
            try:
×
816
                client_xpub = client.get_xpub(derivation, xtype)
×
817
            except (UserCancelled, RuntimeError):
×
818
                # Bad / cancelled PIN / passphrase
819
                client_xpub = None
×
820
            if client_xpub == xpub:
×
821
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
822
                with self.lock:
×
823
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
824
                return client
×
825

826
        # The user input has wrong PIN or passphrase, or cancelled input,
827
        # or it is not pairable
828
        raise DeviceUnpairableError(
×
829
            _('Electrum cannot pair with your {}.\n\n'
830
              'Before you request bitcoins to be sent to addresses in this '
831
              'wallet, ensure you can pair with your device, or that you have '
832
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
833
              'receive will be unspendable.').format(plugin.device))
834

835
    def list_pairable_device_infos(
5✔
836
        self,
837
        *,
838
        handler: Optional['HardwareHandlerBase'],
839
        plugin: 'HW_PluginBase',
840
        devices: Sequence['Device'] = None,
841
        include_failing_clients: bool = False,
842
    ) -> List['DeviceInfo']:
843
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
844
        Already paired devices are also included, as it is okay to reuse them.
845
        """
846
        if not plugin.libraries_available:
×
847
            message = plugin.get_library_not_available_message()
×
848
            raise HardwarePluginLibraryUnavailable(message)
×
849
        if devices is None:
×
850
            devices = self.scan_devices()
×
851
        infos = []
×
852
        for device in devices:
×
853
            if not plugin.can_recognize_device(device):
×
854
                continue
×
855
            try:
×
856
                client = self.create_client(device, handler, plugin)
×
857
                if not client:
×
858
                    continue
×
859
                label = client.label()
×
860
                is_initialized = client.is_initialized()
×
861
                soft_device_id = client.get_soft_device_id()
×
862
                model_name = client.device_model_name()
×
863
            except Exception as e:
×
864
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
865
                if include_failing_clients:
×
866
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
867
                continue
×
868
            infos.append(DeviceInfo(device=device,
×
869
                                    label=label,
870
                                    initialized=is_initialized,
871
                                    plugin_name=plugin.name,
872
                                    soft_device_id=soft_device_id,
873
                                    model_name=model_name))
874

875
        return infos
×
876

877
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
878
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
879
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
880
        """Select the device to use for keystore."""
881
        # ideally this should not be called from the GUI thread...
882
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
883
        while True:
×
884
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
885
            if infos:
×
886
                break
×
887
            if not allow_user_interaction:
×
888
                raise CannotAutoSelectDevice()
×
889
            msg = _('Please insert your {}').format(plugin.device)
×
890
            msg += " ("
×
891
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
892
                msg += f"label: {keystore.label}, "
×
893
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
894
            msg += ').\n\n{}\n\n{}'.format(
×
895
                _('Verify the cable is connected and that '
896
                  'no other application is using it.'),
897
                _('Try to connect again?')
898
            )
899
            if not handler.yes_no_question(msg):
×
900
                raise UserCancelled()
×
901
            devices = None
×
902

903
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
904
        # method 1: select device by id
905
        if keystore.soft_device_id:
×
906
            for info in infos:
×
907
                if info.soft_device_id == keystore.soft_device_id:
×
908
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
909
                    return info
×
910
        # method 2: select device by label
911
        #           but only if not a placeholder label and only if there is no collision
912
        device_labels = [info.label for info in infos]
×
913
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
914
                and device_labels.count(keystore.label) == 1):
915
            for info in infos:
×
916
                if info.label == keystore.label:
×
917
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
918
                    return info
×
919
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
920
        #           saved for keystore anyway, select it
921
        if (len(infos) == 1
×
922
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
923
                and keystore.soft_device_id is None):
924
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
925
            return infos[0]
×
926

927
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
928
        if not allow_user_interaction:
×
929
            raise CannotAutoSelectDevice()
×
930
        # ask user to select device manually
931
        msg = (
×
932
                _("Could not automatically pair with device for given keystore.") + "\n"
933
                + f"(keystore label: {keystore.label!r}, "
934
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
935
        msg += _("Please select which {} device to use:").format(plugin.device)
×
936
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
937
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
938
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
939
                                init=(_("initialized") if info.initialized else _("wiped")),
940
                                transport=info.device.transport_ui_string,
941
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
942
                        for info in infos]
943
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
944
                          f"num options: {len(infos)}. options: {infos}")
945
        c = handler.query_choice(msg, descriptions)
×
946
        if c is None:
×
947
            raise UserCancelled()
×
948
        info = infos[c]
×
949
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
950
        # note: updated label/soft_device_id will be saved after pairing succeeds
951
        return info
×
952

953
    @runs_in_hwd_thread
5✔
954
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
955
        try:
×
956
            import hid
×
957
        except ImportError:
×
958
            return []
×
959

960
        devices = []
×
961
        for d in hid.enumerate(0, 0):
×
962
            vendor_id = d['vendor_id']
×
963
            product_key = (vendor_id, d['product_id'])
×
964
            plugin = None
×
965
            if product_key in self._recognised_hardware:
×
966
                plugin = self._recognised_hardware[product_key]
×
967
            elif vendor_id in self._recognised_vendor:
×
968
                plugin = self._recognised_vendor[vendor_id]
×
969
            if plugin:
×
970
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
971
                if device:
×
972
                    devices.append(device)
×
973
        return devices
×
974

975
    @runs_in_hwd_thread
5✔
976
    @profiler
5✔
977
    def scan_devices(self) -> Sequence['Device']:
5✔
978
        self.logger.info("scanning devices...")
×
979

980
        # First see what's connected that we know about
981
        devices = self._scan_devices_with_hid()
×
982

983
        # Let plugin handlers enumerate devices we don't know about
984
        with self.lock:
×
985
            enumerate_funcs = list(self._enumerate_func)
×
986
        for f in enumerate_funcs:
×
987
            try:
×
988
                new_devices = f()
×
989
            except BaseException as e:
×
990
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
991
            else:
992
                devices.extend(new_devices)
×
993

994
        # find out what was disconnected
995
        client_ids = [dev.id_ for dev in devices]
×
996
        disconnected_clients = []
×
997
        with self.lock:
×
998
            connected = {}
×
999
            for client, id_ in self.clients.items():
×
1000
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1001
                    connected[client] = id_
×
1002
                else:
1003
                    disconnected_clients.append((client, id_))
×
1004
            self.clients = connected
×
1005

1006
        # Unpair disconnected devices
1007
        for client, id_ in disconnected_clients:
×
1008
            self.unpair_id(id_)
×
1009
            if client.handler:
×
1010
                client.handler.update_status(False)
×
1011

1012
        return devices
×
1013

1014
    @classmethod
5✔
1015
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1016
        ret = {}
×
1017
        # add libusb
1018
        try:
×
1019
            import usb1
×
1020
        except Exception as e:
×
1021
            ret["libusb.version"] = None
×
1022
        else:
1023
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1024
            try:
×
1025
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1026
            except AttributeError:
×
1027
                ret["libusb.path"] = None
×
1028
        # add hidapi
1029
        try:
×
1030
            import hid
×
1031
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1032
        except Exception as e:
×
1033
            from importlib.metadata import version
×
1034
            try:
×
1035
                ret["hidapi.version"] = version("hidapi")
×
1036
            except ImportError:
×
1037
                ret["hidapi.version"] = None
×
1038
        return ret
×
1039

1040
    def trigger_pairings(
5✔
1041
            self,
1042
            keystores: Sequence['KeyStore'],
1043
            *,
1044
            allow_user_interaction: bool = True,
1045
            devices: Sequence['Device'] = None,
1046
    ) -> None:
1047
        """Given a list of keystores, try to pair each with a connected hardware device.
1048

1049
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1050
        try to pair each keystore individually. Consider the following scenario:
1051
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1052
        - assume none of the devices are paired yet
1053
        1. if we tried to individually pair keystores, we might try with ks1 first
1054
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1055
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1056
             which is confusing and error-prone. It's especially problematic if the hw device does
1057
             not support labels (such as Ledger), as then the user cannot easily distinguish
1058
             same-type devices. (see #4199)
1059
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1060
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1061
        """
1062
        from .keystore import Hardware_KeyStore
×
1063
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1064
        if not keystores:
×
1065
            return
×
1066
        if devices is None:
×
1067
            devices = self.scan_devices()
×
1068
        # first pair with all devices that can be auto-selected
1069
        for ks in keystores:
×
1070
            try:
×
1071
                ks.get_client(
×
1072
                    force_pair=True,
1073
                    allow_user_interaction=False,
1074
                    devices=devices,
1075
                )
1076
            except UserCancelled:
×
1077
                pass
×
1078
        if allow_user_interaction:
×
1079
            # now do manual selections
1080
            for ks in keystores:
×
1081
                try:
×
1082
                    ks.get_client(
×
1083
                        force_pair=True,
1084
                        allow_user_interaction=True,
1085
                        devices=devices,
1086
                    )
1087
                except UserCancelled:
×
1088
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc