• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5875401303523328

16 Jul 2025 12:15AM UTC coverage: 59.762% (-0.04%) from 59.8%
5875401303523328

Pull #10033

CirrusCI

SomberNight
interface: parallel header-chunks download

We request chunks concurrently. This makes header-sync much faster
when we are many blocks behind.

notes:
- all chunks are downloaded from the same interface, just for simplicity
- we request up to 10 chunks concurrently (so 10*2016 headers)
  - more chunks: higher memory requirements
  - more chunks: higher concurrency => syncing needs fewer network round-trips
  - if a chunk does not connect, bandwidth for all later chunks is wasted
  - we can tweak the constant or make it dynamic or make it a configvar, etc, later
- without this, we progress the chain tip by around 1 chunk per second
  - 52k blocks (1 year on mainnet) takes around 26 seconds
  - this is probably not *that* interesting for mainnet,
    but for testnet3, that sometimes has 200x the block-rate of mainnet,
    it is extremely useful
Pull Request #10033: interface: parallel header-chunks download

2 of 40 new or added lines in 3 files covered. (5.0%)

7 existing lines in 1 file now uncovered.

21983 of 36784 relevant lines covered (59.76%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.22
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
5✔
26
import re
5✔
27
import ssl
5✔
28
import sys
5✔
29
import time
5✔
30
import traceback
5✔
31
import asyncio
5✔
32
import socket
5✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
5✔
34
from collections import defaultdict
5✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
5✔
36
import itertools
5✔
37
import logging
5✔
38
import hashlib
5✔
39
import functools
5✔
40
import random
5✔
41
import enum
5✔
42

43
import aiorpcx
5✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
5✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
5✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
5✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
5✔
48
import certifi
5✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
5✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
53
from . import util
5✔
54
from . import x509
5✔
55
from . import pem
5✔
56
from . import version
5✔
57
from . import blockchain
5✔
58
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
5✔
59
from . import bitcoin
5✔
60
from . import constants
5✔
61
from .i18n import _
5✔
62
from .logging import Logger
5✔
63
from .transaction import Transaction
5✔
64
from .fee_policy import FEE_ETA_TARGETS
5✔
65

66
if TYPE_CHECKING:
5✔
67
    from .network import Network
×
68
    from .simple_config import SimpleConfig
×
69

70

71
ca_path = certifi.where()
5✔
72

73
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
5✔
74

75
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
5✔
76
PREFERRED_NETWORK_PROTOCOL = 's'
5✔
77
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
5✔
78

79
MAX_NUM_HEADERS_PER_REQUEST = 2016
5✔
80
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
5✔
81

82

83
class NetworkTimeout:
5✔
84
    # seconds
85
    class Generic:
5✔
86
        NORMAL = 30
5✔
87
        RELAXED = 45
5✔
88
        MOST_RELAXED = 600
5✔
89

90
    class Urgent(Generic):
5✔
91
        NORMAL = 10
5✔
92
        RELAXED = 20
5✔
93
        MOST_RELAXED = 60
5✔
94

95

96
def assert_non_negative_integer(val: Any) -> None:
5✔
97
    if not is_non_negative_integer(val):
×
98
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
99

100

101
def assert_integer(val: Any) -> None:
5✔
102
    if not is_integer(val):
×
103
        raise RequestCorrupted(f'{val!r} should be an integer')
×
104

105

106
def assert_int_or_float(val: Any) -> None:
5✔
107
    if not is_int_or_float(val):
×
108
        raise RequestCorrupted(f'{val!r} should be int or float')
×
109

110

111
def assert_non_negative_int_or_float(val: Any) -> None:
5✔
112
    if not is_non_negative_int_or_float(val):
×
113
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
114

115

116
def assert_hash256_str(val: Any) -> None:
5✔
117
    if not is_hash256_str(val):
×
118
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
119

120

121
def assert_hex_str(val: Any) -> None:
5✔
122
    if not is_hex_str(val):
×
123
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
124

125

126
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
5✔
127
    if not isinstance(d, dict):
×
128
        raise RequestCorrupted(f'{d!r} should be a dict')
×
129
    if field_name not in d:
×
130
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
131
    return d[field_name]
×
132

133

134
def assert_list_or_tuple(val: Any) -> None:
5✔
135
    if not isinstance(val, (list, tuple)):
×
136
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
137

138

139
class ChainResolutionMode(enum.Enum):
5✔
140
    CATCHUP = enum.auto()
5✔
141
    BACKWARD = enum.auto()
5✔
142
    BINARY = enum.auto()
5✔
143
    FORK = enum.auto()
5✔
144
    NO_FORK = enum.auto()
5✔
145

146

147
class NotificationSession(RPCSession):
5✔
148

149
    def __init__(self, *args, interface: 'Interface', **kwargs):
5✔
150
        super(NotificationSession, self).__init__(*args, **kwargs)
×
151
        self.subscriptions = defaultdict(list)
×
152
        self.cache = {}
×
153
        self._msg_counter = itertools.count(start=1)
×
154
        self.interface = interface
×
155
        self.taskgroup = interface.taskgroup
×
156
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
157

158
    async def handle_request(self, request):
5✔
159
        self.maybe_log(f"--> {request}")
×
160
        try:
×
161
            if isinstance(request, Notification):
×
162
                params, result = request.args[:-1], request.args[-1]
×
163
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
164
                if key in self.subscriptions:
×
165
                    self.cache[key] = result
×
166
                    for queue in self.subscriptions[key]:
×
167
                        await queue.put(request.args)
×
168
                else:
169
                    raise Exception(f'unexpected notification')
×
170
            else:
171
                raise Exception(f'unexpected request. not a notification')
×
172
        except Exception as e:
×
173
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
174
            await self.close()
×
175

176
    async def send_request(self, *args, timeout=None, **kwargs):
5✔
177
        # note: semaphores/timeouts/backpressure etc are handled by
178
        # aiorpcx. the timeout arg here in most cases should not be set
179
        msg_id = next(self._msg_counter)
×
180
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
181
        try:
×
182
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
183
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
184
            response = await util.wait_for2(
×
185
                super().send_request(*args, **kwargs),
186
                timeout)
187
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
188
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
189
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
190
        except CodeMessageError as e:
×
191
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
192
            raise
×
193
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
194
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
195
            raise
×
196
        else:
197
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
198
            return response
×
199

200
    def set_default_timeout(self, timeout):
5✔
201
        assert hasattr(self, "sent_request_timeout")  # in base class
×
202
        self.sent_request_timeout = timeout
×
203
        assert hasattr(self, "max_send_delay")        # in base class
×
204
        self.max_send_delay = timeout
×
205

206
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
5✔
207
        # note: until the cache is written for the first time,
208
        # each 'subscribe' call might make a request on the network.
209
        key = self.get_hashable_key_for_rpc_call(method, params)
×
210
        self.subscriptions[key].append(queue)
×
211
        if key in self.cache:
×
212
            result = self.cache[key]
×
213
        else:
214
            result = await self.send_request(method, params)
×
215
            self.cache[key] = result
×
216
        await queue.put(params + [result])
×
217

218
    def unsubscribe(self, queue):
5✔
219
        """Unsubscribe a callback to free object references to enable GC."""
220
        # note: we can't unsubscribe from the server, so we keep receiving
221
        # subsequent notifications
222
        for v in self.subscriptions.values():
×
223
            if queue in v:
×
224
                v.remove(queue)
×
225

226
    @classmethod
5✔
227
    def get_hashable_key_for_rpc_call(cls, method, params):
5✔
228
        """Hashable index for subscriptions and cache"""
229
        return str(method) + repr(params)
×
230

231
    def maybe_log(self, msg: str) -> None:
5✔
232
        if not self.interface: return
×
233
        if self.interface.debug or self.interface.network.debug:
×
234
            self.interface.logger.debug(msg)
×
235

236
    def default_framer(self):
5✔
237
        # overridden so that max_size can be customized
238
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
239
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
240
        return NewlineFramer(max_size=max_size)
×
241

242
    async def close(self, *, force_after: int = None):
5✔
243
        """Closes the connection and waits for it to be closed.
244
        We try to flush buffered data to the wire, which can take some time.
245
        """
246
        if force_after is None:
×
247
            # We give up after a while and just abort the connection.
248
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
249
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
250
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
251
            #       wait until this timeout is triggered
252
            force_after = 1  # seconds
×
253
        await super().close(force_after=force_after)
×
254

255

256
class NetworkException(Exception): pass
5✔
257

258

259
class GracefulDisconnect(NetworkException):
5✔
260
    log_level = logging.INFO
5✔
261

262
    def __init__(self, *args, log_level=None, **kwargs):
5✔
263
        Exception.__init__(self, *args, **kwargs)
5✔
264
        if log_level is not None:
5✔
265
            self.log_level = log_level
×
266

267

268
class RequestTimedOut(GracefulDisconnect):
5✔
269
    def __str__(self):
5✔
270
        return _("Network request timed out.")
×
271

272

273
class RequestCorrupted(Exception): pass
5✔
274

275
class ErrorParsingSSLCert(Exception): pass
5✔
276
class ErrorGettingSSLCertFromServer(Exception): pass
5✔
277
class ErrorSSLCertFingerprintMismatch(Exception): pass
5✔
278
class InvalidOptionCombination(Exception): pass
5✔
279
class ConnectError(NetworkException): pass
5✔
280

281

282
class _RSClient(RSClient):
5✔
283
    async def create_connection(self):
5✔
284
        try:
×
285
            return await super().create_connection()
×
286
        except OSError as e:
×
287
            # note: using "from e" here will set __cause__ of ConnectError
288
            raise ConnectError(e) from e
×
289

290

291
class PaddedRSTransport(RSTransport):
5✔
292
    """A raw socket transport that provides basic countermeasures against traffic analysis
293
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
294
    (it is assumed that a network observer does not see plaintext transport contents,
295
    due to it being wrapped e.g. in TLS)
296
    """
297

298
    MIN_PACKET_SIZE = 1024
5✔
299
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
5✔
300

301
    session: Optional['RPCSession']
5✔
302

303
    def __init__(self, *args, **kwargs):
5✔
304
        RSTransport.__init__(self, *args, **kwargs)
×
305
        self._sbuffer = bytearray()  # "send buffer"
×
306
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
×
307
        self._sbuffer_has_data_evt = asyncio.Event()
×
308
        self._last_send = time.monotonic()
×
309
        self._force_send = False  # type: bool
×
310

311
    # note: this does not call super().write() but is a complete reimplementation
312
    async def write(self, message):
5✔
313
        await self._can_send.wait()
×
314
        if self.is_closing():
×
315
            return
×
316
        framed_message = self._framer.frame(message)
×
317
        self._sbuffer += framed_message
×
318
        self._sbuffer_has_data_evt.set()
×
319
        self._maybe_consume_sbuffer()
×
320

321
    def _maybe_consume_sbuffer(self) -> None:
5✔
322
        """Maybe take some data from sbuffer and send it on the wire."""
323
        if not self._can_send.is_set() or self.is_closing():
×
324
            return
×
325
        buf = self._sbuffer
×
326
        if not buf:
×
327
            return
×
328
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
329
        if not (
×
330
            self._force_send
331
            or len(buf) >= self.MIN_PACKET_SIZE
332
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
333
        ):
334
            return
×
335
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
×
336
        # either (1) pad length to next power of two, to create "lsize" packet:
337
        payload_lsize = len(buf)
×
338
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
×
339
        npad_lsize = total_lsize - payload_lsize
×
340
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
341
        # and create a packet with half that size ("ssize", s for small)
342
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
×
343
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
×
344
        if payload_ssize != -1:
×
345
            payload_ssize += 1  # for "\n" char
×
346
            npad_ssize = total_ssize - payload_ssize
×
347
        else:
348
            npad_ssize = float("inf")
×
349
        # decide between (1) and (2):
350
        if self._force_send or npad_lsize <= npad_ssize:
×
351
            # (1) create "lsize" packet: consume full buffer
352
            npad = npad_lsize
×
353
            p_idx = payload_lsize
×
354
        else:
355
            # (2) create "ssize" packet: consume some, but defer some for later
356
            npad = npad_ssize
×
357
            p_idx = payload_ssize
×
358
        # pad by adding spaces near end
359
        # self.session.maybe_log(
360
        #     f"PaddedRSTransport. calling low-level write(). "
361
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
362
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
363
        # )
364
        json_rpc_terminator = buf[p_idx-2:p_idx]
×
365
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
×
366
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
×
367
        self._asyncio_transport.write(buf2)
×
368
        self._last_send = time.monotonic()
×
369
        del self._sbuffer[:p_idx]
×
370
        if not self._sbuffer:
×
371
            self._sbuffer_has_data_evt.clear()
×
372

373
    async def _poll_sbuffer(self):
5✔
374
        while not self.is_closing():
×
375
            await self._can_send.wait()
×
376
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
×
377
            self._maybe_consume_sbuffer()
×
378
            # If there is still data in the buffer, sleep until it would time out.
379
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
380
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
381
            if len(self._sbuffer) > 0:
×
382
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
383
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
384
                await asyncio.sleep(timeout_rel)
×
385

386
    def connection_made(self, transport: asyncio.BaseTransport):
5✔
387
        super().connection_made(transport)
×
388
        if isinstance(self.session, NotificationSession):
×
389
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
×
390
            self._sbuffer_task = self.loop.create_task(coro)
×
391
        else:
392
            # This a short-lived "fetch_certificate"-type session.
393
            # No polling here, we always force-empty the buffer.
394
            self._force_send = True
×
395

396

397
class ServerAddr:
5✔
398

399
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
5✔
400
        assert isinstance(host, str), repr(host)
5✔
401
        if protocol is None:
5✔
402
            protocol = 's'
×
403
        if not host:
5✔
404
            raise ValueError('host must not be empty')
×
405
        if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
406
            host = host[1:-1]
5✔
407
        try:
5✔
408
            net_addr = NetAddress(host, port)  # this validates host and port
5✔
409
        except Exception as e:
5✔
410
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
5✔
411
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
5✔
412
            raise ValueError(f"invalid network protocol: {protocol}")
×
413
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
5✔
414
        self.port = int(net_addr.port)
5✔
415
        self.protocol = protocol
5✔
416
        self._net_addr_str = str(net_addr)
5✔
417

418
    @classmethod
5✔
419
    def from_str(cls, s: str) -> 'ServerAddr':
5✔
420
        """Constructs a ServerAddr or raises ValueError."""
421
        # host might be IPv6 address, hence do rsplit:
422
        host, port, protocol = str(s).rsplit(':', 2)
5✔
423
        return ServerAddr(host=host, port=port, protocol=protocol)
5✔
424

425
    @classmethod
5✔
426
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
5✔
427
        """Construct ServerAddr from str, guessing missing details.
428
        Does not raise - just returns None if guessing failed.
429
        Ongoing compatibility not guaranteed.
430
        """
431
        if not s:
5✔
432
            return None
×
433
        host = ""
5✔
434
        if s[0] == "[" and "]" in s:  # IPv6 address
5✔
435
            host_end = s.index("]")
5✔
436
            host = s[1:host_end]
5✔
437
            s = s[host_end+1:]
5✔
438
        items = str(s).rsplit(':', 2)
5✔
439
        if len(items) < 2:
5✔
440
            return None  # although maybe we could guess the port too?
5✔
441
        host = host or items[0]
5✔
442
        port = items[1]
5✔
443
        if len(items) >= 3:
5✔
444
            protocol = items[2]
5✔
445
        else:
446
            protocol = PREFERRED_NETWORK_PROTOCOL
5✔
447
        try:
5✔
448
            return ServerAddr(host=host, port=port, protocol=protocol)
5✔
449
        except ValueError:
5✔
450
            return None
5✔
451

452
    def to_friendly_name(self) -> str:
5✔
453
        # note: this method is closely linked to from_str_with_inference
454
        if self.protocol == 's':  # hide trailing ":s"
5✔
455
            return self.net_addr_str()
5✔
456
        return str(self)
5✔
457

458
    def __str__(self):
5✔
459
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
5✔
460

461
    def to_json(self) -> str:
5✔
462
        return str(self)
×
463

464
    def __repr__(self):
5✔
465
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
466

467
    def net_addr_str(self) -> str:
5✔
468
        return self._net_addr_str
5✔
469

470
    def __eq__(self, other):
5✔
471
        if not isinstance(other, ServerAddr):
5✔
472
            return False
×
473
        return (self.host == other.host
5✔
474
                and self.port == other.port
475
                and self.protocol == other.protocol)
476

477
    def __ne__(self, other):
5✔
478
        return not (self == other)
×
479

480
    def __hash__(self):
5✔
481
        return hash((self.host, self.port, self.protocol))
×
482

483

484
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
5✔
485
    filename = host
5✔
486
    try:
5✔
487
        ip = ip_address(host)
5✔
488
    except ValueError:
5✔
489
        pass
5✔
490
    else:
491
        if isinstance(ip, IPv6Address):
×
492
            filename = f"ipv6_{ip.packed.hex()}"
×
493
    return os.path.join(config.path, 'certs', filename)
5✔
494

495

496
class Interface(Logger):
5✔
497

498
    def __init__(self, *, network: 'Network', server: ServerAddr):
5✔
499
        self.ready = network.asyncio_loop.create_future()
5✔
500
        self.got_disconnected = asyncio.Event()
5✔
501
        self.server = server
5✔
502
        Logger.__init__(self)
5✔
503
        assert network.config.path
5✔
504
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
5✔
505
        self.blockchain = None  # type: Optional[Blockchain]
5✔
506
        self._requested_chunks = set()  # type: Set[int]
5✔
507
        self.network = network
5✔
508
        self.session = None  # type: Optional[NotificationSession]
5✔
509
        self._ipaddr_bucket = None
5✔
510
        # Set up proxy.
511
        # - for servers running on localhost, the proxy is not used. If user runs their own server
512
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
513
        #   note: we could maybe relax this further and bypass the proxy for all private
514
        #         addresses...? e.g. 192.168.x.x
515
        if util.is_localhost(server.host):
5✔
516
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
517
            self.proxy = None
×
518
        else:
519
            self.proxy = ESocksProxy.from_network_settings(network)
5✔
520

521
        # Latest block header and corresponding height, as claimed by the server.
522
        # Note that these values are updated before they are verified.
523
        # Especially during initial header sync, verification can take a long time.
524
        # Failing verification will get the interface closed.
525
        self.tip_header = None  # type: Optional[dict]
5✔
526
        self.tip = 0
5✔
527

528
        self._headers_cache = {}  # type: Dict[int, bytes]
5✔
529

530
        self.fee_estimates_eta = {}  # type: Dict[int, int]
5✔
531

532
        # Dump network messages (only for this interface).  Set at runtime from the console.
533
        self.debug = False
5✔
534

535
        self.taskgroup = OldTaskGroup()
5✔
536

537
        async def spawn_task():
5✔
538
            task = await self.network.taskgroup.spawn(self.run())
5✔
539
            task.set_name(f"interface::{str(server)}")
5✔
540
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
5✔
541

542
    @property
5✔
543
    def host(self):
5✔
544
        return self.server.host
5✔
545

546
    @property
5✔
547
    def port(self):
5✔
548
        return self.server.port
×
549

550
    @property
5✔
551
    def protocol(self):
5✔
552
        return self.server.protocol
×
553

554
    def diagnostic_name(self):
5✔
555
        return self.server.net_addr_str()
5✔
556

557
    def __str__(self):
5✔
558
        return f"<Interface {self.diagnostic_name()}>"
×
559

560
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
5✔
561
        """Given a CA enforcing SSL context, returns True if the connection
562
        can be established. Returns False if the server has a self-signed
563
        certificate but otherwise is okay. Any other failures raise.
564
        """
565
        try:
×
566
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
567
        except ConnectError as e:
×
568
            cause = e.__cause__
×
569
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
570
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
571
                    and cause.verify_code == 18):  # "self signed certificate"
572
                # Good. We will use this server as self-signed.
573
                return False
×
574
            # Not good. Cannot use this server.
575
            raise
×
576
        # Good. We will use this server as CA-signed.
577
        return True
×
578

579
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
5✔
580
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
581
        if ca_signed:
×
582
            if self._get_expected_fingerprint():
×
583
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
584
            with open(self.cert_path, 'w') as f:
×
585
                # empty file means this is CA signed, not self-signed
586
                f.write('')
×
587
        else:
588
            await self._save_certificate()
×
589

590
    def _is_saved_ssl_cert_available(self):
5✔
591
        if not os.path.exists(self.cert_path):
×
592
            return False
×
593
        with open(self.cert_path, 'r') as f:
×
594
            contents = f.read()
×
595
        if contents == '':  # CA signed
×
596
            if self._get_expected_fingerprint():
×
597
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
598
            return True
×
599
        # pinned self-signed cert
600
        try:
×
601
            b = pem.dePem(contents, 'CERTIFICATE')
×
602
        except SyntaxError as e:
×
603
            self.logger.info(f"error parsing already saved cert: {e}")
×
604
            raise ErrorParsingSSLCert(e) from e
×
605
        try:
×
606
            x = x509.X509(b)
×
607
        except Exception as e:
×
608
            self.logger.info(f"error parsing already saved cert: {e}")
×
609
            raise ErrorParsingSSLCert(e) from e
×
610
        try:
×
611
            x.check_date()
×
612
        except x509.CertificateError as e:
×
613
            self.logger.info(f"certificate has expired: {e}")
×
614
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
615
            return False
×
616
        self._verify_certificate_fingerprint(bytes(b))
×
617
        return True
×
618

619
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
5✔
620
        if self.protocol != 's':
×
621
            # using plaintext TCP
622
            return None
×
623

624
        # see if we already have cert for this server; or get it for the first time
625
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
626
        if not self._is_saved_ssl_cert_available():
×
627
            try:
×
628
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
629
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
630
                raise ErrorGettingSSLCertFromServer(e) from e
×
631
        # now we have a file saved in our certificate store
632
        siz = os.stat(self.cert_path).st_size
×
633
        if siz == 0:
×
634
            # CA signed cert
635
            sslc = ca_sslc
×
636
        else:
637
            # pinned self-signed cert
638
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
639
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
640
            #       We explicitly disable it as it breaks lots of servers.
641
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
642
            sslc.check_hostname = False
×
643
        return sslc
×
644

645
    def handle_disconnect(func):
5✔
646
        @functools.wraps(func)
5✔
647
        async def wrapper_func(self: 'Interface', *args, **kwargs):
5✔
648
            try:
×
649
                return await func(self, *args, **kwargs)
×
650
            except GracefulDisconnect as e:
×
651
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
652
            except aiorpcx.jsonrpc.RPCError as e:
×
653
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
654
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
655
            finally:
656
                self.got_disconnected.set()
×
657
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
658
                # in case the "with taskgroup" ctx mgr never got a chance to run:
659
                await self.taskgroup.cancel_remaining()
×
660
                await self.network.connection_down(self)
×
661
                # if was not 'ready' yet, schedule waiting coroutines:
662
                self.ready.cancel()
×
663
        return wrapper_func
5✔
664

665
    @ignore_exceptions  # do not kill network.taskgroup
5✔
666
    @log_exceptions
5✔
667
    @handle_disconnect
5✔
668
    async def run(self):
5✔
669
        try:
×
670
            ssl_context = await self._get_ssl_context()
×
671
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
672
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
673
            return
×
674
        try:
×
675
            await self.open_session(ssl_context=ssl_context)
×
676
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
677
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
678
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
679
                    and self.is_main_server() and not self.network.auto_connect):
680
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
681
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
682
            else:
683
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
684
            return
×
685

686
    def _mark_ready(self) -> None:
5✔
687
        if self.ready.cancelled():
×
688
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
689
        if self.ready.done():
×
690
            return
×
691

692
        assert self.tip_header
×
693
        chain = blockchain.check_header(self.tip_header)
×
694
        if not chain:
×
695
            self.blockchain = blockchain.get_best_chain()
×
696
        else:
697
            self.blockchain = chain
×
698
        assert self.blockchain is not None
×
699

700
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
701

702
        self.ready.set_result(1)
×
703

704
    def is_connected_and_ready(self) -> bool:
5✔
705
        return self.ready.done() and not self.got_disconnected.is_set()
×
706

707
    async def _save_certificate(self) -> None:
5✔
708
        if not os.path.exists(self.cert_path):
×
709
            # we may need to retry this a few times, in case the handshake hasn't completed
710
            for _ in range(10):
×
711
                dercert = await self._fetch_certificate()
×
712
                if dercert:
×
713
                    self.logger.info("succeeded in getting cert")
×
714
                    self._verify_certificate_fingerprint(dercert)
×
715
                    with open(self.cert_path, 'w') as f:
×
716
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
717
                        # workaround android bug
718
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
719
                        f.write(cert)
×
720
                        # even though close flushes, we can't fsync when closed.
721
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
722
                        # fsync writes to OS buffer to disk
723
                        f.flush()
×
724
                        os.fsync(f.fileno())
×
725
                    break
×
726
                await asyncio.sleep(1)
×
727
            else:
728
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
729

730
    async def _fetch_certificate(self) -> bytes:
5✔
731
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
732
        sslc.check_hostname = False
×
733
        sslc.verify_mode = ssl.CERT_NONE
×
734
        async with _RSClient(
×
735
            session_factory=RPCSession,
736
            host=self.host, port=self.port,
737
            ssl=sslc,
738
            proxy=self.proxy,
739
            transport=PaddedRSTransport,
740
        ) as session:
741
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
742
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
743
            return ssl_object.getpeercert(binary_form=True)
×
744

745
    def _get_expected_fingerprint(self) -> Optional[str]:
5✔
746
        if self.is_main_server():
×
747
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
748
        return None
×
749

750
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
5✔
751
        expected_fingerprint = self._get_expected_fingerprint()
×
752
        if not expected_fingerprint:
×
753
            return
×
754
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
755
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
756
        if not fingerprints_match:
×
757
            util.trigger_callback('cert_mismatch')
×
758
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
759
        self.logger.info("cert fingerprint verification passed")
×
760

761
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
5✔
762
        """Populate header cache for block heights in range [from_height, to_height]."""
763
        assert from_height <= to_height, (from_height, to_height)
×
764
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
×
765
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
×
766
            # cache already has all requested headers
767
            return
×
768
        # use lower timeout as we usually have network.bhi_lock here
769
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
770
        count = to_height - from_height + 1
×
771
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
×
772
        for idx, raw_header in enumerate(headers):
×
773
            header_height = from_height + idx
×
774
            self._headers_cache[header_height] = raw_header
×
775

776
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
5✔
777
        if not is_non_negative_integer(height):
×
778
            raise Exception(f"{repr(height)} is not a block height")
×
779
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
780
        # use lower timeout as we usually have network.bhi_lock here
781
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
782
        if raw_header := self._headers_cache.get(height):
×
783
            return blockchain.deserialize_header(raw_header, height)
×
784
        self.logger.info(f'requesting block header {height} in {mode=}')
×
785
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
786
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
787

788
    async def get_block_headers(
5✔
789
        self,
790
        *,
791
        start_height: int,
792
        count: int,
793
        timeout=None,
794
        mode: Optional[ChainResolutionMode] = None,
795
    ) -> Sequence[bytes]:
796
        """Request a number of consecutive block headers, starting at `start_height`.
797
        `count` is the num of requested headers, BUT note the server might return fewer than this
798
        (if range would extend beyond its tip).
799
        note: the returned headers are not verified or parsed at all.
800
        """
801
        if not is_non_negative_integer(start_height):
×
802
            raise Exception(f"{repr(start_height)} is not a block height")
×
803
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
×
804
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
805
        self.logger.info(
×
806
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
807
            + (f" (in {mode=})" if mode is not None else "")
808
        )
809
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
×
810
        # check response
811
        assert_dict_contains_field(res, field_name='count')
×
812
        assert_dict_contains_field(res, field_name='hex')
×
813
        assert_dict_contains_field(res, field_name='max')
×
814
        assert_non_negative_integer(res['count'])
×
815
        assert_non_negative_integer(res['max'])
×
816
        assert_hex_str(res['hex'])
×
817
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
×
818
            raise RequestCorrupted('inconsistent chunk hex and count')
×
819
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
820
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
×
821
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
822
        if res['count'] > count:
×
823
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
824
        elif res['count'] < count:
×
825
            # we only tolerate getting fewer headers if it is due to reaching the tip
826
            end_height = start_height + res['count'] - 1
×
827
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
828
                raise RequestCorrupted(
×
829
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
830
        # checks done.
831
        headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
×
832
        return headers
×
833

834
    async def request_chunk_below_max_checkpoint(
5✔
835
        self,
836
        *,
837
        height: int,
838
    ) -> None:
839
        if not is_non_negative_integer(height):
×
840
            raise Exception(f"{repr(height)} is not a block height")
×
NEW
841
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
842
        index = height // CHUNK_SIZE
×
NEW
843
        if index in self._requested_chunks:
×
844
            return None
×
NEW
845
        self.logger.debug(f"requesting chunk from height {height}")
×
846
        try:
×
847
            self._requested_chunks.add(index)
×
NEW
848
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
849
        finally:
850
            self._requested_chunks.discard(index)
×
851
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
852
        if not conn:
×
NEW
853
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
NEW
854
        return None
×
855

856
    async def _fast_forward_chain(
5✔
857
        self,
858
        *,
859
        height: int,  # usually local chain tip + 1
860
        tip: int,  # server tip. we should not request past this.
861
    ) -> int:
862
        """Request some headers starting at `height` to grow the blockchain of this interface.
863
        Returns number of headers we managed to connect, starting at `height`.
864
        """
NEW
865
        if not is_non_negative_integer(height):
×
NEW
866
            raise Exception(f"{repr(height)} is not a block height")
×
NEW
867
        if not is_non_negative_integer(tip):
×
NEW
868
            raise Exception(f"{repr(tip)} is not a block height")
×
NEW
869
        if not (height > constants.net.max_checkpoint()
×
870
                or height == 0 == constants.net.max_checkpoint()):
NEW
871
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
NEW
872
        assert height <= tip, f"{height=} must be <= {tip=}"
×
873
        # Request a few chunks of headers concurrently.
874
        # tradeoffs:
875
        # - more chunks: higher memory requirements
876
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
877
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
NEW
878
        async with OldTaskGroup() as group:
×
NEW
879
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
NEW
880
            index0 = height // CHUNK_SIZE
×
NEW
881
            for chunk_cnt in range(10):
×
NEW
882
                index = index0 + chunk_cnt
×
NEW
883
                start_height = index * CHUNK_SIZE
×
NEW
884
                if start_height > tip:
×
NEW
885
                    break
×
NEW
886
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
NEW
887
                size = end_height - start_height + 1
×
NEW
888
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
889
        # try to connect chunks
NEW
890
        num_headers = 0
×
NEW
891
        for index, task in tasks:
×
NEW
892
            headers = task.result()
×
NEW
893
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
NEW
894
            if not conn:
×
NEW
895
                break
×
NEW
896
            num_headers += len(headers)
×
897
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
NEW
898
        offset = height - index0 * CHUNK_SIZE
×
NEW
899
        return max(0, num_headers - offset)
×
900

901
    def is_main_server(self) -> bool:
5✔
902
        return (self.network.interface == self or
×
903
                self.network.interface is None and self.network.default_server == self.server)
904

905
    async def open_session(
5✔
906
        self,
907
        *,
908
        ssl_context: Optional[ssl.SSLContext],
909
        exit_early: bool = False,
910
    ):
911
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
912
        async with _RSClient(
×
913
            session_factory=session_factory,
914
            host=self.host, port=self.port,
915
            ssl=ssl_context,
916
            proxy=self.proxy,
917
            transport=PaddedRSTransport,
918
        ) as session:
919
            self.session = session  # type: NotificationSession
×
920
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
921
            try:
×
922
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
×
923
            except aiorpcx.jsonrpc.RPCError as e:
×
924
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
925
            if exit_early:
×
926
                return
×
927
            if ver[1] != version.PROTOCOL_VERSION:
×
928
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
929
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
930
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
931
                raise GracefulDisconnect(f'too many connected servers already '
×
932
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
933
            self.logger.info(f"connection established. version: {ver}")
×
934

935
            try:
×
936
                async with self.taskgroup as group:
×
937
                    await group.spawn(self.ping)
×
938
                    await group.spawn(self.request_fee_estimates)
×
939
                    await group.spawn(self.run_fetch_blocks)
×
940
                    await group.spawn(self.monitor_connection)
×
941
            except aiorpcx.jsonrpc.RPCError as e:
×
942
                if e.code in (
×
943
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
944
                    JSONRPC.SERVER_BUSY,
945
                    JSONRPC.METHOD_NOT_FOUND,
946
                    JSONRPC.INTERNAL_ERROR,
947
                ):
948
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
949
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
950
                raise
×
951
            finally:
952
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
953

954
    async def monitor_connection(self):
5✔
955
        while True:
×
956
            await asyncio.sleep(1)
×
957
            # If the session/transport is no longer open, we disconnect.
958
            # e.g. if the remote cleanly sends EOF, we would handle that here.
959
            # note: If the user pulls the ethernet cable or disconnects wifi,
960
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
961
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
962
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
963
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
964
            #         Hence, in practice the connection issue will only be detected the next time we try
965
            #         to send a message (plus timeout), which can take minutes...
966
            if not self.session or self.session.is_closing():
×
967
                raise GracefulDisconnect('session was closed')
×
968

969
    async def ping(self):
5✔
970
        # We periodically send a "ping" msg to make sure the server knows we are still here.
971
        # Adding a bit of randomness generates some noise against traffic analysis.
972
        while True:
×
973
            await asyncio.sleep(random.random() * 300)
×
974
            await self.session.send_request('server.ping')
×
975
            await self._maybe_send_noise()
×
976

977
    async def _maybe_send_noise(self):
5✔
978
        while random.random() < 0.2:
×
979
            await asyncio.sleep(random.random())
×
980
            await self.session.send_request('server.ping')
×
981

982
    async def request_fee_estimates(self):
5✔
983
        while True:
×
984
            async with OldTaskGroup() as group:
×
985
                fee_tasks = []
×
986
                for i in FEE_ETA_TARGETS[0:-1]:
×
987
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
988
            for nblock_target, task in fee_tasks:
×
989
                fee = task.result()
×
990
                if fee < 0: continue
×
991
                assert isinstance(fee, int)
×
992
                self.fee_estimates_eta[nblock_target] = fee
×
993
            self.network.update_fee_estimates()
×
994
            await asyncio.sleep(60)
×
995

996
    async def close(self, *, force_after: int = None):
5✔
997
        """Closes the connection and waits for it to be closed.
998
        We try to flush buffered data to the wire, which can take some time.
999
        """
1000
        if self.session:
×
1001
            await self.session.close(force_after=force_after)
×
1002
        # monitor_connection will cancel tasks
1003

1004
    async def run_fetch_blocks(self):
5✔
1005
        header_queue = asyncio.Queue()
×
1006
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
1007
        while True:
×
1008
            item = await header_queue.get()
×
1009
            raw_header = item[0]
×
1010
            height = raw_header['height']
×
1011
            header_bytes = bfh(raw_header['hex'])
×
1012
            header_dict = blockchain.deserialize_header(header_bytes, height)
×
1013
            self.tip_header = header_dict
×
1014
            self.tip = height
×
1015
            if self.tip < constants.net.max_checkpoint():
×
1016
                raise GracefulDisconnect(
×
1017
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1018
            self._mark_ready()
×
1019
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
×
1020
            self._headers_cache[height] = header_bytes
×
1021
            try:
×
1022
                blockchain_updated = await self._process_header_at_tip()
×
1023
            finally:
1024
                self._headers_cache.clear()  # to reduce memory usage
×
1025
            # header processing done
1026
            if self.is_main_server() or blockchain_updated:
×
1027
                self.logger.info(f"new chain tip. {height=}")
×
1028
            if blockchain_updated:
×
1029
                util.trigger_callback('blockchain_updated')
×
1030
            util.trigger_callback('network_updated')
×
1031
            await self.network.switch_unwanted_fork_interface()
×
1032
            await self.network.switch_lagging_interface()
×
1033
            await self.taskgroup.spawn(self._maybe_send_noise())
×
1034

1035
    async def _process_header_at_tip(self) -> bool:
5✔
1036
        """Returns:
1037
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1038
        True - new header we didn't have, or reorg
1039
        """
1040
        height, header = self.tip, self.tip_header
×
1041
        async with self.network.bhi_lock:
×
1042
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
1043
                # another interface amended the blockchain
1044
                return False
×
1045
            await self.sync_until(height)
×
1046
            return True
×
1047

1048
    async def sync_until(
5✔
1049
        self,
1050
        height: int,
1051
        *,
1052
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1053
    ) -> Tuple[ChainResolutionMode, int]:
1054
        if next_height is None:
5✔
1055
            next_height = self.tip
5✔
1056
        last = None  # type: Optional[ChainResolutionMode]
5✔
1057
        while last is None or height <= next_height:
5✔
1058
            prev_last, prev_height = last, height
5✔
1059
            if next_height > height + 144:
5✔
1060
                # We are far from the tip.
1061
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1062
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
NEW
1063
                num_headers = await self._fast_forward_chain(
×
1064
                    height=height, tip=next_height)
NEW
1065
                if num_headers == 0:
×
1066
                    if height <= constants.net.max_checkpoint():
×
1067
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1068
                    last, height = await self.step(height)
×
1069
                    continue
×
1070
                # report progress to gui/etc
1071
                util.trigger_callback('blockchain_updated')
×
1072
                util.trigger_callback('network_updated')
×
NEW
1073
                height += num_headers
×
1074
                assert height <= next_height+1, (height, self.tip)
×
1075
                last = ChainResolutionMode.CATCHUP
×
1076
            else:
1077
                # We are close to the tip, so process headers one-by-one.
1078
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1079
                last, height = await self.step(height)
5✔
1080
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
5✔
1081
        return last, height
5✔
1082

1083
    async def step(
5✔
1084
        self,
1085
        height: int,
1086
    ) -> Tuple[ChainResolutionMode, int]:
1087
        assert 0 <= height <= self.tip, (height, self.tip)
5✔
1088
        await self._maybe_warm_headers_cache(
5✔
1089
            from_height=height,
1090
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1091
            mode=ChainResolutionMode.CATCHUP,
1092
        )
1093
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
5✔
1094

1095
        chain = blockchain.check_header(header)
5✔
1096
        if chain:
5✔
1097
            self.blockchain = chain
5✔
1098
            # note: there is an edge case here that is not handled.
1099
            # we might know the blockhash (enough for check_header) but
1100
            # not have the header itself. e.g. regtest chain with only genesis.
1101
            # this situation resolves itself on the next block
1102
            return ChainResolutionMode.CATCHUP, height+1
5✔
1103

1104
        can_connect = blockchain.can_connect(header)
5✔
1105
        if not can_connect:
5✔
1106
            self.logger.info(f"can't connect new block: {height=}")
5✔
1107
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
5✔
1108
            chain = blockchain.check_header(header)
5✔
1109
            can_connect = blockchain.can_connect(header)
5✔
1110
            assert chain or can_connect
5✔
1111
        if can_connect:
5✔
1112
            height += 1
5✔
1113
            self.blockchain = can_connect
5✔
1114
            self.blockchain.save_header(header)
5✔
1115
            return ChainResolutionMode.CATCHUP, height
5✔
1116

1117
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
5✔
1118
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
5✔
1119

1120
    async def _search_headers_binary(
5✔
1121
        self,
1122
        height: int,
1123
        bad: int,
1124
        bad_header: dict,
1125
        chain: Optional[Blockchain],
1126
    ) -> Tuple[int, int, dict]:
1127
        assert bad == bad_header['block_height']
5✔
1128
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1129

1130
        self.blockchain = chain
5✔
1131
        good = height
5✔
1132
        while True:
5✔
1133
            assert 0 <= good < bad, (good, bad)
5✔
1134
            height = (good + bad) // 2
5✔
1135
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
5✔
1136
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
5✔
1137
                await self._maybe_warm_headers_cache(
5✔
1138
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1139
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
5✔
1140
            chain = blockchain.check_header(header)
5✔
1141
            if chain:
5✔
1142
                self.blockchain = chain
5✔
1143
                good = height
5✔
1144
            else:
1145
                bad = height
5✔
1146
                bad_header = header
5✔
1147
            if good + 1 == bad:
5✔
1148
                break
5✔
1149

1150
        if not self.blockchain.can_connect(bad_header, check_height=False):
5✔
1151
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1152
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1153

1154
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
5✔
1155
        return good, bad, bad_header
5✔
1156

1157
    async def _resolve_potential_chain_fork_given_forkpoint(
5✔
1158
        self,
1159
        good: int,
1160
        bad: int,
1161
        bad_header: dict,
1162
    ) -> Tuple[ChainResolutionMode, int]:
1163
        assert good + 1 == bad
5✔
1164
        assert bad == bad_header['block_height']
5✔
1165
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1166
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1167
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1168

1169
        bh = self.blockchain.height()
5✔
1170
        assert bh >= good, (bh, good)
5✔
1171
        if bh == good:
5✔
1172
            height = good + 1
5✔
1173
            self.logger.info(f"catching up from {height}")
5✔
1174
            return ChainResolutionMode.NO_FORK, height
5✔
1175

1176
        # this is a new fork we don't yet have
1177
        height = bad + 1
5✔
1178
        self.logger.info(f"new fork at bad height {bad}")
5✔
1179
        b = self.blockchain.fork(bad_header)  # type: Blockchain
5✔
1180
        self.blockchain = b
5✔
1181
        assert b.forkpoint == bad
5✔
1182
        return ChainResolutionMode.FORK, height
5✔
1183

1184
    async def _search_headers_backwards(
5✔
1185
        self,
1186
        height: int,
1187
        *,
1188
        header: dict,
1189
    ) -> Tuple[int, dict, int, dict]:
1190
        async def iterate():
5✔
1191
            nonlocal height, header
1192
            checkp = False
5✔
1193
            if height <= constants.net.max_checkpoint():
5✔
1194
                height = constants.net.max_checkpoint()
×
1195
                checkp = True
×
1196
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
5✔
1197
            chain = blockchain.check_header(header)
5✔
1198
            can_connect = blockchain.can_connect(header)
5✔
1199
            if chain or can_connect:
5✔
1200
                return False
5✔
1201
            if checkp:
5✔
1202
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1203
            return True
5✔
1204

1205
        bad, bad_header = height, header
5✔
1206
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1207
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
5✔
1208
        local_max = max([0] + [x.height() for x in chains])
5✔
1209
        height = min(local_max + 1, height - 1)
5✔
1210
        assert height >= 0
5✔
1211

1212
        await self._maybe_warm_headers_cache(
5✔
1213
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1214

1215
        delta = 2
5✔
1216
        while await iterate():
5✔
1217
            bad, bad_header = height, header
5✔
1218
            height -= delta
5✔
1219
            delta *= 2
5✔
1220

1221
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1222
        self.logger.info(f"exiting backward mode at {height}")
5✔
1223
        return height, header, bad, bad_header
5✔
1224

1225
    @classmethod
5✔
1226
    def client_name(cls) -> str:
5✔
1227
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1228

1229
    def is_tor(self):
5✔
1230
        return self.host.endswith('.onion')
×
1231

1232
    def ip_addr(self) -> Optional[str]:
5✔
1233
        session = self.session
×
1234
        if not session: return None
×
1235
        peer_addr = session.remote_address()
×
1236
        if not peer_addr: return None
×
1237
        return str(peer_addr.host)
×
1238

1239
    def bucket_based_on_ipaddress(self) -> str:
5✔
1240
        def do_bucket():
×
1241
            if self.is_tor():
×
1242
                return BUCKET_NAME_OF_ONION_SERVERS
×
1243
            try:
×
1244
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1245
            except ValueError:
×
1246
                return ''
×
1247
            if not ip_addr:
×
1248
                return ''
×
1249
            if ip_addr.is_loopback:  # localhost is exempt
×
1250
                return ''
×
1251
            if ip_addr.version == 4:
×
1252
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1253
                return str(slash16)
×
1254
            elif ip_addr.version == 6:
×
1255
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1256
                return str(slash48)
×
1257
            return ''
×
1258

1259
        if not self._ipaddr_bucket:
×
1260
            self._ipaddr_bucket = do_bucket()
×
1261
        return self._ipaddr_bucket
×
1262

1263
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
1264
        if not is_hash256_str(tx_hash):
×
1265
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1266
        if not is_non_negative_integer(tx_height):
×
1267
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1268
        # do request
1269
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1270
        # check response
1271
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1272
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1273
        pos = assert_dict_contains_field(res, field_name='pos')
×
1274
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1275
        assert_non_negative_integer(block_height)
×
1276
        assert_non_negative_integer(pos)
×
1277
        assert_list_or_tuple(merkle)
×
1278
        for item in merkle:
×
1279
            assert_hash256_str(item)
×
1280
        return res
×
1281

1282
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
1283
        if not is_hash256_str(tx_hash):
×
1284
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1285
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
1286
        # validate response
1287
        if not is_hex_str(raw):
×
1288
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1289
        tx = Transaction(raw)
×
1290
        try:
×
1291
            tx.deserialize()  # see if raises
×
1292
        except Exception as e:
×
1293
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1294
        if tx.txid() != tx_hash:
×
1295
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1296
        return raw
×
1297

1298
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1299
        if not is_hash256_str(sh):
×
1300
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1301
        # do request
1302
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1303
        # check response
1304
        assert_list_or_tuple(res)
×
1305
        prev_height = 1
×
1306
        for tx_item in res:
×
1307
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1308
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1309
            assert_integer(height)
×
1310
            assert_hash256_str(tx_item['tx_hash'])
×
1311
            if height in (-1, 0):
×
1312
                assert_dict_contains_field(tx_item, field_name='fee')
×
1313
                assert_non_negative_integer(tx_item['fee'])
×
1314
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1315
            else:
1316
                # check monotonicity of heights
1317
                if height < prev_height:
×
1318
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1319
                prev_height = height
×
1320
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1321
        if len(hashes) != len(res):
×
1322
            # Either server is sending garbage... or maybe if server is race-prone
1323
            # a recently mined tx could be included in both last block and mempool?
1324
            # Still, it's simplest to just disregard the response.
1325
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1326
        return res
×
1327

1328
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1329
        if not is_hash256_str(sh):
×
1330
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1331
        # do request
1332
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1333
        # check response
1334
        assert_list_or_tuple(res)
×
1335
        for utxo_item in res:
×
1336
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1337
            assert_dict_contains_field(utxo_item, field_name='value')
×
1338
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1339
            assert_dict_contains_field(utxo_item, field_name='height')
×
1340
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1341
            assert_non_negative_integer(utxo_item['value'])
×
1342
            assert_non_negative_integer(utxo_item['height'])
×
1343
            assert_hash256_str(utxo_item['tx_hash'])
×
1344
        return res
×
1345

1346
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1347
        if not is_hash256_str(sh):
×
1348
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1349
        # do request
1350
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1351
        # check response
1352
        assert_dict_contains_field(res, field_name='confirmed')
×
1353
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1354
        assert_non_negative_integer(res['confirmed'])
×
1355
        assert_integer(res['unconfirmed'])
×
1356
        return res
×
1357

1358
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
5✔
1359
        if not is_non_negative_integer(tx_height):
×
1360
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1361
        if not is_non_negative_integer(tx_pos):
×
1362
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1363
        # do request
1364
        res = await self.session.send_request(
×
1365
            'blockchain.transaction.id_from_pos',
1366
            [tx_height, tx_pos, merkle],
1367
        )
1368
        # check response
1369
        if merkle:
×
1370
            assert_dict_contains_field(res, field_name='tx_hash')
×
1371
            assert_dict_contains_field(res, field_name='merkle')
×
1372
            assert_hash256_str(res['tx_hash'])
×
1373
            assert_list_or_tuple(res['merkle'])
×
1374
            for node_hash in res['merkle']:
×
1375
                assert_hash256_str(node_hash)
×
1376
        else:
1377
            assert_hash256_str(res)
×
1378
        return res
×
1379

1380
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
5✔
1381
        # do request
1382
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1383
        # check response
1384
        assert_list_or_tuple(res)
×
1385
        prev_fee = float('inf')
×
1386
        for fee, s in res:
×
1387
            assert_non_negative_int_or_float(fee)
×
1388
            assert_non_negative_integer(s)
×
1389
            if fee >= prev_fee:  # check monotonicity
×
1390
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1391
            prev_fee = fee
×
1392
        return res
×
1393

1394
    async def get_server_banner(self) -> str:
5✔
1395
        # do request
1396
        res = await self.session.send_request('server.banner')
×
1397
        # check response
1398
        if not isinstance(res, str):
×
1399
            raise RequestCorrupted(f'{res!r} should be a str')
×
1400
        return res
×
1401

1402
    async def get_donation_address(self) -> str:
5✔
1403
        # do request
1404
        res = await self.session.send_request('server.donation_address')
×
1405
        # check response
1406
        if not res:  # ignore empty string
×
1407
            return ''
×
1408
        if not bitcoin.is_address(res):
×
1409
            # note: do not hard-fail -- allow server to use future-type
1410
            #       bitcoin address we do not recognize
1411
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1412
            res = ''
×
1413
        return res
×
1414

1415
    async def get_relay_fee(self) -> int:
5✔
1416
        """Returns the min relay feerate in sat/kbyte."""
1417
        # do request
1418
        res = await self.session.send_request('blockchain.relayfee')
×
1419
        # check response
1420
        assert_non_negative_int_or_float(res)
×
1421
        relayfee = int(res * bitcoin.COIN)
×
1422
        relayfee = max(0, relayfee)
×
1423
        return relayfee
×
1424

1425
    async def get_estimatefee(self, num_blocks: int) -> int:
5✔
1426
        """Returns a feerate estimate for getting confirmed within
1427
        num_blocks blocks, in sat/kbyte.
1428
        Returns -1 if the server could not provide an estimate.
1429
        """
1430
        if not is_non_negative_integer(num_blocks):
×
1431
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1432
        # do request
1433
        try:
×
1434
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1435
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1436
            # The protocol spec says the server itself should already have returned -1
1437
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1438
            # and sends an error instead. Convert it here:
1439
            if "cannot estimate fee" in e.message:
×
1440
                res = -1
×
1441
            else:
1442
                raise
×
1443
        except aiorpcx.jsonrpc.RPCError as e:
×
1444
            # The protocol spec says the server itself should already have returned -1
1445
            # if it cannot provide an estimate. "Fulcrum" often sends:
1446
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1447
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1448
                res = -1
×
1449
            else:
1450
                raise
×
1451
        # check response
1452
        if res != -1:
×
1453
            assert_non_negative_int_or_float(res)
×
1454
            res = int(res * bitcoin.COIN)
×
1455
        return res
×
1456

1457

1458
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
5✔
1459
    chain_bad = blockchain.check_header(header)
5✔
1460
    if chain_bad:
5✔
1461
        raise Exception('bad_header must not check!')
×
1462

1463

1464
def check_cert(host, cert):
5✔
1465
    try:
×
1466
        b = pem.dePem(cert, 'CERTIFICATE')
×
1467
        x = x509.X509(b)
×
1468
    except Exception:
×
1469
        traceback.print_exc(file=sys.stdout)
×
1470
        return
×
1471

1472
    try:
×
1473
        x.check_date()
×
1474
        expired = False
×
1475
    except Exception:
×
1476
        expired = True
×
1477

1478
    m = "host: %s\n"%host
×
1479
    m += "has_expired: %s\n"% expired
×
1480
    util.print_msg(m)
×
1481

1482

1483
# Used by tests
1484
def _match_hostname(name, val):
5✔
1485
    if val == name:
×
1486
        return True
×
1487

1488
    return val.startswith('*.') and name.endswith(val[1:])
×
1489

1490

1491
def test_certificates():
5✔
1492
    from .simple_config import SimpleConfig
×
1493
    config = SimpleConfig()
×
1494
    mydir = os.path.join(config.path, "certs")
×
1495
    certs = os.listdir(mydir)
×
1496
    for c in certs:
×
1497
        p = os.path.join(mydir,c)
×
1498
        with open(p, encoding='utf-8') as f:
×
1499
            cert = f.read()
×
1500
        check_cert(c, cert)
×
1501

1502
if __name__ == "__main__":
5✔
1503
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc