• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6689879406411776

04 Jun 2025 09:05PM UTC coverage: 59.527% (-0.2%) from 59.68%
6689879406411776

Pull #9883

CirrusCI

SomberNight
android build: downgrade cython
Pull Request #9883: WIP: upgrade build deps

21735 of 36513 relevant lines covered (59.53%)

2.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.5
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
5✔
26
import re
5✔
27
import ssl
5✔
28
import sys
5✔
29
import time
5✔
30
import traceback
5✔
31
import asyncio
5✔
32
import socket
5✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
5✔
34
from collections import defaultdict
5✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
5✔
36
import itertools
5✔
37
import logging
5✔
38
import hashlib
5✔
39
import functools
5✔
40
import random
5✔
41

42
import aiorpcx
5✔
43
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
5✔
44
from aiorpcx.curio import timeout_after, TaskTimeout
5✔
45
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
5✔
46
from aiorpcx.rawsocket import RSClient, RSTransport
5✔
47
import certifi
5✔
48

49
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
5✔
50
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
51
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
52
from . import util
5✔
53
from . import x509
5✔
54
from . import pem
5✔
55
from . import version
5✔
56
from . import blockchain
5✔
57
from .blockchain import Blockchain, HEADER_SIZE
5✔
58
from . import bitcoin
5✔
59
from . import constants
5✔
60
from .i18n import _
5✔
61
from .logging import Logger
5✔
62
from .transaction import Transaction
5✔
63
from .fee_policy import FEE_ETA_TARGETS
5✔
64

65
if TYPE_CHECKING:
5✔
66
    from .network import Network
×
67
    from .simple_config import SimpleConfig
×
68

69

70
ca_path = certifi.where()
5✔
71

72
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
5✔
73

74
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
5✔
75
PREFERRED_NETWORK_PROTOCOL = 's'
5✔
76
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
5✔
77

78

79
class NetworkTimeout:
5✔
80
    # seconds
81
    class Generic:
5✔
82
        NORMAL = 30
5✔
83
        RELAXED = 45
5✔
84
        MOST_RELAXED = 600
5✔
85

86
    class Urgent(Generic):
5✔
87
        NORMAL = 10
5✔
88
        RELAXED = 20
5✔
89
        MOST_RELAXED = 60
5✔
90

91

92
def assert_non_negative_integer(val: Any) -> None:
5✔
93
    if not is_non_negative_integer(val):
×
94
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
95

96

97
def assert_integer(val: Any) -> None:
5✔
98
    if not is_integer(val):
×
99
        raise RequestCorrupted(f'{val!r} should be an integer')
×
100

101

102
def assert_int_or_float(val: Any) -> None:
5✔
103
    if not is_int_or_float(val):
×
104
        raise RequestCorrupted(f'{val!r} should be int or float')
×
105

106

107
def assert_non_negative_int_or_float(val: Any) -> None:
5✔
108
    if not is_non_negative_int_or_float(val):
×
109
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
110

111

112
def assert_hash256_str(val: Any) -> None:
5✔
113
    if not is_hash256_str(val):
×
114
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
115

116

117
def assert_hex_str(val: Any) -> None:
5✔
118
    if not is_hex_str(val):
×
119
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
120

121

122
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
5✔
123
    if not isinstance(d, dict):
×
124
        raise RequestCorrupted(f'{d!r} should be a dict')
×
125
    if field_name not in d:
×
126
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
127
    return d[field_name]
×
128

129

130
def assert_list_or_tuple(val: Any) -> None:
5✔
131
    if not isinstance(val, (list, tuple)):
×
132
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
133

134

135
class NotificationSession(RPCSession):
5✔
136

137
    def __init__(self, *args, interface: 'Interface', **kwargs):
5✔
138
        super(NotificationSession, self).__init__(*args, **kwargs)
×
139
        self.subscriptions = defaultdict(list)
×
140
        self.cache = {}
×
141
        self._msg_counter = itertools.count(start=1)
×
142
        self.interface = interface
×
143
        self.taskgroup = interface.taskgroup
×
144
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
145

146
    async def handle_request(self, request):
5✔
147
        self.maybe_log(f"--> {request}")
×
148
        try:
×
149
            if isinstance(request, Notification):
×
150
                params, result = request.args[:-1], request.args[-1]
×
151
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
152
                if key in self.subscriptions:
×
153
                    self.cache[key] = result
×
154
                    for queue in self.subscriptions[key]:
×
155
                        await queue.put(request.args)
×
156
                else:
157
                    raise Exception(f'unexpected notification')
×
158
            else:
159
                raise Exception(f'unexpected request. not a notification')
×
160
        except Exception as e:
×
161
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
162
            await self.close()
×
163

164
    async def send_request(self, *args, timeout=None, **kwargs):
5✔
165
        # note: semaphores/timeouts/backpressure etc are handled by
166
        # aiorpcx. the timeout arg here in most cases should not be set
167
        msg_id = next(self._msg_counter)
×
168
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
169
        try:
×
170
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
171
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
172
            response = await util.wait_for2(
×
173
                super().send_request(*args, **kwargs),
174
                timeout)
175
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
176
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
177
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
178
        except CodeMessageError as e:
×
179
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
180
            raise
×
181
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
182
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
183
            raise
×
184
        else:
185
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
186
            return response
×
187

188
    def set_default_timeout(self, timeout):
5✔
189
        assert hasattr(self, "sent_request_timeout")  # in base class
×
190
        self.sent_request_timeout = timeout
×
191
        assert hasattr(self, "max_send_delay")        # in base class
×
192
        self.max_send_delay = timeout
×
193

194
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
5✔
195
        # note: until the cache is written for the first time,
196
        # each 'subscribe' call might make a request on the network.
197
        key = self.get_hashable_key_for_rpc_call(method, params)
×
198
        self.subscriptions[key].append(queue)
×
199
        if key in self.cache:
×
200
            result = self.cache[key]
×
201
        else:
202
            result = await self.send_request(method, params)
×
203
            self.cache[key] = result
×
204
        await queue.put(params + [result])
×
205

206
    def unsubscribe(self, queue):
5✔
207
        """Unsubscribe a callback to free object references to enable GC."""
208
        # note: we can't unsubscribe from the server, so we keep receiving
209
        # subsequent notifications
210
        for v in self.subscriptions.values():
×
211
            if queue in v:
×
212
                v.remove(queue)
×
213

214
    @classmethod
5✔
215
    def get_hashable_key_for_rpc_call(cls, method, params):
5✔
216
        """Hashable index for subscriptions and cache"""
217
        return str(method) + repr(params)
×
218

219
    def maybe_log(self, msg: str) -> None:
5✔
220
        if not self.interface: return
×
221
        if self.interface.debug or self.interface.network.debug:
×
222
            self.interface.logger.debug(msg)
×
223

224
    def default_framer(self):
5✔
225
        # overridden so that max_size can be customized
226
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
227
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
228
        return NewlineFramer(max_size=max_size)
×
229

230
    async def close(self, *, force_after: int = None):
5✔
231
        """Closes the connection and waits for it to be closed.
232
        We try to flush buffered data to the wire, which can take some time.
233
        """
234
        if force_after is None:
×
235
            # We give up after a while and just abort the connection.
236
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
237
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
238
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
239
            #       wait until this timeout is triggered
240
            force_after = 1  # seconds
×
241
        await super().close(force_after=force_after)
×
242

243

244
class NetworkException(Exception): pass
5✔
245

246

247
class GracefulDisconnect(NetworkException):
5✔
248
    log_level = logging.INFO
5✔
249

250
    def __init__(self, *args, log_level=None, **kwargs):
5✔
251
        Exception.__init__(self, *args, **kwargs)
5✔
252
        if log_level is not None:
5✔
253
            self.log_level = log_level
×
254

255

256
class RequestTimedOut(GracefulDisconnect):
5✔
257
    def __str__(self):
5✔
258
        return _("Network request timed out.")
×
259

260

261
class RequestCorrupted(Exception): pass
5✔
262

263
class ErrorParsingSSLCert(Exception): pass
5✔
264
class ErrorGettingSSLCertFromServer(Exception): pass
5✔
265
class ErrorSSLCertFingerprintMismatch(Exception): pass
5✔
266
class InvalidOptionCombination(Exception): pass
5✔
267
class ConnectError(NetworkException): pass
5✔
268

269

270
class _RSClient(RSClient):
5✔
271
    async def create_connection(self):
5✔
272
        try:
×
273
            return await super().create_connection()
×
274
        except OSError as e:
×
275
            # note: using "from e" here will set __cause__ of ConnectError
276
            raise ConnectError(e) from e
×
277

278

279
class PaddedRSTransport(RSTransport):
5✔
280
    """A raw socket transport that provides basic countermeasures against traffic analysis
281
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
282
    (it is assumed that a network observer does not see plaintext transport contents,
283
    due to it being wrapped e.g. in TLS)
284
    """
285

286
    MIN_PACKET_SIZE = 1024
5✔
287
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
5✔
288

289
    session: Optional['RPCSession']
5✔
290

291
    def __init__(self, *args, **kwargs):
5✔
292
        RSTransport.__init__(self, *args, **kwargs)
×
293
        self._sbuffer = bytearray()  # "send buffer"
×
294
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
×
295
        self._sbuffer_has_data_evt = asyncio.Event()
×
296
        self._last_send = time.monotonic()
×
297
        self._force_send = False  # type: bool
×
298

299
    # note: this does not call super().write() but is a complete reimplementation
300
    async def write(self, message):
5✔
301
        await self._can_send.wait()
×
302
        if self.is_closing():
×
303
            return
×
304
        framed_message = self._framer.frame(message)
×
305
        self._sbuffer += framed_message
×
306
        self._sbuffer_has_data_evt.set()
×
307
        self._maybe_consume_sbuffer()
×
308

309
    def _maybe_consume_sbuffer(self) -> None:
5✔
310
        """Maybe take some data from sbuffer and send it on the wire."""
311
        if not self._can_send.is_set() or self.is_closing():
×
312
            return
×
313
        buf = self._sbuffer
×
314
        if not buf:
×
315
            return
×
316
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
317
        if not (
×
318
            self._force_send
319
            or len(buf) >= self.MIN_PACKET_SIZE
320
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
321
        ):
322
            return
×
323
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
×
324
        # either (1) pad length to next power of two, to create "lsize" packet:
325
        payload_lsize = len(buf)
×
326
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
×
327
        npad_lsize = total_lsize - payload_lsize
×
328
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
329
        # and create a packet with half that size ("ssize", s for small)
330
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
×
331
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
×
332
        if payload_ssize != -1:
×
333
            payload_ssize += 1  # for "\n" char
×
334
            npad_ssize = total_ssize - payload_ssize
×
335
        else:
336
            npad_ssize = float("inf")
×
337
        # decide between (1) and (2):
338
        if self._force_send or npad_lsize <= npad_ssize:
×
339
            # (1) create "lsize" packet: consume full buffer
340
            npad = npad_lsize
×
341
            p_idx = payload_lsize
×
342
        else:
343
            # (2) create "ssize" packet: consume some, but defer some for later
344
            npad = npad_ssize
×
345
            p_idx = payload_ssize
×
346
        # pad by adding spaces near end
347
        # self.session.maybe_log(
348
        #     f"PaddedRSTransport. calling low-level write(). "
349
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
350
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
351
        # )
352
        json_rpc_terminator = buf[p_idx-2:p_idx]
×
353
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
×
354
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
×
355
        self._asyncio_transport.write(buf2)
×
356
        self._last_send = time.monotonic()
×
357
        del self._sbuffer[:p_idx]
×
358
        if not self._sbuffer:
×
359
            self._sbuffer_has_data_evt.clear()
×
360

361
    async def _poll_sbuffer(self):
5✔
362
        while True:
×
363
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
×
364
            self._maybe_consume_sbuffer()
×
365
            # If there is still data in the buffer, sleep until it would time out.
366
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
367
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
368
            if len(self._sbuffer) > 0:
×
369
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
370
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
371
                await asyncio.sleep(timeout_rel)
×
372

373
    def connection_made(self, transport: asyncio.BaseTransport):
5✔
374
        super().connection_made(transport)
×
375
        if isinstance(self.session, NotificationSession):
×
376
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
×
377
            self._sbuffer_task = self.loop.create_task(coro)
×
378
        else:
379
            # This a short-lived "fetch_certificate"-type session.
380
            # No polling here, we always force-empty the buffer.
381
            self._force_send = True
×
382

383

384
class ServerAddr:
5✔
385

386
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
5✔
387
        assert isinstance(host, str), repr(host)
5✔
388
        if protocol is None:
5✔
389
            protocol = 's'
×
390
        if not host:
5✔
391
            raise ValueError('host must not be empty')
×
392
        if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
393
            host = host[1:-1]
5✔
394
        try:
5✔
395
            net_addr = NetAddress(host, port)  # this validates host and port
5✔
396
        except Exception as e:
5✔
397
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
5✔
398
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
5✔
399
            raise ValueError(f"invalid network protocol: {protocol}")
×
400
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
5✔
401
        self.port = int(net_addr.port)
5✔
402
        self.protocol = protocol
5✔
403
        self._net_addr_str = str(net_addr)
5✔
404

405
    @classmethod
5✔
406
    def from_str(cls, s: str) -> 'ServerAddr':
5✔
407
        """Constructs a ServerAddr or raises ValueError."""
408
        # host might be IPv6 address, hence do rsplit:
409
        host, port, protocol = str(s).rsplit(':', 2)
5✔
410
        return ServerAddr(host=host, port=port, protocol=protocol)
5✔
411

412
    @classmethod
5✔
413
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
5✔
414
        """Construct ServerAddr from str, guessing missing details.
415
        Does not raise - just returns None if guessing failed.
416
        Ongoing compatibility not guaranteed.
417
        """
418
        if not s:
5✔
419
            return None
×
420
        host = ""
5✔
421
        if s[0] == "[" and "]" in s:  # IPv6 address
5✔
422
            host_end = s.index("]")
5✔
423
            host = s[1:host_end]
5✔
424
            s = s[host_end+1:]
5✔
425
        items = str(s).rsplit(':', 2)
5✔
426
        if len(items) < 2:
5✔
427
            return None  # although maybe we could guess the port too?
5✔
428
        host = host or items[0]
5✔
429
        port = items[1]
5✔
430
        if len(items) >= 3:
5✔
431
            protocol = items[2]
5✔
432
        else:
433
            protocol = PREFERRED_NETWORK_PROTOCOL
5✔
434
        try:
5✔
435
            return ServerAddr(host=host, port=port, protocol=protocol)
5✔
436
        except ValueError:
5✔
437
            return None
5✔
438

439
    def to_friendly_name(self) -> str:
5✔
440
        # note: this method is closely linked to from_str_with_inference
441
        if self.protocol == 's':  # hide trailing ":s"
5✔
442
            return self.net_addr_str()
5✔
443
        return str(self)
5✔
444

445
    def __str__(self):
5✔
446
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
5✔
447

448
    def to_json(self) -> str:
5✔
449
        return str(self)
×
450

451
    def __repr__(self):
5✔
452
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
453

454
    def net_addr_str(self) -> str:
5✔
455
        return self._net_addr_str
5✔
456

457
    def __eq__(self, other):
5✔
458
        if not isinstance(other, ServerAddr):
5✔
459
            return False
×
460
        return (self.host == other.host
5✔
461
                and self.port == other.port
462
                and self.protocol == other.protocol)
463

464
    def __ne__(self, other):
5✔
465
        return not (self == other)
×
466

467
    def __hash__(self):
5✔
468
        return hash((self.host, self.port, self.protocol))
×
469

470

471
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
5✔
472
    filename = host
5✔
473
    try:
5✔
474
        ip = ip_address(host)
5✔
475
    except ValueError:
5✔
476
        pass
5✔
477
    else:
478
        if isinstance(ip, IPv6Address):
×
479
            filename = f"ipv6_{ip.packed.hex()}"
×
480
    return os.path.join(config.path, 'certs', filename)
5✔
481

482

483
class Interface(Logger):
5✔
484

485
    def __init__(self, *, network: 'Network', server: ServerAddr):
5✔
486
        self.ready = network.asyncio_loop.create_future()
5✔
487
        self.got_disconnected = asyncio.Event()
5✔
488
        self.server = server
5✔
489
        Logger.__init__(self)
5✔
490
        assert network.config.path
5✔
491
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
5✔
492
        self.blockchain = None  # type: Optional[Blockchain]
5✔
493
        self._requested_chunks = set()  # type: Set[int]
5✔
494
        self.network = network
5✔
495
        self.session = None  # type: Optional[NotificationSession]
5✔
496
        self._ipaddr_bucket = None
5✔
497
        # Set up proxy.
498
        # - for servers running on localhost, the proxy is not used. If user runs their own server
499
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
500
        #   note: we could maybe relax this further and bypass the proxy for all private
501
        #         addresses...? e.g. 192.168.x.x
502
        if util.is_localhost(server.host):
5✔
503
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
504
            self.proxy = None
×
505
        else:
506
            self.proxy = ESocksProxy.from_network_settings(network)
5✔
507

508
        # Latest block header and corresponding height, as claimed by the server.
509
        # Note that these values are updated before they are verified.
510
        # Especially during initial header sync, verification can take a long time.
511
        # Failing verification will get the interface closed.
512
        self.tip_header = None
5✔
513
        self.tip = 0
5✔
514

515
        self.fee_estimates_eta = {}  # type: Dict[int, int]
5✔
516

517
        # Dump network messages (only for this interface).  Set at runtime from the console.
518
        self.debug = False
5✔
519

520
        self.taskgroup = OldTaskGroup()
5✔
521

522
        async def spawn_task():
5✔
523
            task = await self.network.taskgroup.spawn(self.run())
5✔
524
            task.set_name(f"interface::{str(server)}")
5✔
525
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
5✔
526

527
    @property
5✔
528
    def host(self):
5✔
529
        return self.server.host
5✔
530

531
    @property
5✔
532
    def port(self):
5✔
533
        return self.server.port
×
534

535
    @property
5✔
536
    def protocol(self):
5✔
537
        return self.server.protocol
×
538

539
    def diagnostic_name(self):
5✔
540
        return self.server.net_addr_str()
5✔
541

542
    def __str__(self):
5✔
543
        return f"<Interface {self.diagnostic_name()}>"
×
544

545
    async def is_server_ca_signed(self, ca_ssl_context):
5✔
546
        """Given a CA enforcing SSL context, returns True if the connection
547
        can be established. Returns False if the server has a self-signed
548
        certificate but otherwise is okay. Any other failures raise.
549
        """
550
        try:
×
551
            await self.open_session(ca_ssl_context, exit_early=True)
×
552
        except ConnectError as e:
×
553
            cause = e.__cause__
×
554
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
555
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
556
                    and cause.verify_code == 18):  # "self signed certificate"
557
                # Good. We will use this server as self-signed.
558
                return False
×
559
            # Not good. Cannot use this server.
560
            raise
×
561
        # Good. We will use this server as CA-signed.
562
        return True
×
563

564
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
5✔
565
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
566
        if ca_signed:
×
567
            if self._get_expected_fingerprint():
×
568
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
569
            with open(self.cert_path, 'w') as f:
×
570
                # empty file means this is CA signed, not self-signed
571
                f.write('')
×
572
        else:
573
            await self._save_certificate()
×
574

575
    def _is_saved_ssl_cert_available(self):
5✔
576
        if not os.path.exists(self.cert_path):
×
577
            return False
×
578
        with open(self.cert_path, 'r') as f:
×
579
            contents = f.read()
×
580
        if contents == '':  # CA signed
×
581
            if self._get_expected_fingerprint():
×
582
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
583
            return True
×
584
        # pinned self-signed cert
585
        try:
×
586
            b = pem.dePem(contents, 'CERTIFICATE')
×
587
        except SyntaxError as e:
×
588
            self.logger.info(f"error parsing already saved cert: {e}")
×
589
            raise ErrorParsingSSLCert(e) from e
×
590
        try:
×
591
            x = x509.X509(b)
×
592
        except Exception as e:
×
593
            self.logger.info(f"error parsing already saved cert: {e}")
×
594
            raise ErrorParsingSSLCert(e) from e
×
595
        try:
×
596
            x.check_date()
×
597
        except x509.CertificateError as e:
×
598
            self.logger.info(f"certificate has expired: {e}")
×
599
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
600
            return False
×
601
        self._verify_certificate_fingerprint(bytearray(b))
×
602
        return True
×
603

604
    async def _get_ssl_context(self):
5✔
605
        if self.protocol != 's':
×
606
            # using plaintext TCP
607
            return None
×
608

609
        # see if we already have cert for this server; or get it for the first time
610
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
611
        if not self._is_saved_ssl_cert_available():
×
612
            try:
×
613
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
614
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
615
                raise ErrorGettingSSLCertFromServer(e) from e
×
616
        # now we have a file saved in our certificate store
617
        siz = os.stat(self.cert_path).st_size
×
618
        if siz == 0:
×
619
            # CA signed cert
620
            sslc = ca_sslc
×
621
        else:
622
            # pinned self-signed cert
623
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
624
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
625
            #       We explicitly disable it as it breaks lots of servers.
626
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
627
            sslc.check_hostname = False
×
628
        return sslc
×
629

630
    def handle_disconnect(func):
5✔
631
        @functools.wraps(func)
5✔
632
        async def wrapper_func(self: 'Interface', *args, **kwargs):
5✔
633
            try:
×
634
                return await func(self, *args, **kwargs)
×
635
            except GracefulDisconnect as e:
×
636
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
637
            except aiorpcx.jsonrpc.RPCError as e:
×
638
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
639
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
640
            finally:
641
                self.got_disconnected.set()
×
642
                await self.network.connection_down(self)
×
643
                # if was not 'ready' yet, schedule waiting coroutines:
644
                self.ready.cancel()
×
645
        return wrapper_func
5✔
646

647
    @ignore_exceptions  # do not kill network.taskgroup
5✔
648
    @log_exceptions
5✔
649
    @handle_disconnect
5✔
650
    async def run(self):
5✔
651
        try:
×
652
            ssl_context = await self._get_ssl_context()
×
653
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
654
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
655
            return
×
656
        try:
×
657
            await self.open_session(ssl_context)
×
658
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
659
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
660
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
661
                    and self.is_main_server() and not self.network.auto_connect):
662
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
663
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
664
            else:
665
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
666
            return
×
667

668
    def _mark_ready(self) -> None:
5✔
669
        if self.ready.cancelled():
×
670
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
671
        if self.ready.done():
×
672
            return
×
673

674
        assert self.tip_header
×
675
        chain = blockchain.check_header(self.tip_header)
×
676
        if not chain:
×
677
            self.blockchain = blockchain.get_best_chain()
×
678
        else:
679
            self.blockchain = chain
×
680
        assert self.blockchain is not None
×
681

682
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
683

684
        self.ready.set_result(1)
×
685

686
    def is_connected_and_ready(self) -> bool:
5✔
687
        return self.ready.done() and not self.got_disconnected.is_set()
×
688

689
    async def _save_certificate(self) -> None:
5✔
690
        if not os.path.exists(self.cert_path):
×
691
            # we may need to retry this a few times, in case the handshake hasn't completed
692
            for _ in range(10):
×
693
                dercert = await self._fetch_certificate()
×
694
                if dercert:
×
695
                    self.logger.info("succeeded in getting cert")
×
696
                    self._verify_certificate_fingerprint(dercert)
×
697
                    with open(self.cert_path, 'w') as f:
×
698
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
699
                        # workaround android bug
700
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
701
                        f.write(cert)
×
702
                        # even though close flushes, we can't fsync when closed.
703
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
704
                        # fsync writes to OS buffer to disk
705
                        f.flush()
×
706
                        os.fsync(f.fileno())
×
707
                    break
×
708
                await asyncio.sleep(1)
×
709
            else:
710
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
711

712
    async def _fetch_certificate(self) -> bytes:
5✔
713
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
714
        sslc.check_hostname = False
×
715
        sslc.verify_mode = ssl.CERT_NONE
×
716
        async with _RSClient(
×
717
            session_factory=RPCSession,
718
            host=self.host, port=self.port,
719
            ssl=sslc,
720
            proxy=self.proxy,
721
            transport=PaddedRSTransport,
722
        ) as session:
723
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
724
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
725
            return ssl_object.getpeercert(binary_form=True)
×
726

727
    def _get_expected_fingerprint(self) -> Optional[str]:
5✔
728
        if self.is_main_server():
×
729
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
730

731
    def _verify_certificate_fingerprint(self, certificate):
5✔
732
        expected_fingerprint = self._get_expected_fingerprint()
×
733
        if not expected_fingerprint:
×
734
            return
×
735
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
736
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
737
        if not fingerprints_match:
×
738
            util.trigger_callback('cert_mismatch')
×
739
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
740
        self.logger.info("cert fingerprint verification passed")
×
741

742
    async def get_block_header(self, height, assert_mode):
5✔
743
        if not is_non_negative_integer(height):
×
744
            raise Exception(f"{repr(height)} is not a block height")
×
745
        self.logger.info(f'requesting block header {height} in mode {assert_mode}')
×
746
        # use lower timeout as we usually have network.bhi_lock here
747
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
748
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
749
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
750

751
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
5✔
752
        if not is_non_negative_integer(height):
×
753
            raise Exception(f"{repr(height)} is not a block height")
×
754
        index = height // 2016
×
755
        if can_return_early and index in self._requested_chunks:
×
756
            return
×
757
        self.logger.info(f"requesting chunk from height {height}")
×
758
        size = 2016
×
759
        if tip is not None:
×
760
            size = min(size, tip - index * 2016 + 1)
×
761
            size = max(size, 0)
×
762
        try:
×
763
            self._requested_chunks.add(index)
×
764
            res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
×
765
        finally:
766
            self._requested_chunks.discard(index)
×
767
        assert_dict_contains_field(res, field_name='count')
×
768
        assert_dict_contains_field(res, field_name='hex')
×
769
        assert_dict_contains_field(res, field_name='max')
×
770
        assert_non_negative_integer(res['count'])
×
771
        assert_non_negative_integer(res['max'])
×
772
        assert_hex_str(res['hex'])
×
773
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
×
774
            raise RequestCorrupted('inconsistent chunk hex and count')
×
775
        # we never request more than 2016 headers, but we enforce those fit in a single response
776
        if res['max'] < 2016:
×
777
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
×
778
        if res['count'] != size:
×
779
            raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
×
780
        conn = self.blockchain.connect_chunk(index, res['hex'])
×
781
        if not conn:
×
782
            return conn, 0
×
783
        return conn, res['count']
×
784

785
    def is_main_server(self) -> bool:
5✔
786
        return (self.network.interface == self or
×
787
                self.network.interface is None and self.network.default_server == self.server)
788

789
    async def open_session(self, sslc, exit_early=False):
5✔
790
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
791
        async with _RSClient(
×
792
            session_factory=session_factory,
793
            host=self.host, port=self.port,
794
            ssl=sslc,
795
            proxy=self.proxy,
796
            transport=PaddedRSTransport,
797
        ) as session:
798
            self.session = session  # type: NotificationSession
×
799
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
800
            try:
×
801
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
×
802
            except aiorpcx.jsonrpc.RPCError as e:
×
803
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
804
            if exit_early:
×
805
                return
×
806
            if ver[1] != version.PROTOCOL_VERSION:
×
807
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
808
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
809
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
810
                raise GracefulDisconnect(f'too many connected servers already '
×
811
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
812
            self.logger.info(f"connection established. version: {ver}")
×
813

814
            try:
×
815
                async with self.taskgroup as group:
×
816
                    await group.spawn(self.ping)
×
817
                    await group.spawn(self.request_fee_estimates)
×
818
                    await group.spawn(self.run_fetch_blocks)
×
819
                    await group.spawn(self.monitor_connection)
×
820
            except aiorpcx.jsonrpc.RPCError as e:
×
821
                if e.code in (
×
822
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
823
                    JSONRPC.SERVER_BUSY,
824
                    JSONRPC.METHOD_NOT_FOUND,
825
                    JSONRPC.INTERNAL_ERROR,
826
                ):
827
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
828
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
829
                raise
×
830
            finally:
831
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
832

833
    async def monitor_connection(self):
5✔
834
        while True:
×
835
            await asyncio.sleep(1)
×
836
            # If the session/transport is no longer open, we disconnect.
837
            # e.g. if the remote cleanly sends EOF, we would handle that here.
838
            # note: If the user pulls the ethernet cable or disconnects wifi,
839
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
840
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
841
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
842
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
843
            #         Hence, in practice the connection issue will only be detected the next time we try
844
            #         to send a message (plus timeout), which can take minutes...
845
            if not self.session or self.session.is_closing():
×
846
                raise GracefulDisconnect('session was closed')
×
847

848
    async def ping(self):
5✔
849
        # We periodically send a "ping" msg to make sure the server knows we are still here.
850
        # Adding a bit of randomness generates some noise against traffic analysis.
851
        while True:
×
852
            await asyncio.sleep(random.random() * 300)
×
853
            await self.session.send_request('server.ping')
×
854
            await self._maybe_send_noise()
×
855

856
    async def _maybe_send_noise(self):
5✔
857
        while random.random() < 0.2:
×
858
            await asyncio.sleep(random.random())
×
859
            await self.session.send_request('server.ping')
×
860

861
    async def request_fee_estimates(self):
5✔
862
        while True:
×
863
            async with OldTaskGroup() as group:
×
864
                fee_tasks = []
×
865
                for i in FEE_ETA_TARGETS[0:-1]:
×
866
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
867
            for nblock_target, task in fee_tasks:
×
868
                fee = task.result()
×
869
                if fee < 0: continue
×
870
                assert isinstance(fee, int)
×
871
                self.fee_estimates_eta[nblock_target] = fee
×
872
            self.network.update_fee_estimates()
×
873
            await asyncio.sleep(60)
×
874

875
    async def close(self, *, force_after: int = None):
5✔
876
        """Closes the connection and waits for it to be closed.
877
        We try to flush buffered data to the wire, which can take some time.
878
        """
879
        if self.session:
×
880
            await self.session.close(force_after=force_after)
×
881
        # monitor_connection will cancel tasks
882

883
    async def run_fetch_blocks(self):
5✔
884
        header_queue = asyncio.Queue()
×
885
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
886
        while True:
×
887
            item = await header_queue.get()
×
888
            raw_header = item[0]
×
889
            height = raw_header['height']
×
890
            header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
×
891
            self.tip_header = header
×
892
            self.tip = height
×
893
            if self.tip < constants.net.max_checkpoint():
×
894
                raise GracefulDisconnect('server tip below max checkpoint')
×
895
            self._mark_ready()
×
896
            blockchain_updated = await self._process_header_at_tip()
×
897
            # header processing done
898
            if self.is_main_server():
×
899
                self.logger.info(f"new chain tip on main interface. {height=}")
×
900
            if blockchain_updated:
×
901
                util.trigger_callback('blockchain_updated')
×
902
            util.trigger_callback('network_updated')
×
903
            await self.network.switch_unwanted_fork_interface()
×
904
            await self.network.switch_lagging_interface()
×
905
            await self.taskgroup.spawn(self._maybe_send_noise())
×
906

907
    async def _process_header_at_tip(self) -> bool:
5✔
908
        """Returns:
909
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
910
        True - new header we didn't have, or reorg
911
        """
912
        height, header = self.tip, self.tip_header
×
913
        async with self.network.bhi_lock:
×
914
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
915
                # another interface amended the blockchain
916
                return False
×
917
            _, height = await self.step(height, header)
×
918
            # in the simple case, height == self.tip+1
919
            if height <= self.tip:
×
920
                await self.sync_until(height)
×
921
            return True
×
922

923
    async def sync_until(self, height, next_height=None):
5✔
924
        if next_height is None:
5✔
925
            next_height = self.tip
×
926
        last = None
5✔
927
        while last is None or height <= next_height:
5✔
928
            prev_last, prev_height = last, height
5✔
929
            if next_height > height + 10:
5✔
930
                could_connect, num_headers = await self.request_chunk(height, next_height)
×
931
                if not could_connect:
×
932
                    if height <= constants.net.max_checkpoint():
×
933
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
934
                    last, height = await self.step(height)
×
935
                    continue
×
936
                util.trigger_callback('blockchain_updated')
×
937
                util.trigger_callback('network_updated')
×
938
                height = (height // 2016 * 2016) + num_headers
×
939
                assert height <= next_height+1, (height, self.tip)
×
940
                last = 'catchup'
×
941
            else:
942
                last, height = await self.step(height)
5✔
943
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
5✔
944
        return last, height
5✔
945

946
    async def step(self, height, header=None):
5✔
947
        assert 0 <= height <= self.tip, (height, self.tip)
5✔
948
        if header is None:
5✔
949
            header = await self.get_block_header(height, 'catchup')
5✔
950

951
        chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
952
        if chain:
5✔
953
            self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
×
954
            # note: there is an edge case here that is not handled.
955
            # we might know the blockhash (enough for check_header) but
956
            # not have the header itself. e.g. regtest chain with only genesis.
957
            # this situation resolves itself on the next block
958
            return 'catchup', height+1
×
959

960
        can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
961
        if not can_connect:
5✔
962
            self.logger.info(f"can't connect new block: {height=}")
5✔
963
            height, header, bad, bad_header = await self._search_headers_backwards(height, header)
5✔
964
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
965
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
966
            assert chain or can_connect
5✔
967
        if can_connect:
5✔
968
            self.logger.info(f"new block: {height=}")
5✔
969
            height += 1
5✔
970
            if isinstance(can_connect, Blockchain):  # not when mocking
5✔
971
                self.blockchain = can_connect
×
972
                self.blockchain.save_header(header)
×
973
            return 'catchup', height
5✔
974

975
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
5✔
976
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
5✔
977

978
    async def _search_headers_binary(self, height, bad, bad_header, chain):
5✔
979
        assert bad == bad_header['block_height']
5✔
980
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
981

982
        self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
983
        good = height
5✔
984
        while True:
5✔
985
            assert good < bad, (good, bad)
5✔
986
            height = (good + bad) // 2
5✔
987
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
5✔
988
            header = await self.get_block_header(height, 'binary')
5✔
989
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
990
            if chain:
5✔
991
                self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
992
                good = height
5✔
993
            else:
994
                bad = height
5✔
995
                bad_header = header
5✔
996
            if good + 1 == bad:
5✔
997
                break
5✔
998

999
        mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
5✔
1000
        real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
5✔
1001
        if not real and not mock:
5✔
1002
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1003
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1004

1005
        self.logger.info(f"binary search exited. good {good}, bad {bad}")
5✔
1006
        return good, bad, bad_header
5✔
1007

1008
    async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
5✔
1009
        assert good + 1 == bad
5✔
1010
        assert bad == bad_header['block_height']
5✔
1011
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1012
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1013
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1014

1015
        bh = self.blockchain.height()
5✔
1016
        assert bh >= good, (bh, good)
5✔
1017
        if bh == good:
5✔
1018
            height = good + 1
×
1019
            self.logger.info(f"catching up from {height}")
×
1020
            return 'no_fork', height
×
1021

1022
        # this is a new fork we don't yet have
1023
        height = bad + 1
5✔
1024
        self.logger.info(f"new fork at bad height {bad}")
5✔
1025
        forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
5✔
1026
        b = forkfun(bad_header)  # type: Blockchain
5✔
1027
        self.blockchain = b
5✔
1028
        assert b.forkpoint == bad
5✔
1029
        return 'fork', height
5✔
1030

1031
    async def _search_headers_backwards(self, height, header):
5✔
1032
        async def iterate():
5✔
1033
            nonlocal height, header
1034
            checkp = False
5✔
1035
            if height <= constants.net.max_checkpoint():
5✔
1036
                height = constants.net.max_checkpoint()
×
1037
                checkp = True
×
1038
            header = await self.get_block_header(height, 'backward')
5✔
1039
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
1040
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
1041
            if chain or can_connect:
5✔
1042
                return False
5✔
1043
            if checkp:
5✔
1044
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1045
            return True
5✔
1046

1047
        bad, bad_header = height, header
5✔
1048
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1049
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
5✔
1050
        local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
5✔
1051
        height = min(local_max + 1, height - 1)
5✔
1052
        while await iterate():
5✔
1053
            bad, bad_header = height, header
5✔
1054
            delta = self.tip - height
5✔
1055
            height = self.tip - 2 * delta
5✔
1056

1057
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1058
        self.logger.info(f"exiting backward mode at {height}")
5✔
1059
        return height, header, bad, bad_header
5✔
1060

1061
    @classmethod
5✔
1062
    def client_name(cls) -> str:
5✔
1063
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1064

1065
    def is_tor(self):
5✔
1066
        return self.host.endswith('.onion')
×
1067

1068
    def ip_addr(self) -> Optional[str]:
5✔
1069
        session = self.session
×
1070
        if not session: return None
×
1071
        peer_addr = session.remote_address()
×
1072
        if not peer_addr: return None
×
1073
        return str(peer_addr.host)
×
1074

1075
    def bucket_based_on_ipaddress(self) -> str:
5✔
1076
        def do_bucket():
×
1077
            if self.is_tor():
×
1078
                return BUCKET_NAME_OF_ONION_SERVERS
×
1079
            try:
×
1080
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1081
            except ValueError:
×
1082
                return ''
×
1083
            if not ip_addr:
×
1084
                return ''
×
1085
            if ip_addr.is_loopback:  # localhost is exempt
×
1086
                return ''
×
1087
            if ip_addr.version == 4:
×
1088
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1089
                return str(slash16)
×
1090
            elif ip_addr.version == 6:
×
1091
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1092
                return str(slash48)
×
1093
            return ''
×
1094

1095
        if not self._ipaddr_bucket:
×
1096
            self._ipaddr_bucket = do_bucket()
×
1097
        return self._ipaddr_bucket
×
1098

1099
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
1100
        if not is_hash256_str(tx_hash):
×
1101
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1102
        if not is_non_negative_integer(tx_height):
×
1103
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1104
        # do request
1105
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1106
        # check response
1107
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1108
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1109
        pos = assert_dict_contains_field(res, field_name='pos')
×
1110
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1111
        assert_non_negative_integer(block_height)
×
1112
        assert_non_negative_integer(pos)
×
1113
        assert_list_or_tuple(merkle)
×
1114
        for item in merkle:
×
1115
            assert_hash256_str(item)
×
1116
        return res
×
1117

1118
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
1119
        if not is_hash256_str(tx_hash):
×
1120
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1121
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
1122
        # validate response
1123
        if not is_hex_str(raw):
×
1124
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1125
        tx = Transaction(raw)
×
1126
        try:
×
1127
            tx.deserialize()  # see if raises
×
1128
        except Exception as e:
×
1129
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1130
        if tx.txid() != tx_hash:
×
1131
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1132
        return raw
×
1133

1134
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1135
        if not is_hash256_str(sh):
×
1136
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1137
        # do request
1138
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1139
        # check response
1140
        assert_list_or_tuple(res)
×
1141
        prev_height = 1
×
1142
        for tx_item in res:
×
1143
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1144
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1145
            assert_integer(height)
×
1146
            assert_hash256_str(tx_item['tx_hash'])
×
1147
            if height in (-1, 0):
×
1148
                assert_dict_contains_field(tx_item, field_name='fee')
×
1149
                assert_non_negative_integer(tx_item['fee'])
×
1150
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1151
            else:
1152
                # check monotonicity of heights
1153
                if height < prev_height:
×
1154
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1155
                prev_height = height
×
1156
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1157
        if len(hashes) != len(res):
×
1158
            # Either server is sending garbage... or maybe if server is race-prone
1159
            # a recently mined tx could be included in both last block and mempool?
1160
            # Still, it's simplest to just disregard the response.
1161
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1162
        return res
×
1163

1164
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1165
        if not is_hash256_str(sh):
×
1166
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1167
        # do request
1168
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1169
        # check response
1170
        assert_list_or_tuple(res)
×
1171
        for utxo_item in res:
×
1172
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1173
            assert_dict_contains_field(utxo_item, field_name='value')
×
1174
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1175
            assert_dict_contains_field(utxo_item, field_name='height')
×
1176
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1177
            assert_non_negative_integer(utxo_item['value'])
×
1178
            assert_non_negative_integer(utxo_item['height'])
×
1179
            assert_hash256_str(utxo_item['tx_hash'])
×
1180
        return res
×
1181

1182
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1183
        if not is_hash256_str(sh):
×
1184
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1185
        # do request
1186
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1187
        # check response
1188
        assert_dict_contains_field(res, field_name='confirmed')
×
1189
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1190
        assert_non_negative_integer(res['confirmed'])
×
1191
        assert_integer(res['unconfirmed'])
×
1192
        return res
×
1193

1194
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
5✔
1195
        if not is_non_negative_integer(tx_height):
×
1196
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1197
        if not is_non_negative_integer(tx_pos):
×
1198
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1199
        # do request
1200
        res = await self.session.send_request(
×
1201
            'blockchain.transaction.id_from_pos',
1202
            [tx_height, tx_pos, merkle],
1203
        )
1204
        # check response
1205
        if merkle:
×
1206
            assert_dict_contains_field(res, field_name='tx_hash')
×
1207
            assert_dict_contains_field(res, field_name='merkle')
×
1208
            assert_hash256_str(res['tx_hash'])
×
1209
            assert_list_or_tuple(res['merkle'])
×
1210
            for node_hash in res['merkle']:
×
1211
                assert_hash256_str(node_hash)
×
1212
        else:
1213
            assert_hash256_str(res)
×
1214
        return res
×
1215

1216
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
5✔
1217
        # do request
1218
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1219
        # check response
1220
        assert_list_or_tuple(res)
×
1221
        prev_fee = float('inf')
×
1222
        for fee, s in res:
×
1223
            assert_non_negative_int_or_float(fee)
×
1224
            assert_non_negative_integer(s)
×
1225
            if fee >= prev_fee:  # check monotonicity
×
1226
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1227
            prev_fee = fee
×
1228
        return res
×
1229

1230
    async def get_server_banner(self) -> str:
5✔
1231
        # do request
1232
        res = await self.session.send_request('server.banner')
×
1233
        # check response
1234
        if not isinstance(res, str):
×
1235
            raise RequestCorrupted(f'{res!r} should be a str')
×
1236
        return res
×
1237

1238
    async def get_donation_address(self) -> str:
5✔
1239
        # do request
1240
        res = await self.session.send_request('server.donation_address')
×
1241
        # check response
1242
        if not res:  # ignore empty string
×
1243
            return ''
×
1244
        if not bitcoin.is_address(res):
×
1245
            # note: do not hard-fail -- allow server to use future-type
1246
            #       bitcoin address we do not recognize
1247
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1248
            res = ''
×
1249
        return res
×
1250

1251
    async def get_relay_fee(self) -> int:
5✔
1252
        """Returns the min relay feerate in sat/kbyte."""
1253
        # do request
1254
        res = await self.session.send_request('blockchain.relayfee')
×
1255
        # check response
1256
        assert_non_negative_int_or_float(res)
×
1257
        relayfee = int(res * bitcoin.COIN)
×
1258
        relayfee = max(0, relayfee)
×
1259
        return relayfee
×
1260

1261
    async def get_estimatefee(self, num_blocks: int) -> int:
5✔
1262
        """Returns a feerate estimate for getting confirmed within
1263
        num_blocks blocks, in sat/kbyte.
1264
        Returns -1 if the server could not provide an estimate.
1265
        """
1266
        if not is_non_negative_integer(num_blocks):
×
1267
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1268
        # do request
1269
        try:
×
1270
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1271
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1272
            # The protocol spec says the server itself should already have returned -1
1273
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1274
            # and sends an error instead. Convert it here:
1275
            if "cannot estimate fee" in e.message:
×
1276
                res = -1
×
1277
            else:
1278
                raise
×
1279
        except aiorpcx.jsonrpc.RPCError as e:
×
1280
            # The protocol spec says the server itself should already have returned -1
1281
            # if it cannot provide an estimate. "Fulcrum" often sends:
1282
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1283
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1284
                res = -1
×
1285
            else:
1286
                raise
×
1287
        # check response
1288
        if res != -1:
×
1289
            assert_non_negative_int_or_float(res)
×
1290
            res = int(res * bitcoin.COIN)
×
1291
        return res
×
1292

1293

1294
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
5✔
1295
    chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
1296
    if chain_bad:
5✔
1297
        raise Exception('bad_header must not check!')
×
1298

1299

1300
def check_cert(host, cert):
5✔
1301
    try:
×
1302
        b = pem.dePem(cert, 'CERTIFICATE')
×
1303
        x = x509.X509(b)
×
1304
    except Exception:
×
1305
        traceback.print_exc(file=sys.stdout)
×
1306
        return
×
1307

1308
    try:
×
1309
        x.check_date()
×
1310
        expired = False
×
1311
    except Exception:
×
1312
        expired = True
×
1313

1314
    m = "host: %s\n"%host
×
1315
    m += "has_expired: %s\n"% expired
×
1316
    util.print_msg(m)
×
1317

1318

1319
# Used by tests
1320
def _match_hostname(name, val):
5✔
1321
    if val == name:
×
1322
        return True
×
1323

1324
    return val.startswith('*.') and name.endswith(val[1:])
×
1325

1326

1327
def test_certificates():
5✔
1328
    from .simple_config import SimpleConfig
×
1329
    config = SimpleConfig()
×
1330
    mydir = os.path.join(config.path, "certs")
×
1331
    certs = os.listdir(mydir)
×
1332
    for c in certs:
×
1333
        p = os.path.join(mydir,c)
×
1334
        with open(p, encoding='utf-8') as f:
×
1335
            cert = f.read()
×
1336
        check_cert(c, cert)
×
1337

1338
if __name__ == "__main__":
5✔
1339
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc