• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6498044641804288

08 May 2025 02:35PM UTC coverage: 59.657% (-0.06%) from 59.713%
6498044641804288

push

CirrusCI

SomberNight
interface: add padding and some noise to protocol messages

basic countermeasures against traffic analysis

13 of 71 new or added lines in 1 file covered. (18.31%)

10 existing lines in 5 files now uncovered.

21542 of 36110 relevant lines covered (59.66%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.8
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
5✔
26
import re
5✔
27
import ssl
5✔
28
import sys
5✔
29
import time
5✔
30
import traceback
5✔
31
import asyncio
5✔
32
import socket
5✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
5✔
34
from collections import defaultdict
5✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
5✔
36
import itertools
5✔
37
import logging
5✔
38
import hashlib
5✔
39
import functools
5✔
40
import random
5✔
41

42
import aiorpcx
5✔
43
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
5✔
44
from aiorpcx.curio import timeout_after, TaskTimeout
5✔
45
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
5✔
46
from aiorpcx.rawsocket import RSClient, RSTransport
5✔
47
import certifi
5✔
48

49
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
5✔
50
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
51
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
52
from . import util
5✔
53
from . import x509
5✔
54
from . import pem
5✔
55
from . import version
5✔
56
from . import blockchain
5✔
57
from .blockchain import Blockchain, HEADER_SIZE
5✔
58
from . import bitcoin
5✔
59
from . import constants
5✔
60
from .i18n import _
5✔
61
from .logging import Logger
5✔
62
from .transaction import Transaction
5✔
63
from .fee_policy import FEE_ETA_TARGETS
5✔
64

65
if TYPE_CHECKING:
5✔
66
    from .network import Network
×
67
    from .simple_config import SimpleConfig
×
68

69

70
ca_path = certifi.where()
5✔
71

72
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
5✔
73

74
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
5✔
75
PREFERRED_NETWORK_PROTOCOL = 's'
5✔
76
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
5✔
77

78

79
class NetworkTimeout:
5✔
80
    # seconds
81
    class Generic:
5✔
82
        NORMAL = 30
5✔
83
        RELAXED = 45
5✔
84
        MOST_RELAXED = 600
5✔
85

86
    class Urgent(Generic):
5✔
87
        NORMAL = 10
5✔
88
        RELAXED = 20
5✔
89
        MOST_RELAXED = 60
5✔
90

91

92
def assert_non_negative_integer(val: Any) -> None:
5✔
93
    if not is_non_negative_integer(val):
×
94
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
95

96

97
def assert_integer(val: Any) -> None:
5✔
98
    if not is_integer(val):
×
99
        raise RequestCorrupted(f'{val!r} should be an integer')
×
100

101

102
def assert_int_or_float(val: Any) -> None:
5✔
103
    if not is_int_or_float(val):
×
104
        raise RequestCorrupted(f'{val!r} should be int or float')
×
105

106

107
def assert_non_negative_int_or_float(val: Any) -> None:
5✔
108
    if not is_non_negative_int_or_float(val):
×
109
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
110

111

112
def assert_hash256_str(val: Any) -> None:
5✔
113
    if not is_hash256_str(val):
×
114
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
115

116

117
def assert_hex_str(val: Any) -> None:
5✔
118
    if not is_hex_str(val):
×
119
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
120

121

122
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
5✔
123
    if not isinstance(d, dict):
×
124
        raise RequestCorrupted(f'{d!r} should be a dict')
×
125
    if field_name not in d:
×
126
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
127
    return d[field_name]
×
128

129

130
def assert_list_or_tuple(val: Any) -> None:
5✔
131
    if not isinstance(val, (list, tuple)):
×
132
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
133

134

135
class NotificationSession(RPCSession):
5✔
136

137
    def __init__(self, *args, interface: 'Interface', **kwargs):
5✔
138
        super(NotificationSession, self).__init__(*args, **kwargs)
×
139
        self.subscriptions = defaultdict(list)
×
140
        self.cache = {}
×
141
        self._msg_counter = itertools.count(start=1)
×
142
        self.interface = interface
×
143
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
144

145
    async def handle_request(self, request):
5✔
146
        self.maybe_log(f"--> {request}")
×
147
        try:
×
148
            if isinstance(request, Notification):
×
149
                params, result = request.args[:-1], request.args[-1]
×
150
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
151
                if key in self.subscriptions:
×
152
                    self.cache[key] = result
×
153
                    for queue in self.subscriptions[key]:
×
154
                        await queue.put(request.args)
×
155
                else:
156
                    raise Exception(f'unexpected notification')
×
157
            else:
158
                raise Exception(f'unexpected request. not a notification')
×
159
        except Exception as e:
×
160
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
161
            await self.close()
×
162

163
    async def send_request(self, *args, timeout=None, **kwargs):
5✔
164
        # note: semaphores/timeouts/backpressure etc are handled by
165
        # aiorpcx. the timeout arg here in most cases should not be set
166
        msg_id = next(self._msg_counter)
×
167
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
168
        try:
×
169
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
170
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
171
            response = await util.wait_for2(
×
172
                super().send_request(*args, **kwargs),
173
                timeout)
174
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
175
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
176
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
177
        except CodeMessageError as e:
×
178
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
179
            raise
×
180
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
181
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
182
            raise
×
183
        else:
184
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
185
            return response
×
186

187
    def set_default_timeout(self, timeout):
5✔
188
        assert hasattr(self, "sent_request_timeout")  # in base class
×
189
        self.sent_request_timeout = timeout
×
190
        assert hasattr(self, "max_send_delay")        # in base class
×
191
        self.max_send_delay = timeout
×
192

193
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
5✔
194
        # note: until the cache is written for the first time,
195
        # each 'subscribe' call might make a request on the network.
196
        key = self.get_hashable_key_for_rpc_call(method, params)
×
197
        self.subscriptions[key].append(queue)
×
198
        if key in self.cache:
×
199
            result = self.cache[key]
×
200
        else:
201
            result = await self.send_request(method, params)
×
202
            self.cache[key] = result
×
203
        await queue.put(params + [result])
×
204

205
    def unsubscribe(self, queue):
5✔
206
        """Unsubscribe a callback to free object references to enable GC."""
207
        # note: we can't unsubscribe from the server, so we keep receiving
208
        # subsequent notifications
209
        for v in self.subscriptions.values():
×
210
            if queue in v:
×
211
                v.remove(queue)
×
212

213
    @classmethod
5✔
214
    def get_hashable_key_for_rpc_call(cls, method, params):
5✔
215
        """Hashable index for subscriptions and cache"""
216
        return str(method) + repr(params)
×
217

218
    def maybe_log(self, msg: str) -> None:
5✔
219
        if not self.interface: return
×
220
        if self.interface.debug or self.interface.network.debug:
×
221
            self.interface.logger.debug(msg)
×
222

223
    def default_framer(self):
5✔
224
        # overridden so that max_size can be customized
225
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
226
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
227
        return NewlineFramer(max_size=max_size)
×
228

229
    async def close(self, *, force_after: int = None):
5✔
230
        """Closes the connection and waits for it to be closed.
231
        We try to flush buffered data to the wire, which can take some time.
232
        """
233
        if force_after is None:
×
234
            # We give up after a while and just abort the connection.
235
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
236
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
237
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
238
            #       wait until this timeout is triggered
239
            force_after = 1  # seconds
×
240
        await super().close(force_after=force_after)
×
241

242

243
class NetworkException(Exception): pass
5✔
244

245

246
class GracefulDisconnect(NetworkException):
5✔
247
    log_level = logging.INFO
5✔
248

249
    def __init__(self, *args, log_level=None, **kwargs):
5✔
250
        Exception.__init__(self, *args, **kwargs)
5✔
251
        if log_level is not None:
5✔
252
            self.log_level = log_level
×
253

254

255
class RequestTimedOut(GracefulDisconnect):
5✔
256
    def __str__(self):
5✔
257
        return _("Network request timed out.")
×
258

259

260
class RequestCorrupted(Exception): pass
5✔
261

262
class ErrorParsingSSLCert(Exception): pass
5✔
263
class ErrorGettingSSLCertFromServer(Exception): pass
5✔
264
class ErrorSSLCertFingerprintMismatch(Exception): pass
5✔
265
class InvalidOptionCombination(Exception): pass
5✔
266
class ConnectError(NetworkException): pass
5✔
267

268

269
class _RSClient(RSClient):
5✔
270
    def __init__(self, *, transport=None, **kwargs):
5✔
NEW
271
        if transport is None:
×
NEW
272
            transport = PaddedRSTransport
×
NEW
273
        RSClient.__init__(self, transport=transport, **kwargs)
×
274

275
    async def create_connection(self):
5✔
276
        try:
×
277
            return await super().create_connection()
×
278
        except OSError as e:
×
279
            # note: using "from e" here will set __cause__ of ConnectError
280
            raise ConnectError(e) from e
×
281

282

283
class PaddedRSTransport(RSTransport):
5✔
284
    """A raw socket transport that provides basic countermeasures against traffic analysis
285
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
286
    (it is assumed that a network observer does not see plaintext transport contents,
287
    due to it being wrapped e.g. in TLS)
288
    """
289

290
    MIN_PAYLOAD_SIZE = 1024
5✔
291

292
    session: Optional['NotificationSession']
5✔
293

294
    def __init__(self, *args, **kwargs):
5✔
NEW
295
        RSTransport.__init__(self, *args, **kwargs)
×
NEW
296
        self._sbuffer = bytearray()  # "send buffer"
×
NEW
297
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
×
NEW
298
        self._sbuffer_has_data_evt = asyncio.Event()
×
NEW
299
        self._last_send = time.monotonic()
×
300

301
    async def write(self, message):
5✔
NEW
302
        await self._can_send.wait()
×
NEW
303
        if self.is_closing():
×
NEW
304
            return
×
NEW
305
        framed_message = self._framer.frame(message)
×
NEW
306
        self._sbuffer += framed_message
×
NEW
307
        self._sbuffer_has_data_evt.set()
×
NEW
308
        self._maybe_consume_sbuffer()
×
NEW
309
        if not self._sbuffer:
×
NEW
310
            self._sbuffer_has_data_evt.clear()
×
311

312
    def _maybe_consume_sbuffer(self):
5✔
NEW
313
        if not self._can_send.is_set() or self.is_closing():
×
NEW
314
            return
×
NEW
315
        buf = self._sbuffer
×
NEW
316
        if not buf:
×
NEW
317
            return
×
318
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
NEW
319
        if not (len(buf) >= self.MIN_PAYLOAD_SIZE or self._last_send + 1 < time.monotonic()):
×
NEW
320
            return
×
NEW
321
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
×
322
        # either (1) pad length to next power of two, to create "lsize" packet:
NEW
323
        payload_lsize = len(buf)
×
NEW
324
        total_lsize = max(self.MIN_PAYLOAD_SIZE, 2 ** (payload_lsize.bit_length()))
×
NEW
325
        npad_lsize = total_lsize - payload_lsize
×
326
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
327
        # and create a packet with half that size ("ssize", s for small)
NEW
328
        total_ssize = max(self.MIN_PAYLOAD_SIZE, total_lsize//2)
×
NEW
329
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
×
NEW
330
        if payload_ssize != -1:
×
NEW
331
            payload_ssize += 1  # for "\n" char
×
NEW
332
            npad_ssize = total_ssize - payload_ssize
×
333
        else:
NEW
334
            npad_ssize = float("inf")
×
335
        # decide between (1) and (2):
NEW
336
        if npad_lsize <= npad_ssize:
×
NEW
337
            npad = npad_lsize
×
NEW
338
            p_idx = payload_lsize
×
339
        else:
NEW
340
            npad = npad_ssize
×
NEW
341
            p_idx = payload_ssize
×
342
        # pad by adding spaces near end
NEW
343
        assert buf[p_idx-2:p_idx] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[p_idx-2:p_idx]=!r}"
×
NEW
344
        self.session.maybe_log(
×
345
            f"PaddedRSTransport. calling low-level write(). "
346
            f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
347
            f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
348
        )
NEW
349
        buf2 = buf[:p_idx - 2] + (npad * b" ") + buf[p_idx - 2:p_idx]
×
NEW
350
        self._asyncio_transport.write(buf2)
×
NEW
351
        self._last_send = time.monotonic()
×
NEW
352
        del self._sbuffer[:p_idx]
×
353

354
    async def _poll_sbuffer(self):
5✔
NEW
355
        while True:
×
NEW
356
            await asyncio.sleep(0.5)  # gives time for buffer to grow
×
NEW
357
            await self._sbuffer_has_data_evt.wait()  # lowers CPU cost compared to pure polling
×
NEW
358
            self._maybe_consume_sbuffer()
×
359

360
    def connection_made(self, transport: asyncio.BaseTransport):
5✔
NEW
361
        super().connection_made(transport)
×
NEW
362
        coro = self.session.interface.taskgroup.spawn(self._poll_sbuffer())
×
NEW
363
        self._sbuffer_task = self.loop.create_task(coro)
×
364

365

366
class ServerAddr:
5✔
367

368
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
5✔
369
        assert isinstance(host, str), repr(host)
5✔
370
        if protocol is None:
5✔
371
            protocol = 's'
×
372
        if not host:
5✔
373
            raise ValueError('host must not be empty')
×
374
        if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
375
            host = host[1:-1]
5✔
376
        try:
5✔
377
            net_addr = NetAddress(host, port)  # this validates host and port
5✔
378
        except Exception as e:
5✔
379
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
5✔
380
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
5✔
381
            raise ValueError(f"invalid network protocol: {protocol}")
×
382
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
5✔
383
        self.port = int(net_addr.port)
5✔
384
        self.protocol = protocol
5✔
385
        self._net_addr_str = str(net_addr)
5✔
386

387
    @classmethod
5✔
388
    def from_str(cls, s: str) -> 'ServerAddr':
5✔
389
        """Constructs a ServerAddr or raises ValueError."""
390
        # host might be IPv6 address, hence do rsplit:
391
        host, port, protocol = str(s).rsplit(':', 2)
5✔
392
        return ServerAddr(host=host, port=port, protocol=protocol)
5✔
393

394
    @classmethod
5✔
395
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
5✔
396
        """Construct ServerAddr from str, guessing missing details.
397
        Does not raise - just returns None if guessing failed.
398
        Ongoing compatibility not guaranteed.
399
        """
400
        if not s:
5✔
401
            return None
×
402
        host = ""
5✔
403
        if s[0] == "[" and "]" in s:  # IPv6 address
5✔
404
            host_end = s.index("]")
5✔
405
            host = s[1:host_end]
5✔
406
            s = s[host_end+1:]
5✔
407
        items = str(s).rsplit(':', 2)
5✔
408
        if len(items) < 2:
5✔
409
            return None  # although maybe we could guess the port too?
5✔
410
        host = host or items[0]
5✔
411
        port = items[1]
5✔
412
        if len(items) >= 3:
5✔
413
            protocol = items[2]
5✔
414
        else:
415
            protocol = PREFERRED_NETWORK_PROTOCOL
5✔
416
        try:
5✔
417
            return ServerAddr(host=host, port=port, protocol=protocol)
5✔
418
        except ValueError:
5✔
419
            return None
5✔
420

421
    def to_friendly_name(self) -> str:
5✔
422
        # note: this method is closely linked to from_str_with_inference
423
        if self.protocol == 's':  # hide trailing ":s"
5✔
424
            return self.net_addr_str()
5✔
425
        return str(self)
5✔
426

427
    def __str__(self):
5✔
428
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
5✔
429

430
    def to_json(self) -> str:
5✔
431
        return str(self)
×
432

433
    def __repr__(self):
5✔
434
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
435

436
    def net_addr_str(self) -> str:
5✔
437
        return self._net_addr_str
5✔
438

439
    def __eq__(self, other):
5✔
440
        if not isinstance(other, ServerAddr):
5✔
441
            return False
×
442
        return (self.host == other.host
5✔
443
                and self.port == other.port
444
                and self.protocol == other.protocol)
445

446
    def __ne__(self, other):
5✔
447
        return not (self == other)
×
448

449
    def __hash__(self):
5✔
450
        return hash((self.host, self.port, self.protocol))
×
451

452

453
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
5✔
454
    filename = host
5✔
455
    try:
5✔
456
        ip = ip_address(host)
5✔
457
    except ValueError:
5✔
458
        pass
5✔
459
    else:
460
        if isinstance(ip, IPv6Address):
×
461
            filename = f"ipv6_{ip.packed.hex()}"
×
462
    return os.path.join(config.path, 'certs', filename)
5✔
463

464

465
class Interface(Logger):
5✔
466

467
    LOGGING_SHORTCUT = 'i'
5✔
468

469
    def __init__(self, *, network: 'Network', server: ServerAddr):
5✔
470
        self.ready = network.asyncio_loop.create_future()
5✔
471
        self.got_disconnected = asyncio.Event()
5✔
472
        self.server = server
5✔
473
        Logger.__init__(self)
5✔
474
        assert network.config.path
5✔
475
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
5✔
476
        self.blockchain = None  # type: Optional[Blockchain]
5✔
477
        self._requested_chunks = set()  # type: Set[int]
5✔
478
        self.network = network
5✔
479
        self.session = None  # type: Optional[NotificationSession]
5✔
480
        self._ipaddr_bucket = None
5✔
481
        # Set up proxy.
482
        # - for servers running on localhost, the proxy is not used. If user runs their own server
483
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
484
        #   note: we could maybe relax this further and bypass the proxy for all private
485
        #         addresses...? e.g. 192.168.x.x
486
        if util.is_localhost(server.host):
5✔
487
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
488
            self.proxy = None
×
489
        else:
490
            self.proxy = ESocksProxy.from_network_settings(network)
5✔
491

492
        # Latest block header and corresponding height, as claimed by the server.
493
        # Note that these values are updated before they are verified.
494
        # Especially during initial header sync, verification can take a long time.
495
        # Failing verification will get the interface closed.
496
        self.tip_header = None
5✔
497
        self.tip = 0
5✔
498

499
        self.fee_estimates_eta = {}  # type: Dict[int, int]
5✔
500

501
        # Dump network messages (only for this interface).  Set at runtime from the console.
502
        self.debug = False
5✔
503

504
        self.taskgroup = OldTaskGroup()
5✔
505

506
        async def spawn_task():
5✔
507
            task = await self.network.taskgroup.spawn(self.run())
5✔
508
            task.set_name(f"interface::{str(server)}")
5✔
509
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
5✔
510

511
    @property
5✔
512
    def host(self):
5✔
513
        return self.server.host
5✔
514

515
    @property
5✔
516
    def port(self):
5✔
517
        return self.server.port
×
518

519
    @property
5✔
520
    def protocol(self):
5✔
521
        return self.server.protocol
×
522

523
    def diagnostic_name(self):
5✔
524
        return self.server.net_addr_str()
5✔
525

526
    def __str__(self):
5✔
527
        return f"<Interface {self.diagnostic_name()}>"
×
528

529
    async def is_server_ca_signed(self, ca_ssl_context):
5✔
530
        """Given a CA enforcing SSL context, returns True if the connection
531
        can be established. Returns False if the server has a self-signed
532
        certificate but otherwise is okay. Any other failures raise.
533
        """
534
        try:
×
535
            await self.open_session(ca_ssl_context, exit_early=True)
×
536
        except ConnectError as e:
×
537
            cause = e.__cause__
×
538
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
539
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
540
                    and cause.verify_code == 18):  # "self signed certificate"
541
                # Good. We will use this server as self-signed.
542
                return False
×
543
            # Not good. Cannot use this server.
544
            raise
×
545
        # Good. We will use this server as CA-signed.
546
        return True
×
547

548
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
5✔
549
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
550
        if ca_signed:
×
551
            if self._get_expected_fingerprint():
×
552
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
553
            with open(self.cert_path, 'w') as f:
×
554
                # empty file means this is CA signed, not self-signed
555
                f.write('')
×
556
        else:
557
            await self._save_certificate()
×
558

559
    def _is_saved_ssl_cert_available(self):
5✔
560
        if not os.path.exists(self.cert_path):
×
561
            return False
×
562
        with open(self.cert_path, 'r') as f:
×
563
            contents = f.read()
×
564
        if contents == '':  # CA signed
×
565
            if self._get_expected_fingerprint():
×
566
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
567
            return True
×
568
        # pinned self-signed cert
569
        try:
×
570
            b = pem.dePem(contents, 'CERTIFICATE')
×
571
        except SyntaxError as e:
×
572
            self.logger.info(f"error parsing already saved cert: {e}")
×
573
            raise ErrorParsingSSLCert(e) from e
×
574
        try:
×
575
            x = x509.X509(b)
×
576
        except Exception as e:
×
577
            self.logger.info(f"error parsing already saved cert: {e}")
×
578
            raise ErrorParsingSSLCert(e) from e
×
579
        try:
×
580
            x.check_date()
×
581
        except x509.CertificateError as e:
×
582
            self.logger.info(f"certificate has expired: {e}")
×
583
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
584
            return False
×
585
        self._verify_certificate_fingerprint(bytearray(b))
×
586
        return True
×
587

588
    async def _get_ssl_context(self):
5✔
589
        if self.protocol != 's':
×
590
            # using plaintext TCP
591
            return None
×
592

593
        # see if we already have cert for this server; or get it for the first time
594
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
595
        if not self._is_saved_ssl_cert_available():
×
596
            try:
×
597
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
598
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
599
                raise ErrorGettingSSLCertFromServer(e) from e
×
600
        # now we have a file saved in our certificate store
601
        siz = os.stat(self.cert_path).st_size
×
602
        if siz == 0:
×
603
            # CA signed cert
604
            sslc = ca_sslc
×
605
        else:
606
            # pinned self-signed cert
607
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
608
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
609
            #       We explicitly disable it as it breaks lots of servers.
610
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
611
            sslc.check_hostname = False
×
612
        return sslc
×
613

614
    def handle_disconnect(func):
5✔
615
        @functools.wraps(func)
5✔
616
        async def wrapper_func(self: 'Interface', *args, **kwargs):
5✔
617
            try:
×
618
                return await func(self, *args, **kwargs)
×
619
            except GracefulDisconnect as e:
×
620
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
621
            except aiorpcx.jsonrpc.RPCError as e:
×
622
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
623
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
624
            finally:
625
                self.got_disconnected.set()
×
626
                await self.network.connection_down(self)
×
627
                # if was not 'ready' yet, schedule waiting coroutines:
628
                self.ready.cancel()
×
629
        return wrapper_func
5✔
630

631
    @ignore_exceptions  # do not kill network.taskgroup
5✔
632
    @log_exceptions
5✔
633
    @handle_disconnect
5✔
634
    async def run(self):
5✔
635
        try:
×
636
            ssl_context = await self._get_ssl_context()
×
637
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
638
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
639
            return
×
640
        try:
×
641
            await self.open_session(ssl_context)
×
642
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
643
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
644
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
645
                    and self.is_main_server() and not self.network.auto_connect):
646
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
647
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
648
            else:
649
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
650
            return
×
651

652
    def _mark_ready(self) -> None:
5✔
653
        if self.ready.cancelled():
×
654
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
655
        if self.ready.done():
×
656
            return
×
657

658
        assert self.tip_header
×
659
        chain = blockchain.check_header(self.tip_header)
×
660
        if not chain:
×
661
            self.blockchain = blockchain.get_best_chain()
×
662
        else:
663
            self.blockchain = chain
×
664
        assert self.blockchain is not None
×
665

666
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
667

668
        self.ready.set_result(1)
×
669

670
    def is_connected_and_ready(self) -> bool:
5✔
671
        return self.ready.done() and not self.got_disconnected.is_set()
×
672

673
    async def _save_certificate(self) -> None:
5✔
674
        if not os.path.exists(self.cert_path):
×
675
            # we may need to retry this a few times, in case the handshake hasn't completed
676
            for _ in range(10):
×
677
                dercert = await self._fetch_certificate()
×
678
                if dercert:
×
679
                    self.logger.info("succeeded in getting cert")
×
680
                    self._verify_certificate_fingerprint(dercert)
×
681
                    with open(self.cert_path, 'w') as f:
×
682
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
683
                        # workaround android bug
684
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
685
                        f.write(cert)
×
686
                        # even though close flushes, we can't fsync when closed.
687
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
688
                        # fsync writes to OS buffer to disk
689
                        f.flush()
×
690
                        os.fsync(f.fileno())
×
691
                    break
×
692
                await asyncio.sleep(1)
×
693
            else:
694
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
695

696
    async def _fetch_certificate(self) -> bytes:
5✔
697
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
698
        sslc.check_hostname = False
×
699
        sslc.verify_mode = ssl.CERT_NONE
×
700
        async with _RSClient(session_factory=RPCSession,
×
701
                             host=self.host, port=self.port,
702
                             ssl=sslc, proxy=self.proxy) as session:
703
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
704
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
705
            return ssl_object.getpeercert(binary_form=True)
×
706

707
    def _get_expected_fingerprint(self) -> Optional[str]:
5✔
708
        if self.is_main_server():
×
709
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
710

711
    def _verify_certificate_fingerprint(self, certificate):
5✔
712
        expected_fingerprint = self._get_expected_fingerprint()
×
713
        if not expected_fingerprint:
×
714
            return
×
715
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
716
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
717
        if not fingerprints_match:
×
718
            util.trigger_callback('cert_mismatch')
×
719
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
720
        self.logger.info("cert fingerprint verification passed")
×
721

722
    async def get_block_header(self, height, assert_mode):
5✔
723
        if not is_non_negative_integer(height):
×
724
            raise Exception(f"{repr(height)} is not a block height")
×
725
        self.logger.info(f'requesting block header {height} in mode {assert_mode}')
×
726
        # use lower timeout as we usually have network.bhi_lock here
727
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
728
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
729
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
730

731
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
5✔
732
        if not is_non_negative_integer(height):
×
733
            raise Exception(f"{repr(height)} is not a block height")
×
734
        index = height // 2016
×
735
        if can_return_early and index in self._requested_chunks:
×
736
            return
×
737
        self.logger.info(f"requesting chunk from height {height}")
×
738
        size = 2016
×
739
        if tip is not None:
×
740
            size = min(size, tip - index * 2016 + 1)
×
741
            size = max(size, 0)
×
742
        try:
×
743
            self._requested_chunks.add(index)
×
744
            res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
×
745
        finally:
746
            self._requested_chunks.discard(index)
×
747
        assert_dict_contains_field(res, field_name='count')
×
748
        assert_dict_contains_field(res, field_name='hex')
×
749
        assert_dict_contains_field(res, field_name='max')
×
750
        assert_non_negative_integer(res['count'])
×
751
        assert_non_negative_integer(res['max'])
×
752
        assert_hex_str(res['hex'])
×
753
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
×
754
            raise RequestCorrupted('inconsistent chunk hex and count')
×
755
        # we never request more than 2016 headers, but we enforce those fit in a single response
756
        if res['max'] < 2016:
×
757
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
×
758
        if res['count'] != size:
×
759
            raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
×
760
        conn = self.blockchain.connect_chunk(index, res['hex'])
×
761
        if not conn:
×
762
            return conn, 0
×
763
        return conn, res['count']
×
764

765
    def is_main_server(self) -> bool:
5✔
766
        return (self.network.interface == self or
×
767
                self.network.interface is None and self.network.default_server == self.server)
768

769
    async def open_session(self, sslc, exit_early=False):
5✔
770
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
771
        async with _RSClient(session_factory=session_factory,
×
772
                             host=self.host, port=self.port,
773
                             ssl=sslc, proxy=self.proxy) as session:
774
            self.session = session  # type: NotificationSession
×
775
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
776
            try:
×
777
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
×
778
            except aiorpcx.jsonrpc.RPCError as e:
×
779
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
780
            if exit_early:
×
781
                return
×
782
            if ver[1] != version.PROTOCOL_VERSION:
×
783
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
784
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
785
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
786
                raise GracefulDisconnect(f'too many connected servers already '
×
787
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
788
            self.logger.info(f"connection established. version: {ver}")
×
789

790
            try:
×
791
                async with self.taskgroup as group:
×
792
                    await group.spawn(self.ping)
×
793
                    await group.spawn(self.request_fee_estimates)
×
794
                    await group.spawn(self.run_fetch_blocks)
×
795
                    await group.spawn(self.monitor_connection)
×
796
            except aiorpcx.jsonrpc.RPCError as e:
×
797
                if e.code in (
×
798
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
799
                    JSONRPC.SERVER_BUSY,
800
                    JSONRPC.METHOD_NOT_FOUND,
801
                    JSONRPC.INTERNAL_ERROR,
802
                ):
803
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
804
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
805
                raise
×
806
            finally:
807
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
808

809
    async def monitor_connection(self):
5✔
810
        while True:
×
811
            await asyncio.sleep(1)
×
812
            # If the session/transport is no longer open, we disconnect.
813
            # e.g. if the remote cleanly sends EOF, we would handle that here.
814
            # note: If the user pulls the ethernet cable or disconnects wifi,
815
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
816
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
817
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
818
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
819
            #         Hence, in practice the connection issue will only be detected the next time we try
820
            #         to send a message (plus timeout), which can take minutes...
821
            if not self.session or self.session.is_closing():
×
822
                raise GracefulDisconnect('session was closed')
×
823

824
    async def ping(self):
5✔
825
        # We periodically send a "ping" msg to make sure the server knows we are still here.
826
        # Adding a bit of randomness generates some noise against traffic analysis.
827
        while True:
×
NEW
828
            await asyncio.sleep(random.random() * 300)
×
NEW
829
            await self.session.send_request('server.ping')
×
NEW
830
            await self._maybe_send_noise()
×
831

832
    async def _maybe_send_noise(self):
5✔
NEW
833
        while random.random() < 0.2:
×
NEW
834
            await asyncio.sleep(random.random())
×
UNCOV
835
            await self.session.send_request('server.ping')
×
836

837
    async def request_fee_estimates(self):
5✔
838
        while True:
×
839
            async with OldTaskGroup() as group:
×
840
                fee_tasks = []
×
841
                for i in FEE_ETA_TARGETS[0:-1]:
×
842
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
843
            for nblock_target, task in fee_tasks:
×
844
                fee = task.result()
×
845
                if fee < 0: continue
×
846
                assert isinstance(fee, int)
×
847
                self.fee_estimates_eta[nblock_target] = fee
×
848
            self.network.update_fee_estimates()
×
849
            await asyncio.sleep(60)
×
850

851
    async def close(self, *, force_after: int = None):
5✔
852
        """Closes the connection and waits for it to be closed.
853
        We try to flush buffered data to the wire, which can take some time.
854
        """
855
        if self.session:
×
856
            await self.session.close(force_after=force_after)
×
857
        # monitor_connection will cancel tasks
858

859
    async def run_fetch_blocks(self):
5✔
860
        header_queue = asyncio.Queue()
×
861
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
862
        while True:
×
863
            item = await header_queue.get()
×
864
            raw_header = item[0]
×
865
            height = raw_header['height']
×
866
            header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
×
867
            self.tip_header = header
×
868
            self.tip = height
×
869
            if self.tip < constants.net.max_checkpoint():
×
870
                raise GracefulDisconnect('server tip below max checkpoint')
×
871
            self._mark_ready()
×
872
            blockchain_updated = await self._process_header_at_tip()
×
873
            # header processing done
874
            if blockchain_updated:
×
875
                util.trigger_callback('blockchain_updated')
×
876
            util.trigger_callback('network_updated')
×
877
            await self.network.switch_unwanted_fork_interface()
×
878
            await self.network.switch_lagging_interface()
×
NEW
879
            await self.taskgroup.spawn(self._maybe_send_noise())
×
880

881
    async def _process_header_at_tip(self) -> bool:
5✔
882
        """Returns:
883
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
884
        True - new header we didn't have, or reorg
885
        """
886
        height, header = self.tip, self.tip_header
×
887
        async with self.network.bhi_lock:
×
888
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
889
                # another interface amended the blockchain
890
                return False
×
891
            _, height = await self.step(height, header)
×
892
            # in the simple case, height == self.tip+1
893
            if height <= self.tip:
×
894
                await self.sync_until(height)
×
895
            return True
×
896

897
    async def sync_until(self, height, next_height=None):
5✔
898
        if next_height is None:
5✔
899
            next_height = self.tip
×
900
        last = None
5✔
901
        while last is None or height <= next_height:
5✔
902
            prev_last, prev_height = last, height
5✔
903
            if next_height > height + 10:
5✔
904
                could_connect, num_headers = await self.request_chunk(height, next_height)
×
905
                if not could_connect:
×
906
                    if height <= constants.net.max_checkpoint():
×
907
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
908
                    last, height = await self.step(height)
×
909
                    continue
×
910
                util.trigger_callback('blockchain_updated')
×
911
                util.trigger_callback('network_updated')
×
912
                height = (height // 2016 * 2016) + num_headers
×
913
                assert height <= next_height+1, (height, self.tip)
×
914
                last = 'catchup'
×
915
            else:
916
                last, height = await self.step(height)
5✔
917
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
5✔
918
        return last, height
5✔
919

920
    async def step(self, height, header=None):
5✔
921
        assert 0 <= height <= self.tip, (height, self.tip)
5✔
922
        if header is None:
5✔
923
            header = await self.get_block_header(height, 'catchup')
5✔
924

925
        chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
926
        if chain:
5✔
927
            self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
×
928
            # note: there is an edge case here that is not handled.
929
            # we might know the blockhash (enough for check_header) but
930
            # not have the header itself. e.g. regtest chain with only genesis.
931
            # this situation resolves itself on the next block
932
            return 'catchup', height+1
×
933

934
        can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
935
        if not can_connect:
5✔
936
            self.logger.info(f"can't connect new block: {height=}")
5✔
937
            height, header, bad, bad_header = await self._search_headers_backwards(height, header)
5✔
938
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
939
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
940
            assert chain or can_connect
5✔
941
        if can_connect:
5✔
942
            self.logger.info(f"new block: {height=}")
5✔
943
            height += 1
5✔
944
            if isinstance(can_connect, Blockchain):  # not when mocking
5✔
945
                self.blockchain = can_connect
×
946
                self.blockchain.save_header(header)
×
947
            return 'catchup', height
5✔
948

949
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
5✔
950
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
5✔
951

952
    async def _search_headers_binary(self, height, bad, bad_header, chain):
5✔
953
        assert bad == bad_header['block_height']
5✔
954
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
955

956
        self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
957
        good = height
5✔
958
        while True:
5✔
959
            assert good < bad, (good, bad)
5✔
960
            height = (good + bad) // 2
5✔
961
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
5✔
962
            header = await self.get_block_header(height, 'binary')
5✔
963
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
964
            if chain:
5✔
965
                self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
966
                good = height
5✔
967
            else:
968
                bad = height
5✔
969
                bad_header = header
5✔
970
            if good + 1 == bad:
5✔
971
                break
5✔
972

973
        mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
5✔
974
        real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
5✔
975
        if not real and not mock:
5✔
976
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
977
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
978

979
        self.logger.info(f"binary search exited. good {good}, bad {bad}")
5✔
980
        return good, bad, bad_header
5✔
981

982
    async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
5✔
983
        assert good + 1 == bad
5✔
984
        assert bad == bad_header['block_height']
5✔
985
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
986
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
987
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
988

989
        bh = self.blockchain.height()
5✔
990
        assert bh >= good, (bh, good)
5✔
991
        if bh == good:
5✔
992
            height = good + 1
×
993
            self.logger.info(f"catching up from {height}")
×
994
            return 'no_fork', height
×
995

996
        # this is a new fork we don't yet have
997
        height = bad + 1
5✔
998
        self.logger.info(f"new fork at bad height {bad}")
5✔
999
        forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
5✔
1000
        b = forkfun(bad_header)  # type: Blockchain
5✔
1001
        self.blockchain = b
5✔
1002
        assert b.forkpoint == bad
5✔
1003
        return 'fork', height
5✔
1004

1005
    async def _search_headers_backwards(self, height, header):
5✔
1006
        async def iterate():
5✔
1007
            nonlocal height, header
1008
            checkp = False
5✔
1009
            if height <= constants.net.max_checkpoint():
5✔
1010
                height = constants.net.max_checkpoint()
×
1011
                checkp = True
×
1012
            header = await self.get_block_header(height, 'backward')
5✔
1013
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
1014
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
1015
            if chain or can_connect:
5✔
1016
                return False
5✔
1017
            if checkp:
5✔
1018
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1019
            return True
5✔
1020

1021
        bad, bad_header = height, header
5✔
1022
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1023
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
5✔
1024
        local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
5✔
1025
        height = min(local_max + 1, height - 1)
5✔
1026
        while await iterate():
5✔
1027
            bad, bad_header = height, header
5✔
1028
            delta = self.tip - height
5✔
1029
            height = self.tip - 2 * delta
5✔
1030

1031
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
1032
        self.logger.info(f"exiting backward mode at {height}")
5✔
1033
        return height, header, bad, bad_header
5✔
1034

1035
    @classmethod
5✔
1036
    def client_name(cls) -> str:
5✔
1037
        return f'electrum/{version.ELECTRUM_VERSION}'
×
1038

1039
    def is_tor(self):
5✔
1040
        return self.host.endswith('.onion')
×
1041

1042
    def ip_addr(self) -> Optional[str]:
5✔
1043
        session = self.session
×
1044
        if not session: return None
×
1045
        peer_addr = session.remote_address()
×
1046
        if not peer_addr: return None
×
1047
        return str(peer_addr.host)
×
1048

1049
    def bucket_based_on_ipaddress(self) -> str:
5✔
1050
        def do_bucket():
×
1051
            if self.is_tor():
×
1052
                return BUCKET_NAME_OF_ONION_SERVERS
×
1053
            try:
×
1054
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1055
            except ValueError:
×
1056
                return ''
×
1057
            if not ip_addr:
×
1058
                return ''
×
1059
            if ip_addr.is_loopback:  # localhost is exempt
×
1060
                return ''
×
1061
            if ip_addr.version == 4:
×
1062
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1063
                return str(slash16)
×
1064
            elif ip_addr.version == 6:
×
1065
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1066
                return str(slash48)
×
1067
            return ''
×
1068

1069
        if not self._ipaddr_bucket:
×
1070
            self._ipaddr_bucket = do_bucket()
×
1071
        return self._ipaddr_bucket
×
1072

1073
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
1074
        if not is_hash256_str(tx_hash):
×
1075
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1076
        if not is_non_negative_integer(tx_height):
×
1077
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1078
        # do request
1079
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1080
        # check response
1081
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1082
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1083
        pos = assert_dict_contains_field(res, field_name='pos')
×
1084
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1085
        assert_non_negative_integer(block_height)
×
1086
        assert_non_negative_integer(pos)
×
1087
        assert_list_or_tuple(merkle)
×
1088
        for item in merkle:
×
1089
            assert_hash256_str(item)
×
1090
        return res
×
1091

1092
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
1093
        if not is_hash256_str(tx_hash):
×
1094
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1095
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
1096
        # validate response
1097
        if not is_hex_str(raw):
×
1098
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1099
        tx = Transaction(raw)
×
1100
        try:
×
1101
            tx.deserialize()  # see if raises
×
1102
        except Exception as e:
×
1103
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1104
        if tx.txid() != tx_hash:
×
1105
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1106
        return raw
×
1107

1108
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1109
        if not is_hash256_str(sh):
×
1110
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1111
        # do request
1112
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1113
        # check response
1114
        assert_list_or_tuple(res)
×
1115
        prev_height = 1
×
1116
        for tx_item in res:
×
1117
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1118
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1119
            assert_integer(height)
×
1120
            assert_hash256_str(tx_item['tx_hash'])
×
1121
            if height in (-1, 0):
×
1122
                assert_dict_contains_field(tx_item, field_name='fee')
×
1123
                assert_non_negative_integer(tx_item['fee'])
×
1124
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1125
            else:
1126
                # check monotonicity of heights
1127
                if height < prev_height:
×
1128
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1129
                prev_height = height
×
1130
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1131
        if len(hashes) != len(res):
×
1132
            # Either server is sending garbage... or maybe if server is race-prone
1133
            # a recently mined tx could be included in both last block and mempool?
1134
            # Still, it's simplest to just disregard the response.
1135
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1136
        return res
×
1137

1138
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1139
        if not is_hash256_str(sh):
×
1140
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1141
        # do request
1142
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1143
        # check response
1144
        assert_list_or_tuple(res)
×
1145
        for utxo_item in res:
×
1146
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1147
            assert_dict_contains_field(utxo_item, field_name='value')
×
1148
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1149
            assert_dict_contains_field(utxo_item, field_name='height')
×
1150
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1151
            assert_non_negative_integer(utxo_item['value'])
×
1152
            assert_non_negative_integer(utxo_item['height'])
×
1153
            assert_hash256_str(utxo_item['tx_hash'])
×
1154
        return res
×
1155

1156
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1157
        if not is_hash256_str(sh):
×
1158
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1159
        # do request
1160
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1161
        # check response
1162
        assert_dict_contains_field(res, field_name='confirmed')
×
1163
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1164
        assert_non_negative_integer(res['confirmed'])
×
1165
        assert_integer(res['unconfirmed'])
×
1166
        return res
×
1167

1168
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
5✔
1169
        if not is_non_negative_integer(tx_height):
×
1170
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1171
        if not is_non_negative_integer(tx_pos):
×
1172
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1173
        # do request
1174
        res = await self.session.send_request(
×
1175
            'blockchain.transaction.id_from_pos',
1176
            [tx_height, tx_pos, merkle],
1177
        )
1178
        # check response
1179
        if merkle:
×
1180
            assert_dict_contains_field(res, field_name='tx_hash')
×
1181
            assert_dict_contains_field(res, field_name='merkle')
×
1182
            assert_hash256_str(res['tx_hash'])
×
1183
            assert_list_or_tuple(res['merkle'])
×
1184
            for node_hash in res['merkle']:
×
1185
                assert_hash256_str(node_hash)
×
1186
        else:
1187
            assert_hash256_str(res)
×
1188
        return res
×
1189

1190
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
5✔
1191
        # do request
1192
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1193
        # check response
1194
        assert_list_or_tuple(res)
×
1195
        prev_fee = float('inf')
×
1196
        for fee, s in res:
×
1197
            assert_non_negative_int_or_float(fee)
×
1198
            assert_non_negative_integer(s)
×
1199
            if fee >= prev_fee:  # check monotonicity
×
1200
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1201
            prev_fee = fee
×
1202
        return res
×
1203

1204
    async def get_server_banner(self) -> str:
5✔
1205
        # do request
1206
        res = await self.session.send_request('server.banner')
×
1207
        # check response
1208
        if not isinstance(res, str):
×
1209
            raise RequestCorrupted(f'{res!r} should be a str')
×
1210
        return res
×
1211

1212
    async def get_donation_address(self) -> str:
5✔
1213
        # do request
1214
        res = await self.session.send_request('server.donation_address')
×
1215
        # check response
1216
        if not res:  # ignore empty string
×
1217
            return ''
×
1218
        if not bitcoin.is_address(res):
×
1219
            # note: do not hard-fail -- allow server to use future-type
1220
            #       bitcoin address we do not recognize
1221
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1222
            res = ''
×
1223
        return res
×
1224

1225
    async def get_relay_fee(self) -> int:
5✔
1226
        """Returns the min relay feerate in sat/kbyte."""
1227
        # do request
1228
        res = await self.session.send_request('blockchain.relayfee')
×
1229
        # check response
1230
        assert_non_negative_int_or_float(res)
×
1231
        relayfee = int(res * bitcoin.COIN)
×
1232
        relayfee = max(0, relayfee)
×
1233
        return relayfee
×
1234

1235
    async def get_estimatefee(self, num_blocks: int) -> int:
5✔
1236
        """Returns a feerate estimate for getting confirmed within
1237
        num_blocks blocks, in sat/kbyte.
1238
        Returns -1 if the server could not provide an estimate.
1239
        """
1240
        if not is_non_negative_integer(num_blocks):
×
1241
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1242
        # do request
1243
        try:
×
1244
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1245
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1246
            # The protocol spec says the server itself should already have returned -1
1247
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1248
            # and sends an error instead. Convert it here:
1249
            if "cannot estimate fee" in e.message:
×
1250
                res = -1
×
1251
            else:
1252
                raise
×
1253
        except aiorpcx.jsonrpc.RPCError as e:
×
1254
            # The protocol spec says the server itself should already have returned -1
1255
            # if it cannot provide an estimate. "Fulcrum" often sends:
1256
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1257
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1258
                res = -1
×
1259
            else:
1260
                raise
×
1261
        # check response
1262
        if res != -1:
×
1263
            assert_non_negative_int_or_float(res)
×
1264
            res = int(res * bitcoin.COIN)
×
1265
        return res
×
1266

1267

1268
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
5✔
1269
    chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
1270
    if chain_bad:
5✔
1271
        raise Exception('bad_header must not check!')
×
1272

1273

1274
def check_cert(host, cert):
5✔
1275
    try:
×
1276
        b = pem.dePem(cert, 'CERTIFICATE')
×
1277
        x = x509.X509(b)
×
1278
    except Exception:
×
1279
        traceback.print_exc(file=sys.stdout)
×
1280
        return
×
1281

1282
    try:
×
1283
        x.check_date()
×
1284
        expired = False
×
1285
    except Exception:
×
1286
        expired = True
×
1287

1288
    m = "host: %s\n"%host
×
1289
    m += "has_expired: %s\n"% expired
×
1290
    util.print_msg(m)
×
1291

1292

1293
# Used by tests
1294
def _match_hostname(name, val):
5✔
1295
    if val == name:
×
1296
        return True
×
1297

1298
    return val.startswith('*.') and name.endswith(val[1:])
×
1299

1300

1301
def test_certificates():
5✔
1302
    from .simple_config import SimpleConfig
×
1303
    config = SimpleConfig()
×
1304
    mydir = os.path.join(config.path, "certs")
×
1305
    certs = os.listdir(mydir)
×
1306
    for c in certs:
×
1307
        p = os.path.join(mydir,c)
×
1308
        with open(p, encoding='utf-8') as f:
×
1309
            cert = f.read()
×
1310
        check_cert(c, cert)
×
1311

1312
if __name__ == "__main__":
5✔
1313
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc