• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Info updated!

spesmilo / electrum / 6226630403686400

05 May 2026 01:01PM UTC coverage: 65.246% (+0.03%) from 65.214%
6226630403686400

push

CirrusCI

web-flow
Merge pull request #10629 from SomberNight/202605_toy_server2

tests: electrum protocol: ToyServer/ToyServerSession

17 of 20 new or added lines in 2 files covered. (85.0%)

1 existing line in 1 file now uncovered.

24736 of 37912 relevant lines covered (65.25%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.83
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str, versiontuple)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
KNOWN_ELEC_PROTOCOL_TRANSPORTS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in KNOWN_ELEC_PROTOCOL_TRANSPORTS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85

86
class NetworkTimeout:
1✔
87
    # seconds
88
    class Generic:
1✔
89
        NORMAL = 30
1✔
90
        RELAXED = 45
1✔
91
        MOST_RELAXED = 600
1✔
92

93
    class Urgent(Generic):
1✔
94
        NORMAL = 10
1✔
95
        RELAXED = 20
1✔
96
        MOST_RELAXED = 60
1✔
97

98

99
def assert_non_negative_integer(val: Any) -> None:
1✔
100
    if not is_non_negative_integer(val):
1✔
101
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
102

103

104
def assert_integer(val: Any) -> None:
1✔
105
    if not is_integer(val):
1✔
106
        raise RequestCorrupted(f'{val!r} should be an integer')
×
107

108

109
def assert_int_or_float(val: Any) -> None:
1✔
110
    if not is_int_or_float(val):
×
111
        raise RequestCorrupted(f'{val!r} should be int or float')
×
112

113

114
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
115
    if not is_non_negative_int_or_float(val):
1✔
116
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
117

118

119
def assert_hash256_str(val: Any) -> None:
1✔
120
    if not is_hash256_str(val):
1✔
121
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
122

123

124
def assert_hex_str(val: Any) -> None:
1✔
125
    if not is_hex_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
127

128

129
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
130
    if not isinstance(d, dict):
1✔
131
        raise RequestCorrupted(f'{d!r} should be a dict')
×
132
    if field_name not in d:
1✔
133
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
134
    return d[field_name]
1✔
135

136

137
def assert_list_or_tuple(val: Any) -> None:
1✔
138
    if not isinstance(val, (list, tuple)):
1✔
139
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
140

141

142
def protocol_tuple(s: Any) -> tuple[int, ...]:
1✔
143
    """Converts a protocol version number, such as "1.0" to a tuple (1, 0).
144

145
    If the version number is bad, (0, ) indicating version 0 is returned.
146
    """
147
    try:
1✔
148
        assert isinstance(s, str)
1✔
149
        return versiontuple(s)
1✔
150
    except Exception:
×
151
        return (0, )
×
152

153

154
class ChainResolutionMode(enum.Enum):
1✔
155
    CATCHUP = enum.auto()
1✔
156
    BACKWARD = enum.auto()
1✔
157
    BINARY = enum.auto()
1✔
158
    FORK = enum.auto()
1✔
159
    NO_FORK = enum.auto()
1✔
160

161

162
class NotificationSession(RPCSession):
1✔
163

164
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
165
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
166
        self.subscriptions = defaultdict(list)
1✔
167
        self.cache = {}
1✔
168
        self._msg_counter = itertools.count(start=1)
1✔
169
        self.interface = interface
1✔
170
        self.taskgroup = interface.taskgroup
1✔
171
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
172

173
        # To log pre-processed json traffic, uncomment:
174
        #self.logger.setLevel(logging.DEBUG)  # from aiorpcx
175
        #self.verbosity = 4
176

177
    async def handle_request(self, request):
1✔
178
        self.maybe_log(f"--> {request}")
1✔
179
        try:
1✔
180
            if isinstance(request, Notification):
1✔
181
                params, result = request.args[:-1], request.args[-1]
1✔
182
                key = self.get_hashable_key_for_rpc_call(request.method, params)
1✔
183
                if key in self.subscriptions:
1✔
184
                    self.cache[key] = result
1✔
185
                    for queue in self.subscriptions[key]:
1✔
186
                        await queue.put(request.args)
1✔
187
                else:
188
                    raise Exception(f'unexpected notification')
×
189
            else:
190
                raise Exception(f'unexpected request. not a notification')
×
191
        except Exception as e:
×
192
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
193
            await self.close()
×
194

195
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
196
        # note: semaphores/timeouts/backpressure etc are handled by
197
        # aiorpcx. the timeout arg here in most cases should not be set
198
        msg_id = next(self._msg_counter)
1✔
199
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
200
        try:
1✔
201
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
202
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
203
            response = await util.wait_for2(
1✔
204
                super().send_request(*args, **kwargs),
205
                timeout)
206
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
207
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
208
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
209
        except CodeMessageError as e:
1✔
210
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
211
            raise
1✔
212
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
213
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
214
            raise
×
215
        else:
216
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
217
            return response
1✔
218

219
    def set_default_timeout(self, timeout):
1✔
220
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
221
        self.sent_request_timeout = timeout
1✔
222
        assert hasattr(self, "max_send_delay")        # in base class
1✔
223
        self.max_send_delay = timeout
1✔
224

225
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
226
        # note: until the cache is written for the first time,
227
        # each 'subscribe' call might make a request on the network.
228
        key = self.get_hashable_key_for_rpc_call(method, params)
1✔
229
        self.subscriptions[key].append(queue)
1✔
230
        if key in self.cache:
1✔
231
            result = self.cache[key]
×
232
        else:
233
            result = await self.send_request(method, params)
1✔
234
            self.cache[key] = result
1✔
235
        await queue.put(params + [result])
1✔
236

237
    def unsubscribe(self, queue):
1✔
238
        """Unsubscribe a callback to free object references to enable GC."""
239
        # note: we can't unsubscribe from the server, so we keep receiving
240
        # subsequent notifications
241
        for v in self.subscriptions.values():
1✔
242
            if queue in v:
1✔
243
                v.remove(queue)
1✔
244

245
    @classmethod
1✔
246
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
247
        """Hashable index for subscriptions and cache"""
248
        return str(method) + repr(params)
1✔
249

250
    def maybe_log(self, msg: str) -> None:
1✔
251
        if not self.interface: return
1✔
252
        if self.interface.debug or self.interface.network.debug:
1✔
253
            self.interface.logger.debug(msg)
1✔
254

255
    def default_framer(self):
1✔
256
        # overridden so that max_size can be customized
257
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
258
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
259
        return NewlineFramer(max_size=max_size)
1✔
260

261
    async def close(self, *, force_after: int = None):
1✔
262
        """Closes the connection and waits for it to be closed.
263
        We try to flush buffered data to the wire, which can take some time.
264
        """
265
        if force_after is None:
1✔
266
            # We give up after a while and just abort the connection.
267
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
268
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
269
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
270
            #       wait until this timeout is triggered
271
            force_after = 1  # seconds
1✔
272
        await super().close(force_after=force_after)
1✔
273

274

275
class NetworkException(Exception): pass
1✔
276

277

278
class GracefulDisconnect(NetworkException):
1✔
279
    log_level = logging.INFO
1✔
280

281
    def __init__(self, *args, log_level=None, **kwargs):
1✔
282
        Exception.__init__(self, *args, **kwargs)
1✔
283
        if log_level is not None:
1✔
284
            self.log_level = log_level
×
285

286

287
class RequestTimedOut(GracefulDisconnect):
1✔
288
    def __str__(self):
1✔
289
        return _("Network request timed out.")
×
290

291

292
class RequestCorrupted(Exception): pass
1✔
293

294
class ErrorParsingSSLCert(Exception): pass
1✔
295
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
296
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
297
class InvalidOptionCombination(Exception): pass
1✔
298
class ConnectError(NetworkException): pass
1✔
299

300

301
class TxBroadcastError(NetworkException):
1✔
302
    def get_message_for_gui(self):
1✔
303
        raise NotImplementedError()
×
304

305

306
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
307
    def get_message_for_gui(self):
1✔
308
        return "{}\n{}\n\n{}" \
×
309
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
310
                    _("Consider trying to connect to a different server, or updating Electrum."),
311
                    str(self))
312

313

314
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
315
    def get_message_for_gui(self):
1✔
316
        return "{}\n{}\n\n{}" \
×
317
            .format(_("The server returned an error when broadcasting the transaction."),
318
                    _("Consider trying to connect to a different server, or updating Electrum."),
319
                    str(self))
320

321

322
class TxBroadcastUnknownError(TxBroadcastError):
1✔
323
    def get_message_for_gui(self):
1✔
324
        return "{}\n{}" \
×
325
            .format(_("Unknown error when broadcasting the transaction."),
326
                    _("Consider trying to connect to a different server, or updating Electrum."))
327

328

329
class _RSClient(RSClient):
1✔
330
    async def create_connection(self):
1✔
331
        try:
1✔
332
            return await super().create_connection()
1✔
333
        except OSError as e:
×
334
            # note: using "from e" here will set __cause__ of ConnectError
335
            raise ConnectError(e) from e
×
336

337

338
class PaddedRSTransport(RSTransport):
1✔
339
    """A raw socket transport that provides basic countermeasures against traffic analysis
340
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
341
    (it is assumed that a network observer does not see plaintext transport contents,
342
    due to it being wrapped e.g. in TLS)
343
    """
344

345
    MIN_PACKET_SIZE = 1024
1✔
346
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
347
    # (unpadded) amount of bytes sent instantly before beginning with polling.
348
    # This makes the initial handshake where a few small messages are exchanged faster.
349
    WARMUP_BUDGET_SIZE = 1024
1✔
350

351
    session: Optional['RPCSession']
1✔
352

353
    def __init__(self, *args, **kwargs):
1✔
354
        RSTransport.__init__(self, *args, **kwargs)
1✔
355
        self._sbuffer = bytearray()  # "send buffer"
1✔
356
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
357
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
358
        self._last_send = time.monotonic()
1✔
359
        self._force_send = False  # type: bool
1✔
360

361
    # note: this does not call super().write() but is a complete reimplementation
362
    async def write(self, message):
1✔
363
        await self._can_send.wait()
1✔
364
        if self.is_closing():
1✔
365
            return
×
366
        framed_message = self._framer.frame(message)
1✔
367
        self._sbuffer += framed_message
1✔
368
        self._sbuffer_has_data_evt.set()
1✔
369
        self._maybe_consume_sbuffer()
1✔
370

371
    def _maybe_consume_sbuffer(self) -> None:
1✔
372
        """Maybe take some data from sbuffer and send it on the wire."""
373
        if not self._can_send.is_set() or self.is_closing():
1✔
374
            return
1✔
375
        buf = self._sbuffer
1✔
376
        if not buf:
1✔
377
            return
1✔
378
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
379
        if not (
1✔
380
            self._force_send
381
            or len(buf) >= self.MIN_PACKET_SIZE
382
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
383
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
384
        ):
385
            return
×
386
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
387
        # either (1) pad length to next power of two, to create "lsize" packet:
388
        payload_lsize = len(buf)
1✔
389
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
390
        npad_lsize = total_lsize - payload_lsize
1✔
391
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
392
        # and create a packet with half that size ("ssize", s for small)
393
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
394
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
395
        if payload_ssize != -1:
1✔
396
            payload_ssize += 1  # for "\n" char
1✔
397
            npad_ssize = total_ssize - payload_ssize
1✔
398
        else:
399
            npad_ssize = float("inf")
×
400
        # decide between (1) and (2):
401
        if self._force_send or npad_lsize <= npad_ssize:
1✔
402
            # (1) create "lsize" packet: consume full buffer
403
            npad = npad_lsize
1✔
404
            p_idx = payload_lsize
1✔
405
        else:
406
            # (2) create "ssize" packet: consume some, but defer some for later
407
            npad = npad_ssize
×
408
            p_idx = payload_ssize
×
409
        # pad by adding spaces near end
410
        # self.session.maybe_log(
411
        #     f"PaddedRSTransport. calling low-level write(). "
412
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
413
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
414
        # )
415
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
416
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
417
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
418
        self._asyncio_transport.write(buf2)
1✔
419
        self._last_send = time.monotonic()
1✔
420
        del self._sbuffer[:p_idx]
1✔
421
        if not self._sbuffer:
1✔
422
            self._sbuffer_has_data_evt.clear()
1✔
423

424
    async def _poll_sbuffer(self):
1✔
425
        while not self.is_closing():
1✔
426
            await self._can_send.wait()
1✔
427
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
428
            self._maybe_consume_sbuffer()
1✔
429
            # If there is still data in the buffer, sleep until it would time out.
430
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
431
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
432
            if len(self._sbuffer) > 0:
1✔
433
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
434
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
435
                await asyncio.sleep(timeout_rel)
×
436

437
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
438
        super().connection_made(transport)
1✔
439
        if isinstance(self.session, NotificationSession):
1✔
440
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
441
            self._sbuffer_task = self.loop.create_task(coro)
1✔
442
        else:
443
            # This a short-lived "fetch_certificate"-type session.
444
            # No polling here, we always force-empty the buffer.
445
            self._force_send = True
×
446

447
    async def close(self, *args, **kwargs):
1✔
448
        '''Close the connection and return when closed.'''
449
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
450
        self._force_send = True
1✔
451
        self._maybe_consume_sbuffer()
1✔
452
        await super().close(*args, **kwargs)
1✔
453

454

455
class ServerAddr:
1✔
456

457
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
458
        assert isinstance(host, str), repr(host)
1✔
459
        if protocol is None:
1✔
460
            protocol = 's'
×
461
        if not host:
1✔
462
            raise ValueError('host must not be empty')
×
463
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
464
            host = host[1:-1]
1✔
465
        try:
1✔
466
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
467
        except Exception as e:
1✔
468
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
469
        if protocol not in KNOWN_ELEC_PROTOCOL_TRANSPORTS:
1✔
470
            raise ValueError(f"invalid network protocol: {protocol}")
×
471
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
472
        self.port = int(net_addr.port)
1✔
473
        self.protocol = protocol
1✔
474
        self._net_addr_str = str(net_addr)
1✔
475

476
    @classmethod
1✔
477
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
478
        """Constructs a ServerAddr or raises ValueError."""
479
        # host might be IPv6 address, hence do rsplit:
480
        host, port, protocol = str(s).rsplit(':', 2)
1✔
481
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
482

483
    @classmethod
1✔
484
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
485
        """Construct ServerAddr from str, guessing missing details.
486
        Does not raise - just returns None if guessing failed.
487
        Ongoing compatibility not guaranteed.
488
        """
489
        if not s:
1✔
490
            return None
×
491
        host = ""
1✔
492
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
493
            host_end = s.index("]")
1✔
494
            host = s[1:host_end]
1✔
495
            s = s[host_end+1:]
1✔
496
        items = str(s).rsplit(':', 2)
1✔
497
        if len(items) < 2:
1✔
498
            return None  # although maybe we could guess the port too?
1✔
499
        host = host or items[0]
1✔
500
        port = items[1]
1✔
501
        if len(items) >= 3:
1✔
502
            protocol = items[2]
1✔
503
        else:
504
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
505
        try:
1✔
506
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
507
        except ValueError:
1✔
508
            return None
1✔
509

510
    def to_friendly_name(self) -> str:
1✔
511
        # note: this method is closely linked to from_str_with_inference
512
        if self.protocol == 's':  # hide trailing ":s"
1✔
513
            return self.net_addr_str()
1✔
514
        return str(self)
1✔
515

516
    def __str__(self):
1✔
517
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
518

519
    def to_json(self) -> str:
1✔
520
        return str(self)
×
521

522
    def __repr__(self):
1✔
523
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
524

525
    def net_addr_str(self) -> str:
1✔
526
        return self._net_addr_str
1✔
527

528
    def __eq__(self, other):
1✔
529
        if not isinstance(other, ServerAddr):
1✔
530
            return False
×
531
        return (self.host == other.host
1✔
532
                and self.port == other.port
533
                and self.protocol == other.protocol)
534

535
    def __ne__(self, other):
1✔
536
        return not (self == other)
×
537

538
    def __hash__(self):
1✔
539
        return hash((self.host, self.port, self.protocol))
×
540

541

542
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
543
    filename = host
1✔
544
    try:
1✔
545
        ip = ip_address(host)
1✔
546
    except ValueError:
1✔
547
        pass
1✔
548
    else:
549
        if isinstance(ip, IPv6Address):
1✔
550
            filename = f"ipv6_{ip.packed.hex()}"
×
551
    return os.path.join(config.path, 'certs', filename)
1✔
552

553

554
class Interface(Logger):
1✔
555

556
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
557
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
558
        self.ready = network.asyncio_loop.create_future()
1✔
559
        self.got_disconnected = asyncio.Event()
1✔
560
        self._blockchain_updated = asyncio.Event()
1✔
561
        self.server = server
1✔
562
        Logger.__init__(self)
1✔
563
        assert network.config.path
1✔
564
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
565
        self.blockchain = None  # type: Optional[Blockchain]
1✔
566
        self._requested_chunks = set()  # type: Set[int]
1✔
567
        self.network = network
1✔
568
        self.session = None  # type: Optional[NotificationSession]
1✔
569
        self._ipaddr_bucket = None
1✔
570
        # Set up proxy.
571
        # - for servers running on localhost, the proxy is not used. If user runs their own server
572
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
573
        #   note: we could maybe relax this further and bypass the proxy for all private
574
        #         addresses...? e.g. 192.168.x.x
575
        if util.is_localhost(server.host):
1✔
576
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
577
            self.proxy = None
1✔
578
        else:
579
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
580

581
        # Latest block header and corresponding height, as claimed by the server.
582
        # Note that these values are updated before they are verified.
583
        # Especially during initial header sync, verification can take a long time.
584
        # Failing verification will get the interface closed.
585
        self.tip_header = None  # type: Optional[dict]
1✔
586
        self.tip = 0
1✔
587

588
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
589
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
590

591
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
592

593
        self.active_protocol_tuple = (0,)  # type: Optional[tuple[int, ...]]
1✔
594

595
        # Dump network messages (only for this interface).  Set at runtime from the console.
596
        self.debug = False
1✔
597

598
        self.taskgroup = OldTaskGroup()
1✔
599

600
        async def spawn_task():
1✔
601
            task = await self.network.taskgroup.spawn(self.run())
1✔
602
            task.set_name(f"interface::{str(server)}")
1✔
603
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
604

605
    @property
1✔
606
    def host(self):
1✔
607
        return self.server.host
1✔
608

609
    @property
1✔
610
    def port(self):
1✔
611
        return self.server.port
1✔
612

613
    @property
1✔
614
    def protocol(self):
1✔
615
        return self.server.protocol
1✔
616

617
    def diagnostic_name(self):
1✔
618
        return self.server.net_addr_str()
1✔
619

620
    def __str__(self):
1✔
621
        return f"<Interface {self.diagnostic_name()}>"
×
622

623
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
624
        """Given a CA enforcing SSL context, returns True if the connection
625
        can be established. Returns False if the server has a self-signed
626
        certificate but otherwise is okay. Any other failures raise.
627
        """
628
        try:
×
629
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
630
        except ConnectError as e:
×
631
            cause = e.__cause__
×
632
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
633
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
634
                    and cause.verify_code == 18):  # "self signed certificate"
635
                # Good. We will use this server as self-signed.
636
                return False
×
637
            # Not good. Cannot use this server.
638
            raise
×
639
        # Good. We will use this server as CA-signed.
640
        return True
×
641

642
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
643
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
644
        if ca_signed:
×
645
            if self._get_expected_fingerprint():
×
646
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
647
            with open(self.cert_path, 'w') as f:
×
648
                # empty file means this is CA signed, not self-signed
649
                f.write('')
×
650
        else:
651
            await self._save_certificate()
×
652

653
    def _is_saved_ssl_cert_available(self):
1✔
654
        if not os.path.exists(self.cert_path):
×
655
            return False
×
656
        with open(self.cert_path, 'r') as f:
×
657
            contents = f.read()
×
658
        if contents == '':  # CA signed
×
659
            if self._get_expected_fingerprint():
×
660
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
661
            return True
×
662
        # pinned self-signed cert
663
        try:
×
664
            b = pem.dePem(contents, 'CERTIFICATE')
×
665
        except SyntaxError as e:
×
666
            self.logger.info(f"error parsing already saved cert: {e}")
×
667
            raise ErrorParsingSSLCert(e) from e
×
668
        try:
×
669
            x = x509.X509(b)
×
670
        except Exception as e:
×
671
            self.logger.info(f"error parsing already saved cert: {e}")
×
672
            raise ErrorParsingSSLCert(e) from e
×
673
        try:
×
674
            x.check_date()
×
675
        except x509.CertificateError as e:
×
676
            self.logger.info(f"certificate has expired: {e}")
×
677
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
678
            return False
×
679
        self._verify_certificate_fingerprint(bytes(b))
×
680
        return True
×
681

682
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
683
        if self.protocol != 's':
1✔
684
            # using plaintext TCP
685
            return None
1✔
686

687
        # see if we already have cert for this server; or get it for the first time
688
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
689
        if not self._is_saved_ssl_cert_available():
×
690
            try:
×
691
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
692
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
693
                raise ErrorGettingSSLCertFromServer(e) from e
×
694
        # now we have a file saved in our certificate store
695
        siz = os.stat(self.cert_path).st_size
×
696
        if siz == 0:
×
697
            # CA signed cert
698
            sslc = ca_sslc
×
699
        else:
700
            # pinned self-signed cert
701
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
702
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
703
            #       We explicitly disable it as it breaks lots of servers.
704
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
705
            sslc.check_hostname = False
×
706
        return sslc
×
707

708
    def handle_disconnect(func):
1✔
709
        @functools.wraps(func)
1✔
710
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
711
            try:
1✔
712
                return await func(self, *args, **kwargs)
1✔
713
            except GracefulDisconnect as e:
×
714
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
715
            except aiorpcx.jsonrpc.RPCError as e:
×
716
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
717
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
718
            finally:
719
                self.got_disconnected.set()
1✔
720
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
721
                # in case the "with taskgroup" ctx mgr never got a chance to run:
722
                await self.taskgroup.cancel_remaining()
1✔
723
                await self.network.connection_down(self)
1✔
724
                # if was not 'ready' yet, schedule waiting coroutines:
725
                self.ready.cancel()
1✔
726
        return wrapper_func
1✔
727

728
    @ignore_exceptions  # do not kill network.taskgroup
1✔
729
    @log_exceptions
1✔
730
    @handle_disconnect
1✔
731
    async def run(self):
1✔
732
        try:
1✔
733
            ssl_context = await self._get_ssl_context()
1✔
734
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
735
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
736
            return
×
737
        try:
1✔
738
            await self.open_session(ssl_context=ssl_context)
1✔
739
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
740
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
741
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
742
                    and self.is_main_server() and not self.network.auto_connect):
743
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
744
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
745
            else:
746
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
747
            return
1✔
748

749
    def _mark_ready(self) -> None:
1✔
750
        if self.ready.cancelled():
1✔
751
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
752
        if self.ready.done():
1✔
753
            return
1✔
754

755
        assert self.tip_header
1✔
756
        chain = blockchain.check_header(self.tip_header)
1✔
757
        if not chain:
1✔
758
            self.blockchain = blockchain.get_best_chain()
1✔
759
        else:
760
            self.blockchain = chain
×
761
        assert self.blockchain is not None
1✔
762

763
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
764

765
        self.ready.set_result(1)
1✔
766

767
    def is_connected_and_ready(self) -> bool:
1✔
768
        return self.ready.done() and not self.got_disconnected.is_set()
×
769

770
    async def _save_certificate(self) -> None:
1✔
771
        if not os.path.exists(self.cert_path):
×
772
            # we may need to retry this a few times, in case the handshake hasn't completed
773
            for _ in range(10):
×
774
                dercert = await self._fetch_certificate()
×
775
                if dercert:
×
776
                    self.logger.info("succeeded in getting cert")
×
777
                    self._verify_certificate_fingerprint(dercert)
×
778
                    with open(self.cert_path, 'w') as f:
×
779
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
780
                        # workaround android bug
781
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
782
                        f.write(cert)
×
783
                        # even though close flushes, we can't fsync when closed.
784
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
785
                        # fsync writes to OS buffer to disk
786
                        f.flush()
×
787
                        os.fsync(f.fileno())
×
788
                    break
×
789
                await asyncio.sleep(1)
×
790
            else:
791
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
792

793
    async def _fetch_certificate(self) -> bytes:
1✔
794
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
795
        sslc.check_hostname = False
×
796
        sslc.verify_mode = ssl.CERT_NONE
×
797
        async with _RSClient(
×
798
            session_factory=RPCSession,
799
            host=self.host, port=self.port,
800
            ssl=sslc,
801
            proxy=self.proxy,
802
            transport=PaddedRSTransport,
803
        ) as session:
804
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
805
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
806
            return ssl_object.getpeercert(binary_form=True)
×
807

808
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
809
        if self.is_main_server():
×
810
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
811
        return None
×
812

813
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
814
        expected_fingerprint = self._get_expected_fingerprint()
×
815
        if not expected_fingerprint:
×
816
            return
×
817
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
818
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
819
        if not fingerprints_match:
×
820
            util.trigger_callback('cert_mismatch')
×
821
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
822
        self.logger.info("cert fingerprint verification passed")
×
823

824
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
825
        """Populate header cache for block heights in range [from_height, to_height]."""
826
        assert from_height <= to_height, (from_height, to_height)
1✔
827
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
828
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
829
            # cache already has all requested headers
830
            return
1✔
831
        # use lower timeout as we usually have network.bhi_lock here
832
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
833
        count = to_height - from_height + 1
1✔
834
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
835
        for idx, raw_header in enumerate(headers):
1✔
836
            header_height = from_height + idx
1✔
837
            self._headers_cache[header_height] = raw_header
1✔
838

839
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
840
        if not is_non_negative_integer(height):
1✔
841
            raise Exception(f"{repr(height)} is not a block height")
×
842
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
843
        # use lower timeout as we usually have network.bhi_lock here
844
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
845
        if raw_header := self._headers_cache.get(height):
1✔
846
            return blockchain.deserialize_header(raw_header, height)
1✔
847
        self.logger.info(f'requesting block header {height} in {mode=}')
×
848
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
849
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
850

851
    async def get_block_headers(
1✔
852
        self,
853
        *,
854
        start_height: int,
855
        count: int,
856
        timeout=None,
857
        mode: Optional[ChainResolutionMode] = None,
858
    ) -> Sequence[bytes]:
859
        """Request a number of consecutive block headers, starting at `start_height`.
860
        `count` is the num of requested headers, BUT note the server might return fewer than this
861
        (if range would extend beyond its tip).
862
        note: the returned headers are not verified or parsed at all.
863
        """
864
        if not is_non_negative_integer(start_height):
1✔
865
            raise Exception(f"{repr(start_height)} is not a block height")
×
866
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
867
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
868
        self.logger.info(
1✔
869
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
870
            + (f" (in {mode=})" if mode is not None else "")
871
        )
872
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
873
        # check response
874
        assert_dict_contains_field(res, field_name='count')
1✔
875
        assert_dict_contains_field(res, field_name='max')
1✔
876
        assert_non_negative_integer(res['count'])
1✔
877
        assert_non_negative_integer(res['max'])
1✔
878
        if self.active_protocol_tuple >= (1, 6):
1✔
879
            hex_headers_list = assert_dict_contains_field(res, field_name='headers')
1✔
880
            assert_list_or_tuple(hex_headers_list)
1✔
881
            for item in hex_headers_list:
1✔
882
                assert_hex_str(item)
1✔
883
                if len(item) != HEADER_SIZE * 2:
1✔
884
                    raise RequestCorrupted(f"invalid header size. got {len(item)//2}, expected {HEADER_SIZE}")
×
885
            if len(hex_headers_list) != res['count']:
1✔
886
                raise RequestCorrupted(f"{len(hex_headers_list)=} != {res['count']=}")
×
887
            headers = list(bfh(hex_header) for hex_header in hex_headers_list)
1✔
888
        else: # proto 1.4
889
            hex_headers_concat = assert_dict_contains_field(res, field_name='hex')
×
890
            assert_hex_str(hex_headers_concat)
×
891
            if len(hex_headers_concat) != HEADER_SIZE * 2 * res['count']:
×
892
                raise RequestCorrupted('inconsistent chunk hex and count')
×
893
            headers = list(util.chunks(bfh(hex_headers_concat), size=HEADER_SIZE))
×
894
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
895
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
896
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
897
        if res['count'] > count:
1✔
898
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
899
        elif res['count'] < count:
1✔
900
            # we only tolerate getting fewer headers if it is due to reaching the tip
901
            end_height = start_height + res['count'] - 1
×
902
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
903
                raise RequestCorrupted(
×
904
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
905
        # checks done.
906
        return headers
1✔
907

908
    async def request_chunk_below_max_checkpoint(
1✔
909
        self,
910
        *,
911
        height: int,
912
    ) -> None:
913
        if not is_non_negative_integer(height):
×
914
            raise Exception(f"{repr(height)} is not a block height")
×
915
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
916
        index = height // CHUNK_SIZE
×
917
        if index in self._requested_chunks:
×
918
            return None
×
919
        self.logger.debug(f"requesting chunk from height {height}")
×
920
        try:
×
921
            self._requested_chunks.add(index)
×
922
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
923
        finally:
924
            self._requested_chunks.discard(index)
×
925
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
926
        if not conn:
×
927
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
928
        return None
×
929

930
    async def _fast_forward_chain(
1✔
931
        self,
932
        *,
933
        height: int,  # usually local chain tip + 1
934
        tip: int,  # server tip. we should not request past this.
935
    ) -> int:
936
        """Request some headers starting at `height` to grow the blockchain of this interface.
937
        Returns number of headers we managed to connect, starting at `height`.
938
        """
939
        if not is_non_negative_integer(height):
×
940
            raise Exception(f"{repr(height)} is not a block height")
×
941
        if not is_non_negative_integer(tip):
×
942
            raise Exception(f"{repr(tip)} is not a block height")
×
943
        if not (height > constants.net.max_checkpoint()
×
944
                or height == 0 == constants.net.max_checkpoint()):
945
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
946
        assert height <= tip, f"{height=} must be <= {tip=}"
×
947
        # Request a few chunks of headers concurrently.
948
        # tradeoffs:
949
        # - more chunks: higher memory requirements
950
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
951
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
952
        async with OldTaskGroup() as group:
×
953
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
954
            index0 = height // CHUNK_SIZE
×
955
            for chunk_cnt in range(10):
×
956
                index = index0 + chunk_cnt
×
957
                start_height = index * CHUNK_SIZE
×
958
                if start_height > tip:
×
959
                    break
×
960
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
961
                size = end_height - start_height + 1
×
962
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
963
        # try to connect chunks
964
        num_headers = 0
×
965
        for index, task in tasks:
×
966
            headers = task.result()
×
967
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
968
            if not conn:
×
969
                break
×
970
            num_headers += len(headers)
×
971
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
972
        offset = height - index0 * CHUNK_SIZE
×
973
        return max(0, num_headers - offset)
×
974

975
    def is_main_server(self) -> bool:
1✔
976
        return (self.network.interface == self or
1✔
977
                self.network.interface is None and self.network.default_server == self.server)
978

979
    async def open_session(
1✔
980
        self,
981
        *,
982
        ssl_context: Optional[ssl.SSLContext],
983
        exit_early: bool = False,
984
    ):
985
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
986
        async with _RSClient(
1✔
987
            session_factory=session_factory,
988
            host=self.host, port=self.port,
989
            ssl=ssl_context,
990
            proxy=self.proxy,
991
            transport=PaddedRSTransport,
992
        ) as session:
993
            start = time.perf_counter()
1✔
994
            self.session = session  # type: NotificationSession
1✔
995
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
996
            client_prange = [version.PROTOCOL_VERSION_MIN, version.PROTOCOL_VERSION_MAX]
1✔
997
            try:
1✔
998
                ver = await session.send_request('server.version', [self.client_name(), client_prange])
1✔
999
            except aiorpcx.jsonrpc.RPCError as e:
×
1000
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
1001
            if exit_early:
1✔
1002
                return
×
1003
            self.active_protocol_tuple = protocol_tuple(ver[1])
1✔
1004
            client_pmin = protocol_tuple(client_prange[0])
1✔
1005
            client_pmax = protocol_tuple(client_prange[1])
1✔
1006
            if not (client_pmin <= self.active_protocol_tuple <= client_pmax):
1✔
1007
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
1008
                                         f'we asked for {client_prange!r}, they sent {ver[1]!r}')
1009
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
1010
                raise GracefulDisconnect(f'too many connected servers already '
×
1011
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
1012

1013
            try:
1✔
1014
                features = await session.send_request('server.features')
1✔
1015
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
1016
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
1017
                raise GracefulDisconnect(e)
×
1018
            if server_genesis_hash != constants.net.GENESIS:
1✔
1019
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
1020
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
1021

1022
            try:
1✔
1023
                async with self.taskgroup as group:
1✔
1024
                    await group.spawn(self.ping)
1✔
1025
                    await group.spawn(self.request_fee_estimates)
1✔
1026
                    await group.spawn(self.run_fetch_blocks)
1✔
1027
                    await group.spawn(self.monitor_connection)
1✔
1028
            except aiorpcx.jsonrpc.RPCError as e:
1✔
1029
                if e.code in (
×
1030
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
1031
                    JSONRPC.SERVER_BUSY,
1032
                    JSONRPC.METHOD_NOT_FOUND,
1033
                    JSONRPC.INTERNAL_ERROR,
1034
                ):
1035
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1036
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1037
                raise
×
1038
            finally:
1039
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1040

1041
    async def monitor_connection(self):
1✔
1042
        while True:
1✔
1043
            await asyncio.sleep(1)
1✔
1044
            # If the session/transport is no longer open, we disconnect.
1045
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1046
            # note: If the user pulls the ethernet cable or disconnects wifi,
1047
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1048
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1049
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1050
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1051
            #         Hence, in practice the connection issue will only be detected the next time we try
1052
            #         to send a message (plus timeout), which can take minutes...
1053
            if not self.session or self.session.is_closing():
×
1054
                raise GracefulDisconnect('session was closed')
×
1055

1056
    async def ping(self):
1✔
1057
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1058
        # Adding a bit of randomness generates some noise against traffic analysis.
1059
        while True:
1✔
1060
            await asyncio.sleep(random.random() * 300)
1✔
1061
            await self.session.send_request('server.ping')
×
1062
            await self._maybe_send_noise()
×
1063

1064
    async def _maybe_send_noise(self):
1✔
1065
        while random.random() < 0.2:
1✔
1066
            await asyncio.sleep(random.random())
1✔
1067
            await self.session.send_request('server.ping')
×
1068

1069
    async def request_fee_estimates(self):
1✔
1070
        while True:
1✔
1071
            async with OldTaskGroup() as group:
1✔
1072
                fee_tasks = []
1✔
1073
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1074
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1075
            for nblock_target, task in fee_tasks:
1✔
1076
                fee = task.result()
1✔
1077
                if fee < 0: continue
1✔
1078
                assert isinstance(fee, int)
1✔
1079
                self.fee_estimates_eta[nblock_target] = fee
1✔
1080
            self.network.update_fee_estimates()
1✔
1081
            await asyncio.sleep(60)
1✔
1082

1083
    async def close(self, *, force_after: int = None):
1✔
1084
        """Closes the connection and waits for it to be closed.
1085
        We try to flush buffered data to the wire, which can take some time.
1086
        """
1087
        if self.session:
1✔
1088
            await self.session.close(force_after=force_after)
1✔
1089
        # monitor_connection will cancel tasks
1090

1091
    async def run_fetch_blocks(self):
1✔
1092
        header_queue = asyncio.Queue()
1✔
1093
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1094
        while True:
1✔
1095
            item = await header_queue.get()
1✔
1096
            raw_header = item[0]
1✔
1097
            height = raw_header['height']
1✔
1098
            header_bytes = bfh(raw_header['hex'])
1✔
1099
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1100
            self.tip_header = header_dict
1✔
1101
            self.tip = height
1✔
1102
            if self.tip < constants.net.max_checkpoint():
1✔
1103
                raise GracefulDisconnect(
×
1104
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1105
            self._mark_ready()
1✔
1106
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1107
            self._headers_cache[height] = header_bytes
1✔
1108
            try:
1✔
1109
                blockchain_updated = await self._process_header_at_tip()
1✔
1110
            finally:
1111
                self._headers_cache.clear()  # to reduce memory usage
1✔
1112
            # header processing done
1113
            if self.is_main_server() or blockchain_updated:
1✔
1114
                self.logger.info(f"new chain tip. {height=}")
1✔
1115
            if blockchain_updated:
1✔
1116
                util.trigger_callback('blockchain_updated')
1✔
1117
                self._blockchain_updated.set()
1✔
1118
                self._blockchain_updated.clear()
1✔
1119
            util.trigger_callback('network_updated')
1✔
1120
            await self.network.switch_unwanted_fork_interface()
1✔
1121
            await self.network.switch_lagging_interface()
1✔
1122
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1123

1124
    async def _process_header_at_tip(self) -> bool:
1✔
1125
        """Returns:
1126
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1127
        True - new header we didn't have, or reorg
1128
        """
1129
        height, header = self.tip, self.tip_header
1✔
1130
        async with self.network.bhi_lock:
1✔
1131
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1132
                # another interface amended the blockchain
1133
                return False
×
1134
            await self.sync_until(height)
1✔
1135
            return True
1✔
1136

1137
    async def sync_until(
1✔
1138
        self,
1139
        height: int,
1140
        *,
1141
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1142
    ) -> Tuple[ChainResolutionMode, int]:
1143
        if next_height is None:
1✔
1144
            next_height = self.tip
1✔
1145
        last = None  # type: Optional[ChainResolutionMode]
1✔
1146
        while last is None or height <= next_height:
1✔
1147
            prev_last, prev_height = last, height
1✔
1148
            if next_height > height + 144:
1✔
1149
                # We are far from the tip.
1150
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1151
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1152
                num_headers = await self._fast_forward_chain(
×
1153
                    height=height, tip=next_height)
1154
                if num_headers == 0:
×
1155
                    if height <= constants.net.max_checkpoint():
×
1156
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1157
                    last, height = await self.step(height)
×
1158
                    continue
×
1159
                # report progress to gui/etc
1160
                util.trigger_callback('blockchain_updated')
×
1161
                self._blockchain_updated.set()
×
1162
                self._blockchain_updated.clear()
×
1163
                util.trigger_callback('network_updated')
×
1164
                height += num_headers
×
1165
                assert height <= next_height+1, (height, self.tip)
×
1166
                last = ChainResolutionMode.CATCHUP
×
1167
            else:
1168
                # We are close to the tip, so process headers one-by-one.
1169
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1170
                last, height = await self.step(height)
1✔
1171
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1172
        return last, height
1✔
1173

1174
    async def step(
1✔
1175
        self,
1176
        height: int,
1177
    ) -> Tuple[ChainResolutionMode, int]:
1178
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1179
        await self._maybe_warm_headers_cache(
1✔
1180
            from_height=height,
1181
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1182
            mode=ChainResolutionMode.CATCHUP,
1183
        )
1184
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1185

1186
        chain = blockchain.check_header(header)
1✔
1187
        if chain:
1✔
1188
            self.blockchain = chain
1✔
1189
            # note: there is an edge case here that is not handled.
1190
            # we might know the blockhash (enough for check_header) but
1191
            # not have the header itself. e.g. regtest chain with only genesis.
1192
            # this situation resolves itself on the next block
1193
            return ChainResolutionMode.CATCHUP, height+1
1✔
1194

1195
        can_connect = blockchain.can_connect(header)
1✔
1196
        if not can_connect:
1✔
1197
            self.logger.info(f"can't connect new block: {height=}")
1✔
1198
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1199
            chain = blockchain.check_header(header)
1✔
1200
            can_connect = blockchain.can_connect(header)
1✔
1201
            assert chain or can_connect
1✔
1202
        if can_connect:
1✔
1203
            height += 1
1✔
1204
            self.blockchain = can_connect
1✔
1205
            self.blockchain.save_header(header)
1✔
1206
            return ChainResolutionMode.CATCHUP, height
1✔
1207

1208
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1209
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1210

1211
    async def _search_headers_binary(
1✔
1212
        self,
1213
        height: int,
1214
        bad: int,
1215
        bad_header: dict,
1216
        chain: Optional[Blockchain],
1217
    ) -> Tuple[int, int, dict]:
1218
        assert bad == bad_header['block_height']
1✔
1219
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1220

1221
        self.blockchain = chain
1✔
1222
        good = height
1✔
1223
        while True:
1✔
1224
            assert 0 <= good < bad, (good, bad)
1✔
1225
            height = (good + bad) // 2
1✔
1226
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1227
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1228
                await self._maybe_warm_headers_cache(
1✔
1229
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1230
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1231
            chain = blockchain.check_header(header)
1✔
1232
            if chain:
1✔
1233
                self.blockchain = chain
1✔
1234
                good = height
1✔
1235
            else:
1236
                bad = height
1✔
1237
                bad_header = header
1✔
1238
            if good + 1 == bad:
1✔
1239
                break
1✔
1240

1241
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1242
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1243
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1244

1245
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1246
        return good, bad, bad_header
1✔
1247

1248
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1249
        self,
1250
        good: int,
1251
        bad: int,
1252
        bad_header: dict,
1253
    ) -> Tuple[ChainResolutionMode, int]:
1254
        assert good + 1 == bad
1✔
1255
        assert bad == bad_header['block_height']
1✔
1256
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1257
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1258
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1259

1260
        bh = self.blockchain.height()
1✔
1261
        assert bh >= good, (bh, good)
1✔
1262
        if bh == good:
1✔
1263
            height = good + 1
1✔
1264
            self.logger.info(f"catching up from {height}")
1✔
1265
            return ChainResolutionMode.NO_FORK, height
1✔
1266

1267
        # this is a new fork we don't yet have
1268
        height = bad + 1
1✔
1269
        self.logger.info(f"new fork at bad height {bad}")
1✔
1270
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1271
        self.blockchain = b
1✔
1272
        assert b.forkpoint == bad
1✔
1273
        return ChainResolutionMode.FORK, height
1✔
1274

1275
    async def _search_headers_backwards(
1✔
1276
        self,
1277
        height: int,
1278
        *,
1279
        header: dict,
1280
    ) -> Tuple[int, dict, int, dict]:
1281
        async def iterate():
1✔
1282
            nonlocal height, header
1283
            checkp = False
1✔
1284
            if height <= constants.net.max_checkpoint():
1✔
1285
                height = constants.net.max_checkpoint()
1✔
1286
                checkp = True
1✔
1287
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1288
            chain = blockchain.check_header(header)
1✔
1289
            can_connect = blockchain.can_connect(header)
1✔
1290
            if chain or can_connect:
1✔
1291
                return False
1✔
1292
            if checkp:
1✔
1293
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1294
            return True
1✔
1295

1296
        bad, bad_header = height, header
1✔
1297
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1298
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1299
        local_max = max([0] + [x.height() for x in chains])
1✔
1300
        height = min(local_max + 1, height - 1)
1✔
1301
        assert height >= 0
1✔
1302

1303
        await self._maybe_warm_headers_cache(
1✔
1304
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1305

1306
        delta = 2
1✔
1307
        while await iterate():
1✔
1308
            bad, bad_header = height, header
1✔
1309
            height -= delta
1✔
1310
            delta *= 2
1✔
1311

1312
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1313
        self.logger.info(f"exiting backward mode at {height}")
1✔
1314
        return height, header, bad, bad_header
1✔
1315

1316
    @classmethod
1✔
1317
    def client_name(cls) -> str:
1✔
UNCOV
1318
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1319

1320
    def is_tor(self):
1✔
1321
        return self.host.endswith('.onion')
×
1322

1323
    def ip_addr(self) -> Optional[str]:
1✔
1324
        session = self.session
×
1325
        if not session: return None
×
1326
        peer_addr = session.remote_address()
×
1327
        if not peer_addr: return None
×
1328
        return str(peer_addr.host)
×
1329

1330
    def bucket_based_on_ipaddress(self) -> str:
1✔
1331
        def do_bucket():
×
1332
            if self.is_tor():
×
1333
                return BUCKET_NAME_OF_ONION_SERVERS
×
1334
            try:
×
1335
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1336
            except ValueError:
×
1337
                return ''
×
1338
            if not ip_addr:
×
1339
                return ''
×
1340
            if ip_addr.is_loopback:  # localhost is exempt
×
1341
                return ''
×
1342
            if ip_addr.version == 4:
×
1343
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1344
                return str(slash16)
×
1345
            elif ip_addr.version == 6:
×
1346
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1347
                return str(slash48)
×
1348
            return ''
×
1349

1350
        if not self._ipaddr_bucket:
×
1351
            self._ipaddr_bucket = do_bucket()
×
1352
        return self._ipaddr_bucket
×
1353

1354
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1355
        if not is_hash256_str(tx_hash):
1✔
1356
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1357
        if not is_non_negative_integer(tx_height):
1✔
1358
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1359
        # do request
1360
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
1✔
1361
        # check response
1362
        block_height = assert_dict_contains_field(res, field_name='block_height')
1✔
1363
        merkle = assert_dict_contains_field(res, field_name='merkle')
1✔
1364
        pos = assert_dict_contains_field(res, field_name='pos')
1✔
1365
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1366
        assert_non_negative_integer(block_height)
1✔
1367
        assert_non_negative_integer(pos)
1✔
1368
        assert_list_or_tuple(merkle)
1✔
1369
        for item in merkle:
1✔
1370
            assert_hash256_str(item)
1✔
1371
        return res
1✔
1372

1373
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1374
        if not is_hash256_str(tx_hash):
1✔
1375
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1376
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1377
            return rawtx_bytes.hex()
1✔
1378
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1379
        # validate response
1380
        if not is_hex_str(raw):
1✔
1381
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1382
        tx = Transaction(raw)
1✔
1383
        try:
1✔
1384
            tx.deserialize()  # see if raises
1✔
1385
        except Exception as e:
×
1386
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1387
        if tx.txid() != tx_hash:
1✔
1388
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1389
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1390
        return raw
1✔
1391

1392
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1393
        """caller should handle TxBroadcastError and RequestTimedOut"""
1394
        txid_calc = tx.txid()
1✔
1395
        assert txid_calc is not None
1✔
1396
        rawtx = tx.serialize()
1✔
1397
        assert is_hex_str(rawtx)
1✔
1398
        if timeout is None:
1✔
1399
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1400
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1401
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1402
        try:
1✔
1403
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1404
            # note: both 'out' and exception messages are untrusted input from the server
1405
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1406
            raise  # pass-through
×
1407
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1408
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1409
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1410
        except BaseException as e:  # intentional BaseException for sanity!
×
1411
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1412
            send_exception_to_crash_reporter(e)
×
1413
            raise TxBroadcastUnknownError() from e
×
1414
        if out != txid_calc:
1✔
1415
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1416
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1417
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1418
        # broadcast succeeded.
1419
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1420
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1421
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1422

1423
    async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
1✔
1424
        assert self.active_protocol_tuple >= (1, 6), f"server using old protocol: {self.active_protocol_tuple}"
×
1425
        rawtxs = [tx.serialize() for tx in txs]
×
1426
        assert all(is_hex_str(rawtx) for rawtx in rawtxs)
×
1427
        assert all(tx.txid() is not None for tx in txs)
×
1428
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1429
        for tx in txs:
×
1430
            if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1431
                raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1432
        try:
×
1433
            res = await self.session.send_request('blockchain.transaction.broadcast_package', [rawtxs], timeout=timeout)
×
1434
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1435
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. {rawtxs=}")
×
1436
            return False
×
1437
        success = assert_dict_contains_field(res, field_name='success')
×
1438
        if not success:
×
1439
            errors = assert_dict_contains_field(res, field_name='errors')
×
1440
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(errors))}. {rawtxs=}")
×
1441
            return False
×
1442
        assert success
×
1443
        # broadcast succeeded.
1444
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1445
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1446
        for tx, rawtx in zip(txs, rawtxs):
×
1447
            self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
×
1448
        return True
×
1449

1450
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1451
        if not is_hash256_str(sh):
1✔
1452
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1453
        # do request
1454
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
1✔
1455
        # check response
1456
        assert_list_or_tuple(res)
1✔
1457
        prev_height = 1
1✔
1458
        for tx_item in res:
1✔
1459
            height = assert_dict_contains_field(tx_item, field_name='height')
1✔
1460
            assert_dict_contains_field(tx_item, field_name='tx_hash')
1✔
1461
            assert_integer(height)
1✔
1462
            if height < -1:
1✔
1463
                raise RequestCorrupted(f'{height!r} is not a valid block height')
×
1464
            assert_hash256_str(tx_item['tx_hash'])
1✔
1465
            if height in (-1, 0):
1✔
1466
                assert_dict_contains_field(tx_item, field_name='fee')
1✔
1467
                assert_non_negative_integer(tx_item['fee'])
1✔
1468
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
1✔
1469
            else:
1470
                # check monotonicity of heights
1471
                if height < prev_height:
×
1472
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1473
                prev_height = height
×
1474
        if self.active_protocol_tuple >= (1, 6):
1✔
1475
            # enforce order of mempool txs
1476
            mempool_txs = [tx_item for tx_item in res if tx_item['height'] <= 0]
1✔
1477
            if mempool_txs != sorted(mempool_txs, key=lambda x: (-x['height'], bytes.fromhex(x['tx_hash']))):
1✔
1478
                raise RequestCorrupted(f'mempool txs not in canonical order')
×
1479
        hashes = set(map(lambda item: item['tx_hash'], res))
1✔
1480
        if len(hashes) != len(res):
1✔
1481
            # Either server is sending garbage... or maybe if server is race-prone
1482
            # a recently mined tx could be included in both last block and mempool?
1483
            # Still, it's simplest to just disregard the response.
1484
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1485
        return res
1✔
1486

1487
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1488
        if not is_hash256_str(sh):
×
1489
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1490
        # do request
1491
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1492
        # check response
1493
        assert_list_or_tuple(res)
×
1494
        for utxo_item in res:
×
1495
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1496
            assert_dict_contains_field(utxo_item, field_name='value')
×
1497
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1498
            assert_dict_contains_field(utxo_item, field_name='height')
×
1499
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1500
            assert_non_negative_integer(utxo_item['value'])
×
1501
            assert_non_negative_integer(utxo_item['height'])
×
1502
            assert_hash256_str(utxo_item['tx_hash'])
×
1503
        return res
×
1504

1505
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1506
        if not is_hash256_str(sh):
×
1507
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1508
        # do request
1509
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1510
        # check response
1511
        assert_dict_contains_field(res, field_name='confirmed')
×
1512
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1513
        assert_non_negative_integer(res['confirmed'])
×
1514
        assert_integer(res['unconfirmed'])
×
1515
        return res
×
1516

1517
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1518
        if not is_non_negative_integer(tx_height):
×
1519
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1520
        if not is_non_negative_integer(tx_pos):
×
1521
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1522
        # do request
1523
        res = await self.session.send_request(
×
1524
            'blockchain.transaction.id_from_pos',
1525
            [tx_height, tx_pos, merkle],
1526
        )
1527
        # check response
1528
        if merkle:
×
1529
            assert_dict_contains_field(res, field_name='tx_hash')
×
1530
            assert_dict_contains_field(res, field_name='merkle')
×
1531
            assert_hash256_str(res['tx_hash'])
×
1532
            assert_list_or_tuple(res['merkle'])
×
1533
            for node_hash in res['merkle']:
×
1534
                assert_hash256_str(node_hash)
×
1535
        else:
1536
            assert_hash256_str(res)
×
1537
        return res
×
1538

1539
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1540
        # do request
1541
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1542
        # check response
1543
        assert_list_or_tuple(res)
×
1544
        prev_fee = float('inf')
×
1545
        for fee, s in res:
×
1546
            assert_non_negative_int_or_float(fee)
×
1547
            assert_non_negative_integer(s)
×
1548
            if fee >= prev_fee:  # check monotonicity
×
1549
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1550
            prev_fee = fee
×
1551
        return res
×
1552

1553
    async def get_server_banner(self) -> str:
1✔
1554
        # do request
1555
        res = await self.session.send_request('server.banner')
×
1556
        # check response
1557
        if not isinstance(res, str):
×
1558
            raise RequestCorrupted(f'{res!r} should be a str')
×
1559
        return res
×
1560

1561
    async def get_donation_address(self) -> str:
1✔
1562
        # do request
1563
        res = await self.session.send_request('server.donation_address')
×
1564
        # check response
1565
        if not res:  # ignore empty string
×
1566
            return ''
×
1567
        if not isinstance(res, str):
×
1568
            raise RequestCorrupted(f'{res!r} should be a str')
×
1569
        address = res.removeprefix('bitcoin:')
×
1570
        if not bitcoin.is_address(address):
×
1571
            # note: do not hard-fail -- allow server to use future-type
1572
            #       bitcoin address we do not recognize
1573
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1574
            return ''
×
1575
        return address
×
1576

1577
    async def get_relay_fee(self) -> int:
1✔
1578
        """Returns the min relay feerate in sat/kbyte."""
1579
        # do request
1580
        if self.active_protocol_tuple >= (1, 6):
×
1581
            res = await self.session.send_request('mempool.get_info')
×
1582
            minrelaytxfee = assert_dict_contains_field(res, field_name='minrelaytxfee')
×
1583
        else:
1584
            minrelaytxfee = await self.session.send_request('blockchain.relayfee')
×
1585
        # check response
1586
        assert_non_negative_int_or_float(minrelaytxfee)
×
1587
        relayfee = int(minrelaytxfee * bitcoin.COIN)
×
1588
        relayfee = max(0, relayfee)
×
1589
        return relayfee
×
1590

1591
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1592
        """Returns a feerate estimate for getting confirmed within
1593
        num_blocks blocks, in sat/kbyte.
1594
        Returns -1 if the server could not provide an estimate.
1595
        """
1596
        if not is_non_negative_integer(num_blocks):
1✔
1597
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1598
        # do request
1599
        try:
1✔
1600
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1601
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1602
            # The protocol spec says the server itself should already have returned -1
1603
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1604
            # and sends an error instead. Convert it here:
1605
            if "cannot estimate fee" in e.message:
×
1606
                res = -1
×
1607
            else:
1608
                raise
×
1609
        except aiorpcx.jsonrpc.RPCError as e:
×
1610
            # The protocol spec says the server itself should already have returned -1
1611
            # if it cannot provide an estimate. "Fulcrum" often sends:
1612
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1613
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1614
                res = -1
×
1615
            else:
1616
                raise
×
1617
        # check response
1618
        if res != -1:
1✔
1619
            assert_non_negative_int_or_float(res)
1✔
1620
            res = int(res * bitcoin.COIN)
1✔
1621
        return res
1✔
1622

1623

1624
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1625
    chain_bad = blockchain.check_header(header)
1✔
1626
    if chain_bad:
1✔
1627
        raise Exception('bad_header must not check!')
×
1628

1629

1630
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1631
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1632
    # So, we use substring matching to grok the error message.
1633
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1634
    server_msg = str(server_msg)
×
1635
    server_msg = server_msg.replace("\n", r"\n")
×
1636

1637
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1638
    script_error_messages = {
×
1639
        r"Script evaluated without error but finished with a false/empty top stack element",
1640
        r"Script failed an OP_VERIFY operation",
1641
        r"Script failed an OP_EQUALVERIFY operation",
1642
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1643
        r"Script failed an OP_CHECKSIGVERIFY operation",
1644
        r"Script failed an OP_NUMEQUALVERIFY operation",
1645
        r"Script is too big",
1646
        r"Push value size limit exceeded",
1647
        r"Operation limit exceeded",
1648
        r"Stack size limit exceeded",
1649
        r"Signature count negative or greater than pubkey count",
1650
        r"Pubkey count negative or limit exceeded",
1651
        r"Opcode missing or not understood",
1652
        r"Attempted to use a disabled opcode",
1653
        r"Operation not valid with the current stack size",
1654
        r"Operation not valid with the current altstack size",
1655
        r"OP_RETURN was encountered",
1656
        r"Invalid OP_IF construction",
1657
        r"Negative locktime",
1658
        r"Locktime requirement not satisfied",
1659
        r"Signature hash type missing or not understood",
1660
        r"Non-canonical DER signature",
1661
        r"Data push larger than necessary",
1662
        r"Only push operators allowed in signatures",
1663
        r"Non-canonical signature: S value is unnecessarily high",
1664
        r"Dummy CHECKMULTISIG argument must be zero",
1665
        r"OP_IF/NOTIF argument must be minimal",
1666
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1667
        r"NOPx reserved for soft-fork upgrades",
1668
        r"Witness version reserved for soft-fork upgrades",
1669
        r"Taproot version reserved for soft-fork upgrades",
1670
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1671
        r"Public key version reserved for soft-fork upgrades",
1672
        r"Public key is neither compressed or uncompressed",
1673
        r"Stack size must be exactly one after execution",
1674
        r"Extra items left on stack after execution",
1675
        r"Witness program has incorrect length",
1676
        r"Witness program was passed an empty witness",
1677
        r"Witness program hash mismatch",
1678
        r"Witness requires empty scriptSig",
1679
        r"Witness requires only-redeemscript scriptSig",
1680
        r"Witness provided for non-witness script",
1681
        r"Using non-compressed keys in segwit",
1682
        r"Invalid Schnorr signature size",
1683
        r"Invalid Schnorr signature hash type",
1684
        r"Invalid Schnorr signature",
1685
        r"Invalid Taproot control block size",
1686
        r"Too much signature validation relative to witness weight",
1687
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1688
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1689
        r"Using OP_CODESEPARATOR in non-witness script",
1690
        r"Signature is found in scriptCode",
1691
    }
1692
    for substring in script_error_messages:
×
1693
        if substring in server_msg:
×
1694
            return substring
×
1695
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1696
    # grep "REJECT_"
1697
    # grep "TxValidationResult"
1698
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1699
    validation_error_messages = {
×
1700
        r"coinbase": None,
1701
        r"tx-size-small": None,
1702
        r"non-final": None,
1703
        r"txn-already-in-mempool": None,
1704
        r"txn-mempool-conflict": None,
1705
        r"txn-already-known": None,
1706
        r"non-BIP68-final": None,
1707
        r"bad-txns-nonstandard-inputs": None,
1708
        r"bad-witness-nonstandard": None,
1709
        r"bad-txns-too-many-sigops": None,
1710
        r"mempool min fee not met":
1711
            ("mempool min fee not met\n" +
1712
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1713
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1714
               "of transactions that all pay higher fees. Try to increase the fee.")),
1715
        r"min relay fee not met": None,
1716
        r"absurdly-high-fee": None,
1717
        r"max-fee-exceeded": None,
1718
        r"too-long-mempool-chain": None,
1719
        r"bad-txns-spends-conflicting-tx": None,
1720
        r"insufficient fee": ("insufficient fee\n" +
1721
             _("Your transaction is trying to replace another one in the mempool but it "
1722
               "does not meet the rules to do so. Try to increase the fee.")),
1723
        r"too many potential replacements": None,
1724
        r"replacement-adds-unconfirmed": None,
1725
        r"mempool full": None,
1726
        r"non-mandatory-script-verify-flag": None,
1727
        r"mandatory-script-verify-flag-failed": None,
1728
        r"Transaction check failed": None,
1729
    }
1730
    for substring in validation_error_messages:
×
1731
        if substring in server_msg:
×
1732
            msg = validation_error_messages[substring]
×
1733
            return msg if msg else substring
×
1734
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1735
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1736
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1737
    # grep "RPC_TRANSACTION"
1738
    # grep "RPC_DESERIALIZATION_ERROR"
1739
    # grep "TransactionError"
1740
    rawtransaction_error_messages = {
×
1741
        r"Missing inputs": None,
1742
        r"Inputs missing or spent": None,
1743
        r"transaction already in block chain": None,
1744
        r"Transaction already in block chain": None,
1745
        r"Transaction outputs already in utxo set": None,
1746
        r"TX decode failed": None,
1747
        r"Peer-to-peer functionality missing or disabled": None,
1748
        r"Transaction rejected by AcceptToMemoryPool": None,
1749
        r"AcceptToMemoryPool failed": None,
1750
        r"Transaction rejected by mempool": None,
1751
        r"Mempool internal error": None,
1752
        r"Fee exceeds maximum configured by user": None,
1753
        r"Unspendable output exceeds maximum configured by user": None,
1754
        r"Transaction rejected due to invalid package": None,
1755
    }
1756
    for substring in rawtransaction_error_messages:
×
1757
        if substring in server_msg:
×
1758
            msg = rawtransaction_error_messages[substring]
×
1759
            return msg if msg else substring
×
1760
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1761
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1762
    # grep "REJECT_"
1763
    # grep "TxValidationResult"
1764
    tx_verify_error_messages = {
×
1765
        r"bad-txns-vin-empty": None,
1766
        r"bad-txns-vout-empty": None,
1767
        r"bad-txns-oversize": None,
1768
        r"bad-txns-vout-negative": None,
1769
        r"bad-txns-vout-toolarge": None,
1770
        r"bad-txns-txouttotal-toolarge": None,
1771
        r"bad-txns-inputs-duplicate": None,
1772
        r"bad-cb-length": None,
1773
        r"bad-txns-prevout-null": None,
1774
        r"bad-txns-inputs-missingorspent":
1775
            ("bad-txns-inputs-missingorspent\n" +
1776
             _("You might have a local transaction in your wallet that this transaction "
1777
               "builds on top. You need to either broadcast or remove the local tx.")),
1778
        r"bad-txns-premature-spend-of-coinbase": None,
1779
        r"bad-txns-inputvalues-outofrange": None,
1780
        r"bad-txns-in-belowout": None,
1781
        r"bad-txns-fee-outofrange": None,
1782
    }
1783
    for substring in tx_verify_error_messages:
×
1784
        if substring in server_msg:
×
1785
            msg = tx_verify_error_messages[substring]
×
1786
            return msg if msg else substring
×
1787
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1788
    # grep "reason ="
1789
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1790
    # should come after script_error.cpp (due to e.g. "version")
1791
    policy_error_messages = {
×
1792
        r"version": _("Transaction uses non-standard version."),
1793
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1794
        r"scriptsig-size": None,
1795
        r"scriptsig-not-pushonly": None,
1796
        r"scriptpubkey":
1797
            ("scriptpubkey\n" +
1798
             _("Some of the outputs pay to a non-standard script.")),
1799
        r"bare-multisig": None,
1800
        r"dust":
1801
            (_("Transaction could not be broadcast due to dust outputs.\n"
1802
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1803
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1804
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1805
    }
1806
    for substring in policy_error_messages:
×
1807
        if substring in server_msg:
×
1808
            msg = policy_error_messages[substring]
×
1809
            return msg if msg else substring
×
1810
    # otherwise:
1811
    return _("Unknown error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc