• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5756773899960320

22 May 2026 11:52AM UTC coverage: 65.07% (-0.2%) from 65.246%
5756773899960320

Pull #10627

CirrusCI

SomberNight
interface: protocol 1.7: impl new "server.ping" semantics
Pull Request #10627: Protocol 1.7

107 of 199 new or added lines in 13 files covered. (53.77%)

47 existing lines in 7 files now uncovered.

24748 of 38033 relevant lines covered (65.07%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.42
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str, versiontuple)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException, script_to_scripthash
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
KNOWN_ELEC_PROTOCOL_TRANSPORTS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in KNOWN_ELEC_PROTOCOL_TRANSPORTS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85
RPC_ERROR_HISTORY_TOO_LONG = 10001
1✔
86

87
class HistoryTooLong(Exception):
1✔
88
    # we should not close the connection in that case
89
    pass
1✔
90

91
class NetworkTimeout:
1✔
92
    # seconds
93
    class Generic:
1✔
94
        NORMAL = 30
1✔
95
        RELAXED = 45
1✔
96
        MOST_RELAXED = 600
1✔
97

98
    class Urgent(Generic):
1✔
99
        NORMAL = 10
1✔
100
        RELAXED = 20
1✔
101
        MOST_RELAXED = 60
1✔
102

103

104
def assert_non_negative_integer(val: Any) -> None:
1✔
105
    if not is_non_negative_integer(val):
1✔
106
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
107

108

109
def assert_integer(val: Any) -> None:
1✔
110
    if not is_integer(val):
1✔
111
        raise RequestCorrupted(f'{val!r} should be an integer')
×
112

113

114
def assert_int_or_float(val: Any) -> None:
1✔
115
    if not is_int_or_float(val):
×
116
        raise RequestCorrupted(f'{val!r} should be int or float')
×
117

118

119
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
120
    if not is_non_negative_int_or_float(val):
1✔
121
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
122

123

124
def assert_hash256_str(val: Any) -> None:
1✔
125
    if not is_hash256_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
127

128

129
def assert_hex_str(val: Any, *, allow_odd_len: bool = False) -> None:
1✔
130
    if not is_hex_str(val, allow_odd_len=allow_odd_len):
1✔
UNCOV
131
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
132

133

134
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
135
    if not isinstance(d, dict):
1✔
136
        raise RequestCorrupted(f'{d!r} should be a dict')
×
137
    if field_name not in d:
1✔
138
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
139
    return d[field_name]
1✔
140

141

142
def assert_list_or_tuple(val: Any) -> None:
1✔
143
    if not isinstance(val, (list, tuple)):
1✔
144
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
145

146

147
def protocol_tuple(s: Any) -> tuple[int, ...]:
1✔
148
    """Converts a protocol version number, such as "1.0" to a tuple (1, 0).
149

150
    If the version number is bad, (0, ) indicating version 0 is returned.
151
    """
152
    try:
1✔
153
        assert isinstance(s, str)
1✔
154
        return versiontuple(s)
1✔
155
    except Exception:
×
156
        return (0, )
×
157

158

159
class ChainResolutionMode(enum.Enum):
1✔
160
    CATCHUP = enum.auto()
1✔
161
    BACKWARD = enum.auto()
1✔
162
    BINARY = enum.auto()
1✔
163
    FORK = enum.auto()
1✔
164
    NO_FORK = enum.auto()
1✔
165

166

167
class NotificationSession(RPCSession):
1✔
168

169
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
170
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
171
        self.subscriptions = defaultdict(list)
1✔
172
        self.cache = {}
1✔
173
        self._msg_counter = itertools.count(start=1)
1✔
174
        self.interface = interface
1✔
175
        self.taskgroup = interface.taskgroup
1✔
176
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
177

178
        # To log pre-processed json traffic, uncomment:
179
        #self.logger.setLevel(logging.DEBUG)  # from aiorpcx
180
        #self.verbosity = 4
181

182
    async def handle_request(self, request: aiorpcx.Request | aiorpcx.Notification) -> None:
1✔
183
        self.maybe_log(f"--> {request}")
1✔
184
        # TODO handle request.args being a dict
185
        try:
1✔
186
            if isinstance(request, Notification):
1✔
187
                if request.method == "server.ping":
1✔
NEW
188
                    data = request.args[0]
×
NEW
189
                    await self.interface.phandle_on_ping_notification(data=data)
×
NEW
190
                    return
×
191
                params, result = request.args[:-1], request.args[-1]
1✔
192
                key = self.get_hashable_key_for_rpc_call(request.method, params)
1✔
193
                if key in self.subscriptions:
1✔
194
                    self.cache[key] = result
1✔
195
                    for queue in self.subscriptions[key]:
1✔
196
                        await queue.put(request.args)
1✔
197
                else:
198
                    raise Exception(f'unexpected notification')
×
199
            else:
200
                raise Exception(f'unexpected request. not a notification')
×
201
        except Exception as e:
×
202
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
203
            await self.close()
×
204

205
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
206
        # note: semaphores/timeouts/backpressure etc are handled by
207
        # aiorpcx. the timeout arg here in most cases should not be set
208
        msg_id = next(self._msg_counter)
1✔
209
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
210
        try:
1✔
211
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
212
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
213
            response = await util.wait_for2(
1✔
214
                super().send_request(*args, **kwargs),
215
                timeout)
216
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
217
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
218
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
219
        except CodeMessageError as e:
1✔
220
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
221
            if e.code == RPC_ERROR_HISTORY_TOO_LONG:
1✔
NEW
222
                raise HistoryTooLong()
×
223
            else:
224
                raise
1✔
225
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
226
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
227
            raise
×
228
        else:
229
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
230
            return response
1✔
231

232
    def set_default_timeout(self, timeout):
1✔
233
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
234
        self.sent_request_timeout = timeout
1✔
235
        assert hasattr(self, "max_send_delay")        # in base class
1✔
236
        self.max_send_delay = timeout
1✔
237

238
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
239
        # note: until the cache is written for the first time,
240
        # each 'subscribe' call might make a request on the network.
241
        params2 = params[:]
1✔
242
        if method == 'blockchain.scriptpubkey.subscribe':
1✔
243
            params2[0] = script_to_scripthash(bytes.fromhex(params2[0]))
1✔
244
        key = self.get_hashable_key_for_rpc_call(method, params2)
1✔
245
        self.subscriptions[key].append(queue)
1✔
246
        if key in self.cache:
1✔
247
            result = self.cache[key]
×
248
        else:
249
            result = await self.send_request(method, params)
1✔
250
            self.cache[key] = result
1✔
251
        await queue.put(params2 + [result])
1✔
252

253
    def unsubscribe(self, queue):
1✔
254
        """Unsubscribe a callback to free object references to enable GC."""
255
        # note: we can't unsubscribe from the server, so we keep receiving
256
        # subsequent notifications
257
        for v in self.subscriptions.values():
1✔
258
            if queue in v:
1✔
259
                v.remove(queue)
1✔
260

261
    @classmethod
1✔
262
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
263
        """Hashable index for subscriptions and cache"""
264
        return str(method) + repr(params)
1✔
265

266
    def maybe_log(self, msg: str) -> None:
1✔
267
        if not self.interface: return
1✔
268
        if self.interface.debug or self.interface.network.debug:
1✔
269
            self.interface.logger.debug(msg)
1✔
270

271
    def default_framer(self):
1✔
272
        # overridden so that max_size can be customized
273
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
274
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
275
        return NewlineFramer(max_size=max_size)
1✔
276

277
    async def close(self, *, force_after: int = None):
1✔
278
        """Closes the connection and waits for it to be closed.
279
        We try to flush buffered data to the wire, which can take some time.
280
        """
281
        if force_after is None:
1✔
282
            # We give up after a while and just abort the connection.
283
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
284
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
285
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
286
            #       wait until this timeout is triggered
287
            force_after = 1  # seconds
1✔
288
        await super().close(force_after=force_after)
1✔
289

290

291
class NetworkException(Exception): pass
1✔
292

293

294
class GracefulDisconnect(NetworkException):
1✔
295
    log_level = logging.INFO
1✔
296

297
    def __init__(self, *args, log_level=None, **kwargs):
1✔
298
        Exception.__init__(self, *args, **kwargs)
1✔
299
        if log_level is not None:
1✔
300
            self.log_level = log_level
×
301

302

303
class RequestTimedOut(GracefulDisconnect):
1✔
304
    def __str__(self):
1✔
305
        return _("Network request timed out.")
×
306

307

308
class RequestCorrupted(Exception): pass
1✔
309

310
class ErrorParsingSSLCert(Exception): pass
1✔
311
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
312
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
313
class InvalidOptionCombination(Exception): pass
1✔
314
class ConnectError(NetworkException): pass
1✔
315

316

317
class TxBroadcastError(NetworkException):
1✔
318
    def get_message_for_gui(self):
1✔
319
        raise NotImplementedError()
×
320

321

322
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
323
    def get_message_for_gui(self):
1✔
324
        return "{}\n{}\n\n{}" \
×
325
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
326
                    _("Consider trying to connect to a different server, or updating Electrum."),
327
                    str(self))
328

329

330
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
331
    def get_message_for_gui(self):
1✔
332
        return "{}\n{}\n\n{}" \
×
333
            .format(_("The server returned an error when broadcasting the transaction."),
334
                    _("Consider trying to connect to a different server, or updating Electrum."),
335
                    str(self))
336

337

338
class TxBroadcastUnknownError(TxBroadcastError):
1✔
339
    def get_message_for_gui(self):
1✔
340
        return "{}\n{}" \
×
341
            .format(_("Unknown error when broadcasting the transaction."),
342
                    _("Consider trying to connect to a different server, or updating Electrum."))
343

344

345
class _RSClient(RSClient):
1✔
346
    async def create_connection(self):
1✔
347
        try:
1✔
348
            return await super().create_connection()
1✔
349
        except OSError as e:
×
350
            # note: using "from e" here will set __cause__ of ConnectError
351
            raise ConnectError(e) from e
×
352

353

354
class PaddedRSTransport(RSTransport):
1✔
355
    """A raw socket transport that provides basic countermeasures against traffic analysis
356
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
357
    (it is assumed that a network observer does not see plaintext transport contents,
358
    due to it being wrapped e.g. in TLS)
359
    """
360

361
    MIN_PACKET_SIZE = 1024
1✔
362
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
363
    # (unpadded) amount of bytes sent instantly before beginning with polling.
364
    # This makes the initial handshake where a few small messages are exchanged faster.
365
    WARMUP_BUDGET_SIZE = 1024
1✔
366

367
    session: Optional['RPCSession']
1✔
368

369
    def __init__(self, *args, **kwargs):
1✔
370
        RSTransport.__init__(self, *args, **kwargs)
1✔
371
        self._sbuffer = bytearray()  # "send buffer"
1✔
372
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
373
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
374
        self._last_send = time.monotonic()
1✔
375
        self._force_send = False  # type: bool
1✔
376

377
    # note: this does not call super().write() but is a complete reimplementation
378
    async def write(self, message):
1✔
379
        await self._can_send.wait()
1✔
380
        if self.is_closing():
1✔
381
            return
×
382
        framed_message = self._framer.frame(message)
1✔
383
        self._sbuffer += framed_message
1✔
384
        self._sbuffer_has_data_evt.set()
1✔
385
        self._maybe_consume_sbuffer()
1✔
386

387
    def _maybe_consume_sbuffer(self) -> None:
1✔
388
        """Maybe take some data from sbuffer and send it on the wire."""
389
        if not self._can_send.is_set() or self.is_closing():
1✔
390
            return
1✔
391
        buf = self._sbuffer
1✔
392
        if not buf:
1✔
393
            return
1✔
394
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
395
        if not (
1✔
396
            self._force_send
397
            or len(buf) >= self.MIN_PACKET_SIZE
398
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
399
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
400
        ):
401
            return
×
402
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
403
        # either (1) pad length to next power of two, to create "lsize" packet:
404
        payload_lsize = len(buf)
1✔
405
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
406
        npad_lsize = total_lsize - payload_lsize
1✔
407
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
408
        # and create a packet with half that size ("ssize", s for small)
409
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
410
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
411
        if payload_ssize != -1:
1✔
412
            payload_ssize += 1  # for "\n" char
1✔
413
            npad_ssize = total_ssize - payload_ssize
1✔
414
        else:
415
            npad_ssize = float("inf")
×
416
        # decide between (1) and (2):
417
        if self._force_send or npad_lsize <= npad_ssize:
1✔
418
            # (1) create "lsize" packet: consume full buffer
419
            npad = npad_lsize
1✔
420
            p_idx = payload_lsize
1✔
421
        else:
422
            # (2) create "ssize" packet: consume some, but defer some for later
423
            npad = npad_ssize
×
424
            p_idx = payload_ssize
×
425
        # pad by adding spaces near end
426
        # self.session.maybe_log(
427
        #     f"PaddedRSTransport. calling low-level write(). "
428
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
429
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
430
        # )
431
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
432
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
433
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
434
        self._asyncio_transport.write(buf2)
1✔
435
        self._last_send = time.monotonic()
1✔
436
        del self._sbuffer[:p_idx]
1✔
437
        if not self._sbuffer:
1✔
438
            self._sbuffer_has_data_evt.clear()
1✔
439

440
    async def _poll_sbuffer(self):
1✔
441
        while not self.is_closing():
1✔
442
            await self._can_send.wait()
1✔
443
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
444
            self._maybe_consume_sbuffer()
1✔
445
            # If there is still data in the buffer, sleep until it would time out.
446
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
447
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
448
            if len(self._sbuffer) > 0:
1✔
449
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
450
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
451
                await asyncio.sleep(timeout_rel)
×
452

453
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
454
        super().connection_made(transport)
1✔
455
        if isinstance(self.session, NotificationSession):
1✔
456
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
457
            self._sbuffer_task = self.loop.create_task(coro)
1✔
458
        else:
459
            # This a short-lived "fetch_certificate"-type session.
460
            # No polling here, we always force-empty the buffer.
461
            self._force_send = True
×
462

463
    async def close(self, *args, **kwargs):
1✔
464
        '''Close the connection and return when closed.'''
465
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
466
        self._force_send = True
1✔
467
        self._maybe_consume_sbuffer()
1✔
468
        await super().close(*args, **kwargs)
1✔
469

470

471
class ServerAddr:
1✔
472

473
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
474
        assert isinstance(host, str), repr(host)
1✔
475
        if protocol is None:
1✔
476
            protocol = 's'
×
477
        if not host:
1✔
478
            raise ValueError('host must not be empty')
×
479
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
480
            host = host[1:-1]
1✔
481
        try:
1✔
482
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
483
        except Exception as e:
1✔
484
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
485
        if protocol not in KNOWN_ELEC_PROTOCOL_TRANSPORTS:
1✔
486
            raise ValueError(f"invalid network protocol: {protocol}")
×
487
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
488
        self.port = int(net_addr.port)
1✔
489
        self.protocol = protocol
1✔
490
        self._net_addr_str = str(net_addr)
1✔
491

492
    @classmethod
1✔
493
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
494
        """Constructs a ServerAddr or raises ValueError."""
495
        # host might be IPv6 address, hence do rsplit:
496
        host, port, protocol = str(s).rsplit(':', 2)
1✔
497
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
498

499
    @classmethod
1✔
500
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
501
        """Construct ServerAddr from str, guessing missing details.
502
        Does not raise - just returns None if guessing failed.
503
        Ongoing compatibility not guaranteed.
504
        """
505
        if not s:
1✔
506
            return None
×
507
        host = ""
1✔
508
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
509
            host_end = s.index("]")
1✔
510
            host = s[1:host_end]
1✔
511
            s = s[host_end+1:]
1✔
512
        items = str(s).rsplit(':', 2)
1✔
513
        if len(items) < 2:
1✔
514
            return None  # although maybe we could guess the port too?
1✔
515
        host = host or items[0]
1✔
516
        port = items[1]
1✔
517
        if len(items) >= 3:
1✔
518
            protocol = items[2]
1✔
519
        else:
520
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
521
        try:
1✔
522
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
523
        except ValueError:
1✔
524
            return None
1✔
525

526
    def to_friendly_name(self) -> str:
1✔
527
        # note: this method is closely linked to from_str_with_inference
528
        if self.protocol == 's':  # hide trailing ":s"
1✔
529
            return self.net_addr_str()
1✔
530
        return str(self)
1✔
531

532
    def __str__(self):
1✔
533
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
534

535
    def to_json(self) -> str:
1✔
536
        return str(self)
×
537

538
    def __repr__(self):
1✔
539
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
540

541
    def net_addr_str(self) -> str:
1✔
542
        return self._net_addr_str
1✔
543

544
    def __eq__(self, other):
1✔
545
        if not isinstance(other, ServerAddr):
1✔
546
            return False
×
547
        return (self.host == other.host
1✔
548
                and self.port == other.port
549
                and self.protocol == other.protocol)
550

551
    def __ne__(self, other):
1✔
552
        return not (self == other)
×
553

554
    def __hash__(self):
1✔
555
        return hash((self.host, self.port, self.protocol))
×
556

557

558
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
559
    filename = host
1✔
560
    try:
1✔
561
        ip = ip_address(host)
1✔
562
    except ValueError:
1✔
563
        pass
1✔
564
    else:
565
        if isinstance(ip, IPv6Address):
1✔
566
            filename = f"ipv6_{ip.packed.hex()}"
×
567
    return os.path.join(config.path, 'certs', filename)
1✔
568

569

570
class Interface(Logger):
1✔
571

572
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
573
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
574
        self.ready = network.asyncio_loop.create_future()
1✔
575
        self.got_disconnected = asyncio.Event()
1✔
576
        self._blockchain_updated = asyncio.Event()
1✔
577
        self.server = server
1✔
578
        Logger.__init__(self)
1✔
579
        assert network.config.path
1✔
580
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
581
        self.blockchain = None  # type: Optional[Blockchain]
1✔
582
        self._requested_chunks = set()  # type: Set[int]
1✔
583
        self.network = network
1✔
584
        self.session = None  # type: Optional[NotificationSession]
1✔
585
        self._ipaddr_bucket = None
1✔
586
        # Set up proxy.
587
        # - for servers running on localhost, the proxy is not used. If user runs their own server
588
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
589
        #   note: we could maybe relax this further and bypass the proxy for all private
590
        #         addresses...? e.g. 192.168.x.x
591
        if util.is_localhost(server.host):
1✔
592
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
593
            self.proxy = None
1✔
594
        else:
595
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
596

597
        # Latest block header and corresponding height, as claimed by the server.
598
        # Note that these values are updated before they are verified.
599
        # Especially during initial header sync, verification can take a long time.
600
        # Failing verification will get the interface closed.
601
        self.tip_header = None  # type: Optional[dict]
1✔
602
        self.tip = 0
1✔
603

604
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
605
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
606

607
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
608

609
        self.active_protocol_tuple = (0,)  # type: Optional[tuple[int, ...]]
1✔
610

611
        # Dump network messages (only for this interface).  Set at runtime from the console.
612
        self.debug = False
1✔
613

614
        self.taskgroup = OldTaskGroup()
1✔
615

616
        async def spawn_task():
1✔
617
            task = await self.network.taskgroup.spawn(self.run())
1✔
618
            task.set_name(f"interface::{str(server)}")
1✔
619
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
620

621
    @property
1✔
622
    def host(self):
1✔
623
        return self.server.host
1✔
624

625
    @property
1✔
626
    def port(self):
1✔
627
        return self.server.port
1✔
628

629
    @property
1✔
630
    def protocol(self):
1✔
631
        return self.server.protocol
1✔
632

633
    def diagnostic_name(self):
1✔
634
        return self.server.net_addr_str()
1✔
635

636
    def __str__(self):
1✔
637
        return f"<Interface {self.diagnostic_name()}>"
×
638

639
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
640
        """Given a CA enforcing SSL context, returns True if the connection
641
        can be established. Returns False if the server has a self-signed
642
        certificate but otherwise is okay. Any other failures raise.
643
        """
644
        try:
×
645
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
646
        except ConnectError as e:
×
647
            cause = e.__cause__
×
648
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
649
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
650
                    and cause.verify_code == 18):  # "self signed certificate"
651
                # Good. We will use this server as self-signed.
652
                return False
×
653
            # Not good. Cannot use this server.
654
            raise
×
655
        # Good. We will use this server as CA-signed.
656
        return True
×
657

658
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
659
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
660
        if ca_signed:
×
661
            if self._get_expected_fingerprint():
×
662
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
663
            with open(self.cert_path, 'w') as f:
×
664
                # empty file means this is CA signed, not self-signed
665
                f.write('')
×
666
        else:
667
            await self._save_certificate()
×
668

669
    def _is_saved_ssl_cert_available(self):
1✔
670
        if not os.path.exists(self.cert_path):
×
671
            return False
×
672
        with open(self.cert_path, 'r') as f:
×
673
            contents = f.read()
×
674
        if contents == '':  # CA signed
×
675
            if self._get_expected_fingerprint():
×
676
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
677
            return True
×
678
        # pinned self-signed cert
679
        try:
×
680
            b = pem.dePem(contents, 'CERTIFICATE')
×
681
        except SyntaxError as e:
×
682
            self.logger.info(f"error parsing already saved cert: {e}")
×
683
            raise ErrorParsingSSLCert(e) from e
×
684
        try:
×
685
            x = x509.X509(b)
×
686
        except Exception as e:
×
687
            self.logger.info(f"error parsing already saved cert: {e}")
×
688
            raise ErrorParsingSSLCert(e) from e
×
689
        try:
×
690
            x.check_date()
×
691
        except x509.CertificateError as e:
×
692
            self.logger.info(f"certificate has expired: {e}")
×
693
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
694
            return False
×
695
        self._verify_certificate_fingerprint(bytes(b))
×
696
        return True
×
697

698
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
699
        if self.protocol != 's':
1✔
700
            # using plaintext TCP
701
            return None
1✔
702

703
        # see if we already have cert for this server; or get it for the first time
704
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
705
        if not self._is_saved_ssl_cert_available():
×
706
            try:
×
707
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
708
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
709
                raise ErrorGettingSSLCertFromServer(e) from e
×
710
        # now we have a file saved in our certificate store
711
        siz = os.stat(self.cert_path).st_size
×
712
        if siz == 0:
×
713
            # CA signed cert
714
            sslc = ca_sslc
×
715
        else:
716
            # pinned self-signed cert
717
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
718
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
719
            #       We explicitly disable it as it breaks lots of servers.
720
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
721
            sslc.check_hostname = False
×
722
        return sslc
×
723

724
    def handle_disconnect(func):
1✔
725
        @functools.wraps(func)
1✔
726
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
727
            try:
1✔
728
                return await func(self, *args, **kwargs)
1✔
729
            except GracefulDisconnect as e:
×
730
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
731
            except aiorpcx.jsonrpc.RPCError as e:
×
732
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
733
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
734
            finally:
735
                self.got_disconnected.set()
1✔
736
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
737
                # in case the "with taskgroup" ctx mgr never got a chance to run:
738
                await self.taskgroup.cancel_remaining()
1✔
739
                await self.network.connection_down(self)
1✔
740
                # if was not 'ready' yet, schedule waiting coroutines:
741
                self.ready.cancel()
1✔
742
        return wrapper_func
1✔
743

744
    @ignore_exceptions  # do not kill network.taskgroup
1✔
745
    @log_exceptions
1✔
746
    @handle_disconnect
1✔
747
    async def run(self):
1✔
748
        try:
1✔
749
            ssl_context = await self._get_ssl_context()
1✔
750
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
751
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
752
            return
×
753
        try:
1✔
754
            await self.open_session(ssl_context=ssl_context)
1✔
755
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
756
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
757
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
758
                    and self.is_main_server() and not self.network.auto_connect):
759
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
760
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
761
            else:
762
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
763
            return
1✔
764

765
    def _mark_ready(self) -> None:
1✔
766
        if self.ready.cancelled():
1✔
767
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
768
        if self.ready.done():
1✔
769
            return
1✔
770

771
        assert self.tip_header
1✔
772
        chain = blockchain.check_header(self.tip_header)
1✔
773
        if not chain:
1✔
774
            self.blockchain = blockchain.get_best_chain()
1✔
775
        else:
776
            self.blockchain = chain
×
777
        assert self.blockchain is not None
1✔
778

779
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
780

781
        self.ready.set_result(1)
1✔
782

783
    def is_connected_and_ready(self) -> bool:
1✔
784
        return self.ready.done() and not self.got_disconnected.is_set()
×
785

786
    async def _save_certificate(self) -> None:
1✔
787
        if not os.path.exists(self.cert_path):
×
788
            # we may need to retry this a few times, in case the handshake hasn't completed
789
            for _ in range(10):
×
790
                dercert = await self._fetch_certificate()
×
791
                if dercert:
×
792
                    self.logger.info("succeeded in getting cert")
×
793
                    self._verify_certificate_fingerprint(dercert)
×
794
                    with open(self.cert_path, 'w') as f:
×
795
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
796
                        # workaround android bug
797
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
798
                        f.write(cert)
×
799
                        # even though close flushes, we can't fsync when closed.
800
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
801
                        # fsync writes to OS buffer to disk
802
                        f.flush()
×
803
                        os.fsync(f.fileno())
×
804
                    break
×
805
                await asyncio.sleep(1)
×
806
            else:
807
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
808

809
    async def _fetch_certificate(self) -> bytes:
1✔
810
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
811
        sslc.check_hostname = False
×
812
        sslc.verify_mode = ssl.CERT_NONE
×
813
        async with _RSClient(
×
814
            session_factory=RPCSession,
815
            host=self.host, port=self.port,
816
            ssl=sslc,
817
            proxy=self.proxy,
818
            transport=PaddedRSTransport,
819
        ) as session:
820
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
821
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
822
            return ssl_object.getpeercert(binary_form=True)
×
823

824
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
825
        if self.is_main_server():
×
826
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
827
        return None
×
828

829
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
830
        expected_fingerprint = self._get_expected_fingerprint()
×
831
        if not expected_fingerprint:
×
832
            return
×
833
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
834
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
835
        if not fingerprints_match:
×
836
            util.trigger_callback('cert_mismatch')
×
837
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
838
        self.logger.info("cert fingerprint verification passed")
×
839

840
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
841
        """Populate header cache for block heights in range [from_height, to_height]."""
842
        assert from_height <= to_height, (from_height, to_height)
1✔
843
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
844
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
845
            # cache already has all requested headers
846
            return
1✔
847
        # use lower timeout as we usually have network.bhi_lock here
848
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
849
        count = to_height - from_height + 1
1✔
850
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
851
        for idx, raw_header in enumerate(headers):
1✔
852
            header_height = from_height + idx
1✔
853
            self._headers_cache[header_height] = raw_header
1✔
854

855
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
856
        if not is_non_negative_integer(height):
1✔
857
            raise Exception(f"{repr(height)} is not a block height")
×
858
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
859
        # use lower timeout as we usually have network.bhi_lock here
860
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
861
        if raw_header := self._headers_cache.get(height):
1✔
862
            return blockchain.deserialize_header(raw_header, height)
1✔
863
        self.logger.info(f'requesting block header {height} in {mode=}')
×
864
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
865
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
866

867
    async def get_block_headers(
1✔
868
        self,
869
        *,
870
        start_height: int,
871
        count: int,
872
        timeout=None,
873
        mode: Optional[ChainResolutionMode] = None,
874
    ) -> Sequence[bytes]:
875
        """Request a number of consecutive block headers, starting at `start_height`.
876
        `count` is the num of requested headers, BUT note the server might return fewer than this
877
        (if range would extend beyond its tip).
878
        note: the returned headers are not verified or parsed at all.
879
        """
880
        if not is_non_negative_integer(start_height):
1✔
881
            raise Exception(f"{repr(start_height)} is not a block height")
×
882
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
883
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
884
        self.logger.info(
1✔
885
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
886
            + (f" (in {mode=})" if mode is not None else "")
887
        )
888
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
889
        # check response
890
        assert_dict_contains_field(res, field_name='count')
1✔
891
        assert_dict_contains_field(res, field_name='max')
1✔
892
        assert_non_negative_integer(res['count'])
1✔
893
        assert_non_negative_integer(res['max'])
1✔
894
        if self.active_protocol_tuple >= (1, 6):
1✔
895
            hex_headers_list = assert_dict_contains_field(res, field_name='headers')
1✔
896
            assert_list_or_tuple(hex_headers_list)
1✔
897
            for item in hex_headers_list:
1✔
898
                assert_hex_str(item)
1✔
899
                if len(item) != HEADER_SIZE * 2:
1✔
900
                    raise RequestCorrupted(f"invalid header size. got {len(item)//2}, expected {HEADER_SIZE}")
×
901
            if len(hex_headers_list) != res['count']:
1✔
902
                raise RequestCorrupted(f"{len(hex_headers_list)=} != {res['count']=}")
×
903
            headers = list(bfh(hex_header) for hex_header in hex_headers_list)
1✔
904
        else: # proto 1.4
905
            hex_headers_concat = assert_dict_contains_field(res, field_name='hex')
×
906
            assert_hex_str(hex_headers_concat)
×
907
            if len(hex_headers_concat) != HEADER_SIZE * 2 * res['count']:
×
908
                raise RequestCorrupted('inconsistent chunk hex and count')
×
909
            headers = list(util.chunks(bfh(hex_headers_concat), size=HEADER_SIZE))
×
910
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
911
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
912
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
913
        if res['count'] > count:
1✔
914
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
915
        elif res['count'] < count:
1✔
916
            # we only tolerate getting fewer headers if it is due to reaching the tip
917
            end_height = start_height + res['count'] - 1
×
918
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
919
                raise RequestCorrupted(
×
920
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
921
        # checks done.
922
        return headers
1✔
923

924
    async def request_chunk_below_max_checkpoint(
1✔
925
        self,
926
        *,
927
        height: int,
928
    ) -> None:
929
        if not is_non_negative_integer(height):
×
930
            raise Exception(f"{repr(height)} is not a block height")
×
931
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
932
        index = height // CHUNK_SIZE
×
933
        if index in self._requested_chunks:
×
934
            return None
×
935
        self.logger.debug(f"requesting chunk from height {height}")
×
936
        try:
×
937
            self._requested_chunks.add(index)
×
938
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
939
        finally:
940
            self._requested_chunks.discard(index)
×
941
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
942
        if not conn:
×
943
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
944
        return None
×
945

946
    async def _fast_forward_chain(
1✔
947
        self,
948
        *,
949
        height: int,  # usually local chain tip + 1
950
        tip: int,  # server tip. we should not request past this.
951
    ) -> int:
952
        """Request some headers starting at `height` to grow the blockchain of this interface.
953
        Returns number of headers we managed to connect, starting at `height`.
954
        """
955
        if not is_non_negative_integer(height):
×
956
            raise Exception(f"{repr(height)} is not a block height")
×
957
        if not is_non_negative_integer(tip):
×
958
            raise Exception(f"{repr(tip)} is not a block height")
×
959
        if not (height > constants.net.max_checkpoint()
×
960
                or height == 0 == constants.net.max_checkpoint()):
961
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
962
        assert height <= tip, f"{height=} must be <= {tip=}"
×
963
        # Request a few chunks of headers concurrently.
964
        # tradeoffs:
965
        # - more chunks: higher memory requirements
966
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
967
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
968
        async with OldTaskGroup() as group:
×
969
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
970
            index0 = height // CHUNK_SIZE
×
971
            for chunk_cnt in range(10):
×
972
                index = index0 + chunk_cnt
×
973
                start_height = index * CHUNK_SIZE
×
974
                if start_height > tip:
×
975
                    break
×
976
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
977
                size = end_height - start_height + 1
×
978
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
979
        # try to connect chunks
980
        num_headers = 0
×
981
        for index, task in tasks:
×
982
            headers = task.result()
×
983
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
984
            if not conn:
×
985
                break
×
986
            num_headers += len(headers)
×
987
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
988
        offset = height - index0 * CHUNK_SIZE
×
989
        return max(0, num_headers - offset)
×
990

991
    def is_main_server(self) -> bool:
1✔
992
        return (self.network.interface == self or
1✔
993
                self.network.interface is None and self.network.default_server == self.server)
994

995
    async def open_session(
1✔
996
        self,
997
        *,
998
        ssl_context: Optional[ssl.SSLContext],
999
        exit_early: bool = False,
1000
    ):
1001
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
1002
        async with _RSClient(
1✔
1003
            session_factory=session_factory,
1004
            host=self.host, port=self.port,
1005
            ssl=ssl_context,
1006
            proxy=self.proxy,
1007
            transport=PaddedRSTransport,
1008
        ) as session:
1009
            start = time.perf_counter()
1✔
1010
            self.session = session  # type: NotificationSession
1✔
1011
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
1012
            client_prange = [version.PROTOCOL_VERSION_MIN, version.PROTOCOL_VERSION_MAX]
1✔
1013
            try:
1✔
1014
                ver = await session.send_request('server.version', [self.client_name(), client_prange])
1✔
1015
            except aiorpcx.jsonrpc.RPCError as e:
×
1016
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
1017
            if exit_early:
1✔
1018
                return
×
1019
            self.active_protocol_tuple = protocol_tuple(ver[1])
1✔
1020
            client_pmin = protocol_tuple(client_prange[0])
1✔
1021
            client_pmax = protocol_tuple(client_prange[1])
1✔
1022
            if not (client_pmin <= self.active_protocol_tuple <= client_pmax):
1✔
1023
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
1024
                                         f'we asked for {client_prange!r}, they sent {ver[1]!r}')
1025
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
1026
                raise GracefulDisconnect(f'too many connected servers already '
×
1027
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
1028

1029
            try:
1✔
1030
                features = await session.send_request('server.features')
1✔
1031
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
1032
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
1033
                raise GracefulDisconnect(e)
×
1034
            if server_genesis_hash != constants.net.GENESIS:
1✔
1035
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
1036
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
1037

1038
            try:
1✔
1039
                async with self.taskgroup as group:
1✔
1040
                    await group.spawn(self.ping)
1✔
1041
                    await group.spawn(self.request_fee_estimates)
1✔
1042
                    await group.spawn(self.run_fetch_blocks)
1✔
1043
                    await group.spawn(self.monitor_connection)
1✔
1044
            except aiorpcx.jsonrpc.RPCError as e:
1✔
1045
                if e.code in (
×
1046
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
1047
                    JSONRPC.SERVER_BUSY,
1048
                    JSONRPC.METHOD_NOT_FOUND,
1049
                    JSONRPC.INTERNAL_ERROR,
1050
                ):
1051
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1052
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1053
                raise
×
1054
            finally:
1055
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1056

1057
    async def monitor_connection(self):
1✔
1058
        while True:
1✔
1059
            await asyncio.sleep(1)
1✔
1060
            # If the session/transport is no longer open, we disconnect.
1061
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1062
            # note: If the user pulls the ethernet cable or disconnects wifi,
1063
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1064
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1065
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1066
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1067
            #         Hence, in practice the connection issue will only be detected the next time we try
1068
            #         to send a message (plus timeout), which can take minutes...
1069
            if not self.session or self.session.is_closing():
×
1070
                raise GracefulDisconnect('session was closed')
×
1071

1072
    async def ping(self):
1✔
1073
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1074
        # Adding a bit of randomness generates some noise against traffic analysis.
1075
        while True:
1✔
1076
            await asyncio.sleep(random.random() * 300)
1✔
NEW
1077
            await self.send_ping()
×
1078
            await self._maybe_send_noise()
×
1079

1080
    async def _maybe_send_noise(self):
1✔
1081
        while random.random() < 0.2:
1✔
1082
            await asyncio.sleep(random.random())
1✔
NEW
1083
            await self.send_ping()
×
1084

1085
    async def request_fee_estimates(self):
1✔
1086
        while True:
1✔
1087
            async with OldTaskGroup() as group:
1✔
1088
                fee_tasks = []
1✔
1089
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1090
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1091
            for nblock_target, task in fee_tasks:
1✔
1092
                fee = task.result()
1✔
1093
                if fee < 0: continue
1✔
1094
                assert isinstance(fee, int)
1✔
1095
                self.fee_estimates_eta[nblock_target] = fee
1✔
1096
            self.network.update_fee_estimates()
1✔
1097
            await asyncio.sleep(60)
1✔
1098

1099
    async def close(self, *, force_after: int = None):
1✔
1100
        """Closes the connection and waits for it to be closed.
1101
        We try to flush buffered data to the wire, which can take some time.
1102
        """
1103
        if self.session:
1✔
1104
            await self.session.close(force_after=force_after)
1✔
1105
        # monitor_connection will cancel tasks
1106

1107
    async def run_fetch_blocks(self):
1✔
1108
        header_queue = asyncio.Queue()
1✔
1109
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1110
        while True:
1✔
1111
            item = await header_queue.get()
1✔
1112
            raw_header = item[0]
1✔
1113
            height = raw_header['height']
1✔
1114
            header_bytes = bfh(raw_header['hex'])
1✔
1115
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1116
            self.tip_header = header_dict
1✔
1117
            self.tip = height
1✔
1118
            if self.tip < constants.net.max_checkpoint():
1✔
1119
                raise GracefulDisconnect(
×
1120
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1121
            self._mark_ready()
1✔
1122
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1123
            self._headers_cache[height] = header_bytes
1✔
1124
            try:
1✔
1125
                blockchain_updated = await self._process_header_at_tip()
1✔
1126
            finally:
1127
                self._headers_cache.clear()  # to reduce memory usage
1✔
1128
            # header processing done
1129
            if self.is_main_server() or blockchain_updated:
1✔
1130
                self.logger.info(f"new chain tip. {height=}")
1✔
1131
            if blockchain_updated:
1✔
1132
                util.trigger_callback('blockchain_updated')
1✔
1133
                self._blockchain_updated.set()
1✔
1134
                self._blockchain_updated.clear()
1✔
1135
            util.trigger_callback('network_updated')
1✔
1136
            await self.network.switch_unwanted_fork_interface()
1✔
1137
            await self.network.switch_lagging_interface()
1✔
1138
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1139

1140
    async def _process_header_at_tip(self) -> bool:
1✔
1141
        """Returns:
1142
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1143
        True - new header we didn't have, or reorg
1144
        """
1145
        height, header = self.tip, self.tip_header
1✔
1146
        async with self.network.bhi_lock:
1✔
1147
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1148
                # another interface amended the blockchain
1149
                return False
×
1150
            await self.sync_until(height)
1✔
1151
            return True
1✔
1152

1153
    async def sync_until(
1✔
1154
        self,
1155
        height: int,
1156
        *,
1157
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1158
    ) -> Tuple[ChainResolutionMode, int]:
1159
        if next_height is None:
1✔
1160
            next_height = self.tip
1✔
1161
        last = None  # type: Optional[ChainResolutionMode]
1✔
1162
        while last is None or height <= next_height:
1✔
1163
            prev_last, prev_height = last, height
1✔
1164
            if next_height > height + 144:
1✔
1165
                # We are far from the tip.
1166
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1167
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1168
                num_headers = await self._fast_forward_chain(
×
1169
                    height=height, tip=next_height)
1170
                if num_headers == 0:
×
1171
                    if height <= constants.net.max_checkpoint():
×
1172
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1173
                    last, height = await self.step(height)
×
1174
                    continue
×
1175
                # report progress to gui/etc
1176
                util.trigger_callback('blockchain_updated')
×
1177
                self._blockchain_updated.set()
×
1178
                self._blockchain_updated.clear()
×
1179
                util.trigger_callback('network_updated')
×
1180
                height += num_headers
×
1181
                assert height <= next_height+1, (height, self.tip)
×
1182
                last = ChainResolutionMode.CATCHUP
×
1183
            else:
1184
                # We are close to the tip, so process headers one-by-one.
1185
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1186
                last, height = await self.step(height)
1✔
1187
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1188
        return last, height
1✔
1189

1190
    async def step(
1✔
1191
        self,
1192
        height: int,
1193
    ) -> Tuple[ChainResolutionMode, int]:
1194
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1195
        await self._maybe_warm_headers_cache(
1✔
1196
            from_height=height,
1197
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1198
            mode=ChainResolutionMode.CATCHUP,
1199
        )
1200
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1201

1202
        chain = blockchain.check_header(header)
1✔
1203
        if chain:
1✔
1204
            self.blockchain = chain
1✔
1205
            # note: there is an edge case here that is not handled.
1206
            # we might know the blockhash (enough for check_header) but
1207
            # not have the header itself. e.g. regtest chain with only genesis.
1208
            # this situation resolves itself on the next block
1209
            return ChainResolutionMode.CATCHUP, height+1
1✔
1210

1211
        can_connect = blockchain.can_connect(header)
1✔
1212
        if not can_connect:
1✔
1213
            self.logger.info(f"can't connect new block: {height=}")
1✔
1214
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1215
            chain = blockchain.check_header(header)
1✔
1216
            can_connect = blockchain.can_connect(header)
1✔
1217
            assert chain or can_connect
1✔
1218
        if can_connect:
1✔
1219
            height += 1
1✔
1220
            self.blockchain = can_connect
1✔
1221
            self.blockchain.save_header(header)
1✔
1222
            return ChainResolutionMode.CATCHUP, height
1✔
1223

1224
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1225
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1226

1227
    async def _search_headers_binary(
1✔
1228
        self,
1229
        height: int,
1230
        bad: int,
1231
        bad_header: dict,
1232
        chain: Optional[Blockchain],
1233
    ) -> Tuple[int, int, dict]:
1234
        assert bad == bad_header['block_height']
1✔
1235
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1236

1237
        self.blockchain = chain
1✔
1238
        good = height
1✔
1239
        while True:
1✔
1240
            assert 0 <= good < bad, (good, bad)
1✔
1241
            height = (good + bad) // 2
1✔
1242
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1243
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1244
                await self._maybe_warm_headers_cache(
1✔
1245
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1246
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1247
            chain = blockchain.check_header(header)
1✔
1248
            if chain:
1✔
1249
                self.blockchain = chain
1✔
1250
                good = height
1✔
1251
            else:
1252
                bad = height
1✔
1253
                bad_header = header
1✔
1254
            if good + 1 == bad:
1✔
1255
                break
1✔
1256

1257
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1258
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1259
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1260

1261
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1262
        return good, bad, bad_header
1✔
1263

1264
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1265
        self,
1266
        good: int,
1267
        bad: int,
1268
        bad_header: dict,
1269
    ) -> Tuple[ChainResolutionMode, int]:
1270
        assert good + 1 == bad
1✔
1271
        assert bad == bad_header['block_height']
1✔
1272
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1273
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1274
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1275

1276
        bh = self.blockchain.height()
1✔
1277
        assert bh >= good, (bh, good)
1✔
1278
        if bh == good:
1✔
1279
            height = good + 1
1✔
1280
            self.logger.info(f"catching up from {height}")
1✔
1281
            return ChainResolutionMode.NO_FORK, height
1✔
1282

1283
        # this is a new fork we don't yet have
1284
        height = bad + 1
1✔
1285
        self.logger.info(f"new fork at bad height {bad}")
1✔
1286
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1287
        self.blockchain = b
1✔
1288
        assert b.forkpoint == bad
1✔
1289
        return ChainResolutionMode.FORK, height
1✔
1290

1291
    async def _search_headers_backwards(
1✔
1292
        self,
1293
        height: int,
1294
        *,
1295
        header: dict,
1296
    ) -> Tuple[int, dict, int, dict]:
1297
        async def iterate():
1✔
1298
            nonlocal height, header
1299
            checkp = False
1✔
1300
            if height <= constants.net.max_checkpoint():
1✔
1301
                height = constants.net.max_checkpoint()
1✔
1302
                checkp = True
1✔
1303
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1304
            chain = blockchain.check_header(header)
1✔
1305
            can_connect = blockchain.can_connect(header)
1✔
1306
            if chain or can_connect:
1✔
1307
                return False
1✔
1308
            if checkp:
1✔
1309
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1310
            return True
1✔
1311

1312
        bad, bad_header = height, header
1✔
1313
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1314
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1315
        local_max = max([0] + [x.height() for x in chains])
1✔
1316
        height = min(local_max + 1, height - 1)
1✔
1317
        assert height >= 0
1✔
1318

1319
        await self._maybe_warm_headers_cache(
1✔
1320
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1321

1322
        delta = 2
1✔
1323
        while await iterate():
1✔
1324
            bad, bad_header = height, header
1✔
1325
            height -= delta
1✔
1326
            delta *= 2
1✔
1327

1328
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1329
        self.logger.info(f"exiting backward mode at {height}")
1✔
1330
        return height, header, bad, bad_header
1✔
1331

1332
    @classmethod
1✔
1333
    def client_name(cls) -> str:
1✔
1334
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1335

1336
    def is_tor(self):
1✔
1337
        return self.host.endswith('.onion')
×
1338

1339
    def ip_addr(self) -> Optional[str]:
1✔
1340
        session = self.session
×
1341
        if not session: return None
×
1342
        peer_addr = session.remote_address()
×
1343
        if not peer_addr: return None
×
1344
        return str(peer_addr.host)
×
1345

1346
    def bucket_based_on_ipaddress(self) -> str:
1✔
1347
        def do_bucket():
×
1348
            if self.is_tor():
×
1349
                return BUCKET_NAME_OF_ONION_SERVERS
×
1350
            try:
×
1351
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1352
            except ValueError:
×
1353
                return ''
×
1354
            if not ip_addr:
×
1355
                return ''
×
1356
            if ip_addr.is_loopback:  # localhost is exempt
×
1357
                return ''
×
1358
            if ip_addr.version == 4:
×
1359
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1360
                return str(slash16)
×
1361
            elif ip_addr.version == 6:
×
1362
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1363
                return str(slash48)
×
1364
            return ''
×
1365

1366
        if not self._ipaddr_bucket:
×
1367
            self._ipaddr_bucket = do_bucket()
×
1368
        return self._ipaddr_bucket
×
1369

1370
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1371
        if not is_hash256_str(tx_hash):
1✔
1372
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1373
        if not is_non_negative_integer(tx_height):
1✔
1374
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1375
        # do request
1376
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
1✔
1377
        # check response
1378
        block_height = assert_dict_contains_field(res, field_name='block_height')
1✔
1379
        merkle = assert_dict_contains_field(res, field_name='merkle')
1✔
1380
        pos = assert_dict_contains_field(res, field_name='pos')
1✔
1381
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1382
        assert_non_negative_integer(block_height)
1✔
1383
        assert_non_negative_integer(pos)
1✔
1384
        assert_list_or_tuple(merkle)
1✔
1385
        for item in merkle:
1✔
1386
            assert_hash256_str(item)
1✔
1387
        return res
1✔
1388

1389
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1390
        if not is_hash256_str(tx_hash):
1✔
1391
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1392
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1393
            return rawtx_bytes.hex()
1✔
1394
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1395
        # validate response
1396
        if not is_hex_str(raw):
1✔
1397
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1398
        tx = Transaction(raw)
1✔
1399
        try:
1✔
1400
            tx.deserialize()  # see if raises
1✔
1401
        except Exception as e:
×
1402
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1403
        if tx.txid() != tx_hash:
1✔
1404
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1405
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1406
        return raw
1✔
1407

1408
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1409
        """caller should handle TxBroadcastError and RequestTimedOut"""
1410
        txid_calc = tx.txid()
1✔
1411
        assert txid_calc is not None
1✔
1412
        rawtx = tx.serialize()
1✔
1413
        assert is_hex_str(rawtx)
1✔
1414
        if timeout is None:
1✔
1415
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1416
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1417
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1418
        try:
1✔
1419
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1420
            # note: both 'out' and exception messages are untrusted input from the server
1421
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1422
            raise  # pass-through
×
1423
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1424
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1425
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1426
        except BaseException as e:  # intentional BaseException for sanity!
×
1427
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1428
            send_exception_to_crash_reporter(e)
×
1429
            raise TxBroadcastUnknownError() from e
×
1430
        if out != txid_calc:
1✔
1431
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1432
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1433
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1434
        # broadcast succeeded.
1435
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1436
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
1437
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1438

1439
    async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
1✔
1440
        assert self.active_protocol_tuple >= (1, 6), f"server using old protocol: {self.active_protocol_tuple}"
×
1441
        rawtxs = [tx.serialize() for tx in txs]
×
1442
        assert all(is_hex_str(rawtx) for rawtx in rawtxs)
×
1443
        assert all(tx.txid() is not None for tx in txs)
×
1444
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1445
        for tx in txs:
×
1446
            if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1447
                raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1448
        try:
×
1449
            res = await self.session.send_request('blockchain.transaction.broadcast_package', [rawtxs], timeout=timeout)
×
1450
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1451
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. {rawtxs=}")
×
1452
            return False
×
1453
        success = assert_dict_contains_field(res, field_name='success')
×
1454
        if not success:
×
1455
            errors = assert_dict_contains_field(res, field_name='errors')
×
1456
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(errors))}. {rawtxs=}")
×
1457
            return False
×
1458
        assert success
×
1459
        # broadcast succeeded.
1460
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1461
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
1462
        for tx, rawtx in zip(txs, rawtxs):
×
1463
            self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
×
1464
        return True
×
1465

1466
    async def get_history_for_spk(self, spk: str) -> List[dict]:
1✔
1467
        # do request
1468
        res = await self.session.send_request('blockchain.scriptpubkey.get_history', [spk])
1✔
1469
        # check response
1470
        assert_list_or_tuple(res)
1✔
1471
        prev_height = 1
1✔
1472
        for tx_item in res:
1✔
1473
            height = assert_dict_contains_field(tx_item, field_name='height')
1✔
1474
            assert_dict_contains_field(tx_item, field_name='tx_hash')
1✔
1475
            assert_integer(height)
1✔
1476
            if height < -1:
1✔
1477
                raise RequestCorrupted(f'{height!r} is not a valid block height')
×
1478
            assert_hash256_str(tx_item['tx_hash'])
1✔
1479
            if height in (-1, 0):
1✔
1480
                assert_dict_contains_field(tx_item, field_name='fee')
1✔
1481
                assert_non_negative_integer(tx_item['fee'])
1✔
1482
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
1✔
1483
            else:
1484
                # check monotonicity of heights
1485
                if height < prev_height:
×
1486
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1487
                prev_height = height
×
1488
        if self.active_protocol_tuple >= (1, 6):
1✔
1489
            # enforce order of mempool txs
1490
            mempool_txs = [tx_item for tx_item in res if tx_item['height'] <= 0]
1✔
1491
            if mempool_txs != sorted(mempool_txs, key=lambda x: (-x['height'], bytes.fromhex(x['tx_hash']))):
1✔
1492
                raise RequestCorrupted(f'mempool txs not in canonical order')
×
1493
        hashes = set(map(lambda item: item['tx_hash'], res))
1✔
1494
        if len(hashes) != len(res):
1✔
1495
            # Either server is sending garbage... or maybe if server is race-prone
1496
            # a recently mined tx could be included in both last block and mempool?
1497
            # Still, it's simplest to just disregard the response.
NEW
1498
            raise RequestCorrupted(f"server history has non-unique txids for spk={spk}")
×
1499
        return res
1✔
1500

1501
    async def listunspent_for_spk(self, spk: str) -> List[dict]:
1✔
1502
        # do request
NEW
1503
        res = await self.session.send_request('blockchain.scriptpubkey.listunspent', [spk])
×
1504
        # check response
1505
        assert_list_or_tuple(res)
×
1506
        for utxo_item in res:
×
1507
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1508
            assert_dict_contains_field(utxo_item, field_name='value')
×
1509
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1510
            assert_dict_contains_field(utxo_item, field_name='height')
×
1511
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1512
            assert_non_negative_integer(utxo_item['value'])
×
1513
            assert_non_negative_integer(utxo_item['height'])
×
1514
            assert_hash256_str(utxo_item['tx_hash'])
×
1515
        return res
×
1516

1517
    async def get_balance_for_spk(self, spk: str) -> dict:
1✔
1518
        # do request
NEW
1519
        res = await self.session.send_request('blockchain.scriptpubkey.get_balance', [spk])
×
1520
        # check response
1521
        assert_dict_contains_field(res, field_name='confirmed')
×
1522
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1523
        assert_non_negative_integer(res['confirmed'])
×
1524
        assert_integer(res['unconfirmed'])
×
1525
        return res
×
1526

1527
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1528
        if not is_non_negative_integer(tx_height):
×
1529
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1530
        if not is_non_negative_integer(tx_pos):
×
1531
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1532
        # do request
1533
        res = await self.session.send_request(
×
1534
            'blockchain.transaction.id_from_pos',
1535
            [tx_height, tx_pos, merkle],
1536
        )
1537
        # check response
1538
        if merkle:
×
1539
            assert_dict_contains_field(res, field_name='tx_hash')
×
1540
            assert_dict_contains_field(res, field_name='merkle')
×
1541
            assert_hash256_str(res['tx_hash'])
×
1542
            assert_list_or_tuple(res['merkle'])
×
1543
            for node_hash in res['merkle']:
×
1544
                assert_hash256_str(node_hash)
×
1545
        else:
1546
            assert_hash256_str(res)
×
1547
        return res
×
1548

1549
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1550
        # do request
1551
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1552
        # check response
1553
        assert_list_or_tuple(res)
×
1554
        prev_fee = float('inf')
×
1555
        for fee, s in res:
×
1556
            assert_non_negative_int_or_float(fee)
×
1557
            assert_non_negative_integer(s)
×
1558
            if fee >= prev_fee:  # check monotonicity
×
1559
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1560
            prev_fee = fee
×
1561
        return res
×
1562

1563
    async def get_server_banner(self) -> str:
1✔
1564
        # do request
1565
        res = await self.session.send_request('server.banner')
×
1566
        # check response
1567
        if not isinstance(res, str):
×
1568
            raise RequestCorrupted(f'{res!r} should be a str')
×
1569
        return res
×
1570

1571
    async def get_donation_address(self) -> str:
1✔
1572
        # do request
1573
        res = await self.session.send_request('server.donation_address')
×
1574
        # check response
1575
        if not res:  # ignore empty string
×
1576
            return ''
×
1577
        if not isinstance(res, str):
×
1578
            raise RequestCorrupted(f'{res!r} should be a str')
×
1579
        address = res.removeprefix('bitcoin:')
×
1580
        if not bitcoin.is_address(address):
×
1581
            # note: do not hard-fail -- allow server to use future-type
1582
            #       bitcoin address we do not recognize
1583
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1584
            return ''
×
1585
        return address
×
1586

1587
    async def get_relay_fee(self) -> int:
1✔
1588
        """Returns the min relay feerate in sat/kbyte."""
1589
        # do request
1590
        if self.active_protocol_tuple >= (1, 6):
×
1591
            res = await self.session.send_request('mempool.get_info')
×
1592
            minrelaytxfee = assert_dict_contains_field(res, field_name='minrelaytxfee')
×
1593
        else:
1594
            minrelaytxfee = await self.session.send_request('blockchain.relayfee')
×
1595
        # check response
1596
        assert_non_negative_int_or_float(minrelaytxfee)
×
1597
        relayfee = int(minrelaytxfee * bitcoin.COIN)
×
1598
        relayfee = max(0, relayfee)
×
1599
        return relayfee
×
1600

1601
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1602
        """Returns a feerate estimate for getting confirmed within
1603
        num_blocks blocks, in sat/kbyte.
1604
        Returns -1 if the server could not provide an estimate.
1605
        """
1606
        if not is_non_negative_integer(num_blocks):
1✔
1607
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1608
        # do request
1609
        try:
1✔
1610
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1611
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1612
            # The protocol spec says the server itself should already have returned -1
1613
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1614
            # and sends an error instead. Convert it here:
1615
            if "cannot estimate fee" in e.message:
×
1616
                res = -1
×
1617
            else:
1618
                raise
×
1619
        except aiorpcx.jsonrpc.RPCError as e:
×
1620
            # The protocol spec says the server itself should already have returned -1
1621
            # if it cannot provide an estimate. "Fulcrum" often sends:
1622
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1623
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1624
                res = -1
×
1625
            else:
1626
                raise
×
1627
        # check response
1628
        if res != -1:
1✔
1629
            assert_non_negative_int_or_float(res)
1✔
1630
            res = int(res * bitcoin.COIN)
1✔
1631
        return res
1✔
1632

1633
    async def phandle_on_ping_notification(self, data=""):
1✔
NEW
1634
        assert self.active_protocol_tuple >= (1, 7)
×
NEW
1635
        assert_hex_str(data, allow_odd_len=True)
×
1636
        # nothing to do.
1637

1638
    async def send_ping(self, pong_len: int = None, data: str = None) -> dict[str, Any]:
1✔
NEW
1639
        if pong_len is None:
×
NEW
1640
            pong_len = random.randint(0, 128)  # simply to exercise server logic
×
NEW
1641
        if data is None:
×
NEW
1642
            data = "0" * random.randint(0, 128)  # simply to exercise server logic
×
NEW
1643
        assert isinstance(pong_len, int), repr(pong_len)
×
NEW
1644
        assert is_hex_str(data, allow_odd_len=True), repr(data)
×
NEW
1645
        res = await self.session.send_request("server.ping", (pong_len, data))
×
NEW
1646
        data2 = assert_dict_contains_field(res, field_name='data')
×
NEW
1647
        assert_hex_str(data2, allow_odd_len=True)
×
NEW
1648
        if len(data2) != pong_len:
×
NEW
1649
            raise RequestCorrupted(f'length of data ({len(data2)}) does not match requested {pong_len=}')
×
NEW
1650
        return res
×
1651

1652

1653
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1654
    chain_bad = blockchain.check_header(header)
1✔
1655
    if chain_bad:
1✔
1656
        raise Exception('bad_header must not check!')
×
1657

1658

1659
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1660
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1661
    # So, we use substring matching to grok the error message.
1662
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1663
    server_msg = str(server_msg)
×
1664
    server_msg = server_msg.replace("\n", r"\n")
×
1665

1666
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1667
    script_error_messages = {
×
1668
        r"Script evaluated without error but finished with a false/empty top stack element",
1669
        r"Script failed an OP_VERIFY operation",
1670
        r"Script failed an OP_EQUALVERIFY operation",
1671
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1672
        r"Script failed an OP_CHECKSIGVERIFY operation",
1673
        r"Script failed an OP_NUMEQUALVERIFY operation",
1674
        r"Script is too big",
1675
        r"Push value size limit exceeded",
1676
        r"Operation limit exceeded",
1677
        r"Stack size limit exceeded",
1678
        r"Signature count negative or greater than pubkey count",
1679
        r"Pubkey count negative or limit exceeded",
1680
        r"Opcode missing or not understood",
1681
        r"Attempted to use a disabled opcode",
1682
        r"Operation not valid with the current stack size",
1683
        r"Operation not valid with the current altstack size",
1684
        r"OP_RETURN was encountered",
1685
        r"Invalid OP_IF construction",
1686
        r"Negative locktime",
1687
        r"Locktime requirement not satisfied",
1688
        r"Signature hash type missing or not understood",
1689
        r"Non-canonical DER signature",
1690
        r"Data push larger than necessary",
1691
        r"Only push operators allowed in signatures",
1692
        r"Non-canonical signature: S value is unnecessarily high",
1693
        r"Dummy CHECKMULTISIG argument must be zero",
1694
        r"OP_IF/NOTIF argument must be minimal",
1695
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1696
        r"NOPx reserved for soft-fork upgrades",
1697
        r"Witness version reserved for soft-fork upgrades",
1698
        r"Taproot version reserved for soft-fork upgrades",
1699
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1700
        r"Public key version reserved for soft-fork upgrades",
1701
        r"Public key is neither compressed or uncompressed",
1702
        r"Stack size must be exactly one after execution",
1703
        r"Extra items left on stack after execution",
1704
        r"Witness program has incorrect length",
1705
        r"Witness program was passed an empty witness",
1706
        r"Witness program hash mismatch",
1707
        r"Witness requires empty scriptSig",
1708
        r"Witness requires only-redeemscript scriptSig",
1709
        r"Witness provided for non-witness script",
1710
        r"Using non-compressed keys in segwit",
1711
        r"Invalid Schnorr signature size",
1712
        r"Invalid Schnorr signature hash type",
1713
        r"Invalid Schnorr signature",
1714
        r"Invalid Taproot control block size",
1715
        r"Too much signature validation relative to witness weight",
1716
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1717
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1718
        r"Using OP_CODESEPARATOR in non-witness script",
1719
        r"Signature is found in scriptCode",
1720
    }
1721
    for substring in script_error_messages:
×
1722
        if substring in server_msg:
×
1723
            return substring
×
1724
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1725
    # grep "REJECT_"
1726
    # grep "TxValidationResult"
1727
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1728
    validation_error_messages = {
×
1729
        r"coinbase": None,
1730
        r"tx-size-small": None,
1731
        r"non-final": None,
1732
        r"txn-already-in-mempool": None,
1733
        r"txn-mempool-conflict": None,
1734
        r"txn-already-known": None,
1735
        r"non-BIP68-final": None,
1736
        r"bad-txns-nonstandard-inputs": None,
1737
        r"bad-witness-nonstandard": None,
1738
        r"bad-txns-too-many-sigops": None,
1739
        r"mempool min fee not met":
1740
            ("mempool min fee not met\n" +
1741
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1742
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1743
               "of transactions that all pay higher fees. Try to increase the fee.")),
1744
        r"min relay fee not met": None,
1745
        r"absurdly-high-fee": None,
1746
        r"max-fee-exceeded": None,
1747
        r"too-long-mempool-chain": None,
1748
        r"bad-txns-spends-conflicting-tx": None,
1749
        r"insufficient fee": ("insufficient fee\n" +
1750
             _("Your transaction is trying to replace another one in the mempool but it "
1751
               "does not meet the rules to do so. Try to increase the fee.")),
1752
        r"too many potential replacements": None,
1753
        r"replacement-adds-unconfirmed": None,
1754
        r"mempool full": None,
1755
        r"non-mandatory-script-verify-flag": None,
1756
        r"mandatory-script-verify-flag-failed": None,
1757
        r"Transaction check failed": None,
1758
    }
1759
    for substring in validation_error_messages:
×
1760
        if substring in server_msg:
×
1761
            msg = validation_error_messages[substring]
×
1762
            return msg if msg else substring
×
1763
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1764
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1765
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1766
    # grep "RPC_TRANSACTION"
1767
    # grep "RPC_DESERIALIZATION_ERROR"
1768
    # grep "TransactionError"
1769
    rawtransaction_error_messages = {
×
1770
        r"Missing inputs": None,
1771
        r"Inputs missing or spent": None,
1772
        r"transaction already in block chain": None,
1773
        r"Transaction already in block chain": None,
1774
        r"Transaction outputs already in utxo set": None,
1775
        r"TX decode failed": None,
1776
        r"Peer-to-peer functionality missing or disabled": None,
1777
        r"Transaction rejected by AcceptToMemoryPool": None,
1778
        r"AcceptToMemoryPool failed": None,
1779
        r"Transaction rejected by mempool": None,
1780
        r"Mempool internal error": None,
1781
        r"Fee exceeds maximum configured by user": None,
1782
        r"Unspendable output exceeds maximum configured by user": None,
1783
        r"Transaction rejected due to invalid package": None,
1784
    }
1785
    for substring in rawtransaction_error_messages:
×
1786
        if substring in server_msg:
×
1787
            msg = rawtransaction_error_messages[substring]
×
1788
            return msg if msg else substring
×
1789
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1790
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1791
    # grep "REJECT_"
1792
    # grep "TxValidationResult"
1793
    tx_verify_error_messages = {
×
1794
        r"bad-txns-vin-empty": None,
1795
        r"bad-txns-vout-empty": None,
1796
        r"bad-txns-oversize": None,
1797
        r"bad-txns-vout-negative": None,
1798
        r"bad-txns-vout-toolarge": None,
1799
        r"bad-txns-txouttotal-toolarge": None,
1800
        r"bad-txns-inputs-duplicate": None,
1801
        r"bad-cb-length": None,
1802
        r"bad-txns-prevout-null": None,
1803
        r"bad-txns-inputs-missingorspent":
1804
            ("bad-txns-inputs-missingorspent\n" +
1805
             _("You might have a local transaction in your wallet that this transaction "
1806
               "builds on top. You need to either broadcast or remove the local tx.")),
1807
        r"bad-txns-premature-spend-of-coinbase": None,
1808
        r"bad-txns-inputvalues-outofrange": None,
1809
        r"bad-txns-in-belowout": None,
1810
        r"bad-txns-fee-outofrange": None,
1811
    }
1812
    for substring in tx_verify_error_messages:
×
1813
        if substring in server_msg:
×
1814
            msg = tx_verify_error_messages[substring]
×
1815
            return msg if msg else substring
×
1816
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1817
    # grep "reason ="
1818
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1819
    # should come after script_error.cpp (due to e.g. "version")
1820
    policy_error_messages = {
×
1821
        r"version": _("Transaction uses non-standard version."),
1822
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1823
        r"scriptsig-size": None,
1824
        r"scriptsig-not-pushonly": None,
1825
        r"scriptpubkey":
1826
            ("scriptpubkey\n" +
1827
             _("Some of the outputs pay to a non-standard script.")),
1828
        r"bare-multisig": None,
1829
        r"dust":
1830
            (_("Transaction could not be broadcast due to dust outputs.\n"
1831
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1832
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1833
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1834
    }
1835
    for substring in policy_error_messages:
×
1836
        if substring in server_msg:
×
1837
            msg = policy_error_messages[substring]
×
1838
            return msg if msg else substring
×
1839
    # otherwise:
1840
    return _("Unknown error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc