• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5140274696945664

04 May 2026 03:45PM UTC coverage: 65.106% (-0.1%) from 65.246%
5140274696945664

Pull #10627

CirrusCI

ecdsa
synchronizer: separation between addresses_up_to_date and outpoints_up_to_date
Pull Request #10627: Protocol 1.7

100 of 165 new or added lines in 12 files covered. (60.61%)

46 existing lines in 6 files now uncovered.

24733 of 37989 relevant lines covered (65.11%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.13
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str, versiontuple)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException, script_to_scripthash
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
KNOWN_ELEC_PROTOCOL_TRANSPORTS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in KNOWN_ELEC_PROTOCOL_TRANSPORTS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85
RPC_ERROR_HISTORY_TOO_LONG = 10001
1✔
86

87
class HistoryTooLong(Exception):
1✔
88
    # we should not close the connection in that case
89
    pass
1✔
90

91
class NetworkTimeout:
1✔
92
    # seconds
93
    class Generic:
1✔
94
        NORMAL = 30
1✔
95
        RELAXED = 45
1✔
96
        MOST_RELAXED = 600
1✔
97

98
    class Urgent(Generic):
1✔
99
        NORMAL = 10
1✔
100
        RELAXED = 20
1✔
101
        MOST_RELAXED = 60
1✔
102

103

104
def assert_non_negative_integer(val: Any) -> None:
1✔
105
    if not is_non_negative_integer(val):
1✔
106
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
107

108

109
def assert_integer(val: Any) -> None:
1✔
110
    if not is_integer(val):
1✔
111
        raise RequestCorrupted(f'{val!r} should be an integer')
×
112

113

114
def assert_int_or_float(val: Any) -> None:
1✔
115
    if not is_int_or_float(val):
×
116
        raise RequestCorrupted(f'{val!r} should be int or float')
×
117

118

119
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
120
    if not is_non_negative_int_or_float(val):
1✔
121
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
122

123

124
def assert_hash256_str(val: Any) -> None:
1✔
125
    if not is_hash256_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
127

128

129
def assert_hex_str(val: Any) -> None:
1✔
130
    if not is_hex_str(val):
1✔
131
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
132

133

134
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
135
    if not isinstance(d, dict):
1✔
136
        raise RequestCorrupted(f'{d!r} should be a dict')
×
137
    if field_name not in d:
1✔
138
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
139
    return d[field_name]
1✔
140

141

142
def assert_list_or_tuple(val: Any) -> None:
1✔
143
    if not isinstance(val, (list, tuple)):
1✔
144
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
145

146

147
def protocol_tuple(s: Any) -> tuple[int, ...]:
1✔
148
    """Converts a protocol version number, such as "1.0" to a tuple (1, 0).
149

150
    If the version number is bad, (0, ) indicating version 0 is returned.
151
    """
152
    try:
1✔
153
        assert isinstance(s, str)
1✔
154
        return versiontuple(s)
1✔
155
    except Exception:
×
156
        return (0, )
×
157

158

159
class ChainResolutionMode(enum.Enum):
1✔
160
    CATCHUP = enum.auto()
1✔
161
    BACKWARD = enum.auto()
1✔
162
    BINARY = enum.auto()
1✔
163
    FORK = enum.auto()
1✔
164
    NO_FORK = enum.auto()
1✔
165

166

167
class NotificationSession(RPCSession):
1✔
168

169
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
170
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
171
        self.subscriptions = defaultdict(list)
1✔
172
        self.cache = {}
1✔
173
        self._msg_counter = itertools.count(start=1)
1✔
174
        self.interface = interface
1✔
175
        self.taskgroup = interface.taskgroup
1✔
176
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
177

178
    async def handle_request(self, request):
1✔
179
        self.maybe_log(f"--> {request}")
1✔
180
        try:
1✔
181
            if isinstance(request, Notification):
1✔
182
                params, result = request.args[:-1], request.args[-1]
1✔
183
                key = self.get_hashable_key_for_rpc_call(request.method, params)
1✔
184
                if key in self.subscriptions:
1✔
185
                    self.cache[key] = result
1✔
186
                    for queue in self.subscriptions[key]:
1✔
187
                        await queue.put(request.args)
1✔
188
                else:
NEW
189
                    raise Exception(f'unexpected notification {self.subscriptions}')
×
190
            else:
191
                raise Exception(f'unexpected request. not a notification')
×
192
        except Exception as e:
×
193
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
194
            await self.close()
×
195

196
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
197
        # note: semaphores/timeouts/backpressure etc are handled by
198
        # aiorpcx. the timeout arg here in most cases should not be set
199
        msg_id = next(self._msg_counter)
1✔
200
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
201
        try:
1✔
202
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
203
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
204
            response = await util.wait_for2(
1✔
205
                super().send_request(*args, **kwargs),
206
                timeout)
207
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
208
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
209
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
210
        except CodeMessageError as e:
1✔
211
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
212
            if e.code == RPC_ERROR_HISTORY_TOO_LONG:
1✔
NEW
213
                raise HistoryTooLong()
×
214
            else:
215
                raise
1✔
216
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
217
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
218
            raise
×
219
        else:
220
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
221
            return response
1✔
222

223
    def set_default_timeout(self, timeout):
1✔
224
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
225
        self.sent_request_timeout = timeout
1✔
226
        assert hasattr(self, "max_send_delay")        # in base class
1✔
227
        self.max_send_delay = timeout
1✔
228

229
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
230
        # note: until the cache is written for the first time,
231
        # each 'subscribe' call might make a request on the network.
232
        params2 = params[:]
1✔
233
        if method == 'blockchain.scriptpubkey.subscribe':
1✔
234
            params2[0] = script_to_scripthash(bytes.fromhex(params2[0]))
1✔
235
        key = self.get_hashable_key_for_rpc_call(method, params2)
1✔
236
        self.subscriptions[key].append(queue)
1✔
237
        if key in self.cache:
1✔
238
            result = self.cache[key]
×
239
        else:
240
            try:
1✔
241
                result = await self.send_request(method, params)
1✔
NEW
242
            except HistoryTooLong:
×
NEW
243
                self.logger.info(f"history too long {params}")
×
244
                # do not raise, do not kill task group
NEW
245
                return
×
246
            self.cache[key] = result
1✔
247
        await queue.put(params2 + [result])
1✔
248

249
    def unsubscribe(self, queue):
1✔
250
        """Unsubscribe a callback to free object references to enable GC."""
251
        # note: we can't unsubscribe from the server, so we keep receiving
252
        # subsequent notifications
253
        for v in self.subscriptions.values():
1✔
254
            if queue in v:
1✔
255
                v.remove(queue)
1✔
256

257
    @classmethod
1✔
258
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
259
        """Hashable index for subscriptions and cache"""
260
        return str(method) + repr(params)
1✔
261

262
    def maybe_log(self, msg: str) -> None:
1✔
263
        if not self.interface: return
1✔
264
        if self.interface.debug or self.interface.network.debug:
1✔
265
            self.interface.logger.debug(msg)
1✔
266

267
    def default_framer(self):
1✔
268
        # overridden so that max_size can be customized
269
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
270
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
271
        return NewlineFramer(max_size=max_size)
1✔
272

273
    async def close(self, *, force_after: int = None):
1✔
274
        """Closes the connection and waits for it to be closed.
275
        We try to flush buffered data to the wire, which can take some time.
276
        """
277
        if force_after is None:
1✔
278
            # We give up after a while and just abort the connection.
279
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
280
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
281
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
282
            #       wait until this timeout is triggered
283
            force_after = 1  # seconds
1✔
284
        await super().close(force_after=force_after)
1✔
285

286

287
class NetworkException(Exception): pass
1✔
288

289

290
class GracefulDisconnect(NetworkException):
1✔
291
    log_level = logging.INFO
1✔
292

293
    def __init__(self, *args, log_level=None, **kwargs):
1✔
294
        Exception.__init__(self, *args, **kwargs)
1✔
295
        if log_level is not None:
1✔
296
            self.log_level = log_level
×
297

298

299
class RequestTimedOut(GracefulDisconnect):
1✔
300
    def __str__(self):
1✔
301
        return _("Network request timed out.")
×
302

303

304
class RequestCorrupted(Exception): pass
1✔
305

306
class ErrorParsingSSLCert(Exception): pass
1✔
307
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
308
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
309
class InvalidOptionCombination(Exception): pass
1✔
310
class ConnectError(NetworkException): pass
1✔
311

312

313
class TxBroadcastError(NetworkException):
1✔
314
    def get_message_for_gui(self):
1✔
315
        raise NotImplementedError()
×
316

317

318
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
319
    def get_message_for_gui(self):
1✔
320
        return "{}\n{}\n\n{}" \
×
321
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
322
                    _("Consider trying to connect to a different server, or updating Electrum."),
323
                    str(self))
324

325

326
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
327
    def get_message_for_gui(self):
1✔
328
        return "{}\n{}\n\n{}" \
×
329
            .format(_("The server returned an error when broadcasting the transaction."),
330
                    _("Consider trying to connect to a different server, or updating Electrum."),
331
                    str(self))
332

333

334
class TxBroadcastUnknownError(TxBroadcastError):
1✔
335
    def get_message_for_gui(self):
1✔
336
        return "{}\n{}" \
×
337
            .format(_("Unknown error when broadcasting the transaction."),
338
                    _("Consider trying to connect to a different server, or updating Electrum."))
339

340

341
class _RSClient(RSClient):
1✔
342
    async def create_connection(self):
1✔
343
        try:
1✔
344
            return await super().create_connection()
1✔
345
        except OSError as e:
×
346
            # note: using "from e" here will set __cause__ of ConnectError
347
            raise ConnectError(e) from e
×
348

349

350
class PaddedRSTransport(RSTransport):
1✔
351
    """A raw socket transport that provides basic countermeasures against traffic analysis
352
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
353
    (it is assumed that a network observer does not see plaintext transport contents,
354
    due to it being wrapped e.g. in TLS)
355
    """
356

357
    MIN_PACKET_SIZE = 1024
1✔
358
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
359
    # (unpadded) amount of bytes sent instantly before beginning with polling.
360
    # This makes the initial handshake where a few small messages are exchanged faster.
361
    WARMUP_BUDGET_SIZE = 1024
1✔
362

363
    session: Optional['RPCSession']
1✔
364

365
    def __init__(self, *args, **kwargs):
1✔
366
        RSTransport.__init__(self, *args, **kwargs)
1✔
367
        self._sbuffer = bytearray()  # "send buffer"
1✔
368
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
369
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
370
        self._last_send = time.monotonic()
1✔
371
        self._force_send = False  # type: bool
1✔
372

373
    # note: this does not call super().write() but is a complete reimplementation
374
    async def write(self, message):
1✔
375
        await self._can_send.wait()
1✔
376
        if self.is_closing():
1✔
377
            return
×
378
        framed_message = self._framer.frame(message)
1✔
379
        self._sbuffer += framed_message
1✔
380
        self._sbuffer_has_data_evt.set()
1✔
381
        self._maybe_consume_sbuffer()
1✔
382

383
    def _maybe_consume_sbuffer(self) -> None:
1✔
384
        """Maybe take some data from sbuffer and send it on the wire."""
385
        if not self._can_send.is_set() or self.is_closing():
1✔
386
            return
1✔
387
        buf = self._sbuffer
1✔
388
        if not buf:
1✔
389
            return
1✔
390
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
391
        if not (
1✔
392
            self._force_send
393
            or len(buf) >= self.MIN_PACKET_SIZE
394
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
395
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
396
        ):
397
            return
×
398
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
399
        # either (1) pad length to next power of two, to create "lsize" packet:
400
        payload_lsize = len(buf)
1✔
401
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
402
        npad_lsize = total_lsize - payload_lsize
1✔
403
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
404
        # and create a packet with half that size ("ssize", s for small)
405
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
406
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
407
        if payload_ssize != -1:
1✔
408
            payload_ssize += 1  # for "\n" char
1✔
409
            npad_ssize = total_ssize - payload_ssize
1✔
410
        else:
411
            npad_ssize = float("inf")
×
412
        # decide between (1) and (2):
413
        if self._force_send or npad_lsize <= npad_ssize:
1✔
414
            # (1) create "lsize" packet: consume full buffer
415
            npad = npad_lsize
1✔
416
            p_idx = payload_lsize
1✔
417
        else:
418
            # (2) create "ssize" packet: consume some, but defer some for later
419
            npad = npad_ssize
×
420
            p_idx = payload_ssize
×
421
        # pad by adding spaces near end
422
        # self.session.maybe_log(
423
        #     f"PaddedRSTransport. calling low-level write(). "
424
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
425
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
426
        # )
427
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
428
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
429
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
430
        self._asyncio_transport.write(buf2)
1✔
431
        self._last_send = time.monotonic()
1✔
432
        del self._sbuffer[:p_idx]
1✔
433
        if not self._sbuffer:
1✔
434
            self._sbuffer_has_data_evt.clear()
1✔
435

436
    async def _poll_sbuffer(self):
1✔
437
        while not self.is_closing():
1✔
438
            await self._can_send.wait()
1✔
439
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
440
            self._maybe_consume_sbuffer()
1✔
441
            # If there is still data in the buffer, sleep until it would time out.
442
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
443
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
444
            if len(self._sbuffer) > 0:
1✔
445
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
446
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
447
                await asyncio.sleep(timeout_rel)
×
448

449
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
450
        super().connection_made(transport)
1✔
451
        if isinstance(self.session, NotificationSession):
1✔
452
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
453
            self._sbuffer_task = self.loop.create_task(coro)
1✔
454
        else:
455
            # This a short-lived "fetch_certificate"-type session.
456
            # No polling here, we always force-empty the buffer.
457
            self._force_send = True
×
458

459
    async def close(self, *args, **kwargs):
1✔
460
        '''Close the connection and return when closed.'''
461
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
462
        self._force_send = True
1✔
463
        self._maybe_consume_sbuffer()
1✔
464
        await super().close(*args, **kwargs)
1✔
465

466

467
class ServerAddr:
1✔
468

469
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
470
        assert isinstance(host, str), repr(host)
1✔
471
        if protocol is None:
1✔
472
            protocol = 's'
×
473
        if not host:
1✔
474
            raise ValueError('host must not be empty')
×
475
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
476
            host = host[1:-1]
1✔
477
        try:
1✔
478
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
479
        except Exception as e:
1✔
480
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
481
        if protocol not in KNOWN_ELEC_PROTOCOL_TRANSPORTS:
1✔
482
            raise ValueError(f"invalid network protocol: {protocol}")
×
483
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
484
        self.port = int(net_addr.port)
1✔
485
        self.protocol = protocol
1✔
486
        self._net_addr_str = str(net_addr)
1✔
487

488
    @classmethod
1✔
489
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
490
        """Constructs a ServerAddr or raises ValueError."""
491
        # host might be IPv6 address, hence do rsplit:
492
        host, port, protocol = str(s).rsplit(':', 2)
1✔
493
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
494

495
    @classmethod
1✔
496
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
497
        """Construct ServerAddr from str, guessing missing details.
498
        Does not raise - just returns None if guessing failed.
499
        Ongoing compatibility not guaranteed.
500
        """
501
        if not s:
1✔
502
            return None
×
503
        host = ""
1✔
504
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
505
            host_end = s.index("]")
1✔
506
            host = s[1:host_end]
1✔
507
            s = s[host_end+1:]
1✔
508
        items = str(s).rsplit(':', 2)
1✔
509
        if len(items) < 2:
1✔
510
            return None  # although maybe we could guess the port too?
1✔
511
        host = host or items[0]
1✔
512
        port = items[1]
1✔
513
        if len(items) >= 3:
1✔
514
            protocol = items[2]
1✔
515
        else:
516
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
517
        try:
1✔
518
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
519
        except ValueError:
1✔
520
            return None
1✔
521

522
    def to_friendly_name(self) -> str:
1✔
523
        # note: this method is closely linked to from_str_with_inference
524
        if self.protocol == 's':  # hide trailing ":s"
1✔
525
            return self.net_addr_str()
1✔
526
        return str(self)
1✔
527

528
    def __str__(self):
1✔
529
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
530

531
    def to_json(self) -> str:
1✔
532
        return str(self)
×
533

534
    def __repr__(self):
1✔
535
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
536

537
    def net_addr_str(self) -> str:
1✔
538
        return self._net_addr_str
1✔
539

540
    def __eq__(self, other):
1✔
541
        if not isinstance(other, ServerAddr):
1✔
542
            return False
×
543
        return (self.host == other.host
1✔
544
                and self.port == other.port
545
                and self.protocol == other.protocol)
546

547
    def __ne__(self, other):
1✔
548
        return not (self == other)
×
549

550
    def __hash__(self):
1✔
551
        return hash((self.host, self.port, self.protocol))
×
552

553

554
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
555
    filename = host
1✔
556
    try:
1✔
557
        ip = ip_address(host)
1✔
558
    except ValueError:
1✔
559
        pass
1✔
560
    else:
561
        if isinstance(ip, IPv6Address):
1✔
562
            filename = f"ipv6_{ip.packed.hex()}"
×
563
    return os.path.join(config.path, 'certs', filename)
1✔
564

565

566
class Interface(Logger):
1✔
567

568
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
569
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
570
        self.ready = network.asyncio_loop.create_future()
1✔
571
        self.got_disconnected = asyncio.Event()
1✔
572
        self._blockchain_updated = asyncio.Event()
1✔
573
        self.server = server
1✔
574
        Logger.__init__(self)
1✔
575
        assert network.config.path
1✔
576
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
577
        self.blockchain = None  # type: Optional[Blockchain]
1✔
578
        self._requested_chunks = set()  # type: Set[int]
1✔
579
        self.network = network
1✔
580
        self.session = None  # type: Optional[NotificationSession]
1✔
581
        self._ipaddr_bucket = None
1✔
582
        # Set up proxy.
583
        # - for servers running on localhost, the proxy is not used. If user runs their own server
584
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
585
        #   note: we could maybe relax this further and bypass the proxy for all private
586
        #         addresses...? e.g. 192.168.x.x
587
        if util.is_localhost(server.host):
1✔
588
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
589
            self.proxy = None
1✔
590
        else:
591
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
592

593
        # Latest block header and corresponding height, as claimed by the server.
594
        # Note that these values are updated before they are verified.
595
        # Especially during initial header sync, verification can take a long time.
596
        # Failing verification will get the interface closed.
597
        self.tip_header = None  # type: Optional[dict]
1✔
598
        self.tip = 0
1✔
599

600
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
601
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
602

603
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
604

605
        self.active_protocol_tuple = (0,)  # type: Optional[tuple[int, ...]]
1✔
606

607
        # Dump network messages (only for this interface).  Set at runtime from the console.
608
        self.debug = False
1✔
609

610
        self.taskgroup = OldTaskGroup()
1✔
611

612
        async def spawn_task():
1✔
613
            task = await self.network.taskgroup.spawn(self.run())
1✔
614
            task.set_name(f"interface::{str(server)}")
1✔
615
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
616

617
    @property
1✔
618
    def host(self):
1✔
619
        return self.server.host
1✔
620

621
    @property
1✔
622
    def port(self):
1✔
623
        return self.server.port
1✔
624

625
    @property
1✔
626
    def protocol(self):
1✔
627
        return self.server.protocol
1✔
628

629
    def diagnostic_name(self):
1✔
630
        return self.server.net_addr_str()
1✔
631

632
    def __str__(self):
1✔
633
        return f"<Interface {self.diagnostic_name()}>"
×
634

635
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
636
        """Given a CA enforcing SSL context, returns True if the connection
637
        can be established. Returns False if the server has a self-signed
638
        certificate but otherwise is okay. Any other failures raise.
639
        """
640
        try:
×
641
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
642
        except ConnectError as e:
×
643
            cause = e.__cause__
×
644
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
645
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
646
                    and cause.verify_code == 18):  # "self signed certificate"
647
                # Good. We will use this server as self-signed.
648
                return False
×
649
            # Not good. Cannot use this server.
650
            raise
×
651
        # Good. We will use this server as CA-signed.
652
        return True
×
653

654
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
655
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
656
        if ca_signed:
×
657
            if self._get_expected_fingerprint():
×
658
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
659
            with open(self.cert_path, 'w') as f:
×
660
                # empty file means this is CA signed, not self-signed
661
                f.write('')
×
662
        else:
663
            await self._save_certificate()
×
664

665
    def _is_saved_ssl_cert_available(self):
1✔
666
        if not os.path.exists(self.cert_path):
×
667
            return False
×
668
        with open(self.cert_path, 'r') as f:
×
669
            contents = f.read()
×
670
        if contents == '':  # CA signed
×
671
            if self._get_expected_fingerprint():
×
672
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
673
            return True
×
674
        # pinned self-signed cert
675
        try:
×
676
            b = pem.dePem(contents, 'CERTIFICATE')
×
677
        except SyntaxError as e:
×
678
            self.logger.info(f"error parsing already saved cert: {e}")
×
679
            raise ErrorParsingSSLCert(e) from e
×
680
        try:
×
681
            x = x509.X509(b)
×
682
        except Exception as e:
×
683
            self.logger.info(f"error parsing already saved cert: {e}")
×
684
            raise ErrorParsingSSLCert(e) from e
×
685
        try:
×
686
            x.check_date()
×
687
        except x509.CertificateError as e:
×
688
            self.logger.info(f"certificate has expired: {e}")
×
689
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
690
            return False
×
691
        self._verify_certificate_fingerprint(bytes(b))
×
692
        return True
×
693

694
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
695
        if self.protocol != 's':
1✔
696
            # using plaintext TCP
697
            return None
1✔
698

699
        # see if we already have cert for this server; or get it for the first time
700
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
701
        if not self._is_saved_ssl_cert_available():
×
702
            try:
×
703
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
704
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
705
                raise ErrorGettingSSLCertFromServer(e) from e
×
706
        # now we have a file saved in our certificate store
707
        siz = os.stat(self.cert_path).st_size
×
708
        if siz == 0:
×
709
            # CA signed cert
710
            sslc = ca_sslc
×
711
        else:
712
            # pinned self-signed cert
713
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
714
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
715
            #       We explicitly disable it as it breaks lots of servers.
716
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
717
            sslc.check_hostname = False
×
718
        return sslc
×
719

720
    def handle_disconnect(func):
1✔
721
        @functools.wraps(func)
1✔
722
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
723
            try:
1✔
724
                return await func(self, *args, **kwargs)
1✔
725
            except GracefulDisconnect as e:
×
726
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
727
            except aiorpcx.jsonrpc.RPCError as e:
×
728
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
729
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
730
            finally:
731
                self.got_disconnected.set()
1✔
732
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
733
                # in case the "with taskgroup" ctx mgr never got a chance to run:
734
                await self.taskgroup.cancel_remaining()
1✔
735
                await self.network.connection_down(self)
1✔
736
                # if was not 'ready' yet, schedule waiting coroutines:
737
                self.ready.cancel()
1✔
738
        return wrapper_func
1✔
739

740
    @ignore_exceptions  # do not kill network.taskgroup
1✔
741
    @log_exceptions
1✔
742
    @handle_disconnect
1✔
743
    async def run(self):
1✔
744
        try:
1✔
745
            ssl_context = await self._get_ssl_context()
1✔
746
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
747
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
748
            return
×
749
        try:
1✔
750
            await self.open_session(ssl_context=ssl_context)
1✔
751
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
752
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
753
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
754
                    and self.is_main_server() and not self.network.auto_connect):
755
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
756
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
757
            else:
758
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
759
            return
1✔
760

761
    def _mark_ready(self) -> None:
1✔
762
        if self.ready.cancelled():
1✔
763
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
764
        if self.ready.done():
1✔
765
            return
1✔
766

767
        assert self.tip_header
1✔
768
        chain = blockchain.check_header(self.tip_header)
1✔
769
        if not chain:
1✔
770
            self.blockchain = blockchain.get_best_chain()
1✔
771
        else:
772
            self.blockchain = chain
×
773
        assert self.blockchain is not None
1✔
774

775
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
776

777
        self.ready.set_result(1)
1✔
778

779
    def is_connected_and_ready(self) -> bool:
1✔
780
        return self.ready.done() and not self.got_disconnected.is_set()
×
781

782
    async def _save_certificate(self) -> None:
1✔
783
        if not os.path.exists(self.cert_path):
×
784
            # we may need to retry this a few times, in case the handshake hasn't completed
785
            for _ in range(10):
×
786
                dercert = await self._fetch_certificate()
×
787
                if dercert:
×
788
                    self.logger.info("succeeded in getting cert")
×
789
                    self._verify_certificate_fingerprint(dercert)
×
790
                    with open(self.cert_path, 'w') as f:
×
791
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
792
                        # workaround android bug
793
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
794
                        f.write(cert)
×
795
                        # even though close flushes, we can't fsync when closed.
796
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
797
                        # fsync writes to OS buffer to disk
798
                        f.flush()
×
799
                        os.fsync(f.fileno())
×
800
                    break
×
801
                await asyncio.sleep(1)
×
802
            else:
803
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
804

805
    async def _fetch_certificate(self) -> bytes:
1✔
806
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
807
        sslc.check_hostname = False
×
808
        sslc.verify_mode = ssl.CERT_NONE
×
809
        async with _RSClient(
×
810
            session_factory=RPCSession,
811
            host=self.host, port=self.port,
812
            ssl=sslc,
813
            proxy=self.proxy,
814
            transport=PaddedRSTransport,
815
        ) as session:
816
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
817
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
818
            return ssl_object.getpeercert(binary_form=True)
×
819

820
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
821
        if self.is_main_server():
×
822
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
823
        return None
×
824

825
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
826
        expected_fingerprint = self._get_expected_fingerprint()
×
827
        if not expected_fingerprint:
×
828
            return
×
829
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
830
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
831
        if not fingerprints_match:
×
832
            util.trigger_callback('cert_mismatch')
×
833
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
834
        self.logger.info("cert fingerprint verification passed")
×
835

836
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
837
        """Populate header cache for block heights in range [from_height, to_height]."""
838
        assert from_height <= to_height, (from_height, to_height)
1✔
839
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
840
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
841
            # cache already has all requested headers
842
            return
1✔
843
        # use lower timeout as we usually have network.bhi_lock here
844
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
845
        count = to_height - from_height + 1
1✔
846
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
847
        for idx, raw_header in enumerate(headers):
1✔
848
            header_height = from_height + idx
1✔
849
            self._headers_cache[header_height] = raw_header
1✔
850

851
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
852
        if not is_non_negative_integer(height):
1✔
853
            raise Exception(f"{repr(height)} is not a block height")
×
854
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
855
        # use lower timeout as we usually have network.bhi_lock here
856
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
857
        if raw_header := self._headers_cache.get(height):
1✔
858
            return blockchain.deserialize_header(raw_header, height)
1✔
859
        self.logger.info(f'requesting block header {height} in {mode=}')
×
860
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
861
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
862

863
    async def get_block_headers(
1✔
864
        self,
865
        *,
866
        start_height: int,
867
        count: int,
868
        timeout=None,
869
        mode: Optional[ChainResolutionMode] = None,
870
    ) -> Sequence[bytes]:
871
        """Request a number of consecutive block headers, starting at `start_height`.
872
        `count` is the num of requested headers, BUT note the server might return fewer than this
873
        (if range would extend beyond its tip).
874
        note: the returned headers are not verified or parsed at all.
875
        """
876
        if not is_non_negative_integer(start_height):
1✔
877
            raise Exception(f"{repr(start_height)} is not a block height")
×
878
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
879
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
880
        self.logger.info(
1✔
881
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
882
            + (f" (in {mode=})" if mode is not None else "")
883
        )
884
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
885
        # check response
886
        assert_dict_contains_field(res, field_name='count')
1✔
887
        assert_dict_contains_field(res, field_name='max')
1✔
888
        assert_non_negative_integer(res['count'])
1✔
889
        assert_non_negative_integer(res['max'])
1✔
890
        if self.active_protocol_tuple >= (1, 6):
1✔
891
            hex_headers_list = assert_dict_contains_field(res, field_name='headers')
1✔
892
            assert_list_or_tuple(hex_headers_list)
1✔
893
            for item in hex_headers_list:
1✔
894
                assert_hex_str(item)
1✔
895
                if len(item) != HEADER_SIZE * 2:
1✔
896
                    raise RequestCorrupted(f"invalid header size. got {len(item)//2}, expected {HEADER_SIZE}")
×
897
            if len(hex_headers_list) != res['count']:
1✔
898
                raise RequestCorrupted(f"{len(hex_headers_list)=} != {res['count']=}")
×
899
            headers = list(bfh(hex_header) for hex_header in hex_headers_list)
1✔
900
        else: # proto 1.4
901
            hex_headers_concat = assert_dict_contains_field(res, field_name='hex')
×
902
            assert_hex_str(hex_headers_concat)
×
903
            if len(hex_headers_concat) != HEADER_SIZE * 2 * res['count']:
×
904
                raise RequestCorrupted('inconsistent chunk hex and count')
×
905
            headers = list(util.chunks(bfh(hex_headers_concat), size=HEADER_SIZE))
×
906
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
907
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
908
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
909
        if res['count'] > count:
1✔
910
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
911
        elif res['count'] < count:
1✔
912
            # we only tolerate getting fewer headers if it is due to reaching the tip
913
            end_height = start_height + res['count'] - 1
×
914
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
915
                raise RequestCorrupted(
×
916
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
917
        # checks done.
918
        return headers
1✔
919

920
    async def request_chunk_below_max_checkpoint(
1✔
921
        self,
922
        *,
923
        height: int,
924
    ) -> None:
925
        if not is_non_negative_integer(height):
×
926
            raise Exception(f"{repr(height)} is not a block height")
×
927
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
928
        index = height // CHUNK_SIZE
×
929
        if index in self._requested_chunks:
×
930
            return None
×
931
        self.logger.debug(f"requesting chunk from height {height}")
×
932
        try:
×
933
            self._requested_chunks.add(index)
×
934
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
935
        finally:
936
            self._requested_chunks.discard(index)
×
937
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
938
        if not conn:
×
939
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
940
        return None
×
941

942
    async def _fast_forward_chain(
1✔
943
        self,
944
        *,
945
        height: int,  # usually local chain tip + 1
946
        tip: int,  # server tip. we should not request past this.
947
    ) -> int:
948
        """Request some headers starting at `height` to grow the blockchain of this interface.
949
        Returns number of headers we managed to connect, starting at `height`.
950
        """
951
        if not is_non_negative_integer(height):
×
952
            raise Exception(f"{repr(height)} is not a block height")
×
953
        if not is_non_negative_integer(tip):
×
954
            raise Exception(f"{repr(tip)} is not a block height")
×
955
        if not (height > constants.net.max_checkpoint()
×
956
                or height == 0 == constants.net.max_checkpoint()):
957
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
958
        assert height <= tip, f"{height=} must be <= {tip=}"
×
959
        # Request a few chunks of headers concurrently.
960
        # tradeoffs:
961
        # - more chunks: higher memory requirements
962
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
963
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
964
        async with OldTaskGroup() as group:
×
965
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
966
            index0 = height // CHUNK_SIZE
×
967
            for chunk_cnt in range(10):
×
968
                index = index0 + chunk_cnt
×
969
                start_height = index * CHUNK_SIZE
×
970
                if start_height > tip:
×
971
                    break
×
972
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
973
                size = end_height - start_height + 1
×
974
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
975
        # try to connect chunks
976
        num_headers = 0
×
977
        for index, task in tasks:
×
978
            headers = task.result()
×
979
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
980
            if not conn:
×
981
                break
×
982
            num_headers += len(headers)
×
983
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
984
        offset = height - index0 * CHUNK_SIZE
×
985
        return max(0, num_headers - offset)
×
986

987
    def is_main_server(self) -> bool:
1✔
988
        return (self.network.interface == self or
1✔
989
                self.network.interface is None and self.network.default_server == self.server)
990

991
    async def open_session(
1✔
992
        self,
993
        *,
994
        ssl_context: Optional[ssl.SSLContext],
995
        exit_early: bool = False,
996
    ):
997
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
998
        async with _RSClient(
1✔
999
            session_factory=session_factory,
1000
            host=self.host, port=self.port,
1001
            ssl=ssl_context,
1002
            proxy=self.proxy,
1003
            transport=PaddedRSTransport,
1004
        ) as session:
1005
            start = time.perf_counter()
1✔
1006
            self.session = session  # type: NotificationSession
1✔
1007
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
1008
            client_prange = [version.PROTOCOL_VERSION_MIN, version.PROTOCOL_VERSION_MAX]
1✔
1009
            try:
1✔
1010
                ver = await session.send_request('server.version', [self.client_name(), client_prange])
1✔
1011
            except aiorpcx.jsonrpc.RPCError as e:
×
1012
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
1013
            if exit_early:
1✔
1014
                return
×
1015
            self.active_protocol_tuple = protocol_tuple(ver[1])
1✔
1016
            client_pmin = protocol_tuple(client_prange[0])
1✔
1017
            client_pmax = protocol_tuple(client_prange[1])
1✔
1018
            if not (client_pmin <= self.active_protocol_tuple <= client_pmax):
1✔
1019
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
1020
                                         f'we asked for {client_prange!r}, they sent {ver[1]!r}')
1021
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
1022
                raise GracefulDisconnect(f'too many connected servers already '
×
1023
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
1024

1025
            try:
1✔
1026
                features = await session.send_request('server.features')
1✔
1027
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
1028
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
1029
                raise GracefulDisconnect(e)
×
1030
            if server_genesis_hash != constants.net.GENESIS:
1✔
1031
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
1032
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
1033

1034
            try:
1✔
1035
                async with self.taskgroup as group:
1✔
1036
                    await group.spawn(self.ping)
1✔
1037
                    await group.spawn(self.request_fee_estimates)
1✔
1038
                    await group.spawn(self.run_fetch_blocks)
1✔
1039
                    await group.spawn(self.monitor_connection)
1✔
1040
            except aiorpcx.jsonrpc.RPCError as e:
1✔
1041
                if e.code in (
×
1042
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
1043
                    JSONRPC.SERVER_BUSY,
1044
                    JSONRPC.METHOD_NOT_FOUND,
1045
                    JSONRPC.INTERNAL_ERROR,
1046
                ):
1047
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1048
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1049
                raise
×
1050
            finally:
1051
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1052

1053
    async def monitor_connection(self):
1✔
1054
        while True:
1✔
1055
            await asyncio.sleep(1)
1✔
1056
            # If the session/transport is no longer open, we disconnect.
1057
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1058
            # note: If the user pulls the ethernet cable or disconnects wifi,
1059
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1060
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1061
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1062
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1063
            #         Hence, in practice the connection issue will only be detected the next time we try
1064
            #         to send a message (plus timeout), which can take minutes...
1065
            if not self.session or self.session.is_closing():
×
1066
                raise GracefulDisconnect('session was closed')
×
1067

1068
    async def ping(self):
1✔
1069
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1070
        # Adding a bit of randomness generates some noise against traffic analysis.
1071
        while True:
1✔
1072
            await asyncio.sleep(random.random() * 300)
1✔
1073
            await self.session.send_request('server.ping')
×
1074
            await self._maybe_send_noise()
×
1075

1076
    async def _maybe_send_noise(self):
1✔
1077
        while random.random() < 0.2:
1✔
1078
            await asyncio.sleep(random.random())
1✔
1079
            await self.session.send_request('server.ping')
×
1080

1081
    async def request_fee_estimates(self):
1✔
1082
        while True:
1✔
1083
            async with OldTaskGroup() as group:
1✔
1084
                fee_tasks = []
1✔
1085
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1086
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1087
            for nblock_target, task in fee_tasks:
1✔
1088
                fee = task.result()
1✔
1089
                if fee < 0: continue
1✔
1090
                assert isinstance(fee, int)
1✔
1091
                self.fee_estimates_eta[nblock_target] = fee
1✔
1092
            self.network.update_fee_estimates()
1✔
1093
            await asyncio.sleep(60)
1✔
1094

1095
    async def close(self, *, force_after: int = None):
1✔
1096
        """Closes the connection and waits for it to be closed.
1097
        We try to flush buffered data to the wire, which can take some time.
1098
        """
1099
        if self.session:
1✔
1100
            await self.session.close(force_after=force_after)
1✔
1101
        # monitor_connection will cancel tasks
1102

1103
    async def run_fetch_blocks(self):
1✔
1104
        header_queue = asyncio.Queue()
1✔
1105
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1106
        while True:
1✔
1107
            item = await header_queue.get()
1✔
1108
            raw_header = item[0]
1✔
1109
            height = raw_header['height']
1✔
1110
            header_bytes = bfh(raw_header['hex'])
1✔
1111
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1112
            self.tip_header = header_dict
1✔
1113
            self.tip = height
1✔
1114
            if self.tip < constants.net.max_checkpoint():
1✔
1115
                raise GracefulDisconnect(
×
1116
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1117
            self._mark_ready()
1✔
1118
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1119
            self._headers_cache[height] = header_bytes
1✔
1120
            try:
1✔
1121
                blockchain_updated = await self._process_header_at_tip()
1✔
1122
            finally:
1123
                self._headers_cache.clear()  # to reduce memory usage
1✔
1124
            # header processing done
1125
            if self.is_main_server() or blockchain_updated:
1✔
1126
                self.logger.info(f"new chain tip. {height=}")
1✔
1127
            if blockchain_updated:
1✔
1128
                util.trigger_callback('blockchain_updated')
1✔
1129
                self._blockchain_updated.set()
1✔
1130
                self._blockchain_updated.clear()
1✔
1131
            util.trigger_callback('network_updated')
1✔
1132
            await self.network.switch_unwanted_fork_interface()
1✔
1133
            await self.network.switch_lagging_interface()
1✔
1134
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1135

1136
    async def _process_header_at_tip(self) -> bool:
1✔
1137
        """Returns:
1138
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1139
        True - new header we didn't have, or reorg
1140
        """
1141
        height, header = self.tip, self.tip_header
1✔
1142
        async with self.network.bhi_lock:
1✔
1143
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1144
                # another interface amended the blockchain
1145
                return False
×
1146
            await self.sync_until(height)
1✔
1147
            return True
1✔
1148

1149
    async def sync_until(
1✔
1150
        self,
1151
        height: int,
1152
        *,
1153
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1154
    ) -> Tuple[ChainResolutionMode, int]:
1155
        if next_height is None:
1✔
1156
            next_height = self.tip
1✔
1157
        last = None  # type: Optional[ChainResolutionMode]
1✔
1158
        while last is None or height <= next_height:
1✔
1159
            prev_last, prev_height = last, height
1✔
1160
            if next_height > height + 144:
1✔
1161
                # We are far from the tip.
1162
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1163
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1164
                num_headers = await self._fast_forward_chain(
×
1165
                    height=height, tip=next_height)
1166
                if num_headers == 0:
×
1167
                    if height <= constants.net.max_checkpoint():
×
1168
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1169
                    last, height = await self.step(height)
×
1170
                    continue
×
1171
                # report progress to gui/etc
1172
                util.trigger_callback('blockchain_updated')
×
1173
                self._blockchain_updated.set()
×
1174
                self._blockchain_updated.clear()
×
1175
                util.trigger_callback('network_updated')
×
1176
                height += num_headers
×
1177
                assert height <= next_height+1, (height, self.tip)
×
1178
                last = ChainResolutionMode.CATCHUP
×
1179
            else:
1180
                # We are close to the tip, so process headers one-by-one.
1181
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1182
                last, height = await self.step(height)
1✔
1183
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1184
        return last, height
1✔
1185

1186
    async def step(
1✔
1187
        self,
1188
        height: int,
1189
    ) -> Tuple[ChainResolutionMode, int]:
1190
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1191
        await self._maybe_warm_headers_cache(
1✔
1192
            from_height=height,
1193
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1194
            mode=ChainResolutionMode.CATCHUP,
1195
        )
1196
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1197

1198
        chain = blockchain.check_header(header)
1✔
1199
        if chain:
1✔
1200
            self.blockchain = chain
1✔
1201
            # note: there is an edge case here that is not handled.
1202
            # we might know the blockhash (enough for check_header) but
1203
            # not have the header itself. e.g. regtest chain with only genesis.
1204
            # this situation resolves itself on the next block
1205
            return ChainResolutionMode.CATCHUP, height+1
1✔
1206

1207
        can_connect = blockchain.can_connect(header)
1✔
1208
        if not can_connect:
1✔
1209
            self.logger.info(f"can't connect new block: {height=}")
1✔
1210
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1211
            chain = blockchain.check_header(header)
1✔
1212
            can_connect = blockchain.can_connect(header)
1✔
1213
            assert chain or can_connect
1✔
1214
        if can_connect:
1✔
1215
            height += 1
1✔
1216
            self.blockchain = can_connect
1✔
1217
            self.blockchain.save_header(header)
1✔
1218
            return ChainResolutionMode.CATCHUP, height
1✔
1219

1220
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1221
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1222

1223
    async def _search_headers_binary(
1✔
1224
        self,
1225
        height: int,
1226
        bad: int,
1227
        bad_header: dict,
1228
        chain: Optional[Blockchain],
1229
    ) -> Tuple[int, int, dict]:
1230
        assert bad == bad_header['block_height']
1✔
1231
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1232

1233
        self.blockchain = chain
1✔
1234
        good = height
1✔
1235
        while True:
1✔
1236
            assert 0 <= good < bad, (good, bad)
1✔
1237
            height = (good + bad) // 2
1✔
1238
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1239
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1240
                await self._maybe_warm_headers_cache(
1✔
1241
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1242
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1243
            chain = blockchain.check_header(header)
1✔
1244
            if chain:
1✔
1245
                self.blockchain = chain
1✔
1246
                good = height
1✔
1247
            else:
1248
                bad = height
1✔
1249
                bad_header = header
1✔
1250
            if good + 1 == bad:
1✔
1251
                break
1✔
1252

1253
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1254
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1255
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1256

1257
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1258
        return good, bad, bad_header
1✔
1259

1260
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1261
        self,
1262
        good: int,
1263
        bad: int,
1264
        bad_header: dict,
1265
    ) -> Tuple[ChainResolutionMode, int]:
1266
        assert good + 1 == bad
1✔
1267
        assert bad == bad_header['block_height']
1✔
1268
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1269
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1270
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1271

1272
        bh = self.blockchain.height()
1✔
1273
        assert bh >= good, (bh, good)
1✔
1274
        if bh == good:
1✔
1275
            height = good + 1
1✔
1276
            self.logger.info(f"catching up from {height}")
1✔
1277
            return ChainResolutionMode.NO_FORK, height
1✔
1278

1279
        # this is a new fork we don't yet have
1280
        height = bad + 1
1✔
1281
        self.logger.info(f"new fork at bad height {bad}")
1✔
1282
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1283
        self.blockchain = b
1✔
1284
        assert b.forkpoint == bad
1✔
1285
        return ChainResolutionMode.FORK, height
1✔
1286

1287
    async def _search_headers_backwards(
1✔
1288
        self,
1289
        height: int,
1290
        *,
1291
        header: dict,
1292
    ) -> Tuple[int, dict, int, dict]:
1293
        async def iterate():
1✔
1294
            nonlocal height, header
1295
            checkp = False
1✔
1296
            if height <= constants.net.max_checkpoint():
1✔
1297
                height = constants.net.max_checkpoint()
1✔
1298
                checkp = True
1✔
1299
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1300
            chain = blockchain.check_header(header)
1✔
1301
            can_connect = blockchain.can_connect(header)
1✔
1302
            if chain or can_connect:
1✔
1303
                return False
1✔
1304
            if checkp:
1✔
1305
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1306
            return True
1✔
1307

1308
        bad, bad_header = height, header
1✔
1309
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1310
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1311
        local_max = max([0] + [x.height() for x in chains])
1✔
1312
        height = min(local_max + 1, height - 1)
1✔
1313
        assert height >= 0
1✔
1314

1315
        await self._maybe_warm_headers_cache(
1✔
1316
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1317

1318
        delta = 2
1✔
1319
        while await iterate():
1✔
1320
            bad, bad_header = height, header
1✔
1321
            height -= delta
1✔
1322
            delta *= 2
1✔
1323

1324
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1325
        self.logger.info(f"exiting backward mode at {height}")
1✔
1326
        return height, header, bad, bad_header
1✔
1327

1328
    @classmethod
1✔
1329
    def client_name(cls) -> str:
1✔
1330
        return f'electrum/{version.ELECTRUM_VERSION}'
1✔
1331

1332
    def is_tor(self):
1✔
1333
        return self.host.endswith('.onion')
×
1334

1335
    def ip_addr(self) -> Optional[str]:
1✔
1336
        session = self.session
×
1337
        if not session: return None
×
1338
        peer_addr = session.remote_address()
×
1339
        if not peer_addr: return None
×
1340
        return str(peer_addr.host)
×
1341

1342
    def bucket_based_on_ipaddress(self) -> str:
1✔
1343
        def do_bucket():
×
1344
            if self.is_tor():
×
1345
                return BUCKET_NAME_OF_ONION_SERVERS
×
1346
            try:
×
1347
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1348
            except ValueError:
×
1349
                return ''
×
1350
            if not ip_addr:
×
1351
                return ''
×
1352
            if ip_addr.is_loopback:  # localhost is exempt
×
1353
                return ''
×
1354
            if ip_addr.version == 4:
×
1355
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1356
                return str(slash16)
×
1357
            elif ip_addr.version == 6:
×
1358
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1359
                return str(slash48)
×
1360
            return ''
×
1361

1362
        if not self._ipaddr_bucket:
×
1363
            self._ipaddr_bucket = do_bucket()
×
1364
        return self._ipaddr_bucket
×
1365

1366
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1367
        if not is_hash256_str(tx_hash):
1✔
1368
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1369
        if not is_non_negative_integer(tx_height):
1✔
1370
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1371
        # do request
1372
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
1✔
1373
        # check response
1374
        block_height = assert_dict_contains_field(res, field_name='block_height')
1✔
1375
        merkle = assert_dict_contains_field(res, field_name='merkle')
1✔
1376
        pos = assert_dict_contains_field(res, field_name='pos')
1✔
1377
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1378
        assert_non_negative_integer(block_height)
1✔
1379
        assert_non_negative_integer(pos)
1✔
1380
        assert_list_or_tuple(merkle)
1✔
1381
        for item in merkle:
1✔
1382
            assert_hash256_str(item)
1✔
1383
        return res
1✔
1384

1385
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1386
        if not is_hash256_str(tx_hash):
1✔
1387
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1388
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1389
            return rawtx_bytes.hex()
1✔
1390
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1391
        # validate response
1392
        if not is_hex_str(raw):
1✔
1393
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1394
        tx = Transaction(raw)
1✔
1395
        try:
1✔
1396
            tx.deserialize()  # see if raises
1✔
1397
        except Exception as e:
×
1398
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1399
        if tx.txid() != tx_hash:
1✔
1400
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1401
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1402
        return raw
1✔
1403

1404
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1405
        """caller should handle TxBroadcastError and RequestTimedOut"""
1406
        txid_calc = tx.txid()
1✔
1407
        assert txid_calc is not None
1✔
1408
        rawtx = tx.serialize()
1✔
1409
        assert is_hex_str(rawtx)
1✔
1410
        if timeout is None:
1✔
1411
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1412
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1413
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1414
        try:
1✔
1415
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1416
            # note: both 'out' and exception messages are untrusted input from the server
1417
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1418
            raise  # pass-through
×
1419
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1420
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1421
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1422
        except BaseException as e:  # intentional BaseException for sanity!
×
1423
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1424
            send_exception_to_crash_reporter(e)
×
1425
            raise TxBroadcastUnknownError() from e
×
1426
        if out != txid_calc:
1✔
1427
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1428
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1429
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1430
        # broadcast succeeded.
1431
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1432
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
1433
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1434

1435
    async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
1✔
1436
        assert self.active_protocol_tuple >= (1, 6), f"server using old protocol: {self.active_protocol_tuple}"
×
1437
        rawtxs = [tx.serialize() for tx in txs]
×
1438
        assert all(is_hex_str(rawtx) for rawtx in rawtxs)
×
1439
        assert all(tx.txid() is not None for tx in txs)
×
1440
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1441
        for tx in txs:
×
1442
            if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1443
                raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1444
        try:
×
1445
            res = await self.session.send_request('blockchain.transaction.broadcast_package', [rawtxs], timeout=timeout)
×
1446
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1447
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. {rawtxs=}")
×
1448
            return False
×
1449
        success = assert_dict_contains_field(res, field_name='success')
×
1450
        if not success:
×
1451
            errors = assert_dict_contains_field(res, field_name='errors')
×
1452
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(errors))}. {rawtxs=}")
×
1453
            return False
×
1454
        assert success
×
1455
        # broadcast succeeded.
1456
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1457
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
1458
        for tx, rawtx in zip(txs, rawtxs):
×
1459
            self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
×
1460
        return True
×
1461

1462
    async def get_history_for_spk(self, spk: str) -> List[dict]:
1✔
1463
        # do request
1464
        try:
1✔
1465
            res = await self.session.send_request('blockchain.scriptpubkey.get_history', [spk])
1✔
NEW
1466
        except HistoryTooLong:
×
1467
            # do not raise, do not kill task group
NEW
1468
            return
×
1469
        # check response
1470
        assert_list_or_tuple(res)
1✔
1471
        prev_height = 1
1✔
1472
        for tx_item in res:
1✔
1473
            height = assert_dict_contains_field(tx_item, field_name='height')
1✔
1474
            assert_dict_contains_field(tx_item, field_name='tx_hash')
1✔
1475
            assert_integer(height)
1✔
1476
            if height < -1:
1✔
1477
                raise RequestCorrupted(f'{height!r} is not a valid block height')
×
1478
            assert_hash256_str(tx_item['tx_hash'])
1✔
1479
            if height in (-1, 0):
1✔
1480
                assert_dict_contains_field(tx_item, field_name='fee')
1✔
1481
                assert_non_negative_integer(tx_item['fee'])
1✔
1482
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
1✔
1483
            else:
1484
                # check monotonicity of heights
1485
                if height < prev_height:
×
1486
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1487
                prev_height = height
×
1488
        if self.active_protocol_tuple >= (1, 6):
1✔
1489
            # enforce order of mempool txs
1490
            mempool_txs = [tx_item for tx_item in res if tx_item['height'] <= 0]
1✔
1491
            if mempool_txs != sorted(mempool_txs, key=lambda x: (-x['height'], bytes.fromhex(x['tx_hash']))):
1✔
1492
                raise RequestCorrupted(f'mempool txs not in canonical order')
×
1493
        hashes = set(map(lambda item: item['tx_hash'], res))
1✔
1494
        if len(hashes) != len(res):
1✔
1495
            # Either server is sending garbage... or maybe if server is race-prone
1496
            # a recently mined tx could be included in both last block and mempool?
1497
            # Still, it's simplest to just disregard the response.
NEW
1498
            raise RequestCorrupted(f"server history has non-unique txids for spk={spk}")
×
1499
        return res
1✔
1500

1501
    async def listunspent_for_spk(self, spk: str) -> List[dict]:
1✔
1502
        # do request
NEW
1503
        res = await self.session.send_request('blockchain.scriptpubkey.listunspent', [spk])
×
1504
        # check response
1505
        assert_list_or_tuple(res)
×
1506
        for utxo_item in res:
×
1507
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1508
            assert_dict_contains_field(utxo_item, field_name='value')
×
1509
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1510
            assert_dict_contains_field(utxo_item, field_name='height')
×
1511
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1512
            assert_non_negative_integer(utxo_item['value'])
×
1513
            assert_non_negative_integer(utxo_item['height'])
×
1514
            assert_hash256_str(utxo_item['tx_hash'])
×
1515
        return res
×
1516

1517
    async def get_balance_for_spk(self, spk: str) -> dict:
1✔
1518
        # do request
NEW
1519
        res = await self.session.send_request('blockchain.scriptpubkey.get_balance', [spk])
×
1520
        # check response
1521
        assert_dict_contains_field(res, field_name='confirmed')
×
1522
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1523
        assert_non_negative_integer(res['confirmed'])
×
1524
        assert_integer(res['unconfirmed'])
×
1525
        return res
×
1526

1527
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1528
        if not is_non_negative_integer(tx_height):
×
1529
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1530
        if not is_non_negative_integer(tx_pos):
×
1531
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1532
        # do request
1533
        res = await self.session.send_request(
×
1534
            'blockchain.transaction.id_from_pos',
1535
            [tx_height, tx_pos, merkle],
1536
        )
1537
        # check response
1538
        if merkle:
×
1539
            assert_dict_contains_field(res, field_name='tx_hash')
×
1540
            assert_dict_contains_field(res, field_name='merkle')
×
1541
            assert_hash256_str(res['tx_hash'])
×
1542
            assert_list_or_tuple(res['merkle'])
×
1543
            for node_hash in res['merkle']:
×
1544
                assert_hash256_str(node_hash)
×
1545
        else:
1546
            assert_hash256_str(res)
×
1547
        return res
×
1548

1549
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1550
        # do request
1551
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1552
        # check response
1553
        assert_list_or_tuple(res)
×
1554
        prev_fee = float('inf')
×
1555
        for fee, s in res:
×
1556
            assert_non_negative_int_or_float(fee)
×
1557
            assert_non_negative_integer(s)
×
1558
            if fee >= prev_fee:  # check monotonicity
×
1559
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1560
            prev_fee = fee
×
1561
        return res
×
1562

1563
    async def get_server_banner(self) -> str:
1✔
1564
        # do request
1565
        res = await self.session.send_request('server.banner')
×
1566
        # check response
1567
        if not isinstance(res, str):
×
1568
            raise RequestCorrupted(f'{res!r} should be a str')
×
1569
        return res
×
1570

1571
    async def get_donation_address(self) -> str:
1✔
1572
        # do request
1573
        res = await self.session.send_request('server.donation_address')
×
1574
        # check response
1575
        if not res:  # ignore empty string
×
1576
            return ''
×
1577
        if not isinstance(res, str):
×
1578
            raise RequestCorrupted(f'{res!r} should be a str')
×
1579
        address = res.removeprefix('bitcoin:')
×
1580
        if not bitcoin.is_address(address):
×
1581
            # note: do not hard-fail -- allow server to use future-type
1582
            #       bitcoin address we do not recognize
1583
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1584
            return ''
×
1585
        return address
×
1586

1587
    async def get_relay_fee(self) -> int:
1✔
1588
        """Returns the min relay feerate in sat/kbyte."""
1589
        # do request
1590
        if self.active_protocol_tuple >= (1, 6):
×
1591
            res = await self.session.send_request('mempool.get_info')
×
1592
            minrelaytxfee = assert_dict_contains_field(res, field_name='minrelaytxfee')
×
1593
        else:
1594
            minrelaytxfee = await self.session.send_request('blockchain.relayfee')
×
1595
        # check response
1596
        assert_non_negative_int_or_float(minrelaytxfee)
×
1597
        relayfee = int(minrelaytxfee * bitcoin.COIN)
×
1598
        relayfee = max(0, relayfee)
×
1599
        return relayfee
×
1600

1601
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1602
        """Returns a feerate estimate for getting confirmed within
1603
        num_blocks blocks, in sat/kbyte.
1604
        Returns -1 if the server could not provide an estimate.
1605
        """
1606
        if not is_non_negative_integer(num_blocks):
1✔
1607
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1608
        # do request
1609
        try:
1✔
1610
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1611
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1612
            # The protocol spec says the server itself should already have returned -1
1613
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1614
            # and sends an error instead. Convert it here:
1615
            if "cannot estimate fee" in e.message:
×
1616
                res = -1
×
1617
            else:
1618
                raise
×
1619
        except aiorpcx.jsonrpc.RPCError as e:
×
1620
            # The protocol spec says the server itself should already have returned -1
1621
            # if it cannot provide an estimate. "Fulcrum" often sends:
1622
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1623
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1624
                res = -1
×
1625
            else:
1626
                raise
×
1627
        # check response
1628
        if res != -1:
1✔
1629
            assert_non_negative_int_or_float(res)
1✔
1630
            res = int(res * bitcoin.COIN)
1✔
1631
        return res
1✔
1632

1633

1634
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1635
    chain_bad = blockchain.check_header(header)
1✔
1636
    if chain_bad:
1✔
1637
        raise Exception('bad_header must not check!')
×
1638

1639

1640
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1641
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1642
    # So, we use substring matching to grok the error message.
1643
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1644
    server_msg = str(server_msg)
×
1645
    server_msg = server_msg.replace("\n", r"\n")
×
1646

1647
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1648
    script_error_messages = {
×
1649
        r"Script evaluated without error but finished with a false/empty top stack element",
1650
        r"Script failed an OP_VERIFY operation",
1651
        r"Script failed an OP_EQUALVERIFY operation",
1652
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1653
        r"Script failed an OP_CHECKSIGVERIFY operation",
1654
        r"Script failed an OP_NUMEQUALVERIFY operation",
1655
        r"Script is too big",
1656
        r"Push value size limit exceeded",
1657
        r"Operation limit exceeded",
1658
        r"Stack size limit exceeded",
1659
        r"Signature count negative or greater than pubkey count",
1660
        r"Pubkey count negative or limit exceeded",
1661
        r"Opcode missing or not understood",
1662
        r"Attempted to use a disabled opcode",
1663
        r"Operation not valid with the current stack size",
1664
        r"Operation not valid with the current altstack size",
1665
        r"OP_RETURN was encountered",
1666
        r"Invalid OP_IF construction",
1667
        r"Negative locktime",
1668
        r"Locktime requirement not satisfied",
1669
        r"Signature hash type missing or not understood",
1670
        r"Non-canonical DER signature",
1671
        r"Data push larger than necessary",
1672
        r"Only push operators allowed in signatures",
1673
        r"Non-canonical signature: S value is unnecessarily high",
1674
        r"Dummy CHECKMULTISIG argument must be zero",
1675
        r"OP_IF/NOTIF argument must be minimal",
1676
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1677
        r"NOPx reserved for soft-fork upgrades",
1678
        r"Witness version reserved for soft-fork upgrades",
1679
        r"Taproot version reserved for soft-fork upgrades",
1680
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1681
        r"Public key version reserved for soft-fork upgrades",
1682
        r"Public key is neither compressed or uncompressed",
1683
        r"Stack size must be exactly one after execution",
1684
        r"Extra items left on stack after execution",
1685
        r"Witness program has incorrect length",
1686
        r"Witness program was passed an empty witness",
1687
        r"Witness program hash mismatch",
1688
        r"Witness requires empty scriptSig",
1689
        r"Witness requires only-redeemscript scriptSig",
1690
        r"Witness provided for non-witness script",
1691
        r"Using non-compressed keys in segwit",
1692
        r"Invalid Schnorr signature size",
1693
        r"Invalid Schnorr signature hash type",
1694
        r"Invalid Schnorr signature",
1695
        r"Invalid Taproot control block size",
1696
        r"Too much signature validation relative to witness weight",
1697
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1698
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1699
        r"Using OP_CODESEPARATOR in non-witness script",
1700
        r"Signature is found in scriptCode",
1701
    }
1702
    for substring in script_error_messages:
×
1703
        if substring in server_msg:
×
1704
            return substring
×
1705
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1706
    # grep "REJECT_"
1707
    # grep "TxValidationResult"
1708
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1709
    validation_error_messages = {
×
1710
        r"coinbase": None,
1711
        r"tx-size-small": None,
1712
        r"non-final": None,
1713
        r"txn-already-in-mempool": None,
1714
        r"txn-mempool-conflict": None,
1715
        r"txn-already-known": None,
1716
        r"non-BIP68-final": None,
1717
        r"bad-txns-nonstandard-inputs": None,
1718
        r"bad-witness-nonstandard": None,
1719
        r"bad-txns-too-many-sigops": None,
1720
        r"mempool min fee not met":
1721
            ("mempool min fee not met\n" +
1722
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1723
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1724
               "of transactions that all pay higher fees. Try to increase the fee.")),
1725
        r"min relay fee not met": None,
1726
        r"absurdly-high-fee": None,
1727
        r"max-fee-exceeded": None,
1728
        r"too-long-mempool-chain": None,
1729
        r"bad-txns-spends-conflicting-tx": None,
1730
        r"insufficient fee": ("insufficient fee\n" +
1731
             _("Your transaction is trying to replace another one in the mempool but it "
1732
               "does not meet the rules to do so. Try to increase the fee.")),
1733
        r"too many potential replacements": None,
1734
        r"replacement-adds-unconfirmed": None,
1735
        r"mempool full": None,
1736
        r"non-mandatory-script-verify-flag": None,
1737
        r"mandatory-script-verify-flag-failed": None,
1738
        r"Transaction check failed": None,
1739
    }
1740
    for substring in validation_error_messages:
×
1741
        if substring in server_msg:
×
1742
            msg = validation_error_messages[substring]
×
1743
            return msg if msg else substring
×
1744
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1745
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1746
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1747
    # grep "RPC_TRANSACTION"
1748
    # grep "RPC_DESERIALIZATION_ERROR"
1749
    # grep "TransactionError"
1750
    rawtransaction_error_messages = {
×
1751
        r"Missing inputs": None,
1752
        r"Inputs missing or spent": None,
1753
        r"transaction already in block chain": None,
1754
        r"Transaction already in block chain": None,
1755
        r"Transaction outputs already in utxo set": None,
1756
        r"TX decode failed": None,
1757
        r"Peer-to-peer functionality missing or disabled": None,
1758
        r"Transaction rejected by AcceptToMemoryPool": None,
1759
        r"AcceptToMemoryPool failed": None,
1760
        r"Transaction rejected by mempool": None,
1761
        r"Mempool internal error": None,
1762
        r"Fee exceeds maximum configured by user": None,
1763
        r"Unspendable output exceeds maximum configured by user": None,
1764
        r"Transaction rejected due to invalid package": None,
1765
    }
1766
    for substring in rawtransaction_error_messages:
×
1767
        if substring in server_msg:
×
1768
            msg = rawtransaction_error_messages[substring]
×
1769
            return msg if msg else substring
×
1770
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1771
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1772
    # grep "REJECT_"
1773
    # grep "TxValidationResult"
1774
    tx_verify_error_messages = {
×
1775
        r"bad-txns-vin-empty": None,
1776
        r"bad-txns-vout-empty": None,
1777
        r"bad-txns-oversize": None,
1778
        r"bad-txns-vout-negative": None,
1779
        r"bad-txns-vout-toolarge": None,
1780
        r"bad-txns-txouttotal-toolarge": None,
1781
        r"bad-txns-inputs-duplicate": None,
1782
        r"bad-cb-length": None,
1783
        r"bad-txns-prevout-null": None,
1784
        r"bad-txns-inputs-missingorspent":
1785
            ("bad-txns-inputs-missingorspent\n" +
1786
             _("You might have a local transaction in your wallet that this transaction "
1787
               "builds on top. You need to either broadcast or remove the local tx.")),
1788
        r"bad-txns-premature-spend-of-coinbase": None,
1789
        r"bad-txns-inputvalues-outofrange": None,
1790
        r"bad-txns-in-belowout": None,
1791
        r"bad-txns-fee-outofrange": None,
1792
    }
1793
    for substring in tx_verify_error_messages:
×
1794
        if substring in server_msg:
×
1795
            msg = tx_verify_error_messages[substring]
×
1796
            return msg if msg else substring
×
1797
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1798
    # grep "reason ="
1799
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1800
    # should come after script_error.cpp (due to e.g. "version")
1801
    policy_error_messages = {
×
1802
        r"version": _("Transaction uses non-standard version."),
1803
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1804
        r"scriptsig-size": None,
1805
        r"scriptsig-not-pushonly": None,
1806
        r"scriptpubkey":
1807
            ("scriptpubkey\n" +
1808
             _("Some of the outputs pay to a non-standard script.")),
1809
        r"bare-multisig": None,
1810
        r"dust":
1811
            (_("Transaction could not be broadcast due to dust outputs.\n"
1812
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1813
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1814
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1815
    }
1816
    for substring in policy_error_messages:
×
1817
        if substring in server_msg:
×
1818
            msg = policy_error_messages[substring]
×
1819
            return msg if msg else substring
×
1820
    # otherwise:
1821
    return _("Unknown error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc