• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6248983879745536

27 Oct 2025 05:55PM UTC coverage: 61.357% (+0.007%) from 61.35%
6248983879745536

push

CirrusCI

web-flow
Merge pull request #10281 from f321x/dont_connect_to_other_networks

interface: check genesis hash on connection

7 of 10 new or added lines in 1 file covered. (70.0%)

1 existing line in 1 file now uncovered.

22894 of 37313 relevant lines covered (61.36%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.68
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85

86
class NetworkTimeout:
1✔
87
    # seconds
88
    class Generic:
1✔
89
        NORMAL = 30
1✔
90
        RELAXED = 45
1✔
91
        MOST_RELAXED = 600
1✔
92

93
    class Urgent(Generic):
1✔
94
        NORMAL = 10
1✔
95
        RELAXED = 20
1✔
96
        MOST_RELAXED = 60
1✔
97

98

99
def assert_non_negative_integer(val: Any) -> None:
1✔
100
    if not is_non_negative_integer(val):
1✔
101
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
102

103

104
def assert_integer(val: Any) -> None:
1✔
105
    if not is_integer(val):
×
106
        raise RequestCorrupted(f'{val!r} should be an integer')
×
107

108

109
def assert_int_or_float(val: Any) -> None:
1✔
110
    if not is_int_or_float(val):
×
111
        raise RequestCorrupted(f'{val!r} should be int or float')
×
112

113

114
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
115
    if not is_non_negative_int_or_float(val):
1✔
116
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
117

118

119
def assert_hash256_str(val: Any) -> None:
1✔
120
    if not is_hash256_str(val):
×
121
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
122

123

124
def assert_hex_str(val: Any) -> None:
1✔
125
    if not is_hex_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
127

128

129
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
130
    if not isinstance(d, dict):
1✔
131
        raise RequestCorrupted(f'{d!r} should be a dict')
×
132
    if field_name not in d:
1✔
133
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
134
    return d[field_name]
1✔
135

136

137
def assert_list_or_tuple(val: Any) -> None:
1✔
138
    if not isinstance(val, (list, tuple)):
×
139
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
140

141

142
class ChainResolutionMode(enum.Enum):
1✔
143
    CATCHUP = enum.auto()
1✔
144
    BACKWARD = enum.auto()
1✔
145
    BINARY = enum.auto()
1✔
146
    FORK = enum.auto()
1✔
147
    NO_FORK = enum.auto()
1✔
148

149

150
class NotificationSession(RPCSession):
1✔
151

152
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
153
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
154
        self.subscriptions = defaultdict(list)
1✔
155
        self.cache = {}
1✔
156
        self._msg_counter = itertools.count(start=1)
1✔
157
        self.interface = interface
1✔
158
        self.taskgroup = interface.taskgroup
1✔
159
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
160

161
    async def handle_request(self, request):
1✔
162
        self.maybe_log(f"--> {request}")
×
163
        try:
×
164
            if isinstance(request, Notification):
×
165
                params, result = request.args[:-1], request.args[-1]
×
166
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
167
                if key in self.subscriptions:
×
168
                    self.cache[key] = result
×
169
                    for queue in self.subscriptions[key]:
×
170
                        await queue.put(request.args)
×
171
                else:
172
                    raise Exception(f'unexpected notification')
×
173
            else:
174
                raise Exception(f'unexpected request. not a notification')
×
175
        except Exception as e:
×
176
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
177
            await self.close()
×
178

179
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
180
        # note: semaphores/timeouts/backpressure etc are handled by
181
        # aiorpcx. the timeout arg here in most cases should not be set
182
        msg_id = next(self._msg_counter)
1✔
183
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
184
        try:
1✔
185
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
186
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
187
            response = await util.wait_for2(
1✔
188
                super().send_request(*args, **kwargs),
189
                timeout)
190
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
191
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
192
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
193
        except CodeMessageError as e:
1✔
194
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
195
            raise
1✔
196
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
197
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
198
            raise
×
199
        else:
200
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
201
            return response
1✔
202

203
    def set_default_timeout(self, timeout):
1✔
204
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
205
        self.sent_request_timeout = timeout
1✔
206
        assert hasattr(self, "max_send_delay")        # in base class
1✔
207
        self.max_send_delay = timeout
1✔
208

209
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
210
        # note: until the cache is written for the first time,
211
        # each 'subscribe' call might make a request on the network.
212
        key = self.get_hashable_key_for_rpc_call(method, params)
1✔
213
        self.subscriptions[key].append(queue)
1✔
214
        if key in self.cache:
1✔
215
            result = self.cache[key]
×
216
        else:
217
            result = await self.send_request(method, params)
1✔
218
            self.cache[key] = result
1✔
219
        await queue.put(params + [result])
1✔
220

221
    def unsubscribe(self, queue):
1✔
222
        """Unsubscribe a callback to free object references to enable GC."""
223
        # note: we can't unsubscribe from the server, so we keep receiving
224
        # subsequent notifications
225
        for v in self.subscriptions.values():
×
226
            if queue in v:
×
227
                v.remove(queue)
×
228

229
    @classmethod
1✔
230
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
231
        """Hashable index for subscriptions and cache"""
232
        return str(method) + repr(params)
1✔
233

234
    def maybe_log(self, msg: str) -> None:
1✔
235
        if not self.interface: return
1✔
236
        if self.interface.debug or self.interface.network.debug:
1✔
237
            self.interface.logger.debug(msg)
1✔
238

239
    def default_framer(self):
1✔
240
        # overridden so that max_size can be customized
241
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
242
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
243
        return NewlineFramer(max_size=max_size)
1✔
244

245
    async def close(self, *, force_after: int = None):
1✔
246
        """Closes the connection and waits for it to be closed.
247
        We try to flush buffered data to the wire, which can take some time.
248
        """
249
        if force_after is None:
1✔
250
            # We give up after a while and just abort the connection.
251
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
252
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
253
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
254
            #       wait until this timeout is triggered
255
            force_after = 1  # seconds
1✔
256
        await super().close(force_after=force_after)
1✔
257

258

259
class NetworkException(Exception): pass
1✔
260

261

262
class GracefulDisconnect(NetworkException):
1✔
263
    log_level = logging.INFO
1✔
264

265
    def __init__(self, *args, log_level=None, **kwargs):
1✔
266
        Exception.__init__(self, *args, **kwargs)
1✔
267
        if log_level is not None:
1✔
268
            self.log_level = log_level
×
269

270

271
class RequestTimedOut(GracefulDisconnect):
1✔
272
    def __str__(self):
1✔
273
        return _("Network request timed out.")
×
274

275

276
class RequestCorrupted(Exception): pass
1✔
277

278
class ErrorParsingSSLCert(Exception): pass
1✔
279
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
280
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
281
class InvalidOptionCombination(Exception): pass
1✔
282
class ConnectError(NetworkException): pass
1✔
283

284

285
class TxBroadcastError(NetworkException):
1✔
286
    def get_message_for_gui(self):
1✔
287
        raise NotImplementedError()
×
288

289

290
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
291
    def get_message_for_gui(self):
1✔
292
        return "{}\n{}\n\n{}" \
×
293
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
294
                    _("Consider trying to connect to a different server, or updating Electrum."),
295
                    str(self))
296

297

298
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
299
    def get_message_for_gui(self):
1✔
300
        return "{}\n{}\n\n{}" \
×
301
            .format(_("The server returned an error when broadcasting the transaction."),
302
                    _("Consider trying to connect to a different server, or updating Electrum."),
303
                    str(self))
304

305

306
class TxBroadcastUnknownError(TxBroadcastError):
1✔
307
    def get_message_for_gui(self):
1✔
308
        return "{}\n{}" \
×
309
            .format(_("Unknown error when broadcasting the transaction."),
310
                    _("Consider trying to connect to a different server, or updating Electrum."))
311

312

313
class _RSClient(RSClient):
1✔
314
    async def create_connection(self):
1✔
315
        try:
1✔
316
            return await super().create_connection()
1✔
317
        except OSError as e:
×
318
            # note: using "from e" here will set __cause__ of ConnectError
319
            raise ConnectError(e) from e
×
320

321

322
class PaddedRSTransport(RSTransport):
1✔
323
    """A raw socket transport that provides basic countermeasures against traffic analysis
324
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
325
    (it is assumed that a network observer does not see plaintext transport contents,
326
    due to it being wrapped e.g. in TLS)
327
    """
328

329
    MIN_PACKET_SIZE = 1024
1✔
330
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
331
    # (unpadded) amount of bytes sent instantly before beginning with polling.
332
    # This makes the initial handshake where a few small messages are exchanged faster.
333
    WARMUP_BUDGET_SIZE = 1024
1✔
334

335
    session: Optional['RPCSession']
1✔
336

337
    def __init__(self, *args, **kwargs):
1✔
338
        RSTransport.__init__(self, *args, **kwargs)
1✔
339
        self._sbuffer = bytearray()  # "send buffer"
1✔
340
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
341
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
342
        self._last_send = time.monotonic()
1✔
343
        self._force_send = False  # type: bool
1✔
344

345
    # note: this does not call super().write() but is a complete reimplementation
346
    async def write(self, message):
1✔
347
        await self._can_send.wait()
1✔
348
        if self.is_closing():
1✔
349
            return
×
350
        framed_message = self._framer.frame(message)
1✔
351
        self._sbuffer += framed_message
1✔
352
        self._sbuffer_has_data_evt.set()
1✔
353
        self._maybe_consume_sbuffer()
1✔
354

355
    def _maybe_consume_sbuffer(self) -> None:
1✔
356
        """Maybe take some data from sbuffer and send it on the wire."""
357
        if not self._can_send.is_set() or self.is_closing():
1✔
358
            return
×
359
        buf = self._sbuffer
1✔
360
        if not buf:
1✔
361
            return
1✔
362
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
363
        if not (
1✔
364
            self._force_send
365
            or len(buf) >= self.MIN_PACKET_SIZE
366
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
367
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
368
        ):
369
            return
×
370
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
371
        # either (1) pad length to next power of two, to create "lsize" packet:
372
        payload_lsize = len(buf)
1✔
373
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
374
        npad_lsize = total_lsize - payload_lsize
1✔
375
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
376
        # and create a packet with half that size ("ssize", s for small)
377
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
378
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
379
        if payload_ssize != -1:
1✔
380
            payload_ssize += 1  # for "\n" char
1✔
381
            npad_ssize = total_ssize - payload_ssize
1✔
382
        else:
383
            npad_ssize = float("inf")
×
384
        # decide between (1) and (2):
385
        if self._force_send or npad_lsize <= npad_ssize:
1✔
386
            # (1) create "lsize" packet: consume full buffer
387
            npad = npad_lsize
1✔
388
            p_idx = payload_lsize
1✔
389
        else:
390
            # (2) create "ssize" packet: consume some, but defer some for later
391
            npad = npad_ssize
×
392
            p_idx = payload_ssize
×
393
        # pad by adding spaces near end
394
        # self.session.maybe_log(
395
        #     f"PaddedRSTransport. calling low-level write(). "
396
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
397
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
398
        # )
399
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
400
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
401
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
402
        self._asyncio_transport.write(buf2)
1✔
403
        self._last_send = time.monotonic()
1✔
404
        del self._sbuffer[:p_idx]
1✔
405
        if not self._sbuffer:
1✔
406
            self._sbuffer_has_data_evt.clear()
1✔
407

408
    async def _poll_sbuffer(self):
1✔
409
        while not self.is_closing():
1✔
410
            await self._can_send.wait()
1✔
411
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
412
            self._maybe_consume_sbuffer()
1✔
413
            # If there is still data in the buffer, sleep until it would time out.
414
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
415
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
416
            if len(self._sbuffer) > 0:
1✔
417
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
418
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
419
                await asyncio.sleep(timeout_rel)
×
420

421
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
422
        super().connection_made(transport)
1✔
423
        if isinstance(self.session, NotificationSession):
1✔
424
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
425
            self._sbuffer_task = self.loop.create_task(coro)
1✔
426
        else:
427
            # This a short-lived "fetch_certificate"-type session.
428
            # No polling here, we always force-empty the buffer.
429
            self._force_send = True
×
430

431

432
class ServerAddr:
1✔
433

434
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
435
        assert isinstance(host, str), repr(host)
1✔
436
        if protocol is None:
1✔
437
            protocol = 's'
×
438
        if not host:
1✔
439
            raise ValueError('host must not be empty')
×
440
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
441
            host = host[1:-1]
1✔
442
        try:
1✔
443
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
444
        except Exception as e:
1✔
445
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
446
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
1✔
447
            raise ValueError(f"invalid network protocol: {protocol}")
×
448
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
449
        self.port = int(net_addr.port)
1✔
450
        self.protocol = protocol
1✔
451
        self._net_addr_str = str(net_addr)
1✔
452

453
    @classmethod
1✔
454
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
455
        """Constructs a ServerAddr or raises ValueError."""
456
        # host might be IPv6 address, hence do rsplit:
457
        host, port, protocol = str(s).rsplit(':', 2)
1✔
458
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
459

460
    @classmethod
1✔
461
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
462
        """Construct ServerAddr from str, guessing missing details.
463
        Does not raise - just returns None if guessing failed.
464
        Ongoing compatibility not guaranteed.
465
        """
466
        if not s:
1✔
467
            return None
×
468
        host = ""
1✔
469
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
470
            host_end = s.index("]")
1✔
471
            host = s[1:host_end]
1✔
472
            s = s[host_end+1:]
1✔
473
        items = str(s).rsplit(':', 2)
1✔
474
        if len(items) < 2:
1✔
475
            return None  # although maybe we could guess the port too?
1✔
476
        host = host or items[0]
1✔
477
        port = items[1]
1✔
478
        if len(items) >= 3:
1✔
479
            protocol = items[2]
1✔
480
        else:
481
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
482
        try:
1✔
483
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
484
        except ValueError:
1✔
485
            return None
1✔
486

487
    def to_friendly_name(self) -> str:
1✔
488
        # note: this method is closely linked to from_str_with_inference
489
        if self.protocol == 's':  # hide trailing ":s"
1✔
490
            return self.net_addr_str()
1✔
491
        return str(self)
1✔
492

493
    def __str__(self):
1✔
494
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
495

496
    def to_json(self) -> str:
1✔
497
        return str(self)
×
498

499
    def __repr__(self):
1✔
500
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
501

502
    def net_addr_str(self) -> str:
1✔
503
        return self._net_addr_str
1✔
504

505
    def __eq__(self, other):
1✔
506
        if not isinstance(other, ServerAddr):
1✔
507
            return False
×
508
        return (self.host == other.host
1✔
509
                and self.port == other.port
510
                and self.protocol == other.protocol)
511

512
    def __ne__(self, other):
1✔
513
        return not (self == other)
×
514

515
    def __hash__(self):
1✔
516
        return hash((self.host, self.port, self.protocol))
×
517

518

519
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
520
    filename = host
1✔
521
    try:
1✔
522
        ip = ip_address(host)
1✔
523
    except ValueError:
1✔
524
        pass
1✔
525
    else:
526
        if isinstance(ip, IPv6Address):
1✔
527
            filename = f"ipv6_{ip.packed.hex()}"
×
528
    return os.path.join(config.path, 'certs', filename)
1✔
529

530

531
class Interface(Logger):
1✔
532

533
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
534
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
535
        self.ready = network.asyncio_loop.create_future()
1✔
536
        self.got_disconnected = asyncio.Event()
1✔
537
        self._blockchain_updated = asyncio.Event()
1✔
538
        self.server = server
1✔
539
        Logger.__init__(self)
1✔
540
        assert network.config.path
1✔
541
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
542
        self.blockchain = None  # type: Optional[Blockchain]
1✔
543
        self._requested_chunks = set()  # type: Set[int]
1✔
544
        self.network = network
1✔
545
        self.session = None  # type: Optional[NotificationSession]
1✔
546
        self._ipaddr_bucket = None
1✔
547
        # Set up proxy.
548
        # - for servers running on localhost, the proxy is not used. If user runs their own server
549
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
550
        #   note: we could maybe relax this further and bypass the proxy for all private
551
        #         addresses...? e.g. 192.168.x.x
552
        if util.is_localhost(server.host):
1✔
553
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
554
            self.proxy = None
1✔
555
        else:
556
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
557

558
        # Latest block header and corresponding height, as claimed by the server.
559
        # Note that these values are updated before they are verified.
560
        # Especially during initial header sync, verification can take a long time.
561
        # Failing verification will get the interface closed.
562
        self.tip_header = None  # type: Optional[dict]
1✔
563
        self.tip = 0
1✔
564

565
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
566
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
567

568
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
569

570
        # Dump network messages (only for this interface).  Set at runtime from the console.
571
        self.debug = False
1✔
572

573
        self.taskgroup = OldTaskGroup()
1✔
574

575
        async def spawn_task():
1✔
576
            task = await self.network.taskgroup.spawn(self.run())
1✔
577
            task.set_name(f"interface::{str(server)}")
1✔
578
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
579

580
    @property
1✔
581
    def host(self):
1✔
582
        return self.server.host
1✔
583

584
    @property
1✔
585
    def port(self):
1✔
586
        return self.server.port
1✔
587

588
    @property
1✔
589
    def protocol(self):
1✔
590
        return self.server.protocol
1✔
591

592
    def diagnostic_name(self):
1✔
593
        return self.server.net_addr_str()
1✔
594

595
    def __str__(self):
1✔
596
        return f"<Interface {self.diagnostic_name()}>"
×
597

598
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
599
        """Given a CA enforcing SSL context, returns True if the connection
600
        can be established. Returns False if the server has a self-signed
601
        certificate but otherwise is okay. Any other failures raise.
602
        """
603
        try:
×
604
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
605
        except ConnectError as e:
×
606
            cause = e.__cause__
×
607
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
608
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
609
                    and cause.verify_code == 18):  # "self signed certificate"
610
                # Good. We will use this server as self-signed.
611
                return False
×
612
            # Not good. Cannot use this server.
613
            raise
×
614
        # Good. We will use this server as CA-signed.
615
        return True
×
616

617
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
618
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
619
        if ca_signed:
×
620
            if self._get_expected_fingerprint():
×
621
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
622
            with open(self.cert_path, 'w') as f:
×
623
                # empty file means this is CA signed, not self-signed
624
                f.write('')
×
625
        else:
626
            await self._save_certificate()
×
627

628
    def _is_saved_ssl_cert_available(self):
1✔
629
        if not os.path.exists(self.cert_path):
×
630
            return False
×
631
        with open(self.cert_path, 'r') as f:
×
632
            contents = f.read()
×
633
        if contents == '':  # CA signed
×
634
            if self._get_expected_fingerprint():
×
635
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
636
            return True
×
637
        # pinned self-signed cert
638
        try:
×
639
            b = pem.dePem(contents, 'CERTIFICATE')
×
640
        except SyntaxError as e:
×
641
            self.logger.info(f"error parsing already saved cert: {e}")
×
642
            raise ErrorParsingSSLCert(e) from e
×
643
        try:
×
644
            x = x509.X509(b)
×
645
        except Exception as e:
×
646
            self.logger.info(f"error parsing already saved cert: {e}")
×
647
            raise ErrorParsingSSLCert(e) from e
×
648
        try:
×
649
            x.check_date()
×
650
        except x509.CertificateError as e:
×
651
            self.logger.info(f"certificate has expired: {e}")
×
652
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
653
            return False
×
654
        self._verify_certificate_fingerprint(bytes(b))
×
655
        return True
×
656

657
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
658
        if self.protocol != 's':
1✔
659
            # using plaintext TCP
660
            return None
1✔
661

662
        # see if we already have cert for this server; or get it for the first time
663
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
664
        if not self._is_saved_ssl_cert_available():
×
665
            try:
×
666
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
667
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
668
                raise ErrorGettingSSLCertFromServer(e) from e
×
669
        # now we have a file saved in our certificate store
670
        siz = os.stat(self.cert_path).st_size
×
671
        if siz == 0:
×
672
            # CA signed cert
673
            sslc = ca_sslc
×
674
        else:
675
            # pinned self-signed cert
676
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
677
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
678
            #       We explicitly disable it as it breaks lots of servers.
679
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
680
            sslc.check_hostname = False
×
681
        return sslc
×
682

683
    def handle_disconnect(func):
1✔
684
        @functools.wraps(func)
1✔
685
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
686
            try:
1✔
687
                return await func(self, *args, **kwargs)
1✔
688
            except GracefulDisconnect as e:
×
689
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
690
            except aiorpcx.jsonrpc.RPCError as e:
×
691
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
692
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
693
            finally:
694
                self.got_disconnected.set()
1✔
695
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
696
                # in case the "with taskgroup" ctx mgr never got a chance to run:
697
                await self.taskgroup.cancel_remaining()
1✔
698
                await self.network.connection_down(self)
1✔
699
                # if was not 'ready' yet, schedule waiting coroutines:
700
                self.ready.cancel()
1✔
701
        return wrapper_func
1✔
702

703
    @ignore_exceptions  # do not kill network.taskgroup
1✔
704
    @log_exceptions
1✔
705
    @handle_disconnect
1✔
706
    async def run(self):
1✔
707
        try:
1✔
708
            ssl_context = await self._get_ssl_context()
1✔
709
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
710
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
711
            return
×
712
        try:
1✔
713
            await self.open_session(ssl_context=ssl_context)
1✔
714
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
715
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
716
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
717
                    and self.is_main_server() and not self.network.auto_connect):
718
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
719
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
720
            else:
721
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
722
            return
1✔
723

724
    def _mark_ready(self) -> None:
1✔
725
        if self.ready.cancelled():
1✔
726
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
727
        if self.ready.done():
1✔
728
            return
×
729

730
        assert self.tip_header
1✔
731
        chain = blockchain.check_header(self.tip_header)
1✔
732
        if not chain:
1✔
733
            self.blockchain = blockchain.get_best_chain()
1✔
734
        else:
735
            self.blockchain = chain
×
736
        assert self.blockchain is not None
1✔
737

738
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
739

740
        self.ready.set_result(1)
1✔
741

742
    def is_connected_and_ready(self) -> bool:
1✔
743
        return self.ready.done() and not self.got_disconnected.is_set()
×
744

745
    async def _save_certificate(self) -> None:
1✔
746
        if not os.path.exists(self.cert_path):
×
747
            # we may need to retry this a few times, in case the handshake hasn't completed
748
            for _ in range(10):
×
749
                dercert = await self._fetch_certificate()
×
750
                if dercert:
×
751
                    self.logger.info("succeeded in getting cert")
×
752
                    self._verify_certificate_fingerprint(dercert)
×
753
                    with open(self.cert_path, 'w') as f:
×
754
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
755
                        # workaround android bug
756
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
757
                        f.write(cert)
×
758
                        # even though close flushes, we can't fsync when closed.
759
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
760
                        # fsync writes to OS buffer to disk
761
                        f.flush()
×
762
                        os.fsync(f.fileno())
×
763
                    break
×
764
                await asyncio.sleep(1)
×
765
            else:
766
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
767

768
    async def _fetch_certificate(self) -> bytes:
1✔
769
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
770
        sslc.check_hostname = False
×
771
        sslc.verify_mode = ssl.CERT_NONE
×
772
        async with _RSClient(
×
773
            session_factory=RPCSession,
774
            host=self.host, port=self.port,
775
            ssl=sslc,
776
            proxy=self.proxy,
777
            transport=PaddedRSTransport,
778
        ) as session:
779
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
780
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
781
            return ssl_object.getpeercert(binary_form=True)
×
782

783
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
784
        if self.is_main_server():
×
785
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
786
        return None
×
787

788
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
789
        expected_fingerprint = self._get_expected_fingerprint()
×
790
        if not expected_fingerprint:
×
791
            return
×
792
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
793
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
794
        if not fingerprints_match:
×
795
            util.trigger_callback('cert_mismatch')
×
796
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
797
        self.logger.info("cert fingerprint verification passed")
×
798

799
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
800
        """Populate header cache for block heights in range [from_height, to_height]."""
801
        assert from_height <= to_height, (from_height, to_height)
1✔
802
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
803
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
804
            # cache already has all requested headers
805
            return
1✔
806
        # use lower timeout as we usually have network.bhi_lock here
807
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
808
        count = to_height - from_height + 1
1✔
809
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
810
        for idx, raw_header in enumerate(headers):
1✔
811
            header_height = from_height + idx
1✔
812
            self._headers_cache[header_height] = raw_header
1✔
813

814
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
815
        if not is_non_negative_integer(height):
1✔
816
            raise Exception(f"{repr(height)} is not a block height")
×
817
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
818
        # use lower timeout as we usually have network.bhi_lock here
819
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
820
        if raw_header := self._headers_cache.get(height):
1✔
821
            return blockchain.deserialize_header(raw_header, height)
1✔
822
        self.logger.info(f'requesting block header {height} in {mode=}')
×
823
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
824
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
825

826
    async def get_block_headers(
1✔
827
        self,
828
        *,
829
        start_height: int,
830
        count: int,
831
        timeout=None,
832
        mode: Optional[ChainResolutionMode] = None,
833
    ) -> Sequence[bytes]:
834
        """Request a number of consecutive block headers, starting at `start_height`.
835
        `count` is the num of requested headers, BUT note the server might return fewer than this
836
        (if range would extend beyond its tip).
837
        note: the returned headers are not verified or parsed at all.
838
        """
839
        if not is_non_negative_integer(start_height):
1✔
840
            raise Exception(f"{repr(start_height)} is not a block height")
×
841
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
842
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
843
        self.logger.info(
1✔
844
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
845
            + (f" (in {mode=})" if mode is not None else "")
846
        )
847
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
848
        # check response
849
        assert_dict_contains_field(res, field_name='count')
1✔
850
        assert_dict_contains_field(res, field_name='hex')
1✔
851
        assert_dict_contains_field(res, field_name='max')
1✔
852
        assert_non_negative_integer(res['count'])
1✔
853
        assert_non_negative_integer(res['max'])
1✔
854
        assert_hex_str(res['hex'])
1✔
855
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
1✔
856
            raise RequestCorrupted('inconsistent chunk hex and count')
×
857
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
858
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
859
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
860
        if res['count'] > count:
1✔
861
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
862
        elif res['count'] < count:
1✔
863
            # we only tolerate getting fewer headers if it is due to reaching the tip
864
            end_height = start_height + res['count'] - 1
×
865
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
866
                raise RequestCorrupted(
×
867
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
868
        # checks done.
869
        headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
1✔
870
        return headers
1✔
871

872
    async def request_chunk_below_max_checkpoint(
1✔
873
        self,
874
        *,
875
        height: int,
876
    ) -> None:
877
        if not is_non_negative_integer(height):
×
878
            raise Exception(f"{repr(height)} is not a block height")
×
879
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
880
        index = height // CHUNK_SIZE
×
881
        if index in self._requested_chunks:
×
882
            return None
×
883
        self.logger.debug(f"requesting chunk from height {height}")
×
884
        try:
×
885
            self._requested_chunks.add(index)
×
886
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
887
        finally:
888
            self._requested_chunks.discard(index)
×
889
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
890
        if not conn:
×
891
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
892
        return None
×
893

894
    async def _fast_forward_chain(
1✔
895
        self,
896
        *,
897
        height: int,  # usually local chain tip + 1
898
        tip: int,  # server tip. we should not request past this.
899
    ) -> int:
900
        """Request some headers starting at `height` to grow the blockchain of this interface.
901
        Returns number of headers we managed to connect, starting at `height`.
902
        """
903
        if not is_non_negative_integer(height):
×
904
            raise Exception(f"{repr(height)} is not a block height")
×
905
        if not is_non_negative_integer(tip):
×
906
            raise Exception(f"{repr(tip)} is not a block height")
×
907
        if not (height > constants.net.max_checkpoint()
×
908
                or height == 0 == constants.net.max_checkpoint()):
909
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
910
        assert height <= tip, f"{height=} must be <= {tip=}"
×
911
        # Request a few chunks of headers concurrently.
912
        # tradeoffs:
913
        # - more chunks: higher memory requirements
914
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
915
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
916
        async with OldTaskGroup() as group:
×
917
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
918
            index0 = height // CHUNK_SIZE
×
919
            for chunk_cnt in range(10):
×
920
                index = index0 + chunk_cnt
×
921
                start_height = index * CHUNK_SIZE
×
922
                if start_height > tip:
×
923
                    break
×
924
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
925
                size = end_height - start_height + 1
×
926
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
927
        # try to connect chunks
928
        num_headers = 0
×
929
        for index, task in tasks:
×
930
            headers = task.result()
×
931
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
932
            if not conn:
×
933
                break
×
934
            num_headers += len(headers)
×
935
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
936
        offset = height - index0 * CHUNK_SIZE
×
937
        return max(0, num_headers - offset)
×
938

939
    def is_main_server(self) -> bool:
1✔
940
        return (self.network.interface == self or
1✔
941
                self.network.interface is None and self.network.default_server == self.server)
942

943
    async def open_session(
1✔
944
        self,
945
        *,
946
        ssl_context: Optional[ssl.SSLContext],
947
        exit_early: bool = False,
948
    ):
949
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
950
        async with _RSClient(
1✔
951
            session_factory=session_factory,
952
            host=self.host, port=self.port,
953
            ssl=ssl_context,
954
            proxy=self.proxy,
955
            transport=PaddedRSTransport,
956
        ) as session:
957
            start = time.perf_counter()
1✔
958
            self.session = session  # type: NotificationSession
1✔
959
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
960
            try:
1✔
961
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
1✔
962
            except aiorpcx.jsonrpc.RPCError as e:
×
963
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
964
            if exit_early:
1✔
965
                return
×
966
            if ver[1] != version.PROTOCOL_VERSION:
1✔
967
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
968
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
969
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
970
                raise GracefulDisconnect(f'too many connected servers already '
×
971
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
972

973
            try:
1✔
974
                features = await session.send_request('server.features')
1✔
975
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
NEW
976
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
NEW
977
                raise GracefulDisconnect(e)
×
978
            if server_genesis_hash != constants.net.GENESIS:
1✔
NEW
979
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
980
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
981

982
            try:
1✔
983
                async with self.taskgroup as group:
1✔
984
                    await group.spawn(self.ping)
1✔
985
                    await group.spawn(self.request_fee_estimates)
1✔
986
                    await group.spawn(self.run_fetch_blocks)
1✔
987
                    await group.spawn(self.monitor_connection)
1✔
988
            except aiorpcx.jsonrpc.RPCError as e:
1✔
989
                if e.code in (
×
990
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
991
                    JSONRPC.SERVER_BUSY,
992
                    JSONRPC.METHOD_NOT_FOUND,
993
                    JSONRPC.INTERNAL_ERROR,
994
                ):
995
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
996
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
997
                raise
×
998
            finally:
999
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1000

1001
    async def monitor_connection(self):
1✔
1002
        while True:
1✔
1003
            await asyncio.sleep(1)
1✔
1004
            # If the session/transport is no longer open, we disconnect.
1005
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1006
            # note: If the user pulls the ethernet cable or disconnects wifi,
1007
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1008
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1009
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1010
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1011
            #         Hence, in practice the connection issue will only be detected the next time we try
1012
            #         to send a message (plus timeout), which can take minutes...
1013
            if not self.session or self.session.is_closing():
×
1014
                raise GracefulDisconnect('session was closed')
×
1015

1016
    async def ping(self):
1✔
1017
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1018
        # Adding a bit of randomness generates some noise against traffic analysis.
1019
        while True:
1✔
1020
            await asyncio.sleep(random.random() * 300)
1✔
1021
            await self.session.send_request('server.ping')
×
1022
            await self._maybe_send_noise()
×
1023

1024
    async def _maybe_send_noise(self):
1✔
1025
        while random.random() < 0.2:
1✔
1026
            await asyncio.sleep(random.random())
×
1027
            await self.session.send_request('server.ping')
×
1028

1029
    async def request_fee_estimates(self):
1✔
1030
        while True:
1✔
1031
            async with OldTaskGroup() as group:
1✔
1032
                fee_tasks = []
1✔
1033
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1034
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1035
            for nblock_target, task in fee_tasks:
1✔
1036
                fee = task.result()
1✔
1037
                if fee < 0: continue
1✔
1038
                assert isinstance(fee, int)
1✔
1039
                self.fee_estimates_eta[nblock_target] = fee
1✔
1040
            self.network.update_fee_estimates()
1✔
1041
            await asyncio.sleep(60)
1✔
1042

1043
    async def close(self, *, force_after: int = None):
1✔
1044
        """Closes the connection and waits for it to be closed.
1045
        We try to flush buffered data to the wire, which can take some time.
1046
        """
1047
        if self.session:
1✔
1048
            await self.session.close(force_after=force_after)
1✔
1049
        # monitor_connection will cancel tasks
1050

1051
    async def run_fetch_blocks(self):
1✔
1052
        header_queue = asyncio.Queue()
1✔
1053
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1054
        while True:
1✔
1055
            item = await header_queue.get()
1✔
1056
            raw_header = item[0]
1✔
1057
            height = raw_header['height']
1✔
1058
            header_bytes = bfh(raw_header['hex'])
1✔
1059
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1060
            self.tip_header = header_dict
1✔
1061
            self.tip = height
1✔
1062
            if self.tip < constants.net.max_checkpoint():
1✔
1063
                raise GracefulDisconnect(
×
1064
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1065
            self._mark_ready()
1✔
1066
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1067
            self._headers_cache[height] = header_bytes
1✔
1068
            try:
1✔
1069
                blockchain_updated = await self._process_header_at_tip()
1✔
1070
            finally:
1071
                self._headers_cache.clear()  # to reduce memory usage
1✔
1072
            # header processing done
1073
            if self.is_main_server() or blockchain_updated:
1✔
1074
                self.logger.info(f"new chain tip. {height=}")
1✔
1075
            if blockchain_updated:
1✔
1076
                util.trigger_callback('blockchain_updated')
1✔
1077
                self._blockchain_updated.set()
1✔
1078
                self._blockchain_updated.clear()
1✔
1079
            util.trigger_callback('network_updated')
1✔
1080
            await self.network.switch_unwanted_fork_interface()
1✔
1081
            await self.network.switch_lagging_interface()
1✔
1082
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1083

1084
    async def _process_header_at_tip(self) -> bool:
1✔
1085
        """Returns:
1086
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1087
        True - new header we didn't have, or reorg
1088
        """
1089
        height, header = self.tip, self.tip_header
1✔
1090
        async with self.network.bhi_lock:
1✔
1091
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1092
                # another interface amended the blockchain
1093
                return False
×
1094
            await self.sync_until(height)
1✔
1095
            return True
1✔
1096

1097
    async def sync_until(
1✔
1098
        self,
1099
        height: int,
1100
        *,
1101
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1102
    ) -> Tuple[ChainResolutionMode, int]:
1103
        if next_height is None:
1✔
1104
            next_height = self.tip
1✔
1105
        last = None  # type: Optional[ChainResolutionMode]
1✔
1106
        while last is None or height <= next_height:
1✔
1107
            prev_last, prev_height = last, height
1✔
1108
            if next_height > height + 144:
1✔
1109
                # We are far from the tip.
1110
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1111
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1112
                num_headers = await self._fast_forward_chain(
×
1113
                    height=height, tip=next_height)
1114
                if num_headers == 0:
×
1115
                    if height <= constants.net.max_checkpoint():
×
1116
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1117
                    last, height = await self.step(height)
×
1118
                    continue
×
1119
                # report progress to gui/etc
1120
                util.trigger_callback('blockchain_updated')
×
1121
                self._blockchain_updated.set()
×
1122
                self._blockchain_updated.clear()
×
1123
                util.trigger_callback('network_updated')
×
1124
                height += num_headers
×
1125
                assert height <= next_height+1, (height, self.tip)
×
1126
                last = ChainResolutionMode.CATCHUP
×
1127
            else:
1128
                # We are close to the tip, so process headers one-by-one.
1129
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1130
                last, height = await self.step(height)
1✔
1131
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1132
        return last, height
1✔
1133

1134
    async def step(
1✔
1135
        self,
1136
        height: int,
1137
    ) -> Tuple[ChainResolutionMode, int]:
1138
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1139
        await self._maybe_warm_headers_cache(
1✔
1140
            from_height=height,
1141
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1142
            mode=ChainResolutionMode.CATCHUP,
1143
        )
1144
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1145

1146
        chain = blockchain.check_header(header)
1✔
1147
        if chain:
1✔
1148
            self.blockchain = chain
1✔
1149
            # note: there is an edge case here that is not handled.
1150
            # we might know the blockhash (enough for check_header) but
1151
            # not have the header itself. e.g. regtest chain with only genesis.
1152
            # this situation resolves itself on the next block
1153
            return ChainResolutionMode.CATCHUP, height+1
1✔
1154

1155
        can_connect = blockchain.can_connect(header)
1✔
1156
        if not can_connect:
1✔
1157
            self.logger.info(f"can't connect new block: {height=}")
1✔
1158
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1159
            chain = blockchain.check_header(header)
1✔
1160
            can_connect = blockchain.can_connect(header)
1✔
1161
            assert chain or can_connect
1✔
1162
        if can_connect:
1✔
1163
            height += 1
1✔
1164
            self.blockchain = can_connect
1✔
1165
            self.blockchain.save_header(header)
1✔
1166
            return ChainResolutionMode.CATCHUP, height
1✔
1167

1168
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1169
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1170

1171
    async def _search_headers_binary(
1✔
1172
        self,
1173
        height: int,
1174
        bad: int,
1175
        bad_header: dict,
1176
        chain: Optional[Blockchain],
1177
    ) -> Tuple[int, int, dict]:
1178
        assert bad == bad_header['block_height']
1✔
1179
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1180

1181
        self.blockchain = chain
1✔
1182
        good = height
1✔
1183
        while True:
1✔
1184
            assert 0 <= good < bad, (good, bad)
1✔
1185
            height = (good + bad) // 2
1✔
1186
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1187
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1188
                await self._maybe_warm_headers_cache(
1✔
1189
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1190
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1191
            chain = blockchain.check_header(header)
1✔
1192
            if chain:
1✔
1193
                self.blockchain = chain
1✔
1194
                good = height
1✔
1195
            else:
1196
                bad = height
1✔
1197
                bad_header = header
1✔
1198
            if good + 1 == bad:
1✔
1199
                break
1✔
1200

1201
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1202
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1203
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1204

1205
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1206
        return good, bad, bad_header
1✔
1207

1208
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1209
        self,
1210
        good: int,
1211
        bad: int,
1212
        bad_header: dict,
1213
    ) -> Tuple[ChainResolutionMode, int]:
1214
        assert good + 1 == bad
1✔
1215
        assert bad == bad_header['block_height']
1✔
1216
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1217
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1218
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1219

1220
        bh = self.blockchain.height()
1✔
1221
        assert bh >= good, (bh, good)
1✔
1222
        if bh == good:
1✔
1223
            height = good + 1
1✔
1224
            self.logger.info(f"catching up from {height}")
1✔
1225
            return ChainResolutionMode.NO_FORK, height
1✔
1226

1227
        # this is a new fork we don't yet have
1228
        height = bad + 1
1✔
1229
        self.logger.info(f"new fork at bad height {bad}")
1✔
1230
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1231
        self.blockchain = b
1✔
1232
        assert b.forkpoint == bad
1✔
1233
        return ChainResolutionMode.FORK, height
1✔
1234

1235
    async def _search_headers_backwards(
1✔
1236
        self,
1237
        height: int,
1238
        *,
1239
        header: dict,
1240
    ) -> Tuple[int, dict, int, dict]:
1241
        async def iterate():
1✔
1242
            nonlocal height, header
1243
            checkp = False
1✔
1244
            if height <= constants.net.max_checkpoint():
1✔
1245
                height = constants.net.max_checkpoint()
1✔
1246
                checkp = True
1✔
1247
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1248
            chain = blockchain.check_header(header)
1✔
1249
            can_connect = blockchain.can_connect(header)
1✔
1250
            if chain or can_connect:
1✔
1251
                return False
1✔
1252
            if checkp:
1✔
1253
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1254
            return True
1✔
1255

1256
        bad, bad_header = height, header
1✔
1257
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1258
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1259
        local_max = max([0] + [x.height() for x in chains])
1✔
1260
        height = min(local_max + 1, height - 1)
1✔
1261
        assert height >= 0
1✔
1262

1263
        await self._maybe_warm_headers_cache(
1✔
1264
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1265

1266
        delta = 2
1✔
1267
        while await iterate():
1✔
1268
            bad, bad_header = height, header
1✔
1269
            height -= delta
1✔
1270
            delta *= 2
1✔
1271

1272
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1273
        self.logger.info(f"exiting backward mode at {height}")
1✔
1274
        return height, header, bad, bad_header
1✔
1275

1276
    @classmethod
1✔
1277
    def client_name(cls) -> str:
1✔
1278
        return f'electrum/{version.ELECTRUM_VERSION}'
1✔
1279

1280
    def is_tor(self):
1✔
1281
        return self.host.endswith('.onion')
×
1282

1283
    def ip_addr(self) -> Optional[str]:
1✔
1284
        session = self.session
×
1285
        if not session: return None
×
1286
        peer_addr = session.remote_address()
×
1287
        if not peer_addr: return None
×
1288
        return str(peer_addr.host)
×
1289

1290
    def bucket_based_on_ipaddress(self) -> str:
1✔
1291
        def do_bucket():
×
1292
            if self.is_tor():
×
1293
                return BUCKET_NAME_OF_ONION_SERVERS
×
1294
            try:
×
1295
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1296
            except ValueError:
×
1297
                return ''
×
1298
            if not ip_addr:
×
1299
                return ''
×
1300
            if ip_addr.is_loopback:  # localhost is exempt
×
1301
                return ''
×
1302
            if ip_addr.version == 4:
×
1303
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1304
                return str(slash16)
×
1305
            elif ip_addr.version == 6:
×
1306
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1307
                return str(slash48)
×
1308
            return ''
×
1309

1310
        if not self._ipaddr_bucket:
×
1311
            self._ipaddr_bucket = do_bucket()
×
1312
        return self._ipaddr_bucket
×
1313

1314
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1315
        if not is_hash256_str(tx_hash):
×
1316
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1317
        if not is_non_negative_integer(tx_height):
×
1318
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1319
        # do request
1320
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1321
        # check response
1322
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1323
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1324
        pos = assert_dict_contains_field(res, field_name='pos')
×
1325
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1326
        assert_non_negative_integer(block_height)
×
1327
        assert_non_negative_integer(pos)
×
1328
        assert_list_or_tuple(merkle)
×
1329
        for item in merkle:
×
1330
            assert_hash256_str(item)
×
1331
        return res
×
1332

1333
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1334
        if not is_hash256_str(tx_hash):
1✔
1335
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1336
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1337
            return rawtx_bytes.hex()
1✔
1338
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1339
        # validate response
1340
        if not is_hex_str(raw):
1✔
1341
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1342
        tx = Transaction(raw)
1✔
1343
        try:
1✔
1344
            tx.deserialize()  # see if raises
1✔
1345
        except Exception as e:
×
1346
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1347
        if tx.txid() != tx_hash:
1✔
1348
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1349
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1350
        return raw
1✔
1351

1352
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1353
        """caller should handle TxBroadcastError and RequestTimedOut"""
1354
        txid_calc = tx.txid()
1✔
1355
        assert txid_calc is not None
1✔
1356
        rawtx = tx.serialize()
1✔
1357
        assert is_hex_str(rawtx)
1✔
1358
        if timeout is None:
1✔
1359
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1360
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1361
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1362
        try:
1✔
1363
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1364
            # note: both 'out' and exception messages are untrusted input from the server
1365
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1366
            raise  # pass-through
×
1367
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1368
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1369
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1370
        except BaseException as e:  # intentional BaseException for sanity!
×
1371
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1372
            send_exception_to_crash_reporter(e)
×
1373
            raise TxBroadcastUnknownError() from e
×
1374
        if out != txid_calc:
1✔
1375
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1376
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1377
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1378
        # broadcast succeeded.
1379
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1380
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1381
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1382

1383
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1384
        if not is_hash256_str(sh):
×
1385
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1386
        # do request
1387
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1388
        # check response
1389
        assert_list_or_tuple(res)
×
1390
        prev_height = 1
×
1391
        for tx_item in res:
×
1392
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1393
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1394
            assert_integer(height)
×
1395
            assert_hash256_str(tx_item['tx_hash'])
×
1396
            if height in (-1, 0):
×
1397
                assert_dict_contains_field(tx_item, field_name='fee')
×
1398
                assert_non_negative_integer(tx_item['fee'])
×
1399
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1400
            else:
1401
                # check monotonicity of heights
1402
                if height < prev_height:
×
1403
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1404
                prev_height = height
×
1405
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1406
        if len(hashes) != len(res):
×
1407
            # Either server is sending garbage... or maybe if server is race-prone
1408
            # a recently mined tx could be included in both last block and mempool?
1409
            # Still, it's simplest to just disregard the response.
1410
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1411
        return res
×
1412

1413
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1414
        if not is_hash256_str(sh):
×
1415
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1416
        # do request
1417
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1418
        # check response
1419
        assert_list_or_tuple(res)
×
1420
        for utxo_item in res:
×
1421
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1422
            assert_dict_contains_field(utxo_item, field_name='value')
×
1423
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1424
            assert_dict_contains_field(utxo_item, field_name='height')
×
1425
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1426
            assert_non_negative_integer(utxo_item['value'])
×
1427
            assert_non_negative_integer(utxo_item['height'])
×
1428
            assert_hash256_str(utxo_item['tx_hash'])
×
1429
        return res
×
1430

1431
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1432
        if not is_hash256_str(sh):
×
1433
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1434
        # do request
1435
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1436
        # check response
1437
        assert_dict_contains_field(res, field_name='confirmed')
×
1438
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1439
        assert_non_negative_integer(res['confirmed'])
×
1440
        assert_integer(res['unconfirmed'])
×
1441
        return res
×
1442

1443
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1444
        if not is_non_negative_integer(tx_height):
×
1445
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1446
        if not is_non_negative_integer(tx_pos):
×
1447
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1448
        # do request
1449
        res = await self.session.send_request(
×
1450
            'blockchain.transaction.id_from_pos',
1451
            [tx_height, tx_pos, merkle],
1452
        )
1453
        # check response
1454
        if merkle:
×
1455
            assert_dict_contains_field(res, field_name='tx_hash')
×
1456
            assert_dict_contains_field(res, field_name='merkle')
×
1457
            assert_hash256_str(res['tx_hash'])
×
1458
            assert_list_or_tuple(res['merkle'])
×
1459
            for node_hash in res['merkle']:
×
1460
                assert_hash256_str(node_hash)
×
1461
        else:
1462
            assert_hash256_str(res)
×
1463
        return res
×
1464

1465
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1466
        # do request
1467
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1468
        # check response
1469
        assert_list_or_tuple(res)
×
1470
        prev_fee = float('inf')
×
1471
        for fee, s in res:
×
1472
            assert_non_negative_int_or_float(fee)
×
1473
            assert_non_negative_integer(s)
×
1474
            if fee >= prev_fee:  # check monotonicity
×
1475
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1476
            prev_fee = fee
×
1477
        return res
×
1478

1479
    async def get_server_banner(self) -> str:
1✔
1480
        # do request
1481
        res = await self.session.send_request('server.banner')
×
1482
        # check response
1483
        if not isinstance(res, str):
×
1484
            raise RequestCorrupted(f'{res!r} should be a str')
×
1485
        return res
×
1486

1487
    async def get_donation_address(self) -> str:
1✔
1488
        # do request
1489
        res = await self.session.send_request('server.donation_address')
×
1490
        # check response
1491
        if not res:  # ignore empty string
×
1492
            return ''
×
1493
        if not bitcoin.is_address(res):
×
1494
            # note: do not hard-fail -- allow server to use future-type
1495
            #       bitcoin address we do not recognize
1496
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1497
            res = ''
×
1498
        return res
×
1499

1500
    async def get_relay_fee(self) -> int:
1✔
1501
        """Returns the min relay feerate in sat/kbyte."""
1502
        # do request
1503
        res = await self.session.send_request('blockchain.relayfee')
×
1504
        # check response
1505
        assert_non_negative_int_or_float(res)
×
1506
        relayfee = int(res * bitcoin.COIN)
×
1507
        relayfee = max(0, relayfee)
×
1508
        return relayfee
×
1509

1510
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1511
        """Returns a feerate estimate for getting confirmed within
1512
        num_blocks blocks, in sat/kbyte.
1513
        Returns -1 if the server could not provide an estimate.
1514
        """
1515
        if not is_non_negative_integer(num_blocks):
1✔
1516
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1517
        # do request
1518
        try:
1✔
1519
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1520
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1521
            # The protocol spec says the server itself should already have returned -1
1522
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1523
            # and sends an error instead. Convert it here:
1524
            if "cannot estimate fee" in e.message:
×
1525
                res = -1
×
1526
            else:
1527
                raise
×
1528
        except aiorpcx.jsonrpc.RPCError as e:
×
1529
            # The protocol spec says the server itself should already have returned -1
1530
            # if it cannot provide an estimate. "Fulcrum" often sends:
1531
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1532
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1533
                res = -1
×
1534
            else:
1535
                raise
×
1536
        # check response
1537
        if res != -1:
1✔
1538
            assert_non_negative_int_or_float(res)
1✔
1539
            res = int(res * bitcoin.COIN)
1✔
1540
        return res
1✔
1541

1542

1543
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1544
    chain_bad = blockchain.check_header(header)
1✔
1545
    if chain_bad:
1✔
1546
        raise Exception('bad_header must not check!')
×
1547

1548

1549
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1550
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1551
    # So, we use substring matching to grok the error message.
1552
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1553
    server_msg = str(server_msg)
×
1554
    server_msg = server_msg.replace("\n", r"\n")
×
1555

1556
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1557
    script_error_messages = {
×
1558
        r"Script evaluated without error but finished with a false/empty top stack element",
1559
        r"Script failed an OP_VERIFY operation",
1560
        r"Script failed an OP_EQUALVERIFY operation",
1561
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1562
        r"Script failed an OP_CHECKSIGVERIFY operation",
1563
        r"Script failed an OP_NUMEQUALVERIFY operation",
1564
        r"Script is too big",
1565
        r"Push value size limit exceeded",
1566
        r"Operation limit exceeded",
1567
        r"Stack size limit exceeded",
1568
        r"Signature count negative or greater than pubkey count",
1569
        r"Pubkey count negative or limit exceeded",
1570
        r"Opcode missing or not understood",
1571
        r"Attempted to use a disabled opcode",
1572
        r"Operation not valid with the current stack size",
1573
        r"Operation not valid with the current altstack size",
1574
        r"OP_RETURN was encountered",
1575
        r"Invalid OP_IF construction",
1576
        r"Negative locktime",
1577
        r"Locktime requirement not satisfied",
1578
        r"Signature hash type missing or not understood",
1579
        r"Non-canonical DER signature",
1580
        r"Data push larger than necessary",
1581
        r"Only push operators allowed in signatures",
1582
        r"Non-canonical signature: S value is unnecessarily high",
1583
        r"Dummy CHECKMULTISIG argument must be zero",
1584
        r"OP_IF/NOTIF argument must be minimal",
1585
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1586
        r"NOPx reserved for soft-fork upgrades",
1587
        r"Witness version reserved for soft-fork upgrades",
1588
        r"Taproot version reserved for soft-fork upgrades",
1589
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1590
        r"Public key version reserved for soft-fork upgrades",
1591
        r"Public key is neither compressed or uncompressed",
1592
        r"Stack size must be exactly one after execution",
1593
        r"Extra items left on stack after execution",
1594
        r"Witness program has incorrect length",
1595
        r"Witness program was passed an empty witness",
1596
        r"Witness program hash mismatch",
1597
        r"Witness requires empty scriptSig",
1598
        r"Witness requires only-redeemscript scriptSig",
1599
        r"Witness provided for non-witness script",
1600
        r"Using non-compressed keys in segwit",
1601
        r"Invalid Schnorr signature size",
1602
        r"Invalid Schnorr signature hash type",
1603
        r"Invalid Schnorr signature",
1604
        r"Invalid Taproot control block size",
1605
        r"Too much signature validation relative to witness weight",
1606
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1607
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1608
        r"Using OP_CODESEPARATOR in non-witness script",
1609
        r"Signature is found in scriptCode",
1610
    }
1611
    for substring in script_error_messages:
×
1612
        if substring in server_msg:
×
1613
            return substring
×
1614
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1615
    # grep "REJECT_"
1616
    # grep "TxValidationResult"
1617
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1618
    validation_error_messages = {
×
1619
        r"coinbase": None,
1620
        r"tx-size-small": None,
1621
        r"non-final": None,
1622
        r"txn-already-in-mempool": None,
1623
        r"txn-mempool-conflict": None,
1624
        r"txn-already-known": None,
1625
        r"non-BIP68-final": None,
1626
        r"bad-txns-nonstandard-inputs": None,
1627
        r"bad-witness-nonstandard": None,
1628
        r"bad-txns-too-many-sigops": None,
1629
        r"mempool min fee not met":
1630
            ("mempool min fee not met\n" +
1631
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1632
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1633
               "of transactions that all pay higher fees. Try to increase the fee.")),
1634
        r"min relay fee not met": None,
1635
        r"absurdly-high-fee": None,
1636
        r"max-fee-exceeded": None,
1637
        r"too-long-mempool-chain": None,
1638
        r"bad-txns-spends-conflicting-tx": None,
1639
        r"insufficient fee": ("insufficient fee\n" +
1640
             _("Your transaction is trying to replace another one in the mempool but it "
1641
               "does not meet the rules to do so. Try to increase the fee.")),
1642
        r"too many potential replacements": None,
1643
        r"replacement-adds-unconfirmed": None,
1644
        r"mempool full": None,
1645
        r"non-mandatory-script-verify-flag": None,
1646
        r"mandatory-script-verify-flag-failed": None,
1647
        r"Transaction check failed": None,
1648
    }
1649
    for substring in validation_error_messages:
×
1650
        if substring in server_msg:
×
1651
            msg = validation_error_messages[substring]
×
1652
            return msg if msg else substring
×
1653
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1654
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1655
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1656
    # grep "RPC_TRANSACTION"
1657
    # grep "RPC_DESERIALIZATION_ERROR"
1658
    # grep "TransactionError"
1659
    rawtransaction_error_messages = {
×
1660
        r"Missing inputs": None,
1661
        r"Inputs missing or spent": None,
1662
        r"transaction already in block chain": None,
1663
        r"Transaction already in block chain": None,
1664
        r"Transaction outputs already in utxo set": None,
1665
        r"TX decode failed": None,
1666
        r"Peer-to-peer functionality missing or disabled": None,
1667
        r"Transaction rejected by AcceptToMemoryPool": None,
1668
        r"AcceptToMemoryPool failed": None,
1669
        r"Transaction rejected by mempool": None,
1670
        r"Mempool internal error": None,
1671
        r"Fee exceeds maximum configured by user": None,
1672
        r"Unspendable output exceeds maximum configured by user": None,
1673
        r"Transaction rejected due to invalid package": None,
1674
    }
1675
    for substring in rawtransaction_error_messages:
×
1676
        if substring in server_msg:
×
1677
            msg = rawtransaction_error_messages[substring]
×
1678
            return msg if msg else substring
×
1679
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1680
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1681
    # grep "REJECT_"
1682
    # grep "TxValidationResult"
1683
    tx_verify_error_messages = {
×
1684
        r"bad-txns-vin-empty": None,
1685
        r"bad-txns-vout-empty": None,
1686
        r"bad-txns-oversize": None,
1687
        r"bad-txns-vout-negative": None,
1688
        r"bad-txns-vout-toolarge": None,
1689
        r"bad-txns-txouttotal-toolarge": None,
1690
        r"bad-txns-inputs-duplicate": None,
1691
        r"bad-cb-length": None,
1692
        r"bad-txns-prevout-null": None,
1693
        r"bad-txns-inputs-missingorspent":
1694
            ("bad-txns-inputs-missingorspent\n" +
1695
             _("You might have a local transaction in your wallet that this transaction "
1696
               "builds on top. You need to either broadcast or remove the local tx.")),
1697
        r"bad-txns-premature-spend-of-coinbase": None,
1698
        r"bad-txns-inputvalues-outofrange": None,
1699
        r"bad-txns-in-belowout": None,
1700
        r"bad-txns-fee-outofrange": None,
1701
    }
1702
    for substring in tx_verify_error_messages:
×
1703
        if substring in server_msg:
×
1704
            msg = tx_verify_error_messages[substring]
×
1705
            return msg if msg else substring
×
1706
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1707
    # grep "reason ="
1708
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1709
    # should come after script_error.cpp (due to e.g. "version")
1710
    policy_error_messages = {
×
1711
        r"version": _("Transaction uses non-standard version."),
1712
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1713
        r"scriptsig-size": None,
1714
        r"scriptsig-not-pushonly": None,
1715
        r"scriptpubkey":
1716
            ("scriptpubkey\n" +
1717
             _("Some of the outputs pay to a non-standard script.")),
1718
        r"bare-multisig": None,
1719
        r"dust":
1720
            (_("Transaction could not be broadcast due to dust outputs.\n"
1721
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1722
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1723
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1724
    }
1725
    for substring in policy_error_messages:
×
1726
        if substring in server_msg:
×
1727
            msg = policy_error_messages[substring]
×
1728
            return msg if msg else substring
×
1729
    # otherwise:
1730
    return _("Unknown error")
×
1731

1732

1733
def check_cert(host, cert):
1✔
1734
    try:
×
1735
        b = pem.dePem(cert, 'CERTIFICATE')
×
1736
        x = x509.X509(b)
×
1737
    except Exception:
×
1738
        traceback.print_exc(file=sys.stdout)
×
1739
        return
×
1740

1741
    try:
×
1742
        x.check_date()
×
1743
        expired = False
×
1744
    except Exception:
×
1745
        expired = True
×
1746

1747
    m = "host: %s\n"%host
×
1748
    m += "has_expired: %s\n"% expired
×
1749
    util.print_msg(m)
×
1750

1751

1752
# Used by tests
1753
def _match_hostname(name, val):
1✔
1754
    if val == name:
×
1755
        return True
×
1756

1757
    return val.startswith('*.') and name.endswith(val[1:])
×
1758

1759

1760
def test_certificates():
1✔
1761
    from .simple_config import SimpleConfig
×
1762
    config = SimpleConfig()
×
1763
    mydir = os.path.join(config.path, "certs")
×
1764
    certs = os.listdir(mydir)
×
1765
    for c in certs:
×
1766
        p = os.path.join(mydir,c)
×
1767
        with open(p, encoding='utf-8') as f:
×
1768
            cert = f.read()
×
1769
        check_cert(c, cert)
×
1770

1771
if __name__ == "__main__":
1✔
1772
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc