• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4961272002772992

05 May 2026 01:19PM UTC coverage: 63.437% (-1.8%) from 65.246%
4961272002772992

Pull #10627

CirrusCI

ecdsa
synchronizer:
 - do not end session if a wallet address receives HISTORY_TOO_LONG,
   so that lnwatcher keeps working
 - separation between addresses_up_to_date and outpoints_up_to_date
Pull Request #10627: Protocol 1.7

66 of 169 new or added lines in 12 files covered. (39.05%)

644 existing lines in 14 files now uncovered.

24111 of 38008 relevant lines covered (63.44%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.27
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str, versiontuple)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException, script_to_scripthash
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
KNOWN_ELEC_PROTOCOL_TRANSPORTS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in KNOWN_ELEC_PROTOCOL_TRANSPORTS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85
RPC_ERROR_HISTORY_TOO_LONG = 10001
1✔
86

87
class HistoryTooLong(Exception):
1✔
88
    # we should not close the connection in that case
89
    pass
1✔
90

91
class NetworkTimeout:
1✔
92
    # seconds
93
    class Generic:
1✔
94
        NORMAL = 30
1✔
95
        RELAXED = 45
1✔
96
        MOST_RELAXED = 600
1✔
97

98
    class Urgent(Generic):
1✔
99
        NORMAL = 10
1✔
100
        RELAXED = 20
1✔
101
        MOST_RELAXED = 60
1✔
102

103

104
def assert_non_negative_integer(val: Any) -> None:
1✔
UNCOV
105
    if not is_non_negative_integer(val):
×
106
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
107

108

109
def assert_integer(val: Any) -> None:
1✔
UNCOV
110
    if not is_integer(val):
×
111
        raise RequestCorrupted(f'{val!r} should be an integer')
×
112

113

114
def assert_int_or_float(val: Any) -> None:
1✔
115
    if not is_int_or_float(val):
×
116
        raise RequestCorrupted(f'{val!r} should be int or float')
×
117

118

119
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
UNCOV
120
    if not is_non_negative_int_or_float(val):
×
121
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
122

123

124
def assert_hash256_str(val: Any) -> None:
1✔
UNCOV
125
    if not is_hash256_str(val):
×
126
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
127

128

129
def assert_hex_str(val: Any) -> None:
1✔
UNCOV
130
    if not is_hex_str(val):
×
131
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
132

133

134
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
UNCOV
135
    if not isinstance(d, dict):
×
136
        raise RequestCorrupted(f'{d!r} should be a dict')
×
UNCOV
137
    if field_name not in d:
×
138
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
UNCOV
139
    return d[field_name]
×
140

141

142
def assert_list_or_tuple(val: Any) -> None:
1✔
UNCOV
143
    if not isinstance(val, (list, tuple)):
×
144
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
145

146

147
def protocol_tuple(s: Any) -> tuple[int, ...]:
1✔
148
    """Converts a protocol version number, such as "1.0" to a tuple (1, 0).
149

150
    If the version number is bad, (0, ) indicating version 0 is returned.
151
    """
UNCOV
152
    try:
×
UNCOV
153
        assert isinstance(s, str)
×
UNCOV
154
        return versiontuple(s)
×
155
    except Exception:
×
156
        return (0, )
×
157

158

159
class ChainResolutionMode(enum.Enum):
1✔
160
    CATCHUP = enum.auto()
1✔
161
    BACKWARD = enum.auto()
1✔
162
    BINARY = enum.auto()
1✔
163
    FORK = enum.auto()
1✔
164
    NO_FORK = enum.auto()
1✔
165

166

167
class NotificationSession(RPCSession):
1✔
168

169
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
UNCOV
170
        super(NotificationSession, self).__init__(*args, **kwargs)
×
UNCOV
171
        self.subscriptions = defaultdict(list)
×
UNCOV
172
        self.cache = {}
×
UNCOV
173
        self._msg_counter = itertools.count(start=1)
×
UNCOV
174
        self.interface = interface
×
UNCOV
175
        self.taskgroup = interface.taskgroup
×
UNCOV
176
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
177

178
        # To log pre-processed json traffic, uncomment:
179
        #self.logger.setLevel(logging.DEBUG)  # from aiorpcx
180
        #self.verbosity = 4
181

182
    async def handle_request(self, request):
1✔
UNCOV
183
        self.maybe_log(f"--> {request}")
×
UNCOV
184
        try:
×
UNCOV
185
            if isinstance(request, Notification):
×
UNCOV
186
                params, result = request.args[:-1], request.args[-1]
×
UNCOV
187
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
UNCOV
188
                if key in self.subscriptions:
×
UNCOV
189
                    self.cache[key] = result
×
UNCOV
190
                    for queue in self.subscriptions[key]:
×
UNCOV
191
                        await queue.put(request.args)
×
192
                else:
NEW
193
                    raise Exception(f'unexpected notification {self.subscriptions}')
×
194
            else:
195
                raise Exception(f'unexpected request. not a notification')
×
196
        except Exception as e:
×
197
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
198
            await self.close()
×
199

200
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
201
        # note: semaphores/timeouts/backpressure etc are handled by
202
        # aiorpcx. the timeout arg here in most cases should not be set
UNCOV
203
        msg_id = next(self._msg_counter)
×
UNCOV
204
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
UNCOV
205
        try:
×
206
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
207
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
UNCOV
208
            response = await util.wait_for2(
×
209
                super().send_request(*args, **kwargs),
210
                timeout)
UNCOV
211
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
212
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
213
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
UNCOV
214
        except CodeMessageError as e:
×
UNCOV
215
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
NEW
216
            if e.code == RPC_ERROR_HISTORY_TOO_LONG:
×
NEW
217
                raise HistoryTooLong()
×
218
            else:
NEW
219
                raise
×
220
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
221
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
222
            raise
×
223
        else:
UNCOV
224
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
UNCOV
225
            return response
×
226

227
    def set_default_timeout(self, timeout):
1✔
UNCOV
228
        assert hasattr(self, "sent_request_timeout")  # in base class
×
UNCOV
229
        self.sent_request_timeout = timeout
×
UNCOV
230
        assert hasattr(self, "max_send_delay")        # in base class
×
UNCOV
231
        self.max_send_delay = timeout
×
232

233
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
234
        # note: until the cache is written for the first time,
235
        # each 'subscribe' call might make a request on the network.
NEW
236
        params2 = params[:]
×
NEW
237
        if method == 'blockchain.scriptpubkey.subscribe':
×
NEW
238
            params2[0] = script_to_scripthash(bytes.fromhex(params2[0]))
×
NEW
239
        key = self.get_hashable_key_for_rpc_call(method, params2)
×
UNCOV
240
        self.subscriptions[key].append(queue)
×
UNCOV
241
        if key in self.cache:
×
242
            result = self.cache[key]
×
243
        else:
UNCOV
244
            result = await self.send_request(method, params)
×
UNCOV
245
            self.cache[key] = result
×
NEW
246
        await queue.put(params2 + [result])
×
247

248
    def unsubscribe(self, queue):
1✔
249
        """Unsubscribe a callback to free object references to enable GC."""
250
        # note: we can't unsubscribe from the server, so we keep receiving
251
        # subsequent notifications
UNCOV
252
        for v in self.subscriptions.values():
×
UNCOV
253
            if queue in v:
×
UNCOV
254
                v.remove(queue)
×
255

256
    @classmethod
1✔
257
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
258
        """Hashable index for subscriptions and cache"""
UNCOV
259
        return str(method) + repr(params)
×
260

261
    def maybe_log(self, msg: str) -> None:
1✔
UNCOV
262
        if not self.interface: return
×
UNCOV
263
        if self.interface.debug or self.interface.network.debug:
×
UNCOV
264
            self.interface.logger.debug(msg)
×
265

266
    def default_framer(self):
1✔
267
        # overridden so that max_size can be customized
UNCOV
268
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
UNCOV
269
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
UNCOV
270
        return NewlineFramer(max_size=max_size)
×
271

272
    async def close(self, *, force_after: int = None):
1✔
273
        """Closes the connection and waits for it to be closed.
274
        We try to flush buffered data to the wire, which can take some time.
275
        """
UNCOV
276
        if force_after is None:
×
277
            # We give up after a while and just abort the connection.
278
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
279
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
280
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
281
            #       wait until this timeout is triggered
UNCOV
282
            force_after = 1  # seconds
×
UNCOV
283
        await super().close(force_after=force_after)
×
284

285

286
class NetworkException(Exception): pass
1✔
287

288

289
class GracefulDisconnect(NetworkException):
1✔
290
    log_level = logging.INFO
1✔
291

292
    def __init__(self, *args, log_level=None, **kwargs):
1✔
293
        Exception.__init__(self, *args, **kwargs)
1✔
294
        if log_level is not None:
1✔
295
            self.log_level = log_level
×
296

297

298
class RequestTimedOut(GracefulDisconnect):
1✔
299
    def __str__(self):
1✔
300
        return _("Network request timed out.")
×
301

302

303
class RequestCorrupted(Exception): pass
1✔
304

305
class ErrorParsingSSLCert(Exception): pass
1✔
306
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
307
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
308
class InvalidOptionCombination(Exception): pass
1✔
309
class ConnectError(NetworkException): pass
1✔
310

311

312
class TxBroadcastError(NetworkException):
1✔
313
    def get_message_for_gui(self):
1✔
314
        raise NotImplementedError()
×
315

316

317
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
318
    def get_message_for_gui(self):
1✔
319
        return "{}\n{}\n\n{}" \
×
320
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
321
                    _("Consider trying to connect to a different server, or updating Electrum."),
322
                    str(self))
323

324

325
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
326
    def get_message_for_gui(self):
1✔
327
        return "{}\n{}\n\n{}" \
×
328
            .format(_("The server returned an error when broadcasting the transaction."),
329
                    _("Consider trying to connect to a different server, or updating Electrum."),
330
                    str(self))
331

332

333
class TxBroadcastUnknownError(TxBroadcastError):
1✔
334
    def get_message_for_gui(self):
1✔
335
        return "{}\n{}" \
×
336
            .format(_("Unknown error when broadcasting the transaction."),
337
                    _("Consider trying to connect to a different server, or updating Electrum."))
338

339

340
class _RSClient(RSClient):
1✔
341
    async def create_connection(self):
1✔
UNCOV
342
        try:
×
UNCOV
343
            return await super().create_connection()
×
344
        except OSError as e:
×
345
            # note: using "from e" here will set __cause__ of ConnectError
346
            raise ConnectError(e) from e
×
347

348

349
class PaddedRSTransport(RSTransport):
1✔
350
    """A raw socket transport that provides basic countermeasures against traffic analysis
351
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
352
    (it is assumed that a network observer does not see plaintext transport contents,
353
    due to it being wrapped e.g. in TLS)
354
    """
355

356
    MIN_PACKET_SIZE = 1024
1✔
357
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
358
    # (unpadded) amount of bytes sent instantly before beginning with polling.
359
    # This makes the initial handshake where a few small messages are exchanged faster.
360
    WARMUP_BUDGET_SIZE = 1024
1✔
361

362
    session: Optional['RPCSession']
1✔
363

364
    def __init__(self, *args, **kwargs):
1✔
UNCOV
365
        RSTransport.__init__(self, *args, **kwargs)
×
UNCOV
366
        self._sbuffer = bytearray()  # "send buffer"
×
UNCOV
367
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
×
UNCOV
368
        self._sbuffer_has_data_evt = asyncio.Event()
×
UNCOV
369
        self._last_send = time.monotonic()
×
UNCOV
370
        self._force_send = False  # type: bool
×
371

372
    # note: this does not call super().write() but is a complete reimplementation
373
    async def write(self, message):
1✔
UNCOV
374
        await self._can_send.wait()
×
UNCOV
375
        if self.is_closing():
×
376
            return
×
UNCOV
377
        framed_message = self._framer.frame(message)
×
UNCOV
378
        self._sbuffer += framed_message
×
UNCOV
379
        self._sbuffer_has_data_evt.set()
×
UNCOV
380
        self._maybe_consume_sbuffer()
×
381

382
    def _maybe_consume_sbuffer(self) -> None:
1✔
383
        """Maybe take some data from sbuffer and send it on the wire."""
UNCOV
384
        if not self._can_send.is_set() or self.is_closing():
×
UNCOV
385
            return
×
UNCOV
386
        buf = self._sbuffer
×
UNCOV
387
        if not buf:
×
UNCOV
388
            return
×
389
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
UNCOV
390
        if not (
×
391
            self._force_send
392
            or len(buf) >= self.MIN_PACKET_SIZE
393
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
394
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
395
        ):
396
            return
×
UNCOV
397
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
×
398
        # either (1) pad length to next power of two, to create "lsize" packet:
UNCOV
399
        payload_lsize = len(buf)
×
UNCOV
400
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
×
UNCOV
401
        npad_lsize = total_lsize - payload_lsize
×
402
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
403
        # and create a packet with half that size ("ssize", s for small)
UNCOV
404
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
×
UNCOV
405
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
×
UNCOV
406
        if payload_ssize != -1:
×
UNCOV
407
            payload_ssize += 1  # for "\n" char
×
UNCOV
408
            npad_ssize = total_ssize - payload_ssize
×
409
        else:
410
            npad_ssize = float("inf")
×
411
        # decide between (1) and (2):
UNCOV
412
        if self._force_send or npad_lsize <= npad_ssize:
×
413
            # (1) create "lsize" packet: consume full buffer
UNCOV
414
            npad = npad_lsize
×
UNCOV
415
            p_idx = payload_lsize
×
416
        else:
417
            # (2) create "ssize" packet: consume some, but defer some for later
418
            npad = npad_ssize
×
419
            p_idx = payload_ssize
×
420
        # pad by adding spaces near end
421
        # self.session.maybe_log(
422
        #     f"PaddedRSTransport. calling low-level write(). "
423
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
424
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
425
        # )
UNCOV
426
        json_rpc_terminator = buf[p_idx-2:p_idx]
×
UNCOV
427
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
×
UNCOV
428
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
×
UNCOV
429
        self._asyncio_transport.write(buf2)
×
UNCOV
430
        self._last_send = time.monotonic()
×
UNCOV
431
        del self._sbuffer[:p_idx]
×
UNCOV
432
        if not self._sbuffer:
×
UNCOV
433
            self._sbuffer_has_data_evt.clear()
×
434

435
    async def _poll_sbuffer(self):
1✔
UNCOV
436
        while not self.is_closing():
×
UNCOV
437
            await self._can_send.wait()
×
UNCOV
438
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
×
UNCOV
439
            self._maybe_consume_sbuffer()
×
440
            # If there is still data in the buffer, sleep until it would time out.
441
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
442
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
UNCOV
443
            if len(self._sbuffer) > 0:
×
444
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
445
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
446
                await asyncio.sleep(timeout_rel)
×
447

448
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
UNCOV
449
        super().connection_made(transport)
×
UNCOV
450
        if isinstance(self.session, NotificationSession):
×
UNCOV
451
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
×
UNCOV
452
            self._sbuffer_task = self.loop.create_task(coro)
×
453
        else:
454
            # This a short-lived "fetch_certificate"-type session.
455
            # No polling here, we always force-empty the buffer.
456
            self._force_send = True
×
457

458
    async def close(self, *args, **kwargs):
1✔
459
        '''Close the connection and return when closed.'''
460
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
UNCOV
461
        self._force_send = True
×
UNCOV
462
        self._maybe_consume_sbuffer()
×
UNCOV
463
        await super().close(*args, **kwargs)
×
464

465

466
class ServerAddr:
1✔
467

468
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
469
        assert isinstance(host, str), repr(host)
1✔
470
        if protocol is None:
1✔
471
            protocol = 's'
×
472
        if not host:
1✔
473
            raise ValueError('host must not be empty')
×
474
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
475
            host = host[1:-1]
1✔
476
        try:
1✔
477
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
478
        except Exception as e:
1✔
479
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
480
        if protocol not in KNOWN_ELEC_PROTOCOL_TRANSPORTS:
1✔
481
            raise ValueError(f"invalid network protocol: {protocol}")
×
482
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
483
        self.port = int(net_addr.port)
1✔
484
        self.protocol = protocol
1✔
485
        self._net_addr_str = str(net_addr)
1✔
486

487
    @classmethod
1✔
488
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
489
        """Constructs a ServerAddr or raises ValueError."""
490
        # host might be IPv6 address, hence do rsplit:
491
        host, port, protocol = str(s).rsplit(':', 2)
1✔
492
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
493

494
    @classmethod
1✔
495
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
496
        """Construct ServerAddr from str, guessing missing details.
497
        Does not raise - just returns None if guessing failed.
498
        Ongoing compatibility not guaranteed.
499
        """
500
        if not s:
1✔
501
            return None
×
502
        host = ""
1✔
503
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
504
            host_end = s.index("]")
1✔
505
            host = s[1:host_end]
1✔
506
            s = s[host_end+1:]
1✔
507
        items = str(s).rsplit(':', 2)
1✔
508
        if len(items) < 2:
1✔
509
            return None  # although maybe we could guess the port too?
1✔
510
        host = host or items[0]
1✔
511
        port = items[1]
1✔
512
        if len(items) >= 3:
1✔
513
            protocol = items[2]
1✔
514
        else:
515
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
516
        try:
1✔
517
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
518
        except ValueError:
1✔
519
            return None
1✔
520

521
    def to_friendly_name(self) -> str:
1✔
522
        # note: this method is closely linked to from_str_with_inference
523
        if self.protocol == 's':  # hide trailing ":s"
1✔
524
            return self.net_addr_str()
1✔
525
        return str(self)
1✔
526

527
    def __str__(self):
1✔
528
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
529

530
    def to_json(self) -> str:
1✔
531
        return str(self)
×
532

533
    def __repr__(self):
1✔
534
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
535

536
    def net_addr_str(self) -> str:
1✔
537
        return self._net_addr_str
1✔
538

539
    def __eq__(self, other):
1✔
540
        if not isinstance(other, ServerAddr):
1✔
541
            return False
×
542
        return (self.host == other.host
1✔
543
                and self.port == other.port
544
                and self.protocol == other.protocol)
545

546
    def __ne__(self, other):
1✔
547
        return not (self == other)
×
548

549
    def __hash__(self):
1✔
550
        return hash((self.host, self.port, self.protocol))
×
551

552

553
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
554
    filename = host
1✔
555
    try:
1✔
556
        ip = ip_address(host)
1✔
557
    except ValueError:
1✔
558
        pass
1✔
559
    else:
UNCOV
560
        if isinstance(ip, IPv6Address):
×
561
            filename = f"ipv6_{ip.packed.hex()}"
×
562
    return os.path.join(config.path, 'certs', filename)
1✔
563

564

565
class Interface(Logger):
1✔
566

567
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
568
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
569
        self.ready = network.asyncio_loop.create_future()
1✔
570
        self.got_disconnected = asyncio.Event()
1✔
571
        self._blockchain_updated = asyncio.Event()
1✔
572
        self.server = server
1✔
573
        Logger.__init__(self)
1✔
574
        assert network.config.path
1✔
575
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
576
        self.blockchain = None  # type: Optional[Blockchain]
1✔
577
        self._requested_chunks = set()  # type: Set[int]
1✔
578
        self.network = network
1✔
579
        self.session = None  # type: Optional[NotificationSession]
1✔
580
        self._ipaddr_bucket = None
1✔
581
        # Set up proxy.
582
        # - for servers running on localhost, the proxy is not used. If user runs their own server
583
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
584
        #   note: we could maybe relax this further and bypass the proxy for all private
585
        #         addresses...? e.g. 192.168.x.x
586
        if util.is_localhost(server.host):
1✔
UNCOV
587
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
UNCOV
588
            self.proxy = None
×
589
        else:
590
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
591

592
        # Latest block header and corresponding height, as claimed by the server.
593
        # Note that these values are updated before they are verified.
594
        # Especially during initial header sync, verification can take a long time.
595
        # Failing verification will get the interface closed.
596
        self.tip_header = None  # type: Optional[dict]
1✔
597
        self.tip = 0
1✔
598

599
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
600
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
601

602
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
603

604
        self.active_protocol_tuple = (0,)  # type: Optional[tuple[int, ...]]
1✔
605

606
        # Dump network messages (only for this interface).  Set at runtime from the console.
607
        self.debug = False
1✔
608

609
        self.taskgroup = OldTaskGroup()
1✔
610

611
        async def spawn_task():
1✔
612
            task = await self.network.taskgroup.spawn(self.run())
1✔
613
            task.set_name(f"interface::{str(server)}")
1✔
614
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
615

616
    @property
1✔
617
    def host(self):
1✔
618
        return self.server.host
1✔
619

620
    @property
1✔
621
    def port(self):
1✔
UNCOV
622
        return self.server.port
×
623

624
    @property
1✔
625
    def protocol(self):
1✔
UNCOV
626
        return self.server.protocol
×
627

628
    def diagnostic_name(self):
1✔
629
        return self.server.net_addr_str()
1✔
630

631
    def __str__(self):
1✔
632
        return f"<Interface {self.diagnostic_name()}>"
×
633

634
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
635
        """Given a CA enforcing SSL context, returns True if the connection
636
        can be established. Returns False if the server has a self-signed
637
        certificate but otherwise is okay. Any other failures raise.
638
        """
639
        try:
×
640
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
641
        except ConnectError as e:
×
642
            cause = e.__cause__
×
643
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
644
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
645
                    and cause.verify_code == 18):  # "self signed certificate"
646
                # Good. We will use this server as self-signed.
647
                return False
×
648
            # Not good. Cannot use this server.
649
            raise
×
650
        # Good. We will use this server as CA-signed.
651
        return True
×
652

653
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
654
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
655
        if ca_signed:
×
656
            if self._get_expected_fingerprint():
×
657
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
658
            with open(self.cert_path, 'w') as f:
×
659
                # empty file means this is CA signed, not self-signed
660
                f.write('')
×
661
        else:
662
            await self._save_certificate()
×
663

664
    def _is_saved_ssl_cert_available(self):
1✔
665
        if not os.path.exists(self.cert_path):
×
666
            return False
×
667
        with open(self.cert_path, 'r') as f:
×
668
            contents = f.read()
×
669
        if contents == '':  # CA signed
×
670
            if self._get_expected_fingerprint():
×
671
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
672
            return True
×
673
        # pinned self-signed cert
674
        try:
×
675
            b = pem.dePem(contents, 'CERTIFICATE')
×
676
        except SyntaxError as e:
×
677
            self.logger.info(f"error parsing already saved cert: {e}")
×
678
            raise ErrorParsingSSLCert(e) from e
×
679
        try:
×
680
            x = x509.X509(b)
×
681
        except Exception as e:
×
682
            self.logger.info(f"error parsing already saved cert: {e}")
×
683
            raise ErrorParsingSSLCert(e) from e
×
684
        try:
×
685
            x.check_date()
×
686
        except x509.CertificateError as e:
×
687
            self.logger.info(f"certificate has expired: {e}")
×
688
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
689
            return False
×
690
        self._verify_certificate_fingerprint(bytes(b))
×
691
        return True
×
692

693
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
UNCOV
694
        if self.protocol != 's':
×
695
            # using plaintext TCP
UNCOV
696
            return None
×
697

698
        # see if we already have cert for this server; or get it for the first time
699
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
700
        if not self._is_saved_ssl_cert_available():
×
701
            try:
×
702
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
703
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
704
                raise ErrorGettingSSLCertFromServer(e) from e
×
705
        # now we have a file saved in our certificate store
706
        siz = os.stat(self.cert_path).st_size
×
707
        if siz == 0:
×
708
            # CA signed cert
709
            sslc = ca_sslc
×
710
        else:
711
            # pinned self-signed cert
712
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
713
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
714
            #       We explicitly disable it as it breaks lots of servers.
715
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
716
            sslc.check_hostname = False
×
717
        return sslc
×
718

719
    def handle_disconnect(func):
1✔
720
        @functools.wraps(func)
1✔
721
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
UNCOV
722
            try:
×
UNCOV
723
                return await func(self, *args, **kwargs)
×
724
            except GracefulDisconnect as e:
×
725
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
726
            except aiorpcx.jsonrpc.RPCError as e:
×
727
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
728
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
729
            finally:
UNCOV
730
                self.got_disconnected.set()
×
731
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
732
                # in case the "with taskgroup" ctx mgr never got a chance to run:
UNCOV
733
                await self.taskgroup.cancel_remaining()
×
UNCOV
734
                await self.network.connection_down(self)
×
735
                # if was not 'ready' yet, schedule waiting coroutines:
UNCOV
736
                self.ready.cancel()
×
737
        return wrapper_func
1✔
738

739
    @ignore_exceptions  # do not kill network.taskgroup
1✔
740
    @log_exceptions
1✔
741
    @handle_disconnect
1✔
742
    async def run(self):
1✔
UNCOV
743
        try:
×
UNCOV
744
            ssl_context = await self._get_ssl_context()
×
745
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
746
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
747
            return
×
UNCOV
748
        try:
×
UNCOV
749
            await self.open_session(ssl_context=ssl_context)
×
UNCOV
750
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
751
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
UNCOV
752
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
753
                    and self.is_main_server() and not self.network.auto_connect):
754
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
755
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
756
            else:
UNCOV
757
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
UNCOV
758
            return
×
759

760
    def _mark_ready(self) -> None:
1✔
UNCOV
761
        if self.ready.cancelled():
×
762
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
UNCOV
763
        if self.ready.done():
×
UNCOV
764
            return
×
765

UNCOV
766
        assert self.tip_header
×
UNCOV
767
        chain = blockchain.check_header(self.tip_header)
×
UNCOV
768
        if not chain:
×
UNCOV
769
            self.blockchain = blockchain.get_best_chain()
×
770
        else:
771
            self.blockchain = chain
×
UNCOV
772
        assert self.blockchain is not None
×
773

UNCOV
774
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
775

UNCOV
776
        self.ready.set_result(1)
×
777

778
    def is_connected_and_ready(self) -> bool:
1✔
779
        return self.ready.done() and not self.got_disconnected.is_set()
×
780

781
    async def _save_certificate(self) -> None:
1✔
782
        if not os.path.exists(self.cert_path):
×
783
            # we may need to retry this a few times, in case the handshake hasn't completed
784
            for _ in range(10):
×
785
                dercert = await self._fetch_certificate()
×
786
                if dercert:
×
787
                    self.logger.info("succeeded in getting cert")
×
788
                    self._verify_certificate_fingerprint(dercert)
×
789
                    with open(self.cert_path, 'w') as f:
×
790
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
791
                        # workaround android bug
792
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
793
                        f.write(cert)
×
794
                        # even though close flushes, we can't fsync when closed.
795
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
796
                        # fsync writes to OS buffer to disk
797
                        f.flush()
×
798
                        os.fsync(f.fileno())
×
799
                    break
×
800
                await asyncio.sleep(1)
×
801
            else:
802
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
803

804
    async def _fetch_certificate(self) -> bytes:
1✔
805
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
806
        sslc.check_hostname = False
×
807
        sslc.verify_mode = ssl.CERT_NONE
×
808
        async with _RSClient(
×
809
            session_factory=RPCSession,
810
            host=self.host, port=self.port,
811
            ssl=sslc,
812
            proxy=self.proxy,
813
            transport=PaddedRSTransport,
814
        ) as session:
815
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
816
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
817
            return ssl_object.getpeercert(binary_form=True)
×
818

819
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
820
        if self.is_main_server():
×
821
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
822
        return None
×
823

824
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
825
        expected_fingerprint = self._get_expected_fingerprint()
×
826
        if not expected_fingerprint:
×
827
            return
×
828
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
829
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
830
        if not fingerprints_match:
×
831
            util.trigger_callback('cert_mismatch')
×
832
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
833
        self.logger.info("cert fingerprint verification passed")
×
834

835
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
836
        """Populate header cache for block heights in range [from_height, to_height]."""
UNCOV
837
        assert from_height <= to_height, (from_height, to_height)
×
UNCOV
838
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
×
UNCOV
839
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
×
840
            # cache already has all requested headers
UNCOV
841
            return
×
842
        # use lower timeout as we usually have network.bhi_lock here
UNCOV
843
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
UNCOV
844
        count = to_height - from_height + 1
×
UNCOV
845
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
×
UNCOV
846
        for idx, raw_header in enumerate(headers):
×
UNCOV
847
            header_height = from_height + idx
×
UNCOV
848
            self._headers_cache[header_height] = raw_header
×
849

850
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
UNCOV
851
        if not is_non_negative_integer(height):
×
852
            raise Exception(f"{repr(height)} is not a block height")
×
853
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
854
        # use lower timeout as we usually have network.bhi_lock here
UNCOV
855
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
UNCOV
856
        if raw_header := self._headers_cache.get(height):
×
UNCOV
857
            return blockchain.deserialize_header(raw_header, height)
×
858
        self.logger.info(f'requesting block header {height} in {mode=}')
×
859
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
860
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
861

862
    async def get_block_headers(
1✔
863
        self,
864
        *,
865
        start_height: int,
866
        count: int,
867
        timeout=None,
868
        mode: Optional[ChainResolutionMode] = None,
869
    ) -> Sequence[bytes]:
870
        """Request a number of consecutive block headers, starting at `start_height`.
871
        `count` is the num of requested headers, BUT note the server might return fewer than this
872
        (if range would extend beyond its tip).
873
        note: the returned headers are not verified or parsed at all.
874
        """
UNCOV
875
        if not is_non_negative_integer(start_height):
×
876
            raise Exception(f"{repr(start_height)} is not a block height")
×
UNCOV
877
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
×
878
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
UNCOV
879
        self.logger.info(
×
880
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
881
            + (f" (in {mode=})" if mode is not None else "")
882
        )
UNCOV
883
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
×
884
        # check response
UNCOV
885
        assert_dict_contains_field(res, field_name='count')
×
UNCOV
886
        assert_dict_contains_field(res, field_name='max')
×
UNCOV
887
        assert_non_negative_integer(res['count'])
×
UNCOV
888
        assert_non_negative_integer(res['max'])
×
UNCOV
889
        if self.active_protocol_tuple >= (1, 6):
×
UNCOV
890
            hex_headers_list = assert_dict_contains_field(res, field_name='headers')
×
UNCOV
891
            assert_list_or_tuple(hex_headers_list)
×
UNCOV
892
            for item in hex_headers_list:
×
UNCOV
893
                assert_hex_str(item)
×
UNCOV
894
                if len(item) != HEADER_SIZE * 2:
×
895
                    raise RequestCorrupted(f"invalid header size. got {len(item)//2}, expected {HEADER_SIZE}")
×
UNCOV
896
            if len(hex_headers_list) != res['count']:
×
897
                raise RequestCorrupted(f"{len(hex_headers_list)=} != {res['count']=}")
×
UNCOV
898
            headers = list(bfh(hex_header) for hex_header in hex_headers_list)
×
899
        else: # proto 1.4
900
            hex_headers_concat = assert_dict_contains_field(res, field_name='hex')
×
901
            assert_hex_str(hex_headers_concat)
×
902
            if len(hex_headers_concat) != HEADER_SIZE * 2 * res['count']:
×
903
                raise RequestCorrupted('inconsistent chunk hex and count')
×
904
            headers = list(util.chunks(bfh(hex_headers_concat), size=HEADER_SIZE))
×
905
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
UNCOV
906
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
×
907
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
UNCOV
908
        if res['count'] > count:
×
909
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
UNCOV
910
        elif res['count'] < count:
×
911
            # we only tolerate getting fewer headers if it is due to reaching the tip
912
            end_height = start_height + res['count'] - 1
×
913
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
914
                raise RequestCorrupted(
×
915
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
916
        # checks done.
UNCOV
917
        return headers
×
918

919
    async def request_chunk_below_max_checkpoint(
1✔
920
        self,
921
        *,
922
        height: int,
923
    ) -> None:
924
        if not is_non_negative_integer(height):
×
925
            raise Exception(f"{repr(height)} is not a block height")
×
926
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
927
        index = height // CHUNK_SIZE
×
928
        if index in self._requested_chunks:
×
929
            return None
×
930
        self.logger.debug(f"requesting chunk from height {height}")
×
931
        try:
×
932
            self._requested_chunks.add(index)
×
933
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
934
        finally:
935
            self._requested_chunks.discard(index)
×
936
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
937
        if not conn:
×
938
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
939
        return None
×
940

941
    async def _fast_forward_chain(
1✔
942
        self,
943
        *,
944
        height: int,  # usually local chain tip + 1
945
        tip: int,  # server tip. we should not request past this.
946
    ) -> int:
947
        """Request some headers starting at `height` to grow the blockchain of this interface.
948
        Returns number of headers we managed to connect, starting at `height`.
949
        """
950
        if not is_non_negative_integer(height):
×
951
            raise Exception(f"{repr(height)} is not a block height")
×
952
        if not is_non_negative_integer(tip):
×
953
            raise Exception(f"{repr(tip)} is not a block height")
×
954
        if not (height > constants.net.max_checkpoint()
×
955
                or height == 0 == constants.net.max_checkpoint()):
956
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
957
        assert height <= tip, f"{height=} must be <= {tip=}"
×
958
        # Request a few chunks of headers concurrently.
959
        # tradeoffs:
960
        # - more chunks: higher memory requirements
961
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
962
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
963
        async with OldTaskGroup() as group:
×
964
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
965
            index0 = height // CHUNK_SIZE
×
966
            for chunk_cnt in range(10):
×
967
                index = index0 + chunk_cnt
×
968
                start_height = index * CHUNK_SIZE
×
969
                if start_height > tip:
×
970
                    break
×
971
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
972
                size = end_height - start_height + 1
×
973
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
974
        # try to connect chunks
975
        num_headers = 0
×
976
        for index, task in tasks:
×
977
            headers = task.result()
×
978
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
979
            if not conn:
×
980
                break
×
981
            num_headers += len(headers)
×
982
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
983
        offset = height - index0 * CHUNK_SIZE
×
984
        return max(0, num_headers - offset)
×
985

986
    def is_main_server(self) -> bool:
1✔
UNCOV
987
        return (self.network.interface == self or
×
988
                self.network.interface is None and self.network.default_server == self.server)
989

990
    async def open_session(
1✔
991
        self,
992
        *,
993
        ssl_context: Optional[ssl.SSLContext],
994
        exit_early: bool = False,
995
    ):
UNCOV
996
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
UNCOV
997
        async with _RSClient(
×
998
            session_factory=session_factory,
999
            host=self.host, port=self.port,
1000
            ssl=ssl_context,
1001
            proxy=self.proxy,
1002
            transport=PaddedRSTransport,
1003
        ) as session:
UNCOV
1004
            start = time.perf_counter()
×
UNCOV
1005
            self.session = session  # type: NotificationSession
×
UNCOV
1006
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
UNCOV
1007
            client_prange = [version.PROTOCOL_VERSION_MIN, version.PROTOCOL_VERSION_MAX]
×
UNCOV
1008
            try:
×
UNCOV
1009
                ver = await session.send_request('server.version', [self.client_name(), client_prange])
×
1010
            except aiorpcx.jsonrpc.RPCError as e:
×
1011
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
UNCOV
1012
            if exit_early:
×
1013
                return
×
UNCOV
1014
            self.active_protocol_tuple = protocol_tuple(ver[1])
×
UNCOV
1015
            client_pmin = protocol_tuple(client_prange[0])
×
UNCOV
1016
            client_pmax = protocol_tuple(client_prange[1])
×
UNCOV
1017
            if not (client_pmin <= self.active_protocol_tuple <= client_pmax):
×
1018
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
1019
                                         f'we asked for {client_prange!r}, they sent {ver[1]!r}')
UNCOV
1020
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
1021
                raise GracefulDisconnect(f'too many connected servers already '
×
1022
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
1023

UNCOV
1024
            try:
×
UNCOV
1025
                features = await session.send_request('server.features')
×
UNCOV
1026
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
×
1027
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
1028
                raise GracefulDisconnect(e)
×
UNCOV
1029
            if server_genesis_hash != constants.net.GENESIS:
×
1030
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
UNCOV
1031
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
×
1032

UNCOV
1033
            try:
×
UNCOV
1034
                async with self.taskgroup as group:
×
UNCOV
1035
                    await group.spawn(self.ping)
×
UNCOV
1036
                    await group.spawn(self.request_fee_estimates)
×
UNCOV
1037
                    await group.spawn(self.run_fetch_blocks)
×
UNCOV
1038
                    await group.spawn(self.monitor_connection)
×
UNCOV
1039
            except aiorpcx.jsonrpc.RPCError as e:
×
1040
                if e.code in (
×
1041
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
1042
                    JSONRPC.SERVER_BUSY,
1043
                    JSONRPC.METHOD_NOT_FOUND,
1044
                    JSONRPC.INTERNAL_ERROR,
1045
                ):
1046
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1047
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1048
                raise
×
1049
            finally:
UNCOV
1050
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
1051

1052
    async def monitor_connection(self):
1✔
UNCOV
1053
        while True:
×
UNCOV
1054
            await asyncio.sleep(1)
×
1055
            # If the session/transport is no longer open, we disconnect.
1056
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1057
            # note: If the user pulls the ethernet cable or disconnects wifi,
1058
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1059
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1060
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1061
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1062
            #         Hence, in practice the connection issue will only be detected the next time we try
1063
            #         to send a message (plus timeout), which can take minutes...
1064
            if not self.session or self.session.is_closing():
×
1065
                raise GracefulDisconnect('session was closed')
×
1066

1067
    async def ping(self):
1✔
1068
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1069
        # Adding a bit of randomness generates some noise against traffic analysis.
UNCOV
1070
        while True:
×
UNCOV
1071
            await asyncio.sleep(random.random() * 300)
×
1072
            await self.session.send_request('server.ping')
×
1073
            await self._maybe_send_noise()
×
1074

1075
    async def _maybe_send_noise(self):
1✔
UNCOV
1076
        while random.random() < 0.2:
×
UNCOV
1077
            await asyncio.sleep(random.random())
×
1078
            await self.session.send_request('server.ping')
×
1079

1080
    async def request_fee_estimates(self):
1✔
UNCOV
1081
        while True:
×
UNCOV
1082
            async with OldTaskGroup() as group:
×
UNCOV
1083
                fee_tasks = []
×
UNCOV
1084
                for i in FEE_ETA_TARGETS[0:-1]:
×
UNCOV
1085
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
UNCOV
1086
            for nblock_target, task in fee_tasks:
×
UNCOV
1087
                fee = task.result()
×
UNCOV
1088
                if fee < 0: continue
×
UNCOV
1089
                assert isinstance(fee, int)
×
UNCOV
1090
                self.fee_estimates_eta[nblock_target] = fee
×
UNCOV
1091
            self.network.update_fee_estimates()
×
UNCOV
1092
            await asyncio.sleep(60)
×
1093

1094
    async def close(self, *, force_after: int = None):
1✔
1095
        """Closes the connection and waits for it to be closed.
1096
        We try to flush buffered data to the wire, which can take some time.
1097
        """
UNCOV
1098
        if self.session:
×
UNCOV
1099
            await self.session.close(force_after=force_after)
×
1100
        # monitor_connection will cancel tasks
1101

1102
    async def run_fetch_blocks(self):
1✔
UNCOV
1103
        header_queue = asyncio.Queue()
×
UNCOV
1104
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
UNCOV
1105
        while True:
×
UNCOV
1106
            item = await header_queue.get()
×
UNCOV
1107
            raw_header = item[0]
×
UNCOV
1108
            height = raw_header['height']
×
UNCOV
1109
            header_bytes = bfh(raw_header['hex'])
×
UNCOV
1110
            header_dict = blockchain.deserialize_header(header_bytes, height)
×
UNCOV
1111
            self.tip_header = header_dict
×
UNCOV
1112
            self.tip = height
×
UNCOV
1113
            if self.tip < constants.net.max_checkpoint():
×
1114
                raise GracefulDisconnect(
×
1115
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
UNCOV
1116
            self._mark_ready()
×
UNCOV
1117
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
×
UNCOV
1118
            self._headers_cache[height] = header_bytes
×
UNCOV
1119
            try:
×
UNCOV
1120
                blockchain_updated = await self._process_header_at_tip()
×
1121
            finally:
UNCOV
1122
                self._headers_cache.clear()  # to reduce memory usage
×
1123
            # header processing done
UNCOV
1124
            if self.is_main_server() or blockchain_updated:
×
UNCOV
1125
                self.logger.info(f"new chain tip. {height=}")
×
UNCOV
1126
            if blockchain_updated:
×
UNCOV
1127
                util.trigger_callback('blockchain_updated')
×
UNCOV
1128
                self._blockchain_updated.set()
×
UNCOV
1129
                self._blockchain_updated.clear()
×
UNCOV
1130
            util.trigger_callback('network_updated')
×
UNCOV
1131
            await self.network.switch_unwanted_fork_interface()
×
UNCOV
1132
            await self.network.switch_lagging_interface()
×
UNCOV
1133
            await self.taskgroup.spawn(self._maybe_send_noise())
×
1134

1135
    async def _process_header_at_tip(self) -> bool:
1✔
1136
        """Returns:
1137
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1138
        True - new header we didn't have, or reorg
1139
        """
UNCOV
1140
        height, header = self.tip, self.tip_header
×
UNCOV
1141
        async with self.network.bhi_lock:
×
UNCOV
1142
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
1143
                # another interface amended the blockchain
1144
                return False
×
UNCOV
1145
            await self.sync_until(height)
×
UNCOV
1146
            return True
×
1147

1148
    async def sync_until(
1✔
1149
        self,
1150
        height: int,
1151
        *,
1152
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1153
    ) -> Tuple[ChainResolutionMode, int]:
1154
        if next_height is None:
1✔
1155
            next_height = self.tip
1✔
1156
        last = None  # type: Optional[ChainResolutionMode]
1✔
1157
        while last is None or height <= next_height:
1✔
1158
            prev_last, prev_height = last, height
1✔
1159
            if next_height > height + 144:
1✔
1160
                # We are far from the tip.
1161
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1162
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1163
                num_headers = await self._fast_forward_chain(
×
1164
                    height=height, tip=next_height)
1165
                if num_headers == 0:
×
1166
                    if height <= constants.net.max_checkpoint():
×
1167
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1168
                    last, height = await self.step(height)
×
1169
                    continue
×
1170
                # report progress to gui/etc
1171
                util.trigger_callback('blockchain_updated')
×
1172
                self._blockchain_updated.set()
×
1173
                self._blockchain_updated.clear()
×
1174
                util.trigger_callback('network_updated')
×
1175
                height += num_headers
×
1176
                assert height <= next_height+1, (height, self.tip)
×
1177
                last = ChainResolutionMode.CATCHUP
×
1178
            else:
1179
                # We are close to the tip, so process headers one-by-one.
1180
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1181
                last, height = await self.step(height)
1✔
1182
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1183
        return last, height
1✔
1184

1185
    async def step(
1✔
1186
        self,
1187
        height: int,
1188
    ) -> Tuple[ChainResolutionMode, int]:
1189
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1190
        await self._maybe_warm_headers_cache(
1✔
1191
            from_height=height,
1192
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1193
            mode=ChainResolutionMode.CATCHUP,
1194
        )
1195
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1196

1197
        chain = blockchain.check_header(header)
1✔
1198
        if chain:
1✔
1199
            self.blockchain = chain
1✔
1200
            # note: there is an edge case here that is not handled.
1201
            # we might know the blockhash (enough for check_header) but
1202
            # not have the header itself. e.g. regtest chain with only genesis.
1203
            # this situation resolves itself on the next block
1204
            return ChainResolutionMode.CATCHUP, height+1
1✔
1205

1206
        can_connect = blockchain.can_connect(header)
1✔
1207
        if not can_connect:
1✔
1208
            self.logger.info(f"can't connect new block: {height=}")
1✔
1209
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1210
            chain = blockchain.check_header(header)
1✔
1211
            can_connect = blockchain.can_connect(header)
1✔
1212
            assert chain or can_connect
1✔
1213
        if can_connect:
1✔
1214
            height += 1
1✔
1215
            self.blockchain = can_connect
1✔
1216
            self.blockchain.save_header(header)
1✔
1217
            return ChainResolutionMode.CATCHUP, height
1✔
1218

1219
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1220
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1221

1222
    async def _search_headers_binary(
1✔
1223
        self,
1224
        height: int,
1225
        bad: int,
1226
        bad_header: dict,
1227
        chain: Optional[Blockchain],
1228
    ) -> Tuple[int, int, dict]:
1229
        assert bad == bad_header['block_height']
1✔
1230
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1231

1232
        self.blockchain = chain
1✔
1233
        good = height
1✔
1234
        while True:
1✔
1235
            assert 0 <= good < bad, (good, bad)
1✔
1236
            height = (good + bad) // 2
1✔
1237
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1238
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1239
                await self._maybe_warm_headers_cache(
1✔
1240
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1241
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1242
            chain = blockchain.check_header(header)
1✔
1243
            if chain:
1✔
1244
                self.blockchain = chain
1✔
1245
                good = height
1✔
1246
            else:
1247
                bad = height
1✔
1248
                bad_header = header
1✔
1249
            if good + 1 == bad:
1✔
1250
                break
1✔
1251

1252
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1253
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1254
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1255

1256
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1257
        return good, bad, bad_header
1✔
1258

1259
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1260
        self,
1261
        good: int,
1262
        bad: int,
1263
        bad_header: dict,
1264
    ) -> Tuple[ChainResolutionMode, int]:
1265
        assert good + 1 == bad
1✔
1266
        assert bad == bad_header['block_height']
1✔
1267
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1268
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1269
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1270

1271
        bh = self.blockchain.height()
1✔
1272
        assert bh >= good, (bh, good)
1✔
1273
        if bh == good:
1✔
1274
            height = good + 1
1✔
1275
            self.logger.info(f"catching up from {height}")
1✔
1276
            return ChainResolutionMode.NO_FORK, height
1✔
1277

1278
        # this is a new fork we don't yet have
1279
        height = bad + 1
1✔
1280
        self.logger.info(f"new fork at bad height {bad}")
1✔
1281
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1282
        self.blockchain = b
1✔
1283
        assert b.forkpoint == bad
1✔
1284
        return ChainResolutionMode.FORK, height
1✔
1285

1286
    async def _search_headers_backwards(
1✔
1287
        self,
1288
        height: int,
1289
        *,
1290
        header: dict,
1291
    ) -> Tuple[int, dict, int, dict]:
1292
        async def iterate():
1✔
1293
            nonlocal height, header
1294
            checkp = False
1✔
1295
            if height <= constants.net.max_checkpoint():
1✔
UNCOV
1296
                height = constants.net.max_checkpoint()
×
UNCOV
1297
                checkp = True
×
1298
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1299
            chain = blockchain.check_header(header)
1✔
1300
            can_connect = blockchain.can_connect(header)
1✔
1301
            if chain or can_connect:
1✔
1302
                return False
1✔
1303
            if checkp:
1✔
1304
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1305
            return True
1✔
1306

1307
        bad, bad_header = height, header
1✔
1308
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1309
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1310
        local_max = max([0] + [x.height() for x in chains])
1✔
1311
        height = min(local_max + 1, height - 1)
1✔
1312
        assert height >= 0
1✔
1313

1314
        await self._maybe_warm_headers_cache(
1✔
1315
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1316

1317
        delta = 2
1✔
1318
        while await iterate():
1✔
1319
            bad, bad_header = height, header
1✔
1320
            height -= delta
1✔
1321
            delta *= 2
1✔
1322

1323
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1324
        self.logger.info(f"exiting backward mode at {height}")
1✔
1325
        return height, header, bad, bad_header
1✔
1326

1327
    @classmethod
1✔
1328
    def client_name(cls) -> str:
1✔
1329
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1330

1331
    def is_tor(self):
1✔
1332
        return self.host.endswith('.onion')
×
1333

1334
    def ip_addr(self) -> Optional[str]:
1✔
1335
        session = self.session
×
1336
        if not session: return None
×
1337
        peer_addr = session.remote_address()
×
1338
        if not peer_addr: return None
×
1339
        return str(peer_addr.host)
×
1340

1341
    def bucket_based_on_ipaddress(self) -> str:
1✔
1342
        def do_bucket():
×
1343
            if self.is_tor():
×
1344
                return BUCKET_NAME_OF_ONION_SERVERS
×
1345
            try:
×
1346
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1347
            except ValueError:
×
1348
                return ''
×
1349
            if not ip_addr:
×
1350
                return ''
×
1351
            if ip_addr.is_loopback:  # localhost is exempt
×
1352
                return ''
×
1353
            if ip_addr.version == 4:
×
1354
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1355
                return str(slash16)
×
1356
            elif ip_addr.version == 6:
×
1357
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1358
                return str(slash48)
×
1359
            return ''
×
1360

1361
        if not self._ipaddr_bucket:
×
1362
            self._ipaddr_bucket = do_bucket()
×
1363
        return self._ipaddr_bucket
×
1364

1365
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
UNCOV
1366
        if not is_hash256_str(tx_hash):
×
1367
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
UNCOV
1368
        if not is_non_negative_integer(tx_height):
×
1369
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1370
        # do request
UNCOV
1371
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1372
        # check response
UNCOV
1373
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
UNCOV
1374
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
UNCOV
1375
        pos = assert_dict_contains_field(res, field_name='pos')
×
1376
        # note: tx_height was just a hint to the server, don't enforce the response to match it
UNCOV
1377
        assert_non_negative_integer(block_height)
×
UNCOV
1378
        assert_non_negative_integer(pos)
×
UNCOV
1379
        assert_list_or_tuple(merkle)
×
UNCOV
1380
        for item in merkle:
×
UNCOV
1381
            assert_hash256_str(item)
×
UNCOV
1382
        return res
×
1383

1384
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
UNCOV
1385
        if not is_hash256_str(tx_hash):
×
1386
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
UNCOV
1387
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
×
UNCOV
1388
            return rawtx_bytes.hex()
×
UNCOV
1389
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
1390
        # validate response
UNCOV
1391
        if not is_hex_str(raw):
×
1392
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
UNCOV
1393
        tx = Transaction(raw)
×
UNCOV
1394
        try:
×
UNCOV
1395
            tx.deserialize()  # see if raises
×
1396
        except Exception as e:
×
1397
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
UNCOV
1398
        if tx.txid() != tx_hash:
×
1399
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
UNCOV
1400
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
×
UNCOV
1401
        return raw
×
1402

1403
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1404
        """caller should handle TxBroadcastError and RequestTimedOut"""
UNCOV
1405
        txid_calc = tx.txid()
×
UNCOV
1406
        assert txid_calc is not None
×
UNCOV
1407
        rawtx = tx.serialize()
×
UNCOV
1408
        assert is_hex_str(rawtx)
×
UNCOV
1409
        if timeout is None:
×
UNCOV
1410
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
UNCOV
1411
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1412
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
UNCOV
1413
        try:
×
UNCOV
1414
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
×
1415
            # note: both 'out' and exception messages are untrusted input from the server
1416
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1417
            raise  # pass-through
×
1418
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1419
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1420
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1421
        except BaseException as e:  # intentional BaseException for sanity!
×
1422
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1423
            send_exception_to_crash_reporter(e)
×
1424
            raise TxBroadcastUnknownError() from e
×
UNCOV
1425
        if out != txid_calc:
×
1426
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1427
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1428
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1429
        # broadcast succeeded.
1430
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1431
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
UNCOV
1432
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
×
1433

1434
    async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
1✔
1435
        assert self.active_protocol_tuple >= (1, 6), f"server using old protocol: {self.active_protocol_tuple}"
×
1436
        rawtxs = [tx.serialize() for tx in txs]
×
1437
        assert all(is_hex_str(rawtx) for rawtx in rawtxs)
×
1438
        assert all(tx.txid() is not None for tx in txs)
×
1439
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1440
        for tx in txs:
×
1441
            if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1442
                raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1443
        try:
×
1444
            res = await self.session.send_request('blockchain.transaction.broadcast_package', [rawtxs], timeout=timeout)
×
1445
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1446
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. {rawtxs=}")
×
1447
            return False
×
1448
        success = assert_dict_contains_field(res, field_name='success')
×
1449
        if not success:
×
1450
            errors = assert_dict_contains_field(res, field_name='errors')
×
1451
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(errors))}. {rawtxs=}")
×
1452
            return False
×
1453
        assert success
×
1454
        # broadcast succeeded.
1455
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1456
        # the status of a scriptpubkey we are subscribed to. Caching here will save a future get_transaction RPC.
1457
        for tx, rawtx in zip(txs, rawtxs):
×
1458
            self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
×
1459
        return True
×
1460

1461
    async def get_history_for_spk(self, spk: str) -> List[dict]:
1✔
1462
        # do request
NEW
1463
        res = await self.session.send_request('blockchain.scriptpubkey.get_history', [spk])
×
1464
        # check response
UNCOV
1465
        assert_list_or_tuple(res)
×
UNCOV
1466
        prev_height = 1
×
UNCOV
1467
        for tx_item in res:
×
UNCOV
1468
            height = assert_dict_contains_field(tx_item, field_name='height')
×
UNCOV
1469
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
UNCOV
1470
            assert_integer(height)
×
UNCOV
1471
            if height < -1:
×
1472
                raise RequestCorrupted(f'{height!r} is not a valid block height')
×
UNCOV
1473
            assert_hash256_str(tx_item['tx_hash'])
×
UNCOV
1474
            if height in (-1, 0):
×
UNCOV
1475
                assert_dict_contains_field(tx_item, field_name='fee')
×
UNCOV
1476
                assert_non_negative_integer(tx_item['fee'])
×
UNCOV
1477
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1478
            else:
1479
                # check monotonicity of heights
1480
                if height < prev_height:
×
1481
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1482
                prev_height = height
×
UNCOV
1483
        if self.active_protocol_tuple >= (1, 6):
×
1484
            # enforce order of mempool txs
UNCOV
1485
            mempool_txs = [tx_item for tx_item in res if tx_item['height'] <= 0]
×
UNCOV
1486
            if mempool_txs != sorted(mempool_txs, key=lambda x: (-x['height'], bytes.fromhex(x['tx_hash']))):
×
1487
                raise RequestCorrupted(f'mempool txs not in canonical order')
×
UNCOV
1488
        hashes = set(map(lambda item: item['tx_hash'], res))
×
UNCOV
1489
        if len(hashes) != len(res):
×
1490
            # Either server is sending garbage... or maybe if server is race-prone
1491
            # a recently mined tx could be included in both last block and mempool?
1492
            # Still, it's simplest to just disregard the response.
NEW
1493
            raise RequestCorrupted(f"server history has non-unique txids for spk={spk}")
×
UNCOV
1494
        return res
×
1495

1496
    async def listunspent_for_spk(self, spk: str) -> List[dict]:
1✔
1497
        # do request
NEW
1498
        res = await self.session.send_request('blockchain.scriptpubkey.listunspent', [spk])
×
1499
        # check response
1500
        assert_list_or_tuple(res)
×
1501
        for utxo_item in res:
×
1502
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1503
            assert_dict_contains_field(utxo_item, field_name='value')
×
1504
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1505
            assert_dict_contains_field(utxo_item, field_name='height')
×
1506
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1507
            assert_non_negative_integer(utxo_item['value'])
×
1508
            assert_non_negative_integer(utxo_item['height'])
×
1509
            assert_hash256_str(utxo_item['tx_hash'])
×
1510
        return res
×
1511

1512
    async def get_balance_for_spk(self, spk: str) -> dict:
1✔
1513
        # do request
NEW
1514
        res = await self.session.send_request('blockchain.scriptpubkey.get_balance', [spk])
×
1515
        # check response
1516
        assert_dict_contains_field(res, field_name='confirmed')
×
1517
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1518
        assert_non_negative_integer(res['confirmed'])
×
1519
        assert_integer(res['unconfirmed'])
×
1520
        return res
×
1521

1522
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1523
        if not is_non_negative_integer(tx_height):
×
1524
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1525
        if not is_non_negative_integer(tx_pos):
×
1526
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1527
        # do request
1528
        res = await self.session.send_request(
×
1529
            'blockchain.transaction.id_from_pos',
1530
            [tx_height, tx_pos, merkle],
1531
        )
1532
        # check response
1533
        if merkle:
×
1534
            assert_dict_contains_field(res, field_name='tx_hash')
×
1535
            assert_dict_contains_field(res, field_name='merkle')
×
1536
            assert_hash256_str(res['tx_hash'])
×
1537
            assert_list_or_tuple(res['merkle'])
×
1538
            for node_hash in res['merkle']:
×
1539
                assert_hash256_str(node_hash)
×
1540
        else:
1541
            assert_hash256_str(res)
×
1542
        return res
×
1543

1544
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1545
        # do request
1546
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1547
        # check response
1548
        assert_list_or_tuple(res)
×
1549
        prev_fee = float('inf')
×
1550
        for fee, s in res:
×
1551
            assert_non_negative_int_or_float(fee)
×
1552
            assert_non_negative_integer(s)
×
1553
            if fee >= prev_fee:  # check monotonicity
×
1554
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1555
            prev_fee = fee
×
1556
        return res
×
1557

1558
    async def get_server_banner(self) -> str:
1✔
1559
        # do request
1560
        res = await self.session.send_request('server.banner')
×
1561
        # check response
1562
        if not isinstance(res, str):
×
1563
            raise RequestCorrupted(f'{res!r} should be a str')
×
1564
        return res
×
1565

1566
    async def get_donation_address(self) -> str:
1✔
1567
        # do request
1568
        res = await self.session.send_request('server.donation_address')
×
1569
        # check response
1570
        if not res:  # ignore empty string
×
1571
            return ''
×
1572
        if not isinstance(res, str):
×
1573
            raise RequestCorrupted(f'{res!r} should be a str')
×
1574
        address = res.removeprefix('bitcoin:')
×
1575
        if not bitcoin.is_address(address):
×
1576
            # note: do not hard-fail -- allow server to use future-type
1577
            #       bitcoin address we do not recognize
1578
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1579
            return ''
×
1580
        return address
×
1581

1582
    async def get_relay_fee(self) -> int:
1✔
1583
        """Returns the min relay feerate in sat/kbyte."""
1584
        # do request
1585
        if self.active_protocol_tuple >= (1, 6):
×
1586
            res = await self.session.send_request('mempool.get_info')
×
1587
            minrelaytxfee = assert_dict_contains_field(res, field_name='minrelaytxfee')
×
1588
        else:
1589
            minrelaytxfee = await self.session.send_request('blockchain.relayfee')
×
1590
        # check response
1591
        assert_non_negative_int_or_float(minrelaytxfee)
×
1592
        relayfee = int(minrelaytxfee * bitcoin.COIN)
×
1593
        relayfee = max(0, relayfee)
×
1594
        return relayfee
×
1595

1596
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1597
        """Returns a feerate estimate for getting confirmed within
1598
        num_blocks blocks, in sat/kbyte.
1599
        Returns -1 if the server could not provide an estimate.
1600
        """
UNCOV
1601
        if not is_non_negative_integer(num_blocks):
×
1602
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1603
        # do request
UNCOV
1604
        try:
×
UNCOV
1605
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1606
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1607
            # The protocol spec says the server itself should already have returned -1
1608
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1609
            # and sends an error instead. Convert it here:
1610
            if "cannot estimate fee" in e.message:
×
1611
                res = -1
×
1612
            else:
1613
                raise
×
1614
        except aiorpcx.jsonrpc.RPCError as e:
×
1615
            # The protocol spec says the server itself should already have returned -1
1616
            # if it cannot provide an estimate. "Fulcrum" often sends:
1617
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1618
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1619
                res = -1
×
1620
            else:
1621
                raise
×
1622
        # check response
UNCOV
1623
        if res != -1:
×
UNCOV
1624
            assert_non_negative_int_or_float(res)
×
UNCOV
1625
            res = int(res * bitcoin.COIN)
×
UNCOV
1626
        return res
×
1627

1628

1629
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1630
    chain_bad = blockchain.check_header(header)
1✔
1631
    if chain_bad:
1✔
1632
        raise Exception('bad_header must not check!')
×
1633

1634

1635
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1636
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1637
    # So, we use substring matching to grok the error message.
1638
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1639
    server_msg = str(server_msg)
×
1640
    server_msg = server_msg.replace("\n", r"\n")
×
1641

1642
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1643
    script_error_messages = {
×
1644
        r"Script evaluated without error but finished with a false/empty top stack element",
1645
        r"Script failed an OP_VERIFY operation",
1646
        r"Script failed an OP_EQUALVERIFY operation",
1647
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1648
        r"Script failed an OP_CHECKSIGVERIFY operation",
1649
        r"Script failed an OP_NUMEQUALVERIFY operation",
1650
        r"Script is too big",
1651
        r"Push value size limit exceeded",
1652
        r"Operation limit exceeded",
1653
        r"Stack size limit exceeded",
1654
        r"Signature count negative or greater than pubkey count",
1655
        r"Pubkey count negative or limit exceeded",
1656
        r"Opcode missing or not understood",
1657
        r"Attempted to use a disabled opcode",
1658
        r"Operation not valid with the current stack size",
1659
        r"Operation not valid with the current altstack size",
1660
        r"OP_RETURN was encountered",
1661
        r"Invalid OP_IF construction",
1662
        r"Negative locktime",
1663
        r"Locktime requirement not satisfied",
1664
        r"Signature hash type missing or not understood",
1665
        r"Non-canonical DER signature",
1666
        r"Data push larger than necessary",
1667
        r"Only push operators allowed in signatures",
1668
        r"Non-canonical signature: S value is unnecessarily high",
1669
        r"Dummy CHECKMULTISIG argument must be zero",
1670
        r"OP_IF/NOTIF argument must be minimal",
1671
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1672
        r"NOPx reserved for soft-fork upgrades",
1673
        r"Witness version reserved for soft-fork upgrades",
1674
        r"Taproot version reserved for soft-fork upgrades",
1675
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1676
        r"Public key version reserved for soft-fork upgrades",
1677
        r"Public key is neither compressed or uncompressed",
1678
        r"Stack size must be exactly one after execution",
1679
        r"Extra items left on stack after execution",
1680
        r"Witness program has incorrect length",
1681
        r"Witness program was passed an empty witness",
1682
        r"Witness program hash mismatch",
1683
        r"Witness requires empty scriptSig",
1684
        r"Witness requires only-redeemscript scriptSig",
1685
        r"Witness provided for non-witness script",
1686
        r"Using non-compressed keys in segwit",
1687
        r"Invalid Schnorr signature size",
1688
        r"Invalid Schnorr signature hash type",
1689
        r"Invalid Schnorr signature",
1690
        r"Invalid Taproot control block size",
1691
        r"Too much signature validation relative to witness weight",
1692
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1693
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1694
        r"Using OP_CODESEPARATOR in non-witness script",
1695
        r"Signature is found in scriptCode",
1696
    }
1697
    for substring in script_error_messages:
×
1698
        if substring in server_msg:
×
1699
            return substring
×
1700
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1701
    # grep "REJECT_"
1702
    # grep "TxValidationResult"
1703
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1704
    validation_error_messages = {
×
1705
        r"coinbase": None,
1706
        r"tx-size-small": None,
1707
        r"non-final": None,
1708
        r"txn-already-in-mempool": None,
1709
        r"txn-mempool-conflict": None,
1710
        r"txn-already-known": None,
1711
        r"non-BIP68-final": None,
1712
        r"bad-txns-nonstandard-inputs": None,
1713
        r"bad-witness-nonstandard": None,
1714
        r"bad-txns-too-many-sigops": None,
1715
        r"mempool min fee not met":
1716
            ("mempool min fee not met\n" +
1717
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1718
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1719
               "of transactions that all pay higher fees. Try to increase the fee.")),
1720
        r"min relay fee not met": None,
1721
        r"absurdly-high-fee": None,
1722
        r"max-fee-exceeded": None,
1723
        r"too-long-mempool-chain": None,
1724
        r"bad-txns-spends-conflicting-tx": None,
1725
        r"insufficient fee": ("insufficient fee\n" +
1726
             _("Your transaction is trying to replace another one in the mempool but it "
1727
               "does not meet the rules to do so. Try to increase the fee.")),
1728
        r"too many potential replacements": None,
1729
        r"replacement-adds-unconfirmed": None,
1730
        r"mempool full": None,
1731
        r"non-mandatory-script-verify-flag": None,
1732
        r"mandatory-script-verify-flag-failed": None,
1733
        r"Transaction check failed": None,
1734
    }
1735
    for substring in validation_error_messages:
×
1736
        if substring in server_msg:
×
1737
            msg = validation_error_messages[substring]
×
1738
            return msg if msg else substring
×
1739
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1740
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1741
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1742
    # grep "RPC_TRANSACTION"
1743
    # grep "RPC_DESERIALIZATION_ERROR"
1744
    # grep "TransactionError"
1745
    rawtransaction_error_messages = {
×
1746
        r"Missing inputs": None,
1747
        r"Inputs missing or spent": None,
1748
        r"transaction already in block chain": None,
1749
        r"Transaction already in block chain": None,
1750
        r"Transaction outputs already in utxo set": None,
1751
        r"TX decode failed": None,
1752
        r"Peer-to-peer functionality missing or disabled": None,
1753
        r"Transaction rejected by AcceptToMemoryPool": None,
1754
        r"AcceptToMemoryPool failed": None,
1755
        r"Transaction rejected by mempool": None,
1756
        r"Mempool internal error": None,
1757
        r"Fee exceeds maximum configured by user": None,
1758
        r"Unspendable output exceeds maximum configured by user": None,
1759
        r"Transaction rejected due to invalid package": None,
1760
    }
1761
    for substring in rawtransaction_error_messages:
×
1762
        if substring in server_msg:
×
1763
            msg = rawtransaction_error_messages[substring]
×
1764
            return msg if msg else substring
×
1765
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1766
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1767
    # grep "REJECT_"
1768
    # grep "TxValidationResult"
1769
    tx_verify_error_messages = {
×
1770
        r"bad-txns-vin-empty": None,
1771
        r"bad-txns-vout-empty": None,
1772
        r"bad-txns-oversize": None,
1773
        r"bad-txns-vout-negative": None,
1774
        r"bad-txns-vout-toolarge": None,
1775
        r"bad-txns-txouttotal-toolarge": None,
1776
        r"bad-txns-inputs-duplicate": None,
1777
        r"bad-cb-length": None,
1778
        r"bad-txns-prevout-null": None,
1779
        r"bad-txns-inputs-missingorspent":
1780
            ("bad-txns-inputs-missingorspent\n" +
1781
             _("You might have a local transaction in your wallet that this transaction "
1782
               "builds on top. You need to either broadcast or remove the local tx.")),
1783
        r"bad-txns-premature-spend-of-coinbase": None,
1784
        r"bad-txns-inputvalues-outofrange": None,
1785
        r"bad-txns-in-belowout": None,
1786
        r"bad-txns-fee-outofrange": None,
1787
    }
1788
    for substring in tx_verify_error_messages:
×
1789
        if substring in server_msg:
×
1790
            msg = tx_verify_error_messages[substring]
×
1791
            return msg if msg else substring
×
1792
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1793
    # grep "reason ="
1794
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1795
    # should come after script_error.cpp (due to e.g. "version")
1796
    policy_error_messages = {
×
1797
        r"version": _("Transaction uses non-standard version."),
1798
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1799
        r"scriptsig-size": None,
1800
        r"scriptsig-not-pushonly": None,
1801
        r"scriptpubkey":
1802
            ("scriptpubkey\n" +
1803
             _("Some of the outputs pay to a non-standard script.")),
1804
        r"bare-multisig": None,
1805
        r"dust":
1806
            (_("Transaction could not be broadcast due to dust outputs.\n"
1807
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1808
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1809
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1810
    }
1811
    for substring in policy_error_messages:
×
1812
        if substring in server_msg:
×
1813
            msg = policy_error_messages[substring]
×
1814
            return msg if msg else substring
×
1815
    # otherwise:
1816
    return _("Unknown error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc