• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5638339052699648

21 Oct 2025 04:57PM UTC coverage: 61.355% (-0.003%) from 61.358%
5638339052699648

push

CirrusCI

web-flow
Merge pull request #10275 from SomberNight/202510_android_desc

fastlane: revert full_description to plain text

22888 of 37304 relevant lines covered (61.36%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.61
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85

86
class NetworkTimeout:
1✔
87
    # seconds
88
    class Generic:
1✔
89
        NORMAL = 30
1✔
90
        RELAXED = 45
1✔
91
        MOST_RELAXED = 600
1✔
92

93
    class Urgent(Generic):
1✔
94
        NORMAL = 10
1✔
95
        RELAXED = 20
1✔
96
        MOST_RELAXED = 60
1✔
97

98

99
def assert_non_negative_integer(val: Any) -> None:
1✔
100
    if not is_non_negative_integer(val):
1✔
101
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
102

103

104
def assert_integer(val: Any) -> None:
1✔
105
    if not is_integer(val):
×
106
        raise RequestCorrupted(f'{val!r} should be an integer')
×
107

108

109
def assert_int_or_float(val: Any) -> None:
1✔
110
    if not is_int_or_float(val):
×
111
        raise RequestCorrupted(f'{val!r} should be int or float')
×
112

113

114
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
115
    if not is_non_negative_int_or_float(val):
1✔
116
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
117

118

119
def assert_hash256_str(val: Any) -> None:
1✔
120
    if not is_hash256_str(val):
×
121
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
122

123

124
def assert_hex_str(val: Any) -> None:
1✔
125
    if not is_hex_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
127

128

129
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
130
    if not isinstance(d, dict):
1✔
131
        raise RequestCorrupted(f'{d!r} should be a dict')
×
132
    if field_name not in d:
1✔
133
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
134
    return d[field_name]
1✔
135

136

137
def assert_list_or_tuple(val: Any) -> None:
1✔
138
    if not isinstance(val, (list, tuple)):
×
139
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
140

141

142
class ChainResolutionMode(enum.Enum):
1✔
143
    CATCHUP = enum.auto()
1✔
144
    BACKWARD = enum.auto()
1✔
145
    BINARY = enum.auto()
1✔
146
    FORK = enum.auto()
1✔
147
    NO_FORK = enum.auto()
1✔
148

149

150
class NotificationSession(RPCSession):
1✔
151

152
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
153
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
154
        self.subscriptions = defaultdict(list)
1✔
155
        self.cache = {}
1✔
156
        self._msg_counter = itertools.count(start=1)
1✔
157
        self.interface = interface
1✔
158
        self.taskgroup = interface.taskgroup
1✔
159
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
160

161
    async def handle_request(self, request):
1✔
162
        self.maybe_log(f"--> {request}")
×
163
        try:
×
164
            if isinstance(request, Notification):
×
165
                params, result = request.args[:-1], request.args[-1]
×
166
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
167
                if key in self.subscriptions:
×
168
                    self.cache[key] = result
×
169
                    for queue in self.subscriptions[key]:
×
170
                        await queue.put(request.args)
×
171
                else:
172
                    raise Exception(f'unexpected notification')
×
173
            else:
174
                raise Exception(f'unexpected request. not a notification')
×
175
        except Exception as e:
×
176
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
177
            await self.close()
×
178

179
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
180
        # note: semaphores/timeouts/backpressure etc are handled by
181
        # aiorpcx. the timeout arg here in most cases should not be set
182
        msg_id = next(self._msg_counter)
1✔
183
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
184
        try:
1✔
185
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
186
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
187
            response = await util.wait_for2(
1✔
188
                super().send_request(*args, **kwargs),
189
                timeout)
190
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
191
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
192
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
193
        except CodeMessageError as e:
1✔
194
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
195
            raise
1✔
196
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
197
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
198
            raise
×
199
        else:
200
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
201
            return response
1✔
202

203
    def set_default_timeout(self, timeout):
1✔
204
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
205
        self.sent_request_timeout = timeout
1✔
206
        assert hasattr(self, "max_send_delay")        # in base class
1✔
207
        self.max_send_delay = timeout
1✔
208

209
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
210
        # note: until the cache is written for the first time,
211
        # each 'subscribe' call might make a request on the network.
212
        key = self.get_hashable_key_for_rpc_call(method, params)
1✔
213
        self.subscriptions[key].append(queue)
1✔
214
        if key in self.cache:
1✔
215
            result = self.cache[key]
×
216
        else:
217
            result = await self.send_request(method, params)
1✔
218
            self.cache[key] = result
1✔
219
        await queue.put(params + [result])
1✔
220

221
    def unsubscribe(self, queue):
1✔
222
        """Unsubscribe a callback to free object references to enable GC."""
223
        # note: we can't unsubscribe from the server, so we keep receiving
224
        # subsequent notifications
225
        for v in self.subscriptions.values():
×
226
            if queue in v:
×
227
                v.remove(queue)
×
228

229
    @classmethod
1✔
230
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
231
        """Hashable index for subscriptions and cache"""
232
        return str(method) + repr(params)
1✔
233

234
    def maybe_log(self, msg: str) -> None:
1✔
235
        if not self.interface: return
1✔
236
        if self.interface.debug or self.interface.network.debug:
1✔
237
            self.interface.logger.debug(msg)
1✔
238

239
    def default_framer(self):
1✔
240
        # overridden so that max_size can be customized
241
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
242
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
243
        return NewlineFramer(max_size=max_size)
1✔
244

245
    async def close(self, *, force_after: int = None):
1✔
246
        """Closes the connection and waits for it to be closed.
247
        We try to flush buffered data to the wire, which can take some time.
248
        """
249
        if force_after is None:
1✔
250
            # We give up after a while and just abort the connection.
251
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
252
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
253
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
254
            #       wait until this timeout is triggered
255
            force_after = 1  # seconds
1✔
256
        await super().close(force_after=force_after)
1✔
257

258

259
class NetworkException(Exception): pass
1✔
260

261

262
class GracefulDisconnect(NetworkException):
1✔
263
    log_level = logging.INFO
1✔
264

265
    def __init__(self, *args, log_level=None, **kwargs):
1✔
266
        Exception.__init__(self, *args, **kwargs)
1✔
267
        if log_level is not None:
1✔
268
            self.log_level = log_level
×
269

270

271
class RequestTimedOut(GracefulDisconnect):
1✔
272
    def __str__(self):
1✔
273
        return _("Network request timed out.")
×
274

275

276
class RequestCorrupted(Exception): pass
1✔
277

278
class ErrorParsingSSLCert(Exception): pass
1✔
279
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
280
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
281
class InvalidOptionCombination(Exception): pass
1✔
282
class ConnectError(NetworkException): pass
1✔
283

284

285
class TxBroadcastError(NetworkException):
1✔
286
    def get_message_for_gui(self):
1✔
287
        raise NotImplementedError()
×
288

289

290
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
291
    def get_message_for_gui(self):
1✔
292
        return "{}\n{}\n\n{}" \
×
293
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
294
                    _("Consider trying to connect to a different server, or updating Electrum."),
295
                    str(self))
296

297

298
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
299
    def get_message_for_gui(self):
1✔
300
        return "{}\n{}\n\n{}" \
×
301
            .format(_("The server returned an error when broadcasting the transaction."),
302
                    _("Consider trying to connect to a different server, or updating Electrum."),
303
                    str(self))
304

305

306
class TxBroadcastUnknownError(TxBroadcastError):
1✔
307
    def get_message_for_gui(self):
1✔
308
        return "{}\n{}" \
×
309
            .format(_("Unknown error when broadcasting the transaction."),
310
                    _("Consider trying to connect to a different server, or updating Electrum."))
311

312

313
class _RSClient(RSClient):
1✔
314
    async def create_connection(self):
1✔
315
        try:
1✔
316
            return await super().create_connection()
1✔
317
        except OSError as e:
×
318
            # note: using "from e" here will set __cause__ of ConnectError
319
            raise ConnectError(e) from e
×
320

321

322
class PaddedRSTransport(RSTransport):
1✔
323
    """A raw socket transport that provides basic countermeasures against traffic analysis
324
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
325
    (it is assumed that a network observer does not see plaintext transport contents,
326
    due to it being wrapped e.g. in TLS)
327
    """
328

329
    MIN_PACKET_SIZE = 1024
1✔
330
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
331

332
    session: Optional['RPCSession']
1✔
333

334
    def __init__(self, *args, **kwargs):
1✔
335
        RSTransport.__init__(self, *args, **kwargs)
1✔
336
        self._sbuffer = bytearray()  # "send buffer"
1✔
337
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
338
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
339
        self._last_send = time.monotonic()
1✔
340
        self._force_send = False  # type: bool
1✔
341

342
    # note: this does not call super().write() but is a complete reimplementation
343
    async def write(self, message):
1✔
344
        await self._can_send.wait()
1✔
345
        if self.is_closing():
1✔
346
            return
×
347
        framed_message = self._framer.frame(message)
1✔
348
        self._sbuffer += framed_message
1✔
349
        self._sbuffer_has_data_evt.set()
1✔
350
        self._maybe_consume_sbuffer()
1✔
351

352
    def _maybe_consume_sbuffer(self) -> None:
1✔
353
        """Maybe take some data from sbuffer and send it on the wire."""
354
        if not self._can_send.is_set() or self.is_closing():
1✔
355
            return
×
356
        buf = self._sbuffer
1✔
357
        if not buf:
1✔
358
            return
1✔
359
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
360
        if not (
1✔
361
            self._force_send
362
            or len(buf) >= self.MIN_PACKET_SIZE
363
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
364
        ):
365
            return
×
366
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
367
        # either (1) pad length to next power of two, to create "lsize" packet:
368
        payload_lsize = len(buf)
1✔
369
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
370
        npad_lsize = total_lsize - payload_lsize
1✔
371
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
372
        # and create a packet with half that size ("ssize", s for small)
373
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
374
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
375
        if payload_ssize != -1:
1✔
376
            payload_ssize += 1  # for "\n" char
1✔
377
            npad_ssize = total_ssize - payload_ssize
1✔
378
        else:
379
            npad_ssize = float("inf")
×
380
        # decide between (1) and (2):
381
        if self._force_send or npad_lsize <= npad_ssize:
1✔
382
            # (1) create "lsize" packet: consume full buffer
383
            npad = npad_lsize
1✔
384
            p_idx = payload_lsize
1✔
385
        else:
386
            # (2) create "ssize" packet: consume some, but defer some for later
387
            npad = npad_ssize
×
388
            p_idx = payload_ssize
×
389
        # pad by adding spaces near end
390
        # self.session.maybe_log(
391
        #     f"PaddedRSTransport. calling low-level write(). "
392
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
393
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
394
        # )
395
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
396
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
397
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
398
        self._asyncio_transport.write(buf2)
1✔
399
        self._last_send = time.monotonic()
1✔
400
        del self._sbuffer[:p_idx]
1✔
401
        if not self._sbuffer:
1✔
402
            self._sbuffer_has_data_evt.clear()
1✔
403

404
    async def _poll_sbuffer(self):
1✔
405
        while not self.is_closing():
1✔
406
            await self._can_send.wait()
1✔
407
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
408
            self._maybe_consume_sbuffer()
1✔
409
            # If there is still data in the buffer, sleep until it would time out.
410
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
411
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
412
            if len(self._sbuffer) > 0:
1✔
413
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
414
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
415
                await asyncio.sleep(timeout_rel)
×
416

417
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
418
        super().connection_made(transport)
1✔
419
        if isinstance(self.session, NotificationSession):
1✔
420
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
421
            self._sbuffer_task = self.loop.create_task(coro)
1✔
422
        else:
423
            # This a short-lived "fetch_certificate"-type session.
424
            # No polling here, we always force-empty the buffer.
425
            self._force_send = True
×
426

427

428
class ServerAddr:
1✔
429

430
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
431
        assert isinstance(host, str), repr(host)
1✔
432
        if protocol is None:
1✔
433
            protocol = 's'
×
434
        if not host:
1✔
435
            raise ValueError('host must not be empty')
×
436
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
437
            host = host[1:-1]
1✔
438
        try:
1✔
439
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
440
        except Exception as e:
1✔
441
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
442
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
1✔
443
            raise ValueError(f"invalid network protocol: {protocol}")
×
444
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
445
        self.port = int(net_addr.port)
1✔
446
        self.protocol = protocol
1✔
447
        self._net_addr_str = str(net_addr)
1✔
448

449
    @classmethod
1✔
450
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
451
        """Constructs a ServerAddr or raises ValueError."""
452
        # host might be IPv6 address, hence do rsplit:
453
        host, port, protocol = str(s).rsplit(':', 2)
1✔
454
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
455

456
    @classmethod
1✔
457
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
458
        """Construct ServerAddr from str, guessing missing details.
459
        Does not raise - just returns None if guessing failed.
460
        Ongoing compatibility not guaranteed.
461
        """
462
        if not s:
1✔
463
            return None
×
464
        host = ""
1✔
465
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
466
            host_end = s.index("]")
1✔
467
            host = s[1:host_end]
1✔
468
            s = s[host_end+1:]
1✔
469
        items = str(s).rsplit(':', 2)
1✔
470
        if len(items) < 2:
1✔
471
            return None  # although maybe we could guess the port too?
1✔
472
        host = host or items[0]
1✔
473
        port = items[1]
1✔
474
        if len(items) >= 3:
1✔
475
            protocol = items[2]
1✔
476
        else:
477
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
478
        try:
1✔
479
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
480
        except ValueError:
1✔
481
            return None
1✔
482

483
    def to_friendly_name(self) -> str:
1✔
484
        # note: this method is closely linked to from_str_with_inference
485
        if self.protocol == 's':  # hide trailing ":s"
1✔
486
            return self.net_addr_str()
1✔
487
        return str(self)
1✔
488

489
    def __str__(self):
1✔
490
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
491

492
    def to_json(self) -> str:
1✔
493
        return str(self)
×
494

495
    def __repr__(self):
1✔
496
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
497

498
    def net_addr_str(self) -> str:
1✔
499
        return self._net_addr_str
1✔
500

501
    def __eq__(self, other):
1✔
502
        if not isinstance(other, ServerAddr):
1✔
503
            return False
×
504
        return (self.host == other.host
1✔
505
                and self.port == other.port
506
                and self.protocol == other.protocol)
507

508
    def __ne__(self, other):
1✔
509
        return not (self == other)
×
510

511
    def __hash__(self):
1✔
512
        return hash((self.host, self.port, self.protocol))
×
513

514

515
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
516
    filename = host
1✔
517
    try:
1✔
518
        ip = ip_address(host)
1✔
519
    except ValueError:
1✔
520
        pass
1✔
521
    else:
522
        if isinstance(ip, IPv6Address):
1✔
523
            filename = f"ipv6_{ip.packed.hex()}"
×
524
    return os.path.join(config.path, 'certs', filename)
1✔
525

526

527
class Interface(Logger):
1✔
528

529
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
530
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
531
        self.ready = network.asyncio_loop.create_future()
1✔
532
        self.got_disconnected = asyncio.Event()
1✔
533
        self._blockchain_updated = asyncio.Event()
1✔
534
        self.server = server
1✔
535
        Logger.__init__(self)
1✔
536
        assert network.config.path
1✔
537
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
538
        self.blockchain = None  # type: Optional[Blockchain]
1✔
539
        self._requested_chunks = set()  # type: Set[int]
1✔
540
        self.network = network
1✔
541
        self.session = None  # type: Optional[NotificationSession]
1✔
542
        self._ipaddr_bucket = None
1✔
543
        # Set up proxy.
544
        # - for servers running on localhost, the proxy is not used. If user runs their own server
545
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
546
        #   note: we could maybe relax this further and bypass the proxy for all private
547
        #         addresses...? e.g. 192.168.x.x
548
        if util.is_localhost(server.host):
1✔
549
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
550
            self.proxy = None
1✔
551
        else:
552
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
553

554
        # Latest block header and corresponding height, as claimed by the server.
555
        # Note that these values are updated before they are verified.
556
        # Especially during initial header sync, verification can take a long time.
557
        # Failing verification will get the interface closed.
558
        self.tip_header = None  # type: Optional[dict]
1✔
559
        self.tip = 0
1✔
560

561
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
562
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
563

564
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
565

566
        # Dump network messages (only for this interface).  Set at runtime from the console.
567
        self.debug = False
1✔
568

569
        self.taskgroup = OldTaskGroup()
1✔
570

571
        async def spawn_task():
1✔
572
            task = await self.network.taskgroup.spawn(self.run())
1✔
573
            task.set_name(f"interface::{str(server)}")
1✔
574
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
575

576
    @property
1✔
577
    def host(self):
1✔
578
        return self.server.host
1✔
579

580
    @property
1✔
581
    def port(self):
1✔
582
        return self.server.port
1✔
583

584
    @property
1✔
585
    def protocol(self):
1✔
586
        return self.server.protocol
1✔
587

588
    def diagnostic_name(self):
1✔
589
        return self.server.net_addr_str()
1✔
590

591
    def __str__(self):
1✔
592
        return f"<Interface {self.diagnostic_name()}>"
×
593

594
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
595
        """Given a CA enforcing SSL context, returns True if the connection
596
        can be established. Returns False if the server has a self-signed
597
        certificate but otherwise is okay. Any other failures raise.
598
        """
599
        try:
×
600
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
601
        except ConnectError as e:
×
602
            cause = e.__cause__
×
603
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
604
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
605
                    and cause.verify_code == 18):  # "self signed certificate"
606
                # Good. We will use this server as self-signed.
607
                return False
×
608
            # Not good. Cannot use this server.
609
            raise
×
610
        # Good. We will use this server as CA-signed.
611
        return True
×
612

613
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
614
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
615
        if ca_signed:
×
616
            if self._get_expected_fingerprint():
×
617
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
618
            with open(self.cert_path, 'w') as f:
×
619
                # empty file means this is CA signed, not self-signed
620
                f.write('')
×
621
        else:
622
            await self._save_certificate()
×
623

624
    def _is_saved_ssl_cert_available(self):
1✔
625
        if not os.path.exists(self.cert_path):
×
626
            return False
×
627
        with open(self.cert_path, 'r') as f:
×
628
            contents = f.read()
×
629
        if contents == '':  # CA signed
×
630
            if self._get_expected_fingerprint():
×
631
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
632
            return True
×
633
        # pinned self-signed cert
634
        try:
×
635
            b = pem.dePem(contents, 'CERTIFICATE')
×
636
        except SyntaxError as e:
×
637
            self.logger.info(f"error parsing already saved cert: {e}")
×
638
            raise ErrorParsingSSLCert(e) from e
×
639
        try:
×
640
            x = x509.X509(b)
×
641
        except Exception as e:
×
642
            self.logger.info(f"error parsing already saved cert: {e}")
×
643
            raise ErrorParsingSSLCert(e) from e
×
644
        try:
×
645
            x.check_date()
×
646
        except x509.CertificateError as e:
×
647
            self.logger.info(f"certificate has expired: {e}")
×
648
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
649
            return False
×
650
        self._verify_certificate_fingerprint(bytes(b))
×
651
        return True
×
652

653
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
654
        if self.protocol != 's':
1✔
655
            # using plaintext TCP
656
            return None
1✔
657

658
        # see if we already have cert for this server; or get it for the first time
659
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
660
        if not self._is_saved_ssl_cert_available():
×
661
            try:
×
662
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
663
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
664
                raise ErrorGettingSSLCertFromServer(e) from e
×
665
        # now we have a file saved in our certificate store
666
        siz = os.stat(self.cert_path).st_size
×
667
        if siz == 0:
×
668
            # CA signed cert
669
            sslc = ca_sslc
×
670
        else:
671
            # pinned self-signed cert
672
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
673
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
674
            #       We explicitly disable it as it breaks lots of servers.
675
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
676
            sslc.check_hostname = False
×
677
        return sslc
×
678

679
    def handle_disconnect(func):
1✔
680
        @functools.wraps(func)
1✔
681
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
682
            try:
1✔
683
                return await func(self, *args, **kwargs)
1✔
684
            except GracefulDisconnect as e:
×
685
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
686
            except aiorpcx.jsonrpc.RPCError as e:
×
687
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
688
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
689
            finally:
690
                self.got_disconnected.set()
1✔
691
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
692
                # in case the "with taskgroup" ctx mgr never got a chance to run:
693
                await self.taskgroup.cancel_remaining()
1✔
694
                await self.network.connection_down(self)
1✔
695
                # if was not 'ready' yet, schedule waiting coroutines:
696
                self.ready.cancel()
1✔
697
        return wrapper_func
1✔
698

699
    @ignore_exceptions  # do not kill network.taskgroup
1✔
700
    @log_exceptions
1✔
701
    @handle_disconnect
1✔
702
    async def run(self):
1✔
703
        try:
1✔
704
            ssl_context = await self._get_ssl_context()
1✔
705
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
706
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
707
            return
×
708
        try:
1✔
709
            await self.open_session(ssl_context=ssl_context)
1✔
710
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
711
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
712
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
713
                    and self.is_main_server() and not self.network.auto_connect):
714
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
715
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
716
            else:
717
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
718
            return
1✔
719

720
    def _mark_ready(self) -> None:
1✔
721
        if self.ready.cancelled():
1✔
722
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
723
        if self.ready.done():
1✔
724
            return
×
725

726
        assert self.tip_header
1✔
727
        chain = blockchain.check_header(self.tip_header)
1✔
728
        if not chain:
1✔
729
            self.blockchain = blockchain.get_best_chain()
1✔
730
        else:
731
            self.blockchain = chain
×
732
        assert self.blockchain is not None
1✔
733

734
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
735

736
        self.ready.set_result(1)
1✔
737

738
    def is_connected_and_ready(self) -> bool:
1✔
739
        return self.ready.done() and not self.got_disconnected.is_set()
×
740

741
    async def _save_certificate(self) -> None:
1✔
742
        if not os.path.exists(self.cert_path):
×
743
            # we may need to retry this a few times, in case the handshake hasn't completed
744
            for _ in range(10):
×
745
                dercert = await self._fetch_certificate()
×
746
                if dercert:
×
747
                    self.logger.info("succeeded in getting cert")
×
748
                    self._verify_certificate_fingerprint(dercert)
×
749
                    with open(self.cert_path, 'w') as f:
×
750
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
751
                        # workaround android bug
752
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
753
                        f.write(cert)
×
754
                        # even though close flushes, we can't fsync when closed.
755
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
756
                        # fsync writes to OS buffer to disk
757
                        f.flush()
×
758
                        os.fsync(f.fileno())
×
759
                    break
×
760
                await asyncio.sleep(1)
×
761
            else:
762
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
763

764
    async def _fetch_certificate(self) -> bytes:
1✔
765
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
766
        sslc.check_hostname = False
×
767
        sslc.verify_mode = ssl.CERT_NONE
×
768
        async with _RSClient(
×
769
            session_factory=RPCSession,
770
            host=self.host, port=self.port,
771
            ssl=sslc,
772
            proxy=self.proxy,
773
            transport=PaddedRSTransport,
774
        ) as session:
775
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
776
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
777
            return ssl_object.getpeercert(binary_form=True)
×
778

779
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
780
        if self.is_main_server():
×
781
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
782
        return None
×
783

784
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
785
        expected_fingerprint = self._get_expected_fingerprint()
×
786
        if not expected_fingerprint:
×
787
            return
×
788
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
789
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
790
        if not fingerprints_match:
×
791
            util.trigger_callback('cert_mismatch')
×
792
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
793
        self.logger.info("cert fingerprint verification passed")
×
794

795
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
796
        """Populate header cache for block heights in range [from_height, to_height]."""
797
        assert from_height <= to_height, (from_height, to_height)
1✔
798
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
799
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
800
            # cache already has all requested headers
801
            return
1✔
802
        # use lower timeout as we usually have network.bhi_lock here
803
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
804
        count = to_height - from_height + 1
1✔
805
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
806
        for idx, raw_header in enumerate(headers):
1✔
807
            header_height = from_height + idx
1✔
808
            self._headers_cache[header_height] = raw_header
1✔
809

810
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
811
        if not is_non_negative_integer(height):
1✔
812
            raise Exception(f"{repr(height)} is not a block height")
×
813
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
814
        # use lower timeout as we usually have network.bhi_lock here
815
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
816
        if raw_header := self._headers_cache.get(height):
1✔
817
            return blockchain.deserialize_header(raw_header, height)
1✔
818
        self.logger.info(f'requesting block header {height} in {mode=}')
×
819
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
820
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
821

822
    async def get_block_headers(
1✔
823
        self,
824
        *,
825
        start_height: int,
826
        count: int,
827
        timeout=None,
828
        mode: Optional[ChainResolutionMode] = None,
829
    ) -> Sequence[bytes]:
830
        """Request a number of consecutive block headers, starting at `start_height`.
831
        `count` is the num of requested headers, BUT note the server might return fewer than this
832
        (if range would extend beyond its tip).
833
        note: the returned headers are not verified or parsed at all.
834
        """
835
        if not is_non_negative_integer(start_height):
1✔
836
            raise Exception(f"{repr(start_height)} is not a block height")
×
837
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
838
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
839
        self.logger.info(
1✔
840
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
841
            + (f" (in {mode=})" if mode is not None else "")
842
        )
843
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
844
        # check response
845
        assert_dict_contains_field(res, field_name='count')
1✔
846
        assert_dict_contains_field(res, field_name='hex')
1✔
847
        assert_dict_contains_field(res, field_name='max')
1✔
848
        assert_non_negative_integer(res['count'])
1✔
849
        assert_non_negative_integer(res['max'])
1✔
850
        assert_hex_str(res['hex'])
1✔
851
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
1✔
852
            raise RequestCorrupted('inconsistent chunk hex and count')
×
853
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
854
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
855
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
856
        if res['count'] > count:
1✔
857
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
858
        elif res['count'] < count:
1✔
859
            # we only tolerate getting fewer headers if it is due to reaching the tip
860
            end_height = start_height + res['count'] - 1
×
861
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
862
                raise RequestCorrupted(
×
863
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
864
        # checks done.
865
        headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
1✔
866
        return headers
1✔
867

868
    async def request_chunk_below_max_checkpoint(
1✔
869
        self,
870
        *,
871
        height: int,
872
    ) -> None:
873
        if not is_non_negative_integer(height):
×
874
            raise Exception(f"{repr(height)} is not a block height")
×
875
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
876
        index = height // CHUNK_SIZE
×
877
        if index in self._requested_chunks:
×
878
            return None
×
879
        self.logger.debug(f"requesting chunk from height {height}")
×
880
        try:
×
881
            self._requested_chunks.add(index)
×
882
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
883
        finally:
884
            self._requested_chunks.discard(index)
×
885
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
886
        if not conn:
×
887
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
888
        return None
×
889

890
    async def _fast_forward_chain(
1✔
891
        self,
892
        *,
893
        height: int,  # usually local chain tip + 1
894
        tip: int,  # server tip. we should not request past this.
895
    ) -> int:
896
        """Request some headers starting at `height` to grow the blockchain of this interface.
897
        Returns number of headers we managed to connect, starting at `height`.
898
        """
899
        if not is_non_negative_integer(height):
×
900
            raise Exception(f"{repr(height)} is not a block height")
×
901
        if not is_non_negative_integer(tip):
×
902
            raise Exception(f"{repr(tip)} is not a block height")
×
903
        if not (height > constants.net.max_checkpoint()
×
904
                or height == 0 == constants.net.max_checkpoint()):
905
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
906
        assert height <= tip, f"{height=} must be <= {tip=}"
×
907
        # Request a few chunks of headers concurrently.
908
        # tradeoffs:
909
        # - more chunks: higher memory requirements
910
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
911
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
912
        async with OldTaskGroup() as group:
×
913
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
914
            index0 = height // CHUNK_SIZE
×
915
            for chunk_cnt in range(10):
×
916
                index = index0 + chunk_cnt
×
917
                start_height = index * CHUNK_SIZE
×
918
                if start_height > tip:
×
919
                    break
×
920
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
921
                size = end_height - start_height + 1
×
922
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
923
        # try to connect chunks
924
        num_headers = 0
×
925
        for index, task in tasks:
×
926
            headers = task.result()
×
927
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
928
            if not conn:
×
929
                break
×
930
            num_headers += len(headers)
×
931
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
932
        offset = height - index0 * CHUNK_SIZE
×
933
        return max(0, num_headers - offset)
×
934

935
    def is_main_server(self) -> bool:
1✔
936
        return (self.network.interface == self or
1✔
937
                self.network.interface is None and self.network.default_server == self.server)
938

939
    async def open_session(
1✔
940
        self,
941
        *,
942
        ssl_context: Optional[ssl.SSLContext],
943
        exit_early: bool = False,
944
    ):
945
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
946
        async with _RSClient(
1✔
947
            session_factory=session_factory,
948
            host=self.host, port=self.port,
949
            ssl=ssl_context,
950
            proxy=self.proxy,
951
            transport=PaddedRSTransport,
952
        ) as session:
953
            self.session = session  # type: NotificationSession
1✔
954
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
955
            try:
1✔
956
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
1✔
957
            except aiorpcx.jsonrpc.RPCError as e:
×
958
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
959
            if exit_early:
1✔
960
                return
×
961
            if ver[1] != version.PROTOCOL_VERSION:
1✔
962
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
963
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
964
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
965
                raise GracefulDisconnect(f'too many connected servers already '
×
966
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
967
            self.logger.info(f"connection established. version: {ver}")
1✔
968

969
            try:
1✔
970
                async with self.taskgroup as group:
1✔
971
                    await group.spawn(self.ping)
1✔
972
                    await group.spawn(self.request_fee_estimates)
1✔
973
                    await group.spawn(self.run_fetch_blocks)
1✔
974
                    await group.spawn(self.monitor_connection)
1✔
975
            except aiorpcx.jsonrpc.RPCError as e:
1✔
976
                if e.code in (
×
977
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
978
                    JSONRPC.SERVER_BUSY,
979
                    JSONRPC.METHOD_NOT_FOUND,
980
                    JSONRPC.INTERNAL_ERROR,
981
                ):
982
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
983
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
984
                raise
×
985
            finally:
986
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
987

988
    async def monitor_connection(self):
1✔
989
        while True:
1✔
990
            await asyncio.sleep(1)
1✔
991
            # If the session/transport is no longer open, we disconnect.
992
            # e.g. if the remote cleanly sends EOF, we would handle that here.
993
            # note: If the user pulls the ethernet cable or disconnects wifi,
994
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
995
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
996
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
997
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
998
            #         Hence, in practice the connection issue will only be detected the next time we try
999
            #         to send a message (plus timeout), which can take minutes...
1000
            if not self.session or self.session.is_closing():
×
1001
                raise GracefulDisconnect('session was closed')
×
1002

1003
    async def ping(self):
1✔
1004
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1005
        # Adding a bit of randomness generates some noise against traffic analysis.
1006
        while True:
1✔
1007
            await asyncio.sleep(random.random() * 300)
1✔
1008
            await self.session.send_request('server.ping')
×
1009
            await self._maybe_send_noise()
×
1010

1011
    async def _maybe_send_noise(self):
1✔
1012
        while random.random() < 0.2:
1✔
1013
            await asyncio.sleep(random.random())
×
1014
            await self.session.send_request('server.ping')
×
1015

1016
    async def request_fee_estimates(self):
1✔
1017
        while True:
1✔
1018
            async with OldTaskGroup() as group:
1✔
1019
                fee_tasks = []
1✔
1020
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1021
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1022
            for nblock_target, task in fee_tasks:
1✔
1023
                fee = task.result()
1✔
1024
                if fee < 0: continue
1✔
1025
                assert isinstance(fee, int)
1✔
1026
                self.fee_estimates_eta[nblock_target] = fee
1✔
1027
            self.network.update_fee_estimates()
1✔
1028
            await asyncio.sleep(60)
1✔
1029

1030
    async def close(self, *, force_after: int = None):
1✔
1031
        """Closes the connection and waits for it to be closed.
1032
        We try to flush buffered data to the wire, which can take some time.
1033
        """
1034
        if self.session:
1✔
1035
            await self.session.close(force_after=force_after)
1✔
1036
        # monitor_connection will cancel tasks
1037

1038
    async def run_fetch_blocks(self):
1✔
1039
        header_queue = asyncio.Queue()
1✔
1040
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1041
        while True:
1✔
1042
            item = await header_queue.get()
1✔
1043
            raw_header = item[0]
1✔
1044
            height = raw_header['height']
1✔
1045
            header_bytes = bfh(raw_header['hex'])
1✔
1046
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1047
            self.tip_header = header_dict
1✔
1048
            self.tip = height
1✔
1049
            if self.tip < constants.net.max_checkpoint():
1✔
1050
                raise GracefulDisconnect(
×
1051
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1052
            self._mark_ready()
1✔
1053
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1054
            self._headers_cache[height] = header_bytes
1✔
1055
            try:
1✔
1056
                blockchain_updated = await self._process_header_at_tip()
1✔
1057
            finally:
1058
                self._headers_cache.clear()  # to reduce memory usage
1✔
1059
            # header processing done
1060
            if self.is_main_server() or blockchain_updated:
1✔
1061
                self.logger.info(f"new chain tip. {height=}")
1✔
1062
            if blockchain_updated:
1✔
1063
                util.trigger_callback('blockchain_updated')
1✔
1064
                self._blockchain_updated.set()
1✔
1065
                self._blockchain_updated.clear()
1✔
1066
            util.trigger_callback('network_updated')
1✔
1067
            await self.network.switch_unwanted_fork_interface()
1✔
1068
            await self.network.switch_lagging_interface()
1✔
1069
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1070

1071
    async def _process_header_at_tip(self) -> bool:
1✔
1072
        """Returns:
1073
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1074
        True - new header we didn't have, or reorg
1075
        """
1076
        height, header = self.tip, self.tip_header
1✔
1077
        async with self.network.bhi_lock:
1✔
1078
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1079
                # another interface amended the blockchain
1080
                return False
×
1081
            await self.sync_until(height)
1✔
1082
            return True
1✔
1083

1084
    async def sync_until(
1✔
1085
        self,
1086
        height: int,
1087
        *,
1088
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1089
    ) -> Tuple[ChainResolutionMode, int]:
1090
        if next_height is None:
1✔
1091
            next_height = self.tip
1✔
1092
        last = None  # type: Optional[ChainResolutionMode]
1✔
1093
        while last is None or height <= next_height:
1✔
1094
            prev_last, prev_height = last, height
1✔
1095
            if next_height > height + 144:
1✔
1096
                # We are far from the tip.
1097
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1098
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1099
                num_headers = await self._fast_forward_chain(
×
1100
                    height=height, tip=next_height)
1101
                if num_headers == 0:
×
1102
                    if height <= constants.net.max_checkpoint():
×
1103
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1104
                    last, height = await self.step(height)
×
1105
                    continue
×
1106
                # report progress to gui/etc
1107
                util.trigger_callback('blockchain_updated')
×
1108
                self._blockchain_updated.set()
×
1109
                self._blockchain_updated.clear()
×
1110
                util.trigger_callback('network_updated')
×
1111
                height += num_headers
×
1112
                assert height <= next_height+1, (height, self.tip)
×
1113
                last = ChainResolutionMode.CATCHUP
×
1114
            else:
1115
                # We are close to the tip, so process headers one-by-one.
1116
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1117
                last, height = await self.step(height)
1✔
1118
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1119
        return last, height
1✔
1120

1121
    async def step(
1✔
1122
        self,
1123
        height: int,
1124
    ) -> Tuple[ChainResolutionMode, int]:
1125
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1126
        await self._maybe_warm_headers_cache(
1✔
1127
            from_height=height,
1128
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1129
            mode=ChainResolutionMode.CATCHUP,
1130
        )
1131
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1132

1133
        chain = blockchain.check_header(header)
1✔
1134
        if chain:
1✔
1135
            self.blockchain = chain
1✔
1136
            # note: there is an edge case here that is not handled.
1137
            # we might know the blockhash (enough for check_header) but
1138
            # not have the header itself. e.g. regtest chain with only genesis.
1139
            # this situation resolves itself on the next block
1140
            return ChainResolutionMode.CATCHUP, height+1
1✔
1141

1142
        can_connect = blockchain.can_connect(header)
1✔
1143
        if not can_connect:
1✔
1144
            self.logger.info(f"can't connect new block: {height=}")
1✔
1145
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1146
            chain = blockchain.check_header(header)
1✔
1147
            can_connect = blockchain.can_connect(header)
1✔
1148
            assert chain or can_connect
1✔
1149
        if can_connect:
1✔
1150
            height += 1
1✔
1151
            self.blockchain = can_connect
1✔
1152
            self.blockchain.save_header(header)
1✔
1153
            return ChainResolutionMode.CATCHUP, height
1✔
1154

1155
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1156
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1157

1158
    async def _search_headers_binary(
1✔
1159
        self,
1160
        height: int,
1161
        bad: int,
1162
        bad_header: dict,
1163
        chain: Optional[Blockchain],
1164
    ) -> Tuple[int, int, dict]:
1165
        assert bad == bad_header['block_height']
1✔
1166
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1167

1168
        self.blockchain = chain
1✔
1169
        good = height
1✔
1170
        while True:
1✔
1171
            assert 0 <= good < bad, (good, bad)
1✔
1172
            height = (good + bad) // 2
1✔
1173
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1174
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1175
                await self._maybe_warm_headers_cache(
1✔
1176
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1177
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1178
            chain = blockchain.check_header(header)
1✔
1179
            if chain:
1✔
1180
                self.blockchain = chain
1✔
1181
                good = height
1✔
1182
            else:
1183
                bad = height
1✔
1184
                bad_header = header
1✔
1185
            if good + 1 == bad:
1✔
1186
                break
1✔
1187

1188
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1189
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1190
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1191

1192
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1193
        return good, bad, bad_header
1✔
1194

1195
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1196
        self,
1197
        good: int,
1198
        bad: int,
1199
        bad_header: dict,
1200
    ) -> Tuple[ChainResolutionMode, int]:
1201
        assert good + 1 == bad
1✔
1202
        assert bad == bad_header['block_height']
1✔
1203
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1204
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1205
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1206

1207
        bh = self.blockchain.height()
1✔
1208
        assert bh >= good, (bh, good)
1✔
1209
        if bh == good:
1✔
1210
            height = good + 1
1✔
1211
            self.logger.info(f"catching up from {height}")
1✔
1212
            return ChainResolutionMode.NO_FORK, height
1✔
1213

1214
        # this is a new fork we don't yet have
1215
        height = bad + 1
1✔
1216
        self.logger.info(f"new fork at bad height {bad}")
1✔
1217
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1218
        self.blockchain = b
1✔
1219
        assert b.forkpoint == bad
1✔
1220
        return ChainResolutionMode.FORK, height
1✔
1221

1222
    async def _search_headers_backwards(
1✔
1223
        self,
1224
        height: int,
1225
        *,
1226
        header: dict,
1227
    ) -> Tuple[int, dict, int, dict]:
1228
        async def iterate():
1✔
1229
            nonlocal height, header
1230
            checkp = False
1✔
1231
            if height <= constants.net.max_checkpoint():
1✔
1232
                height = constants.net.max_checkpoint()
1✔
1233
                checkp = True
1✔
1234
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1235
            chain = blockchain.check_header(header)
1✔
1236
            can_connect = blockchain.can_connect(header)
1✔
1237
            if chain or can_connect:
1✔
1238
                return False
1✔
1239
            if checkp:
1✔
1240
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1241
            return True
1✔
1242

1243
        bad, bad_header = height, header
1✔
1244
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1245
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1246
        local_max = max([0] + [x.height() for x in chains])
1✔
1247
        height = min(local_max + 1, height - 1)
1✔
1248
        assert height >= 0
1✔
1249

1250
        await self._maybe_warm_headers_cache(
1✔
1251
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1252

1253
        delta = 2
1✔
1254
        while await iterate():
1✔
1255
            bad, bad_header = height, header
1✔
1256
            height -= delta
1✔
1257
            delta *= 2
1✔
1258

1259
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1260
        self.logger.info(f"exiting backward mode at {height}")
1✔
1261
        return height, header, bad, bad_header
1✔
1262

1263
    @classmethod
1✔
1264
    def client_name(cls) -> str:
1✔
1265
        return f'electrum/{version.ELECTRUM_VERSION}'
1✔
1266

1267
    def is_tor(self):
1✔
1268
        return self.host.endswith('.onion')
×
1269

1270
    def ip_addr(self) -> Optional[str]:
1✔
1271
        session = self.session
×
1272
        if not session: return None
×
1273
        peer_addr = session.remote_address()
×
1274
        if not peer_addr: return None
×
1275
        return str(peer_addr.host)
×
1276

1277
    def bucket_based_on_ipaddress(self) -> str:
1✔
1278
        def do_bucket():
×
1279
            if self.is_tor():
×
1280
                return BUCKET_NAME_OF_ONION_SERVERS
×
1281
            try:
×
1282
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1283
            except ValueError:
×
1284
                return ''
×
1285
            if not ip_addr:
×
1286
                return ''
×
1287
            if ip_addr.is_loopback:  # localhost is exempt
×
1288
                return ''
×
1289
            if ip_addr.version == 4:
×
1290
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1291
                return str(slash16)
×
1292
            elif ip_addr.version == 6:
×
1293
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1294
                return str(slash48)
×
1295
            return ''
×
1296

1297
        if not self._ipaddr_bucket:
×
1298
            self._ipaddr_bucket = do_bucket()
×
1299
        return self._ipaddr_bucket
×
1300

1301
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1302
        if not is_hash256_str(tx_hash):
×
1303
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1304
        if not is_non_negative_integer(tx_height):
×
1305
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1306
        # do request
1307
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1308
        # check response
1309
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1310
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1311
        pos = assert_dict_contains_field(res, field_name='pos')
×
1312
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1313
        assert_non_negative_integer(block_height)
×
1314
        assert_non_negative_integer(pos)
×
1315
        assert_list_or_tuple(merkle)
×
1316
        for item in merkle:
×
1317
            assert_hash256_str(item)
×
1318
        return res
×
1319

1320
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1321
        if not is_hash256_str(tx_hash):
1✔
1322
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1323
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1324
            return rawtx_bytes.hex()
1✔
1325
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1326
        # validate response
1327
        if not is_hex_str(raw):
1✔
1328
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1329
        tx = Transaction(raw)
1✔
1330
        try:
1✔
1331
            tx.deserialize()  # see if raises
1✔
1332
        except Exception as e:
×
1333
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1334
        if tx.txid() != tx_hash:
1✔
1335
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1336
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1337
        return raw
1✔
1338

1339
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1340
        """caller should handle TxBroadcastError and RequestTimedOut"""
1341
        txid_calc = tx.txid()
1✔
1342
        assert txid_calc is not None
1✔
1343
        rawtx = tx.serialize()
1✔
1344
        assert is_hex_str(rawtx)
1✔
1345
        if timeout is None:
1✔
1346
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1347
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1348
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1349
        try:
1✔
1350
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1351
            # note: both 'out' and exception messages are untrusted input from the server
1352
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1353
            raise  # pass-through
×
1354
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1355
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1356
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1357
        except BaseException as e:  # intentional BaseException for sanity!
×
1358
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1359
            send_exception_to_crash_reporter(e)
×
1360
            raise TxBroadcastUnknownError() from e
×
1361
        if out != txid_calc:
1✔
1362
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1363
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1364
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1365
        # broadcast succeeded.
1366
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1367
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1368
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1369

1370
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1371
        if not is_hash256_str(sh):
×
1372
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1373
        # do request
1374
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1375
        # check response
1376
        assert_list_or_tuple(res)
×
1377
        prev_height = 1
×
1378
        for tx_item in res:
×
1379
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1380
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1381
            assert_integer(height)
×
1382
            assert_hash256_str(tx_item['tx_hash'])
×
1383
            if height in (-1, 0):
×
1384
                assert_dict_contains_field(tx_item, field_name='fee')
×
1385
                assert_non_negative_integer(tx_item['fee'])
×
1386
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1387
            else:
1388
                # check monotonicity of heights
1389
                if height < prev_height:
×
1390
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1391
                prev_height = height
×
1392
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1393
        if len(hashes) != len(res):
×
1394
            # Either server is sending garbage... or maybe if server is race-prone
1395
            # a recently mined tx could be included in both last block and mempool?
1396
            # Still, it's simplest to just disregard the response.
1397
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1398
        return res
×
1399

1400
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1401
        if not is_hash256_str(sh):
×
1402
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1403
        # do request
1404
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1405
        # check response
1406
        assert_list_or_tuple(res)
×
1407
        for utxo_item in res:
×
1408
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1409
            assert_dict_contains_field(utxo_item, field_name='value')
×
1410
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1411
            assert_dict_contains_field(utxo_item, field_name='height')
×
1412
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1413
            assert_non_negative_integer(utxo_item['value'])
×
1414
            assert_non_negative_integer(utxo_item['height'])
×
1415
            assert_hash256_str(utxo_item['tx_hash'])
×
1416
        return res
×
1417

1418
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1419
        if not is_hash256_str(sh):
×
1420
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1421
        # do request
1422
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1423
        # check response
1424
        assert_dict_contains_field(res, field_name='confirmed')
×
1425
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1426
        assert_non_negative_integer(res['confirmed'])
×
1427
        assert_integer(res['unconfirmed'])
×
1428
        return res
×
1429

1430
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1431
        if not is_non_negative_integer(tx_height):
×
1432
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1433
        if not is_non_negative_integer(tx_pos):
×
1434
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1435
        # do request
1436
        res = await self.session.send_request(
×
1437
            'blockchain.transaction.id_from_pos',
1438
            [tx_height, tx_pos, merkle],
1439
        )
1440
        # check response
1441
        if merkle:
×
1442
            assert_dict_contains_field(res, field_name='tx_hash')
×
1443
            assert_dict_contains_field(res, field_name='merkle')
×
1444
            assert_hash256_str(res['tx_hash'])
×
1445
            assert_list_or_tuple(res['merkle'])
×
1446
            for node_hash in res['merkle']:
×
1447
                assert_hash256_str(node_hash)
×
1448
        else:
1449
            assert_hash256_str(res)
×
1450
        return res
×
1451

1452
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1453
        # do request
1454
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1455
        # check response
1456
        assert_list_or_tuple(res)
×
1457
        prev_fee = float('inf')
×
1458
        for fee, s in res:
×
1459
            assert_non_negative_int_or_float(fee)
×
1460
            assert_non_negative_integer(s)
×
1461
            if fee >= prev_fee:  # check monotonicity
×
1462
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1463
            prev_fee = fee
×
1464
        return res
×
1465

1466
    async def get_server_banner(self) -> str:
1✔
1467
        # do request
1468
        res = await self.session.send_request('server.banner')
×
1469
        # check response
1470
        if not isinstance(res, str):
×
1471
            raise RequestCorrupted(f'{res!r} should be a str')
×
1472
        return res
×
1473

1474
    async def get_donation_address(self) -> str:
1✔
1475
        # do request
1476
        res = await self.session.send_request('server.donation_address')
×
1477
        # check response
1478
        if not res:  # ignore empty string
×
1479
            return ''
×
1480
        if not bitcoin.is_address(res):
×
1481
            # note: do not hard-fail -- allow server to use future-type
1482
            #       bitcoin address we do not recognize
1483
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1484
            res = ''
×
1485
        return res
×
1486

1487
    async def get_relay_fee(self) -> int:
1✔
1488
        """Returns the min relay feerate in sat/kbyte."""
1489
        # do request
1490
        res = await self.session.send_request('blockchain.relayfee')
×
1491
        # check response
1492
        assert_non_negative_int_or_float(res)
×
1493
        relayfee = int(res * bitcoin.COIN)
×
1494
        relayfee = max(0, relayfee)
×
1495
        return relayfee
×
1496

1497
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1498
        """Returns a feerate estimate for getting confirmed within
1499
        num_blocks blocks, in sat/kbyte.
1500
        Returns -1 if the server could not provide an estimate.
1501
        """
1502
        if not is_non_negative_integer(num_blocks):
1✔
1503
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1504
        # do request
1505
        try:
1✔
1506
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1507
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1508
            # The protocol spec says the server itself should already have returned -1
1509
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1510
            # and sends an error instead. Convert it here:
1511
            if "cannot estimate fee" in e.message:
×
1512
                res = -1
×
1513
            else:
1514
                raise
×
1515
        except aiorpcx.jsonrpc.RPCError as e:
×
1516
            # The protocol spec says the server itself should already have returned -1
1517
            # if it cannot provide an estimate. "Fulcrum" often sends:
1518
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1519
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1520
                res = -1
×
1521
            else:
1522
                raise
×
1523
        # check response
1524
        if res != -1:
1✔
1525
            assert_non_negative_int_or_float(res)
1✔
1526
            res = int(res * bitcoin.COIN)
1✔
1527
        return res
1✔
1528

1529

1530
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1531
    chain_bad = blockchain.check_header(header)
1✔
1532
    if chain_bad:
1✔
1533
        raise Exception('bad_header must not check!')
×
1534

1535

1536
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1537
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1538
    # So, we use substring matching to grok the error message.
1539
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1540
    server_msg = str(server_msg)
×
1541
    server_msg = server_msg.replace("\n", r"\n")
×
1542

1543
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1544
    script_error_messages = {
×
1545
        r"Script evaluated without error but finished with a false/empty top stack element",
1546
        r"Script failed an OP_VERIFY operation",
1547
        r"Script failed an OP_EQUALVERIFY operation",
1548
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1549
        r"Script failed an OP_CHECKSIGVERIFY operation",
1550
        r"Script failed an OP_NUMEQUALVERIFY operation",
1551
        r"Script is too big",
1552
        r"Push value size limit exceeded",
1553
        r"Operation limit exceeded",
1554
        r"Stack size limit exceeded",
1555
        r"Signature count negative or greater than pubkey count",
1556
        r"Pubkey count negative or limit exceeded",
1557
        r"Opcode missing or not understood",
1558
        r"Attempted to use a disabled opcode",
1559
        r"Operation not valid with the current stack size",
1560
        r"Operation not valid with the current altstack size",
1561
        r"OP_RETURN was encountered",
1562
        r"Invalid OP_IF construction",
1563
        r"Negative locktime",
1564
        r"Locktime requirement not satisfied",
1565
        r"Signature hash type missing or not understood",
1566
        r"Non-canonical DER signature",
1567
        r"Data push larger than necessary",
1568
        r"Only push operators allowed in signatures",
1569
        r"Non-canonical signature: S value is unnecessarily high",
1570
        r"Dummy CHECKMULTISIG argument must be zero",
1571
        r"OP_IF/NOTIF argument must be minimal",
1572
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1573
        r"NOPx reserved for soft-fork upgrades",
1574
        r"Witness version reserved for soft-fork upgrades",
1575
        r"Taproot version reserved for soft-fork upgrades",
1576
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1577
        r"Public key version reserved for soft-fork upgrades",
1578
        r"Public key is neither compressed or uncompressed",
1579
        r"Stack size must be exactly one after execution",
1580
        r"Extra items left on stack after execution",
1581
        r"Witness program has incorrect length",
1582
        r"Witness program was passed an empty witness",
1583
        r"Witness program hash mismatch",
1584
        r"Witness requires empty scriptSig",
1585
        r"Witness requires only-redeemscript scriptSig",
1586
        r"Witness provided for non-witness script",
1587
        r"Using non-compressed keys in segwit",
1588
        r"Invalid Schnorr signature size",
1589
        r"Invalid Schnorr signature hash type",
1590
        r"Invalid Schnorr signature",
1591
        r"Invalid Taproot control block size",
1592
        r"Too much signature validation relative to witness weight",
1593
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1594
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1595
        r"Using OP_CODESEPARATOR in non-witness script",
1596
        r"Signature is found in scriptCode",
1597
    }
1598
    for substring in script_error_messages:
×
1599
        if substring in server_msg:
×
1600
            return substring
×
1601
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1602
    # grep "REJECT_"
1603
    # grep "TxValidationResult"
1604
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1605
    validation_error_messages = {
×
1606
        r"coinbase": None,
1607
        r"tx-size-small": None,
1608
        r"non-final": None,
1609
        r"txn-already-in-mempool": None,
1610
        r"txn-mempool-conflict": None,
1611
        r"txn-already-known": None,
1612
        r"non-BIP68-final": None,
1613
        r"bad-txns-nonstandard-inputs": None,
1614
        r"bad-witness-nonstandard": None,
1615
        r"bad-txns-too-many-sigops": None,
1616
        r"mempool min fee not met":
1617
            ("mempool min fee not met\n" +
1618
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1619
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1620
               "of transactions that all pay higher fees. Try to increase the fee.")),
1621
        r"min relay fee not met": None,
1622
        r"absurdly-high-fee": None,
1623
        r"max-fee-exceeded": None,
1624
        r"too-long-mempool-chain": None,
1625
        r"bad-txns-spends-conflicting-tx": None,
1626
        r"insufficient fee": ("insufficient fee\n" +
1627
             _("Your transaction is trying to replace another one in the mempool but it "
1628
               "does not meet the rules to do so. Try to increase the fee.")),
1629
        r"too many potential replacements": None,
1630
        r"replacement-adds-unconfirmed": None,
1631
        r"mempool full": None,
1632
        r"non-mandatory-script-verify-flag": None,
1633
        r"mandatory-script-verify-flag-failed": None,
1634
        r"Transaction check failed": None,
1635
    }
1636
    for substring in validation_error_messages:
×
1637
        if substring in server_msg:
×
1638
            msg = validation_error_messages[substring]
×
1639
            return msg if msg else substring
×
1640
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1641
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1642
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1643
    # grep "RPC_TRANSACTION"
1644
    # grep "RPC_DESERIALIZATION_ERROR"
1645
    # grep "TransactionError"
1646
    rawtransaction_error_messages = {
×
1647
        r"Missing inputs": None,
1648
        r"Inputs missing or spent": None,
1649
        r"transaction already in block chain": None,
1650
        r"Transaction already in block chain": None,
1651
        r"Transaction outputs already in utxo set": None,
1652
        r"TX decode failed": None,
1653
        r"Peer-to-peer functionality missing or disabled": None,
1654
        r"Transaction rejected by AcceptToMemoryPool": None,
1655
        r"AcceptToMemoryPool failed": None,
1656
        r"Transaction rejected by mempool": None,
1657
        r"Mempool internal error": None,
1658
        r"Fee exceeds maximum configured by user": None,
1659
        r"Unspendable output exceeds maximum configured by user": None,
1660
        r"Transaction rejected due to invalid package": None,
1661
    }
1662
    for substring in rawtransaction_error_messages:
×
1663
        if substring in server_msg:
×
1664
            msg = rawtransaction_error_messages[substring]
×
1665
            return msg if msg else substring
×
1666
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1667
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1668
    # grep "REJECT_"
1669
    # grep "TxValidationResult"
1670
    tx_verify_error_messages = {
×
1671
        r"bad-txns-vin-empty": None,
1672
        r"bad-txns-vout-empty": None,
1673
        r"bad-txns-oversize": None,
1674
        r"bad-txns-vout-negative": None,
1675
        r"bad-txns-vout-toolarge": None,
1676
        r"bad-txns-txouttotal-toolarge": None,
1677
        r"bad-txns-inputs-duplicate": None,
1678
        r"bad-cb-length": None,
1679
        r"bad-txns-prevout-null": None,
1680
        r"bad-txns-inputs-missingorspent":
1681
            ("bad-txns-inputs-missingorspent\n" +
1682
             _("You might have a local transaction in your wallet that this transaction "
1683
               "builds on top. You need to either broadcast or remove the local tx.")),
1684
        r"bad-txns-premature-spend-of-coinbase": None,
1685
        r"bad-txns-inputvalues-outofrange": None,
1686
        r"bad-txns-in-belowout": None,
1687
        r"bad-txns-fee-outofrange": None,
1688
    }
1689
    for substring in tx_verify_error_messages:
×
1690
        if substring in server_msg:
×
1691
            msg = tx_verify_error_messages[substring]
×
1692
            return msg if msg else substring
×
1693
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1694
    # grep "reason ="
1695
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1696
    # should come after script_error.cpp (due to e.g. "version")
1697
    policy_error_messages = {
×
1698
        r"version": _("Transaction uses non-standard version."),
1699
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1700
        r"scriptsig-size": None,
1701
        r"scriptsig-not-pushonly": None,
1702
        r"scriptpubkey":
1703
            ("scriptpubkey\n" +
1704
             _("Some of the outputs pay to a non-standard script.")),
1705
        r"bare-multisig": None,
1706
        r"dust":
1707
            (_("Transaction could not be broadcast due to dust outputs.\n"
1708
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1709
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1710
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1711
    }
1712
    for substring in policy_error_messages:
×
1713
        if substring in server_msg:
×
1714
            msg = policy_error_messages[substring]
×
1715
            return msg if msg else substring
×
1716
    # otherwise:
1717
    return _("Unknown error")
×
1718

1719

1720
def check_cert(host, cert):
1✔
1721
    try:
×
1722
        b = pem.dePem(cert, 'CERTIFICATE')
×
1723
        x = x509.X509(b)
×
1724
    except Exception:
×
1725
        traceback.print_exc(file=sys.stdout)
×
1726
        return
×
1727

1728
    try:
×
1729
        x.check_date()
×
1730
        expired = False
×
1731
    except Exception:
×
1732
        expired = True
×
1733

1734
    m = "host: %s\n"%host
×
1735
    m += "has_expired: %s\n"% expired
×
1736
    util.print_msg(m)
×
1737

1738

1739
# Used by tests
1740
def _match_hostname(name, val):
1✔
1741
    if val == name:
×
1742
        return True
×
1743

1744
    return val.startswith('*.') and name.endswith(val[1:])
×
1745

1746

1747
def test_certificates():
1✔
1748
    from .simple_config import SimpleConfig
×
1749
    config = SimpleConfig()
×
1750
    mydir = os.path.join(config.path, "certs")
×
1751
    certs = os.listdir(mydir)
×
1752
    for c in certs:
×
1753
        p = os.path.join(mydir,c)
×
1754
        with open(p, encoding='utf-8') as f:
×
1755
            cert = f.read()
×
1756
        check_cert(c, cert)
×
1757

1758
if __name__ == "__main__":
1✔
1759
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc