• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5545353681829888

28 Nov 2025 04:57PM UTC coverage: 62.332% (-0.008%) from 62.34%
5545353681829888

push

CirrusCI

web-flow
Merge pull request #10330 from SomberNight/202511_lnsweep_refactor_preimage

lnsweep: factor out "maybe_reveal_preimage_for_htlc"

1 of 13 new or added lines in 1 file covered. (7.69%)

9 existing lines in 5 files now uncovered.

23622 of 37897 relevant lines covered (62.33%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.66
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str, versiontuple)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85

86
class NetworkTimeout:
1✔
87
    # seconds
88
    class Generic:
1✔
89
        NORMAL = 30
1✔
90
        RELAXED = 45
1✔
91
        MOST_RELAXED = 600
1✔
92

93
    class Urgent(Generic):
1✔
94
        NORMAL = 10
1✔
95
        RELAXED = 20
1✔
96
        MOST_RELAXED = 60
1✔
97

98

99
def assert_non_negative_integer(val: Any) -> None:
1✔
100
    if not is_non_negative_integer(val):
1✔
101
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
102

103

104
def assert_integer(val: Any) -> None:
1✔
105
    if not is_integer(val):
1✔
106
        raise RequestCorrupted(f'{val!r} should be an integer')
×
107

108

109
def assert_int_or_float(val: Any) -> None:
1✔
110
    if not is_int_or_float(val):
×
111
        raise RequestCorrupted(f'{val!r} should be int or float')
×
112

113

114
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
115
    if not is_non_negative_int_or_float(val):
1✔
116
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
117

118

119
def assert_hash256_str(val: Any) -> None:
1✔
120
    if not is_hash256_str(val):
1✔
121
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
122

123

124
def assert_hex_str(val: Any) -> None:
1✔
125
    if not is_hex_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
127

128

129
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
130
    if not isinstance(d, dict):
1✔
131
        raise RequestCorrupted(f'{d!r} should be a dict')
×
132
    if field_name not in d:
1✔
133
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
134
    return d[field_name]
1✔
135

136

137
def assert_list_or_tuple(val: Any) -> None:
1✔
138
    if not isinstance(val, (list, tuple)):
1✔
139
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
140

141

142
def protocol_tuple(s: Any) -> tuple[int, ...]:
1✔
143
    """Converts a protocol version number, such as "1.0" to a tuple (1, 0).
144

145
    If the version number is bad, (0, ) indicating version 0 is returned.
146
    """
147
    try:
1✔
148
        assert isinstance(s, str)
1✔
149
        return versiontuple(s)
1✔
150
    except Exception:
×
151
        return (0, )
×
152

153

154
class ChainResolutionMode(enum.Enum):
1✔
155
    CATCHUP = enum.auto()
1✔
156
    BACKWARD = enum.auto()
1✔
157
    BINARY = enum.auto()
1✔
158
    FORK = enum.auto()
1✔
159
    NO_FORK = enum.auto()
1✔
160

161

162
class NotificationSession(RPCSession):
1✔
163

164
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
165
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
166
        self.subscriptions = defaultdict(list)
1✔
167
        self.cache = {}
1✔
168
        self._msg_counter = itertools.count(start=1)
1✔
169
        self.interface = interface
1✔
170
        self.taskgroup = interface.taskgroup
1✔
171
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
172

173
    async def handle_request(self, request):
1✔
174
        self.maybe_log(f"--> {request}")
1✔
175
        try:
1✔
176
            if isinstance(request, Notification):
1✔
177
                params, result = request.args[:-1], request.args[-1]
1✔
178
                key = self.get_hashable_key_for_rpc_call(request.method, params)
1✔
179
                if key in self.subscriptions:
1✔
180
                    self.cache[key] = result
1✔
181
                    for queue in self.subscriptions[key]:
1✔
182
                        await queue.put(request.args)
1✔
183
                else:
184
                    raise Exception(f'unexpected notification')
×
185
            else:
186
                raise Exception(f'unexpected request. not a notification')
×
187
        except Exception as e:
×
188
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
189
            await self.close()
×
190

191
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
192
        # note: semaphores/timeouts/backpressure etc are handled by
193
        # aiorpcx. the timeout arg here in most cases should not be set
194
        msg_id = next(self._msg_counter)
1✔
195
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
196
        try:
1✔
197
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
198
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
199
            response = await util.wait_for2(
1✔
200
                super().send_request(*args, **kwargs),
201
                timeout)
202
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
203
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
204
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
205
        except CodeMessageError as e:
1✔
206
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
207
            raise
1✔
208
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
209
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
210
            raise
×
211
        else:
212
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
213
            return response
1✔
214

215
    def set_default_timeout(self, timeout):
1✔
216
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
217
        self.sent_request_timeout = timeout
1✔
218
        assert hasattr(self, "max_send_delay")        # in base class
1✔
219
        self.max_send_delay = timeout
1✔
220

221
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
222
        # note: until the cache is written for the first time,
223
        # each 'subscribe' call might make a request on the network.
224
        key = self.get_hashable_key_for_rpc_call(method, params)
1✔
225
        self.subscriptions[key].append(queue)
1✔
226
        if key in self.cache:
1✔
227
            result = self.cache[key]
×
228
        else:
229
            result = await self.send_request(method, params)
1✔
230
            self.cache[key] = result
1✔
231
        await queue.put(params + [result])
1✔
232

233
    def unsubscribe(self, queue):
1✔
234
        """Unsubscribe a callback to free object references to enable GC."""
235
        # note: we can't unsubscribe from the server, so we keep receiving
236
        # subsequent notifications
237
        for v in self.subscriptions.values():
1✔
238
            if queue in v:
1✔
239
                v.remove(queue)
1✔
240

241
    @classmethod
1✔
242
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
243
        """Hashable index for subscriptions and cache"""
244
        return str(method) + repr(params)
1✔
245

246
    def maybe_log(self, msg: str) -> None:
1✔
247
        if not self.interface: return
1✔
248
        if self.interface.debug or self.interface.network.debug:
1✔
249
            self.interface.logger.debug(msg)
1✔
250

251
    def default_framer(self):
1✔
252
        # overridden so that max_size can be customized
253
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
254
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
255
        return NewlineFramer(max_size=max_size)
1✔
256

257
    async def close(self, *, force_after: int = None):
1✔
258
        """Closes the connection and waits for it to be closed.
259
        We try to flush buffered data to the wire, which can take some time.
260
        """
261
        if force_after is None:
1✔
262
            # We give up after a while and just abort the connection.
263
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
264
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
265
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
266
            #       wait until this timeout is triggered
267
            force_after = 1  # seconds
1✔
268
        await super().close(force_after=force_after)
1✔
269

270

271
class NetworkException(Exception): pass
1✔
272

273

274
class GracefulDisconnect(NetworkException):
1✔
275
    log_level = logging.INFO
1✔
276

277
    def __init__(self, *args, log_level=None, **kwargs):
1✔
278
        Exception.__init__(self, *args, **kwargs)
1✔
279
        if log_level is not None:
1✔
280
            self.log_level = log_level
×
281

282

283
class RequestTimedOut(GracefulDisconnect):
1✔
284
    def __str__(self):
1✔
285
        return _("Network request timed out.")
×
286

287

288
class RequestCorrupted(Exception): pass
1✔
289

290
class ErrorParsingSSLCert(Exception): pass
1✔
291
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
292
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
293
class InvalidOptionCombination(Exception): pass
1✔
294
class ConnectError(NetworkException): pass
1✔
295

296

297
class TxBroadcastError(NetworkException):
1✔
298
    def get_message_for_gui(self):
1✔
299
        raise NotImplementedError()
×
300

301

302
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
303
    def get_message_for_gui(self):
1✔
304
        return "{}\n{}\n\n{}" \
×
305
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
306
                    _("Consider trying to connect to a different server, or updating Electrum."),
307
                    str(self))
308

309

310
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
311
    def get_message_for_gui(self):
1✔
312
        return "{}\n{}\n\n{}" \
×
313
            .format(_("The server returned an error when broadcasting the transaction."),
314
                    _("Consider trying to connect to a different server, or updating Electrum."),
315
                    str(self))
316

317

318
class TxBroadcastUnknownError(TxBroadcastError):
1✔
319
    def get_message_for_gui(self):
1✔
320
        return "{}\n{}" \
×
321
            .format(_("Unknown error when broadcasting the transaction."),
322
                    _("Consider trying to connect to a different server, or updating Electrum."))
323

324

325
class _RSClient(RSClient):
1✔
326
    async def create_connection(self):
1✔
327
        try:
1✔
328
            return await super().create_connection()
1✔
329
        except OSError as e:
×
330
            # note: using "from e" here will set __cause__ of ConnectError
331
            raise ConnectError(e) from e
×
332

333

334
class PaddedRSTransport(RSTransport):
1✔
335
    """A raw socket transport that provides basic countermeasures against traffic analysis
336
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
337
    (it is assumed that a network observer does not see plaintext transport contents,
338
    due to it being wrapped e.g. in TLS)
339
    """
340

341
    MIN_PACKET_SIZE = 1024
1✔
342
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
343
    # (unpadded) amount of bytes sent instantly before beginning with polling.
344
    # This makes the initial handshake where a few small messages are exchanged faster.
345
    WARMUP_BUDGET_SIZE = 1024
1✔
346

347
    session: Optional['RPCSession']
1✔
348

349
    def __init__(self, *args, **kwargs):
1✔
350
        RSTransport.__init__(self, *args, **kwargs)
1✔
351
        self._sbuffer = bytearray()  # "send buffer"
1✔
352
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
353
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
354
        self._last_send = time.monotonic()
1✔
355
        self._force_send = False  # type: bool
1✔
356

357
    # note: this does not call super().write() but is a complete reimplementation
358
    async def write(self, message):
1✔
359
        await self._can_send.wait()
1✔
360
        if self.is_closing():
1✔
361
            return
×
362
        framed_message = self._framer.frame(message)
1✔
363
        self._sbuffer += framed_message
1✔
364
        self._sbuffer_has_data_evt.set()
1✔
365
        self._maybe_consume_sbuffer()
1✔
366

367
    def _maybe_consume_sbuffer(self) -> None:
1✔
368
        """Maybe take some data from sbuffer and send it on the wire."""
369
        if not self._can_send.is_set() or self.is_closing():
1✔
370
            return
1✔
371
        buf = self._sbuffer
1✔
372
        if not buf:
1✔
373
            return
1✔
374
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
375
        if not (
1✔
376
            self._force_send
377
            or len(buf) >= self.MIN_PACKET_SIZE
378
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
379
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
380
        ):
381
            return
×
382
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
383
        # either (1) pad length to next power of two, to create "lsize" packet:
384
        payload_lsize = len(buf)
1✔
385
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
386
        npad_lsize = total_lsize - payload_lsize
1✔
387
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
388
        # and create a packet with half that size ("ssize", s for small)
389
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
390
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
391
        if payload_ssize != -1:
1✔
392
            payload_ssize += 1  # for "\n" char
1✔
393
            npad_ssize = total_ssize - payload_ssize
1✔
394
        else:
395
            npad_ssize = float("inf")
×
396
        # decide between (1) and (2):
397
        if self._force_send or npad_lsize <= npad_ssize:
1✔
398
            # (1) create "lsize" packet: consume full buffer
399
            npad = npad_lsize
1✔
400
            p_idx = payload_lsize
1✔
401
        else:
402
            # (2) create "ssize" packet: consume some, but defer some for later
403
            npad = npad_ssize
×
404
            p_idx = payload_ssize
×
405
        # pad by adding spaces near end
406
        # self.session.maybe_log(
407
        #     f"PaddedRSTransport. calling low-level write(). "
408
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
409
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
410
        # )
411
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
412
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
413
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
414
        self._asyncio_transport.write(buf2)
1✔
415
        self._last_send = time.monotonic()
1✔
416
        del self._sbuffer[:p_idx]
1✔
417
        if not self._sbuffer:
1✔
418
            self._sbuffer_has_data_evt.clear()
1✔
419

420
    async def _poll_sbuffer(self):
1✔
421
        while not self.is_closing():
1✔
422
            await self._can_send.wait()
1✔
423
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
424
            self._maybe_consume_sbuffer()
1✔
425
            # If there is still data in the buffer, sleep until it would time out.
426
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
427
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
428
            if len(self._sbuffer) > 0:
1✔
429
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
430
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
431
                await asyncio.sleep(timeout_rel)
×
432

433
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
434
        super().connection_made(transport)
1✔
435
        if isinstance(self.session, NotificationSession):
1✔
436
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
437
            self._sbuffer_task = self.loop.create_task(coro)
1✔
438
        else:
439
            # This a short-lived "fetch_certificate"-type session.
440
            # No polling here, we always force-empty the buffer.
441
            self._force_send = True
×
442

443
    async def close(self, *args, **kwargs):
1✔
444
        '''Close the connection and return when closed.'''
445
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
446
        self._force_send = True
1✔
447
        self._maybe_consume_sbuffer()
1✔
448
        await super().close(*args, **kwargs)
1✔
449

450

451
class ServerAddr:
1✔
452

453
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
454
        assert isinstance(host, str), repr(host)
1✔
455
        if protocol is None:
1✔
456
            protocol = 's'
×
457
        if not host:
1✔
458
            raise ValueError('host must not be empty')
×
459
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
460
            host = host[1:-1]
1✔
461
        try:
1✔
462
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
463
        except Exception as e:
1✔
464
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
465
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
1✔
466
            raise ValueError(f"invalid network protocol: {protocol}")
×
467
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
468
        self.port = int(net_addr.port)
1✔
469
        self.protocol = protocol
1✔
470
        self._net_addr_str = str(net_addr)
1✔
471

472
    @classmethod
1✔
473
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
474
        """Constructs a ServerAddr or raises ValueError."""
475
        # host might be IPv6 address, hence do rsplit:
476
        host, port, protocol = str(s).rsplit(':', 2)
1✔
477
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
478

479
    @classmethod
1✔
480
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
481
        """Construct ServerAddr from str, guessing missing details.
482
        Does not raise - just returns None if guessing failed.
483
        Ongoing compatibility not guaranteed.
484
        """
485
        if not s:
1✔
486
            return None
×
487
        host = ""
1✔
488
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
489
            host_end = s.index("]")
1✔
490
            host = s[1:host_end]
1✔
491
            s = s[host_end+1:]
1✔
492
        items = str(s).rsplit(':', 2)
1✔
493
        if len(items) < 2:
1✔
494
            return None  # although maybe we could guess the port too?
1✔
495
        host = host or items[0]
1✔
496
        port = items[1]
1✔
497
        if len(items) >= 3:
1✔
498
            protocol = items[2]
1✔
499
        else:
500
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
501
        try:
1✔
502
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
503
        except ValueError:
1✔
504
            return None
1✔
505

506
    def to_friendly_name(self) -> str:
1✔
507
        # note: this method is closely linked to from_str_with_inference
508
        if self.protocol == 's':  # hide trailing ":s"
1✔
509
            return self.net_addr_str()
1✔
510
        return str(self)
1✔
511

512
    def __str__(self):
1✔
513
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
514

515
    def to_json(self) -> str:
1✔
516
        return str(self)
×
517

518
    def __repr__(self):
1✔
519
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
520

521
    def net_addr_str(self) -> str:
1✔
522
        return self._net_addr_str
1✔
523

524
    def __eq__(self, other):
1✔
525
        if not isinstance(other, ServerAddr):
1✔
526
            return False
×
527
        return (self.host == other.host
1✔
528
                and self.port == other.port
529
                and self.protocol == other.protocol)
530

531
    def __ne__(self, other):
1✔
532
        return not (self == other)
×
533

534
    def __hash__(self):
1✔
535
        return hash((self.host, self.port, self.protocol))
×
536

537

538
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
539
    filename = host
1✔
540
    try:
1✔
541
        ip = ip_address(host)
1✔
542
    except ValueError:
1✔
543
        pass
1✔
544
    else:
545
        if isinstance(ip, IPv6Address):
1✔
546
            filename = f"ipv6_{ip.packed.hex()}"
×
547
    return os.path.join(config.path, 'certs', filename)
1✔
548

549

550
class Interface(Logger):
1✔
551

552
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
553
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
554
        self.ready = network.asyncio_loop.create_future()
1✔
555
        self.got_disconnected = asyncio.Event()
1✔
556
        self._blockchain_updated = asyncio.Event()
1✔
557
        self.server = server
1✔
558
        Logger.__init__(self)
1✔
559
        assert network.config.path
1✔
560
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
561
        self.blockchain = None  # type: Optional[Blockchain]
1✔
562
        self._requested_chunks = set()  # type: Set[int]
1✔
563
        self.network = network
1✔
564
        self.session = None  # type: Optional[NotificationSession]
1✔
565
        self._ipaddr_bucket = None
1✔
566
        # Set up proxy.
567
        # - for servers running on localhost, the proxy is not used. If user runs their own server
568
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
569
        #   note: we could maybe relax this further and bypass the proxy for all private
570
        #         addresses...? e.g. 192.168.x.x
571
        if util.is_localhost(server.host):
1✔
572
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
573
            self.proxy = None
1✔
574
        else:
575
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
576

577
        # Latest block header and corresponding height, as claimed by the server.
578
        # Note that these values are updated before they are verified.
579
        # Especially during initial header sync, verification can take a long time.
580
        # Failing verification will get the interface closed.
581
        self.tip_header = None  # type: Optional[dict]
1✔
582
        self.tip = 0
1✔
583

584
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
585
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
586

587
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
588

589
        self.active_protocol_tuple = (0,)  # type: Optional[tuple[int, ...]]
1✔
590

591
        # Dump network messages (only for this interface).  Set at runtime from the console.
592
        self.debug = False
1✔
593

594
        self.taskgroup = OldTaskGroup()
1✔
595

596
        async def spawn_task():
1✔
597
            task = await self.network.taskgroup.spawn(self.run())
1✔
598
            task.set_name(f"interface::{str(server)}")
1✔
599
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
600

601
    @property
1✔
602
    def host(self):
1✔
603
        return self.server.host
1✔
604

605
    @property
1✔
606
    def port(self):
1✔
607
        return self.server.port
1✔
608

609
    @property
1✔
610
    def protocol(self):
1✔
611
        return self.server.protocol
1✔
612

613
    def diagnostic_name(self):
1✔
614
        return self.server.net_addr_str()
1✔
615

616
    def __str__(self):
1✔
617
        return f"<Interface {self.diagnostic_name()}>"
×
618

619
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
620
        """Given a CA enforcing SSL context, returns True if the connection
621
        can be established. Returns False if the server has a self-signed
622
        certificate but otherwise is okay. Any other failures raise.
623
        """
624
        try:
×
625
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
626
        except ConnectError as e:
×
627
            cause = e.__cause__
×
628
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
629
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
630
                    and cause.verify_code == 18):  # "self signed certificate"
631
                # Good. We will use this server as self-signed.
632
                return False
×
633
            # Not good. Cannot use this server.
634
            raise
×
635
        # Good. We will use this server as CA-signed.
636
        return True
×
637

638
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
639
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
640
        if ca_signed:
×
641
            if self._get_expected_fingerprint():
×
642
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
643
            with open(self.cert_path, 'w') as f:
×
644
                # empty file means this is CA signed, not self-signed
645
                f.write('')
×
646
        else:
647
            await self._save_certificate()
×
648

649
    def _is_saved_ssl_cert_available(self):
1✔
650
        if not os.path.exists(self.cert_path):
×
651
            return False
×
652
        with open(self.cert_path, 'r') as f:
×
653
            contents = f.read()
×
654
        if contents == '':  # CA signed
×
655
            if self._get_expected_fingerprint():
×
656
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
657
            return True
×
658
        # pinned self-signed cert
659
        try:
×
660
            b = pem.dePem(contents, 'CERTIFICATE')
×
661
        except SyntaxError as e:
×
662
            self.logger.info(f"error parsing already saved cert: {e}")
×
663
            raise ErrorParsingSSLCert(e) from e
×
664
        try:
×
665
            x = x509.X509(b)
×
666
        except Exception as e:
×
667
            self.logger.info(f"error parsing already saved cert: {e}")
×
668
            raise ErrorParsingSSLCert(e) from e
×
669
        try:
×
670
            x.check_date()
×
671
        except x509.CertificateError as e:
×
672
            self.logger.info(f"certificate has expired: {e}")
×
673
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
674
            return False
×
675
        self._verify_certificate_fingerprint(bytes(b))
×
676
        return True
×
677

678
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
679
        if self.protocol != 's':
1✔
680
            # using plaintext TCP
681
            return None
1✔
682

683
        # see if we already have cert for this server; or get it for the first time
684
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
685
        if not self._is_saved_ssl_cert_available():
×
686
            try:
×
687
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
688
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
689
                raise ErrorGettingSSLCertFromServer(e) from e
×
690
        # now we have a file saved in our certificate store
691
        siz = os.stat(self.cert_path).st_size
×
692
        if siz == 0:
×
693
            # CA signed cert
694
            sslc = ca_sslc
×
695
        else:
696
            # pinned self-signed cert
697
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
698
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
699
            #       We explicitly disable it as it breaks lots of servers.
700
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
701
            sslc.check_hostname = False
×
702
        return sslc
×
703

704
    def handle_disconnect(func):
1✔
705
        @functools.wraps(func)
1✔
706
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
707
            try:
1✔
708
                return await func(self, *args, **kwargs)
1✔
709
            except GracefulDisconnect as e:
×
710
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
711
            except aiorpcx.jsonrpc.RPCError as e:
×
712
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
713
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
714
            finally:
715
                self.got_disconnected.set()
1✔
716
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
717
                # in case the "with taskgroup" ctx mgr never got a chance to run:
718
                await self.taskgroup.cancel_remaining()
1✔
719
                await self.network.connection_down(self)
1✔
720
                # if was not 'ready' yet, schedule waiting coroutines:
721
                self.ready.cancel()
1✔
722
        return wrapper_func
1✔
723

724
    @ignore_exceptions  # do not kill network.taskgroup
1✔
725
    @log_exceptions
1✔
726
    @handle_disconnect
1✔
727
    async def run(self):
1✔
728
        try:
1✔
729
            ssl_context = await self._get_ssl_context()
1✔
730
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
731
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
732
            return
×
733
        try:
1✔
734
            await self.open_session(ssl_context=ssl_context)
1✔
735
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
736
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
737
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
738
                    and self.is_main_server() and not self.network.auto_connect):
739
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
740
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
741
            else:
742
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
743
            return
1✔
744

745
    def _mark_ready(self) -> None:
1✔
746
        if self.ready.cancelled():
1✔
747
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
748
        if self.ready.done():
1✔
749
            return
1✔
750

751
        assert self.tip_header
1✔
752
        chain = blockchain.check_header(self.tip_header)
1✔
753
        if not chain:
1✔
754
            self.blockchain = blockchain.get_best_chain()
1✔
755
        else:
756
            self.blockchain = chain
×
757
        assert self.blockchain is not None
1✔
758

759
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
760

761
        self.ready.set_result(1)
1✔
762

763
    def is_connected_and_ready(self) -> bool:
1✔
764
        return self.ready.done() and not self.got_disconnected.is_set()
×
765

766
    async def _save_certificate(self) -> None:
1✔
767
        if not os.path.exists(self.cert_path):
×
768
            # we may need to retry this a few times, in case the handshake hasn't completed
769
            for _ in range(10):
×
770
                dercert = await self._fetch_certificate()
×
771
                if dercert:
×
772
                    self.logger.info("succeeded in getting cert")
×
773
                    self._verify_certificate_fingerprint(dercert)
×
774
                    with open(self.cert_path, 'w') as f:
×
775
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
776
                        # workaround android bug
777
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
778
                        f.write(cert)
×
779
                        # even though close flushes, we can't fsync when closed.
780
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
781
                        # fsync writes to OS buffer to disk
782
                        f.flush()
×
783
                        os.fsync(f.fileno())
×
784
                    break
×
785
                await asyncio.sleep(1)
×
786
            else:
787
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
788

789
    async def _fetch_certificate(self) -> bytes:
1✔
790
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
791
        sslc.check_hostname = False
×
792
        sslc.verify_mode = ssl.CERT_NONE
×
793
        async with _RSClient(
×
794
            session_factory=RPCSession,
795
            host=self.host, port=self.port,
796
            ssl=sslc,
797
            proxy=self.proxy,
798
            transport=PaddedRSTransport,
799
        ) as session:
800
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
801
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
802
            return ssl_object.getpeercert(binary_form=True)
×
803

804
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
805
        if self.is_main_server():
×
806
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
807
        return None
×
808

809
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
810
        expected_fingerprint = self._get_expected_fingerprint()
×
811
        if not expected_fingerprint:
×
812
            return
×
813
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
814
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
815
        if not fingerprints_match:
×
816
            util.trigger_callback('cert_mismatch')
×
817
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
818
        self.logger.info("cert fingerprint verification passed")
×
819

820
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
821
        """Populate header cache for block heights in range [from_height, to_height]."""
822
        assert from_height <= to_height, (from_height, to_height)
1✔
823
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
824
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
825
            # cache already has all requested headers
826
            return
1✔
827
        # use lower timeout as we usually have network.bhi_lock here
828
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
829
        count = to_height - from_height + 1
1✔
830
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
831
        for idx, raw_header in enumerate(headers):
1✔
832
            header_height = from_height + idx
1✔
833
            self._headers_cache[header_height] = raw_header
1✔
834

835
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
836
        if not is_non_negative_integer(height):
1✔
837
            raise Exception(f"{repr(height)} is not a block height")
×
838
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
839
        # use lower timeout as we usually have network.bhi_lock here
840
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
841
        if raw_header := self._headers_cache.get(height):
1✔
842
            return blockchain.deserialize_header(raw_header, height)
1✔
843
        self.logger.info(f'requesting block header {height} in {mode=}')
×
844
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
845
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
846

847
    async def get_block_headers(
1✔
848
        self,
849
        *,
850
        start_height: int,
851
        count: int,
852
        timeout=None,
853
        mode: Optional[ChainResolutionMode] = None,
854
    ) -> Sequence[bytes]:
855
        """Request a number of consecutive block headers, starting at `start_height`.
856
        `count` is the num of requested headers, BUT note the server might return fewer than this
857
        (if range would extend beyond its tip).
858
        note: the returned headers are not verified or parsed at all.
859
        """
860
        if not is_non_negative_integer(start_height):
1✔
861
            raise Exception(f"{repr(start_height)} is not a block height")
×
862
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
863
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
864
        self.logger.info(
1✔
865
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
866
            + (f" (in {mode=})" if mode is not None else "")
867
        )
868
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
869
        # check response
870
        assert_dict_contains_field(res, field_name='count')
1✔
871
        assert_dict_contains_field(res, field_name='max')
1✔
872
        assert_non_negative_integer(res['count'])
1✔
873
        assert_non_negative_integer(res['max'])
1✔
874
        if self.active_protocol_tuple >= (1, 6):
1✔
875
            hex_headers_list = assert_dict_contains_field(res, field_name='headers')
1✔
876
            assert_list_or_tuple(hex_headers_list)
1✔
877
            for item in hex_headers_list:
1✔
878
                assert_hex_str(item)
1✔
879
                if len(item) != HEADER_SIZE * 2:
1✔
880
                    raise RequestCorrupted(f"invalid header size. got {len(item)//2}, expected {HEADER_SIZE}")
×
881
            if len(hex_headers_list) != res['count']:
1✔
882
                raise RequestCorrupted(f"{len(hex_headers_list)=} != {res['count']=}")
×
883
            headers = list(bfh(hex_header) for hex_header in hex_headers_list)
1✔
884
        else: # proto 1.4
885
            hex_headers_concat = assert_dict_contains_field(res, field_name='hex')
×
886
            assert_hex_str(hex_headers_concat)
×
887
            if len(hex_headers_concat) != HEADER_SIZE * 2 * res['count']:
×
888
                raise RequestCorrupted('inconsistent chunk hex and count')
×
889
            headers = list(util.chunks(bfh(hex_headers_concat), size=HEADER_SIZE))
×
890
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
891
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
892
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
893
        if res['count'] > count:
1✔
894
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
895
        elif res['count'] < count:
1✔
896
            # we only tolerate getting fewer headers if it is due to reaching the tip
897
            end_height = start_height + res['count'] - 1
×
898
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
899
                raise RequestCorrupted(
×
900
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
901
        # checks done.
902
        return headers
1✔
903

904
    async def request_chunk_below_max_checkpoint(
1✔
905
        self,
906
        *,
907
        height: int,
908
    ) -> None:
909
        if not is_non_negative_integer(height):
×
910
            raise Exception(f"{repr(height)} is not a block height")
×
911
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
912
        index = height // CHUNK_SIZE
×
913
        if index in self._requested_chunks:
×
914
            return None
×
915
        self.logger.debug(f"requesting chunk from height {height}")
×
916
        try:
×
917
            self._requested_chunks.add(index)
×
918
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
919
        finally:
920
            self._requested_chunks.discard(index)
×
921
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
922
        if not conn:
×
923
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
924
        return None
×
925

926
    async def _fast_forward_chain(
1✔
927
        self,
928
        *,
929
        height: int,  # usually local chain tip + 1
930
        tip: int,  # server tip. we should not request past this.
931
    ) -> int:
932
        """Request some headers starting at `height` to grow the blockchain of this interface.
933
        Returns number of headers we managed to connect, starting at `height`.
934
        """
935
        if not is_non_negative_integer(height):
×
936
            raise Exception(f"{repr(height)} is not a block height")
×
937
        if not is_non_negative_integer(tip):
×
938
            raise Exception(f"{repr(tip)} is not a block height")
×
939
        if not (height > constants.net.max_checkpoint()
×
940
                or height == 0 == constants.net.max_checkpoint()):
941
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
942
        assert height <= tip, f"{height=} must be <= {tip=}"
×
943
        # Request a few chunks of headers concurrently.
944
        # tradeoffs:
945
        # - more chunks: higher memory requirements
946
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
947
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
948
        async with OldTaskGroup() as group:
×
949
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
950
            index0 = height // CHUNK_SIZE
×
951
            for chunk_cnt in range(10):
×
952
                index = index0 + chunk_cnt
×
953
                start_height = index * CHUNK_SIZE
×
954
                if start_height > tip:
×
955
                    break
×
956
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
957
                size = end_height - start_height + 1
×
958
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
959
        # try to connect chunks
960
        num_headers = 0
×
961
        for index, task in tasks:
×
962
            headers = task.result()
×
963
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
964
            if not conn:
×
965
                break
×
966
            num_headers += len(headers)
×
967
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
968
        offset = height - index0 * CHUNK_SIZE
×
969
        return max(0, num_headers - offset)
×
970

971
    def is_main_server(self) -> bool:
1✔
972
        return (self.network.interface == self or
1✔
973
                self.network.interface is None and self.network.default_server == self.server)
974

975
    async def open_session(
1✔
976
        self,
977
        *,
978
        ssl_context: Optional[ssl.SSLContext],
979
        exit_early: bool = False,
980
    ):
981
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
982
        async with _RSClient(
1✔
983
            session_factory=session_factory,
984
            host=self.host, port=self.port,
985
            ssl=ssl_context,
986
            proxy=self.proxy,
987
            transport=PaddedRSTransport,
988
        ) as session:
989
            start = time.perf_counter()
1✔
990
            self.session = session  # type: NotificationSession
1✔
991
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
992
            client_prange = [version.PROTOCOL_VERSION_MIN, version.PROTOCOL_VERSION_MAX]
1✔
993
            try:
1✔
994
                ver = await session.send_request('server.version', [self.client_name(), client_prange])
1✔
995
            except aiorpcx.jsonrpc.RPCError as e:
×
996
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
997
            if exit_early:
1✔
998
                return
×
999
            self.active_protocol_tuple = protocol_tuple(ver[1])
1✔
1000
            client_pmin = protocol_tuple(client_prange[0])
1✔
1001
            client_pmax = protocol_tuple(client_prange[1])
1✔
1002
            if not (client_pmin <= self.active_protocol_tuple <= client_pmax):
1✔
1003
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
1004
                                         f'we asked for {client_prange!r}, they sent {ver[1]!r}')
1005
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
1006
                raise GracefulDisconnect(f'too many connected servers already '
×
1007
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
1008

1009
            try:
1✔
1010
                features = await session.send_request('server.features')
1✔
1011
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
1012
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
1013
                raise GracefulDisconnect(e)
×
1014
            if server_genesis_hash != constants.net.GENESIS:
1✔
1015
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
1016
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
1017

1018
            try:
1✔
1019
                async with self.taskgroup as group:
1✔
1020
                    await group.spawn(self.ping)
1✔
1021
                    await group.spawn(self.request_fee_estimates)
1✔
1022
                    await group.spawn(self.run_fetch_blocks)
1✔
1023
                    await group.spawn(self.monitor_connection)
1✔
1024
            except aiorpcx.jsonrpc.RPCError as e:
1✔
1025
                if e.code in (
×
1026
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
1027
                    JSONRPC.SERVER_BUSY,
1028
                    JSONRPC.METHOD_NOT_FOUND,
1029
                    JSONRPC.INTERNAL_ERROR,
1030
                ):
1031
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1032
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1033
                raise
×
1034
            finally:
1035
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1036

1037
    async def monitor_connection(self):
1✔
1038
        while True:
1✔
1039
            await asyncio.sleep(1)
1✔
1040
            # If the session/transport is no longer open, we disconnect.
1041
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1042
            # note: If the user pulls the ethernet cable or disconnects wifi,
1043
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1044
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1045
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1046
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1047
            #         Hence, in practice the connection issue will only be detected the next time we try
1048
            #         to send a message (plus timeout), which can take minutes...
1049
            if not self.session or self.session.is_closing():
×
1050
                raise GracefulDisconnect('session was closed')
×
1051

1052
    async def ping(self):
1✔
1053
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1054
        # Adding a bit of randomness generates some noise against traffic analysis.
1055
        while True:
1✔
1056
            await asyncio.sleep(random.random() * 300)
1✔
1057
            await self.session.send_request('server.ping')
×
1058
            await self._maybe_send_noise()
×
1059

1060
    async def _maybe_send_noise(self):
1✔
1061
        while random.random() < 0.2:
1✔
UNCOV
1062
            await asyncio.sleep(random.random())
×
1063
            await self.session.send_request('server.ping')
×
1064

1065
    async def request_fee_estimates(self):
1✔
1066
        while True:
1✔
1067
            async with OldTaskGroup() as group:
1✔
1068
                fee_tasks = []
1✔
1069
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1070
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1071
            for nblock_target, task in fee_tasks:
1✔
1072
                fee = task.result()
1✔
1073
                if fee < 0: continue
1✔
1074
                assert isinstance(fee, int)
1✔
1075
                self.fee_estimates_eta[nblock_target] = fee
1✔
1076
            self.network.update_fee_estimates()
1✔
1077
            await asyncio.sleep(60)
1✔
1078

1079
    async def close(self, *, force_after: int = None):
1✔
1080
        """Closes the connection and waits for it to be closed.
1081
        We try to flush buffered data to the wire, which can take some time.
1082
        """
1083
        if self.session:
1✔
1084
            await self.session.close(force_after=force_after)
1✔
1085
        # monitor_connection will cancel tasks
1086

1087
    async def run_fetch_blocks(self):
1✔
1088
        header_queue = asyncio.Queue()
1✔
1089
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1090
        while True:
1✔
1091
            item = await header_queue.get()
1✔
1092
            raw_header = item[0]
1✔
1093
            height = raw_header['height']
1✔
1094
            header_bytes = bfh(raw_header['hex'])
1✔
1095
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1096
            self.tip_header = header_dict
1✔
1097
            self.tip = height
1✔
1098
            if self.tip < constants.net.max_checkpoint():
1✔
1099
                raise GracefulDisconnect(
×
1100
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1101
            self._mark_ready()
1✔
1102
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1103
            self._headers_cache[height] = header_bytes
1✔
1104
            try:
1✔
1105
                blockchain_updated = await self._process_header_at_tip()
1✔
1106
            finally:
1107
                self._headers_cache.clear()  # to reduce memory usage
1✔
1108
            # header processing done
1109
            if self.is_main_server() or blockchain_updated:
1✔
1110
                self.logger.info(f"new chain tip. {height=}")
1✔
1111
            if blockchain_updated:
1✔
1112
                util.trigger_callback('blockchain_updated')
1✔
1113
                self._blockchain_updated.set()
1✔
1114
                self._blockchain_updated.clear()
1✔
1115
            util.trigger_callback('network_updated')
1✔
1116
            await self.network.switch_unwanted_fork_interface()
1✔
1117
            await self.network.switch_lagging_interface()
1✔
1118
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1119

1120
    async def _process_header_at_tip(self) -> bool:
1✔
1121
        """Returns:
1122
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1123
        True - new header we didn't have, or reorg
1124
        """
1125
        height, header = self.tip, self.tip_header
1✔
1126
        async with self.network.bhi_lock:
1✔
1127
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1128
                # another interface amended the blockchain
1129
                return False
×
1130
            await self.sync_until(height)
1✔
1131
            return True
1✔
1132

1133
    async def sync_until(
1✔
1134
        self,
1135
        height: int,
1136
        *,
1137
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1138
    ) -> Tuple[ChainResolutionMode, int]:
1139
        if next_height is None:
1✔
1140
            next_height = self.tip
1✔
1141
        last = None  # type: Optional[ChainResolutionMode]
1✔
1142
        while last is None or height <= next_height:
1✔
1143
            prev_last, prev_height = last, height
1✔
1144
            if next_height > height + 144:
1✔
1145
                # We are far from the tip.
1146
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1147
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1148
                num_headers = await self._fast_forward_chain(
×
1149
                    height=height, tip=next_height)
1150
                if num_headers == 0:
×
1151
                    if height <= constants.net.max_checkpoint():
×
1152
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1153
                    last, height = await self.step(height)
×
1154
                    continue
×
1155
                # report progress to gui/etc
1156
                util.trigger_callback('blockchain_updated')
×
1157
                self._blockchain_updated.set()
×
1158
                self._blockchain_updated.clear()
×
1159
                util.trigger_callback('network_updated')
×
1160
                height += num_headers
×
1161
                assert height <= next_height+1, (height, self.tip)
×
1162
                last = ChainResolutionMode.CATCHUP
×
1163
            else:
1164
                # We are close to the tip, so process headers one-by-one.
1165
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1166
                last, height = await self.step(height)
1✔
1167
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1168
        return last, height
1✔
1169

1170
    async def step(
1✔
1171
        self,
1172
        height: int,
1173
    ) -> Tuple[ChainResolutionMode, int]:
1174
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1175
        await self._maybe_warm_headers_cache(
1✔
1176
            from_height=height,
1177
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1178
            mode=ChainResolutionMode.CATCHUP,
1179
        )
1180
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1181

1182
        chain = blockchain.check_header(header)
1✔
1183
        if chain:
1✔
1184
            self.blockchain = chain
1✔
1185
            # note: there is an edge case here that is not handled.
1186
            # we might know the blockhash (enough for check_header) but
1187
            # not have the header itself. e.g. regtest chain with only genesis.
1188
            # this situation resolves itself on the next block
1189
            return ChainResolutionMode.CATCHUP, height+1
1✔
1190

1191
        can_connect = blockchain.can_connect(header)
1✔
1192
        if not can_connect:
1✔
1193
            self.logger.info(f"can't connect new block: {height=}")
1✔
1194
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1195
            chain = blockchain.check_header(header)
1✔
1196
            can_connect = blockchain.can_connect(header)
1✔
1197
            assert chain or can_connect
1✔
1198
        if can_connect:
1✔
1199
            height += 1
1✔
1200
            self.blockchain = can_connect
1✔
1201
            self.blockchain.save_header(header)
1✔
1202
            return ChainResolutionMode.CATCHUP, height
1✔
1203

1204
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1205
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1206

1207
    async def _search_headers_binary(
1✔
1208
        self,
1209
        height: int,
1210
        bad: int,
1211
        bad_header: dict,
1212
        chain: Optional[Blockchain],
1213
    ) -> Tuple[int, int, dict]:
1214
        assert bad == bad_header['block_height']
1✔
1215
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1216

1217
        self.blockchain = chain
1✔
1218
        good = height
1✔
1219
        while True:
1✔
1220
            assert 0 <= good < bad, (good, bad)
1✔
1221
            height = (good + bad) // 2
1✔
1222
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1223
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1224
                await self._maybe_warm_headers_cache(
1✔
1225
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1226
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1227
            chain = blockchain.check_header(header)
1✔
1228
            if chain:
1✔
1229
                self.blockchain = chain
1✔
1230
                good = height
1✔
1231
            else:
1232
                bad = height
1✔
1233
                bad_header = header
1✔
1234
            if good + 1 == bad:
1✔
1235
                break
1✔
1236

1237
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1238
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1239
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1240

1241
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1242
        return good, bad, bad_header
1✔
1243

1244
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1245
        self,
1246
        good: int,
1247
        bad: int,
1248
        bad_header: dict,
1249
    ) -> Tuple[ChainResolutionMode, int]:
1250
        assert good + 1 == bad
1✔
1251
        assert bad == bad_header['block_height']
1✔
1252
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1253
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1254
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1255

1256
        bh = self.blockchain.height()
1✔
1257
        assert bh >= good, (bh, good)
1✔
1258
        if bh == good:
1✔
1259
            height = good + 1
1✔
1260
            self.logger.info(f"catching up from {height}")
1✔
1261
            return ChainResolutionMode.NO_FORK, height
1✔
1262

1263
        # this is a new fork we don't yet have
1264
        height = bad + 1
1✔
1265
        self.logger.info(f"new fork at bad height {bad}")
1✔
1266
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1267
        self.blockchain = b
1✔
1268
        assert b.forkpoint == bad
1✔
1269
        return ChainResolutionMode.FORK, height
1✔
1270

1271
    async def _search_headers_backwards(
1✔
1272
        self,
1273
        height: int,
1274
        *,
1275
        header: dict,
1276
    ) -> Tuple[int, dict, int, dict]:
1277
        async def iterate():
1✔
1278
            nonlocal height, header
1279
            checkp = False
1✔
1280
            if height <= constants.net.max_checkpoint():
1✔
1281
                height = constants.net.max_checkpoint()
1✔
1282
                checkp = True
1✔
1283
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1284
            chain = blockchain.check_header(header)
1✔
1285
            can_connect = blockchain.can_connect(header)
1✔
1286
            if chain or can_connect:
1✔
1287
                return False
1✔
1288
            if checkp:
1✔
1289
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1290
            return True
1✔
1291

1292
        bad, bad_header = height, header
1✔
1293
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1294
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1295
        local_max = max([0] + [x.height() for x in chains])
1✔
1296
        height = min(local_max + 1, height - 1)
1✔
1297
        assert height >= 0
1✔
1298

1299
        await self._maybe_warm_headers_cache(
1✔
1300
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1301

1302
        delta = 2
1✔
1303
        while await iterate():
1✔
1304
            bad, bad_header = height, header
1✔
1305
            height -= delta
1✔
1306
            delta *= 2
1✔
1307

1308
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1309
        self.logger.info(f"exiting backward mode at {height}")
1✔
1310
        return height, header, bad, bad_header
1✔
1311

1312
    @classmethod
1✔
1313
    def client_name(cls) -> str:
1✔
1314
        return f'electrum/{version.ELECTRUM_VERSION}'
1✔
1315

1316
    def is_tor(self):
1✔
1317
        return self.host.endswith('.onion')
×
1318

1319
    def ip_addr(self) -> Optional[str]:
1✔
1320
        session = self.session
×
1321
        if not session: return None
×
1322
        peer_addr = session.remote_address()
×
1323
        if not peer_addr: return None
×
1324
        return str(peer_addr.host)
×
1325

1326
    def bucket_based_on_ipaddress(self) -> str:
1✔
1327
        def do_bucket():
×
1328
            if self.is_tor():
×
1329
                return BUCKET_NAME_OF_ONION_SERVERS
×
1330
            try:
×
1331
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1332
            except ValueError:
×
1333
                return ''
×
1334
            if not ip_addr:
×
1335
                return ''
×
1336
            if ip_addr.is_loopback:  # localhost is exempt
×
1337
                return ''
×
1338
            if ip_addr.version == 4:
×
1339
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1340
                return str(slash16)
×
1341
            elif ip_addr.version == 6:
×
1342
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1343
                return str(slash48)
×
1344
            return ''
×
1345

1346
        if not self._ipaddr_bucket:
×
1347
            self._ipaddr_bucket = do_bucket()
×
1348
        return self._ipaddr_bucket
×
1349

1350
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1351
        if not is_hash256_str(tx_hash):
1✔
1352
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1353
        if not is_non_negative_integer(tx_height):
1✔
1354
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1355
        # do request
1356
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
1✔
1357
        # check response
1358
        block_height = assert_dict_contains_field(res, field_name='block_height')
1✔
1359
        merkle = assert_dict_contains_field(res, field_name='merkle')
1✔
1360
        pos = assert_dict_contains_field(res, field_name='pos')
1✔
1361
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1362
        assert_non_negative_integer(block_height)
1✔
1363
        assert_non_negative_integer(pos)
1✔
1364
        assert_list_or_tuple(merkle)
1✔
1365
        for item in merkle:
1✔
1366
            assert_hash256_str(item)
1✔
1367
        return res
1✔
1368

1369
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1370
        if not is_hash256_str(tx_hash):
1✔
1371
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1372
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1373
            return rawtx_bytes.hex()
1✔
1374
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1375
        # validate response
1376
        if not is_hex_str(raw):
1✔
1377
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1378
        tx = Transaction(raw)
1✔
1379
        try:
1✔
1380
            tx.deserialize()  # see if raises
1✔
1381
        except Exception as e:
×
1382
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1383
        if tx.txid() != tx_hash:
1✔
1384
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1385
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1386
        return raw
1✔
1387

1388
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1389
        """caller should handle TxBroadcastError and RequestTimedOut"""
1390
        txid_calc = tx.txid()
1✔
1391
        assert txid_calc is not None
1✔
1392
        rawtx = tx.serialize()
1✔
1393
        assert is_hex_str(rawtx)
1✔
1394
        if timeout is None:
1✔
1395
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1396
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1397
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1398
        try:
1✔
1399
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1400
            # note: both 'out' and exception messages are untrusted input from the server
1401
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1402
            raise  # pass-through
×
1403
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1404
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1405
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1406
        except BaseException as e:  # intentional BaseException for sanity!
×
1407
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1408
            send_exception_to_crash_reporter(e)
×
1409
            raise TxBroadcastUnknownError() from e
×
1410
        if out != txid_calc:
1✔
1411
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1412
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1413
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1414
        # broadcast succeeded.
1415
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1416
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1417
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1418

1419
    async def broadcast_txpackage(self, txs: Sequence['Transaction']) -> bool:
1✔
1420
        assert self.active_protocol_tuple >= (1, 6), f"server using old protocol: {self.active_protocol_tuple}"
×
1421
        rawtxs = [tx.serialize() for tx in txs]
×
1422
        assert all(is_hex_str(rawtx) for rawtx in rawtxs)
×
1423
        assert all(tx.txid() is not None for tx in txs)
×
1424
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1425
        for tx in txs:
×
1426
            if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1427
                raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1428
        try:
×
1429
            res = await self.session.send_request('blockchain.transaction.broadcast_package', [rawtxs], timeout=timeout)
×
1430
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1431
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. {rawtxs=}")
×
1432
            return False
×
1433
        success = assert_dict_contains_field(res, field_name='success')
×
1434
        if not success:
×
1435
            errors = assert_dict_contains_field(res, field_name='errors')
×
1436
            self.logger.info(f"broadcast_txpackage error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(errors))}. {rawtxs=}")
×
1437
            return False
×
1438
        assert success
×
1439
        # broadcast succeeded.
1440
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1441
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1442
        for tx, rawtx in zip(txs, rawtxs):
×
1443
            self._rawtx_cache[tx.txid()] = bytes.fromhex(rawtx)
×
1444
        return True
×
1445

1446
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1447
        if not is_hash256_str(sh):
1✔
1448
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1449
        # do request
1450
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
1✔
1451
        # check response
1452
        assert_list_or_tuple(res)
1✔
1453
        prev_height = 1
1✔
1454
        for tx_item in res:
1✔
1455
            height = assert_dict_contains_field(tx_item, field_name='height')
1✔
1456
            assert_dict_contains_field(tx_item, field_name='tx_hash')
1✔
1457
            assert_integer(height)
1✔
1458
            if height < -1:
1✔
1459
                raise RequestCorrupted(f'{height!r} is not a valid block height')
×
1460
            assert_hash256_str(tx_item['tx_hash'])
1✔
1461
            if height in (-1, 0):
1✔
1462
                assert_dict_contains_field(tx_item, field_name='fee')
1✔
1463
                assert_non_negative_integer(tx_item['fee'])
1✔
1464
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
1✔
1465
            else:
1466
                # check monotonicity of heights
1467
                if height < prev_height:
×
1468
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1469
                prev_height = height
×
1470
        if self.active_protocol_tuple >= (1, 6):
1✔
1471
            # enforce order of mempool txs
1472
            mempool_txs = [tx_item for tx_item in res if tx_item['height'] <= 0]
1✔
1473
            if mempool_txs != sorted(mempool_txs, key=lambda x: (-x['height'], bytes.fromhex(x['tx_hash']))):
1✔
1474
                raise RequestCorrupted(f'mempool txs not in canonical order')
×
1475
        hashes = set(map(lambda item: item['tx_hash'], res))
1✔
1476
        if len(hashes) != len(res):
1✔
1477
            # Either server is sending garbage... or maybe if server is race-prone
1478
            # a recently mined tx could be included in both last block and mempool?
1479
            # Still, it's simplest to just disregard the response.
1480
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1481
        return res
1✔
1482

1483
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1484
        if not is_hash256_str(sh):
×
1485
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1486
        # do request
1487
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1488
        # check response
1489
        assert_list_or_tuple(res)
×
1490
        for utxo_item in res:
×
1491
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1492
            assert_dict_contains_field(utxo_item, field_name='value')
×
1493
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1494
            assert_dict_contains_field(utxo_item, field_name='height')
×
1495
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1496
            assert_non_negative_integer(utxo_item['value'])
×
1497
            assert_non_negative_integer(utxo_item['height'])
×
1498
            assert_hash256_str(utxo_item['tx_hash'])
×
1499
        return res
×
1500

1501
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1502
        if not is_hash256_str(sh):
×
1503
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1504
        # do request
1505
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1506
        # check response
1507
        assert_dict_contains_field(res, field_name='confirmed')
×
1508
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1509
        assert_non_negative_integer(res['confirmed'])
×
1510
        assert_integer(res['unconfirmed'])
×
1511
        return res
×
1512

1513
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1514
        if not is_non_negative_integer(tx_height):
×
1515
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1516
        if not is_non_negative_integer(tx_pos):
×
1517
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1518
        # do request
1519
        res = await self.session.send_request(
×
1520
            'blockchain.transaction.id_from_pos',
1521
            [tx_height, tx_pos, merkle],
1522
        )
1523
        # check response
1524
        if merkle:
×
1525
            assert_dict_contains_field(res, field_name='tx_hash')
×
1526
            assert_dict_contains_field(res, field_name='merkle')
×
1527
            assert_hash256_str(res['tx_hash'])
×
1528
            assert_list_or_tuple(res['merkle'])
×
1529
            for node_hash in res['merkle']:
×
1530
                assert_hash256_str(node_hash)
×
1531
        else:
1532
            assert_hash256_str(res)
×
1533
        return res
×
1534

1535
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1536
        # do request
1537
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1538
        # check response
1539
        assert_list_or_tuple(res)
×
1540
        prev_fee = float('inf')
×
1541
        for fee, s in res:
×
1542
            assert_non_negative_int_or_float(fee)
×
1543
            assert_non_negative_integer(s)
×
1544
            if fee >= prev_fee:  # check monotonicity
×
1545
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1546
            prev_fee = fee
×
1547
        return res
×
1548

1549
    async def get_server_banner(self) -> str:
1✔
1550
        # do request
1551
        res = await self.session.send_request('server.banner')
×
1552
        # check response
1553
        if not isinstance(res, str):
×
1554
            raise RequestCorrupted(f'{res!r} should be a str')
×
1555
        return res
×
1556

1557
    async def get_donation_address(self) -> str:
1✔
1558
        # do request
1559
        res = await self.session.send_request('server.donation_address')
×
1560
        # check response
1561
        if not res:  # ignore empty string
×
1562
            return ''
×
1563
        if not bitcoin.is_address(res):
×
1564
            # note: do not hard-fail -- allow server to use future-type
1565
            #       bitcoin address we do not recognize
1566
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1567
            res = ''
×
1568
        return res
×
1569

1570
    async def get_relay_fee(self) -> int:
1✔
1571
        """Returns the min relay feerate in sat/kbyte."""
1572
        # do request
1573
        if self.active_protocol_tuple >= (1, 6):
×
1574
            res = await self.session.send_request('mempool.get_info')
×
1575
            minrelaytxfee = assert_dict_contains_field(res, field_name='minrelaytxfee')
×
1576
        else:
1577
            minrelaytxfee = await self.session.send_request('blockchain.relayfee')
×
1578
        # check response
1579
        assert_non_negative_int_or_float(minrelaytxfee)
×
1580
        relayfee = int(minrelaytxfee * bitcoin.COIN)
×
1581
        relayfee = max(0, relayfee)
×
1582
        return relayfee
×
1583

1584
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1585
        """Returns a feerate estimate for getting confirmed within
1586
        num_blocks blocks, in sat/kbyte.
1587
        Returns -1 if the server could not provide an estimate.
1588
        """
1589
        if not is_non_negative_integer(num_blocks):
1✔
1590
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1591
        # do request
1592
        try:
1✔
1593
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1594
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1595
            # The protocol spec says the server itself should already have returned -1
1596
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1597
            # and sends an error instead. Convert it here:
1598
            if "cannot estimate fee" in e.message:
×
1599
                res = -1
×
1600
            else:
1601
                raise
×
1602
        except aiorpcx.jsonrpc.RPCError as e:
×
1603
            # The protocol spec says the server itself should already have returned -1
1604
            # if it cannot provide an estimate. "Fulcrum" often sends:
1605
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1606
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1607
                res = -1
×
1608
            else:
1609
                raise
×
1610
        # check response
1611
        if res != -1:
1✔
1612
            assert_non_negative_int_or_float(res)
1✔
1613
            res = int(res * bitcoin.COIN)
1✔
1614
        return res
1✔
1615

1616

1617
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1618
    chain_bad = blockchain.check_header(header)
1✔
1619
    if chain_bad:
1✔
1620
        raise Exception('bad_header must not check!')
×
1621

1622

1623
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1624
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1625
    # So, we use substring matching to grok the error message.
1626
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1627
    server_msg = str(server_msg)
×
1628
    server_msg = server_msg.replace("\n", r"\n")
×
1629

1630
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1631
    script_error_messages = {
×
1632
        r"Script evaluated without error but finished with a false/empty top stack element",
1633
        r"Script failed an OP_VERIFY operation",
1634
        r"Script failed an OP_EQUALVERIFY operation",
1635
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1636
        r"Script failed an OP_CHECKSIGVERIFY operation",
1637
        r"Script failed an OP_NUMEQUALVERIFY operation",
1638
        r"Script is too big",
1639
        r"Push value size limit exceeded",
1640
        r"Operation limit exceeded",
1641
        r"Stack size limit exceeded",
1642
        r"Signature count negative or greater than pubkey count",
1643
        r"Pubkey count negative or limit exceeded",
1644
        r"Opcode missing or not understood",
1645
        r"Attempted to use a disabled opcode",
1646
        r"Operation not valid with the current stack size",
1647
        r"Operation not valid with the current altstack size",
1648
        r"OP_RETURN was encountered",
1649
        r"Invalid OP_IF construction",
1650
        r"Negative locktime",
1651
        r"Locktime requirement not satisfied",
1652
        r"Signature hash type missing or not understood",
1653
        r"Non-canonical DER signature",
1654
        r"Data push larger than necessary",
1655
        r"Only push operators allowed in signatures",
1656
        r"Non-canonical signature: S value is unnecessarily high",
1657
        r"Dummy CHECKMULTISIG argument must be zero",
1658
        r"OP_IF/NOTIF argument must be minimal",
1659
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1660
        r"NOPx reserved for soft-fork upgrades",
1661
        r"Witness version reserved for soft-fork upgrades",
1662
        r"Taproot version reserved for soft-fork upgrades",
1663
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1664
        r"Public key version reserved for soft-fork upgrades",
1665
        r"Public key is neither compressed or uncompressed",
1666
        r"Stack size must be exactly one after execution",
1667
        r"Extra items left on stack after execution",
1668
        r"Witness program has incorrect length",
1669
        r"Witness program was passed an empty witness",
1670
        r"Witness program hash mismatch",
1671
        r"Witness requires empty scriptSig",
1672
        r"Witness requires only-redeemscript scriptSig",
1673
        r"Witness provided for non-witness script",
1674
        r"Using non-compressed keys in segwit",
1675
        r"Invalid Schnorr signature size",
1676
        r"Invalid Schnorr signature hash type",
1677
        r"Invalid Schnorr signature",
1678
        r"Invalid Taproot control block size",
1679
        r"Too much signature validation relative to witness weight",
1680
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1681
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1682
        r"Using OP_CODESEPARATOR in non-witness script",
1683
        r"Signature is found in scriptCode",
1684
    }
1685
    for substring in script_error_messages:
×
1686
        if substring in server_msg:
×
1687
            return substring
×
1688
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1689
    # grep "REJECT_"
1690
    # grep "TxValidationResult"
1691
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1692
    validation_error_messages = {
×
1693
        r"coinbase": None,
1694
        r"tx-size-small": None,
1695
        r"non-final": None,
1696
        r"txn-already-in-mempool": None,
1697
        r"txn-mempool-conflict": None,
1698
        r"txn-already-known": None,
1699
        r"non-BIP68-final": None,
1700
        r"bad-txns-nonstandard-inputs": None,
1701
        r"bad-witness-nonstandard": None,
1702
        r"bad-txns-too-many-sigops": None,
1703
        r"mempool min fee not met":
1704
            ("mempool min fee not met\n" +
1705
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1706
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1707
               "of transactions that all pay higher fees. Try to increase the fee.")),
1708
        r"min relay fee not met": None,
1709
        r"absurdly-high-fee": None,
1710
        r"max-fee-exceeded": None,
1711
        r"too-long-mempool-chain": None,
1712
        r"bad-txns-spends-conflicting-tx": None,
1713
        r"insufficient fee": ("insufficient fee\n" +
1714
             _("Your transaction is trying to replace another one in the mempool but it "
1715
               "does not meet the rules to do so. Try to increase the fee.")),
1716
        r"too many potential replacements": None,
1717
        r"replacement-adds-unconfirmed": None,
1718
        r"mempool full": None,
1719
        r"non-mandatory-script-verify-flag": None,
1720
        r"mandatory-script-verify-flag-failed": None,
1721
        r"Transaction check failed": None,
1722
    }
1723
    for substring in validation_error_messages:
×
1724
        if substring in server_msg:
×
1725
            msg = validation_error_messages[substring]
×
1726
            return msg if msg else substring
×
1727
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1728
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1729
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1730
    # grep "RPC_TRANSACTION"
1731
    # grep "RPC_DESERIALIZATION_ERROR"
1732
    # grep "TransactionError"
1733
    rawtransaction_error_messages = {
×
1734
        r"Missing inputs": None,
1735
        r"Inputs missing or spent": None,
1736
        r"transaction already in block chain": None,
1737
        r"Transaction already in block chain": None,
1738
        r"Transaction outputs already in utxo set": None,
1739
        r"TX decode failed": None,
1740
        r"Peer-to-peer functionality missing or disabled": None,
1741
        r"Transaction rejected by AcceptToMemoryPool": None,
1742
        r"AcceptToMemoryPool failed": None,
1743
        r"Transaction rejected by mempool": None,
1744
        r"Mempool internal error": None,
1745
        r"Fee exceeds maximum configured by user": None,
1746
        r"Unspendable output exceeds maximum configured by user": None,
1747
        r"Transaction rejected due to invalid package": None,
1748
    }
1749
    for substring in rawtransaction_error_messages:
×
1750
        if substring in server_msg:
×
1751
            msg = rawtransaction_error_messages[substring]
×
1752
            return msg if msg else substring
×
1753
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1754
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1755
    # grep "REJECT_"
1756
    # grep "TxValidationResult"
1757
    tx_verify_error_messages = {
×
1758
        r"bad-txns-vin-empty": None,
1759
        r"bad-txns-vout-empty": None,
1760
        r"bad-txns-oversize": None,
1761
        r"bad-txns-vout-negative": None,
1762
        r"bad-txns-vout-toolarge": None,
1763
        r"bad-txns-txouttotal-toolarge": None,
1764
        r"bad-txns-inputs-duplicate": None,
1765
        r"bad-cb-length": None,
1766
        r"bad-txns-prevout-null": None,
1767
        r"bad-txns-inputs-missingorspent":
1768
            ("bad-txns-inputs-missingorspent\n" +
1769
             _("You might have a local transaction in your wallet that this transaction "
1770
               "builds on top. You need to either broadcast or remove the local tx.")),
1771
        r"bad-txns-premature-spend-of-coinbase": None,
1772
        r"bad-txns-inputvalues-outofrange": None,
1773
        r"bad-txns-in-belowout": None,
1774
        r"bad-txns-fee-outofrange": None,
1775
    }
1776
    for substring in tx_verify_error_messages:
×
1777
        if substring in server_msg:
×
1778
            msg = tx_verify_error_messages[substring]
×
1779
            return msg if msg else substring
×
1780
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1781
    # grep "reason ="
1782
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1783
    # should come after script_error.cpp (due to e.g. "version")
1784
    policy_error_messages = {
×
1785
        r"version": _("Transaction uses non-standard version."),
1786
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1787
        r"scriptsig-size": None,
1788
        r"scriptsig-not-pushonly": None,
1789
        r"scriptpubkey":
1790
            ("scriptpubkey\n" +
1791
             _("Some of the outputs pay to a non-standard script.")),
1792
        r"bare-multisig": None,
1793
        r"dust":
1794
            (_("Transaction could not be broadcast due to dust outputs.\n"
1795
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1796
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1797
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1798
    }
1799
    for substring in policy_error_messages:
×
1800
        if substring in server_msg:
×
1801
            msg = policy_error_messages[substring]
×
1802
            return msg if msg else substring
×
1803
    # otherwise:
1804
    return _("Unknown error")
×
1805

1806

1807
def check_cert(host, cert):
1✔
1808
    try:
×
1809
        b = pem.dePem(cert, 'CERTIFICATE')
×
1810
        x = x509.X509(b)
×
1811
    except Exception:
×
1812
        traceback.print_exc(file=sys.stdout)
×
1813
        return
×
1814

1815
    try:
×
1816
        x.check_date()
×
1817
        expired = False
×
1818
    except Exception:
×
1819
        expired = True
×
1820

1821
    m = "host: %s\n"%host
×
1822
    m += "has_expired: %s\n"% expired
×
1823
    util.print_msg(m)
×
1824

1825

1826
# Used by tests
1827
def _match_hostname(name, val):
1✔
1828
    if val == name:
×
1829
        return True
×
1830

1831
    return val.startswith('*.') and name.endswith(val[1:])
×
1832

1833

1834
def test_certificates():
1✔
1835
    from .simple_config import SimpleConfig
×
1836
    config = SimpleConfig()
×
1837
    mydir = os.path.join(config.path, "certs")
×
1838
    certs = os.listdir(mydir)
×
1839
    for c in certs:
×
1840
        p = os.path.join(mydir,c)
×
1841
        with open(p, encoding='utf-8') as f:
×
1842
            cert = f.read()
×
1843
        check_cert(c, cert)
×
1844

1845
if __name__ == "__main__":
1✔
1846
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc