• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5763146712875008

07 Mar 2025 09:24AM UTC coverage: 60.719% (+0.3%) from 60.38%
5763146712875008

Pull #9586

CirrusCI

f321x
reduce scope of dont_settle_htlc_keys
Pull Request #9586: Add mechanism to block htlcs from settling back

6 of 10 new or added lines in 2 files covered. (60.0%)

3812 existing lines in 21 files now uncovered.

20679 of 34057 relevant lines covered (60.72%)

2.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.72
/electrum/network.py
1
# Electrum - Lightweight Bitcoin Client
2
# Copyright (c) 2011-2016 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import asyncio
4✔
24
import time
4✔
25
import queue
4✔
26
import os
4✔
27
import random
4✔
28
import re
4✔
29
from collections import defaultdict
4✔
30
import threading
4✔
31
import socket
4✔
32
import json
4✔
33
import sys
4✔
34
from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any, TypeVar
4✔
35
import traceback
4✔
36
import concurrent
4✔
37
from concurrent import futures
4✔
38
import copy
4✔
39
import functools
4✔
40
from enum import IntEnum
4✔
41
from contextlib import nullcontext
4✔
42

43
import aiorpcx
4✔
44
from aiorpcx import ignore_after, NetAddress
4✔
45
from aiohttp import ClientResponse
4✔
46

47
from . import util
4✔
48
from .util import (log_exceptions, ignore_exceptions, OldTaskGroup,
4✔
49
                   bfh, make_aiohttp_session, send_exception_to_crash_reporter,
50
                   is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
51
                   error_text_str_to_safe_str)
52
from .bitcoin import COIN, DummyAddress, DummyAddressUsedInTxException
4✔
53
from . import constants
4✔
54
from . import blockchain
4✔
55
from . import bitcoin
4✔
56
from . import dns_hacks
4✔
57
from .transaction import Transaction
4✔
58
from .blockchain import Blockchain, HEADER_SIZE
4✔
59
from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
4✔
60
                        RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
61
                        NetworkException, RequestCorrupted, ServerAddr)
62
from .version import PROTOCOL_VERSION
4✔
63
from .i18n import _
4✔
64
from .logging import get_logger, Logger
4✔
65

66
if TYPE_CHECKING:
4✔
UNCOV
67
    from collections.abc import Coroutine
×
68

UNCOV
69
    from .channel_db import ChannelDB
×
UNCOV
70
    from .lnrouter import LNPathFinder
×
UNCOV
71
    from .lnworker import LNGossip
×
72
    #from .lnwatcher import WatchTower
UNCOV
73
    from .daemon import Daemon
×
74
    from .simple_config import SimpleConfig
×
75

76

77
_logger = get_logger(__name__)
4✔
78

79

80
NUM_TARGET_CONNECTED_SERVERS = 10
4✔
81
NUM_STICKY_SERVERS = 4
4✔
82
NUM_RECENT_SERVERS = 20
4✔
83

84
T = TypeVar('T')
4✔
85

86

87
class ConnectionState(IntEnum):
4✔
88
    DISCONNECTED  = 0
4✔
89
    CONNECTING    = 1
4✔
90
    CONNECTED     = 2
4✔
91

92

93
def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
4✔
94
    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
95
    Also validate values, such as IP addresses and ports.
96
    """
UNCOV
97
    servers = {}
×
UNCOV
98
    for item in result:
×
UNCOV
99
        host = item[1]
×
UNCOV
100
        out = {}
×
UNCOV
101
        version = None
×
102
        pruning_level = '-'
×
103
        if len(item) > 2:
×
104
            for v in item[2]:
×
105
                if re.match(r"[st]\d*", v):
×
106
                    protocol, port = v[0], v[1:]
×
107
                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
×
108
                    ServerAddr(host, port, protocol=protocol)  # check if raises
×
109
                    out[protocol] = port
×
110
                elif re.match("v(.?)+", v):
×
111
                    version = v[1:]
×
112
                elif re.match(r"p\d*", v):
×
113
                    pruning_level = v[1:]
×
114
                if pruning_level == '': pruning_level = '0'
×
115
        if out:
×
116
            out['pruning'] = pruning_level
×
117
            out['version'] = version
×
118
            servers[host] = out
×
119
    return servers
×
120

121

122
def filter_version(servers):
4✔
123
    def is_recent(version):
×
124
        try:
×
UNCOV
125
            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
×
UNCOV
126
        except Exception as e:
×
UNCOV
127
            return False
×
128
    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
×
129

130

131
def filter_noonion(servers):
4✔
132
    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
×
133

134

135
def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
4✔
136
    """Filters the hostmap for those implementing protocol."""
137
    if allowed_protocols is None:
×
UNCOV
138
        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
UNCOV
139
    eligible = []
×
UNCOV
140
    for host, portmap in hostmap.items():
×
UNCOV
141
        for protocol in allowed_protocols:
×
142
            port = portmap.get(protocol)
×
143
            if port:
×
144
                eligible.append(ServerAddr(host, port, protocol=protocol))
×
145
    return eligible
×
146

147

148
def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
4✔
149
                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
150
    if hostmap is None:
×
UNCOV
151
        hostmap = constants.net.DEFAULT_SERVERS
×
UNCOV
152
    if exclude_set is None:
×
UNCOV
153
        exclude_set = set()
×
UNCOV
154
    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
×
155
    eligible = list(servers - exclude_set)
×
156
    return random.choice(eligible) if eligible else None
×
157

158

159
class NetworkParameters(NamedTuple):
4✔
160
    server: ServerAddr
4✔
161
    proxy: Optional[dict]
4✔
162
    auto_connect: bool
4✔
163
    oneserver: bool = False
4✔
164

165

166
proxy_modes = ['socks4', 'socks5']
4✔
167

168

169
def serialize_proxy(p):
4✔
UNCOV
170
    if not isinstance(p, dict):
×
UNCOV
171
        return None
×
172
    return ':'.join([p.get('mode'), p.get('host'), p.get('port')])
×
173

174

175
def deserialize_proxy(s: Optional[str], user: str = None, password: str = None) -> Optional[dict]:
4✔
176
    if not isinstance(s, str):
×
UNCOV
177
        return None
×
UNCOV
178
    if s.lower() == 'none':
×
UNCOV
179
        return None
×
UNCOV
180
    proxy = {"mode": "socks5", "host": "localhost"}
×
181

UNCOV
182
    args = s.split(':')
×
UNCOV
183
    if args[0] in proxy_modes:
×
UNCOV
184
        proxy['mode'] = args[0]
×
UNCOV
185
        args = args[1:]
×
186

UNCOV
187
    def is_valid_port(ps: str):
×
UNCOV
188
        try:
×
UNCOV
189
            return 0 < int(ps) < 65535
×
UNCOV
190
        except ValueError:
×
UNCOV
191
            return False
×
192

193
    def is_valid_host(ph: str):
×
UNCOV
194
        try:
×
UNCOV
195
            NetAddress(ph, '1')
×
196
        except ValueError:
×
UNCOV
197
            return False
×
UNCOV
198
        return True
×
199

200
    # detect migrate from old settings
201
    if len(args) == 4 and is_valid_host(args[0]) and is_valid_port(args[1]):  # host:port:user:pass,
×
202
        proxy['host'] = args[0]
×
203
        proxy['port'] = args[1]
×
UNCOV
204
        proxy['user'] = args[2]
×
205
        proxy['password'] = args[3]
×
206
        return proxy
×
207

208
    proxy['host'] = ':'.join(args[:-1])
×
209
    proxy['port'] = args[-1]
×
210

211
    if not is_valid_host(proxy['host']) or not is_valid_port(proxy['port']):
×
UNCOV
212
        return None
×
213

214
    proxy['user'] = user
×
215
    proxy['password'] = password
×
216

217
    return proxy
×
218

219

220
class BestEffortRequestFailed(NetworkException): pass
4✔
221

222

223
class TxBroadcastError(NetworkException):
4✔
224
    def get_message_for_gui(self):
4✔
225
        raise NotImplementedError()
×
226

227

228
class TxBroadcastHashMismatch(TxBroadcastError):
4✔
229
    def get_message_for_gui(self):
4✔
UNCOV
230
        return "{}\n{}\n\n{}" \
×
231
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
232
                    _("Consider trying to connect to a different server, or updating Electrum."),
233
                    str(self))
234

235

236
class TxBroadcastServerReturnedError(TxBroadcastError):
4✔
237
    def get_message_for_gui(self):
4✔
UNCOV
238
        return "{}\n{}\n\n{}" \
×
239
            .format(_("The server returned an error when broadcasting the transaction."),
240
                    _("Consider trying to connect to a different server, or updating Electrum."),
241
                    str(self))
242

243

244
class TxBroadcastUnknownError(TxBroadcastError):
4✔
245
    def get_message_for_gui(self):
4✔
UNCOV
246
        return "{}\n{}" \
×
247
            .format(_("Unknown error when broadcasting the transaction."),
248
                    _("Consider trying to connect to a different server, or updating Electrum."))
249

250

251
class UntrustedServerReturnedError(NetworkException):
4✔
252
    def __init__(self, *, original_exception):
4✔
UNCOV
253
        self.original_exception = original_exception
×
254

255
    def get_message_for_gui(self) -> str:
4✔
UNCOV
256
        return str(self)
×
257

258
    def get_untrusted_message(self) -> str:
4✔
UNCOV
259
        e = self.original_exception
×
260
        return (f"<UntrustedServerReturnedError "
×
261
                f"[DO NOT TRUST THIS MESSAGE] original_exception: {error_text_str_to_safe_str(repr(e))}>")
262

263
    def __str__(self):
4✔
264
        # We should not show the untrusted text from self.original_exception,
265
        # to avoid accidentally showing it in the GUI.
266
        return _("The server returned an error.")
×
267

268
    def __repr__(self):
4✔
269
        # We should not show the untrusted text from self.original_exception,
270
        # to avoid accidentally showing it in the GUI.
UNCOV
271
        return f"<UntrustedServerReturnedError {str(self)!r}>"
×
272

273

274
_INSTANCE = None
4✔
275

276

277
class Network(Logger, NetworkRetryManager[ServerAddr]):
4✔
278
    """The Network class manages a set of connections to remote electrum
279
    servers, each connected socket is handled by an Interface() object.
280
    """
281

282
    LOGGING_SHORTCUT = 'n'
4✔
283

284
    taskgroup: Optional[OldTaskGroup]
4✔
285
    interface: Optional[Interface]
4✔
286
    interfaces: Dict[ServerAddr, Interface]
4✔
287
    _connecting_ifaces: Set[ServerAddr]
4✔
288
    _closing_ifaces: Set[ServerAddr]
4✔
289
    default_server: ServerAddr
4✔
290
    _recent_servers: List[ServerAddr]
4✔
291

292
    channel_db: Optional['ChannelDB'] = None
4✔
293
    lngossip: Optional['LNGossip'] = None
4✔
294
    path_finder: Optional['LNPathFinder'] = None
4✔
295

296
    def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
4✔
297
        global _INSTANCE
UNCOV
298
        assert _INSTANCE is None, "Network is a singleton!"
×
UNCOV
299
        _INSTANCE = self
×
300

UNCOV
301
        Logger.__init__(self)
×
UNCOV
302
        NetworkRetryManager.__init__(
×
303
            self,
304
            max_retry_delay_normal=600,
305
            init_retry_delay_normal=15,
306
            max_retry_delay_urgent=10,
307
            init_retry_delay_urgent=1,
308
        )
309

UNCOV
310
        self.asyncio_loop = util.get_asyncio_loop()
×
UNCOV
311
        assert self.asyncio_loop.is_running(), "event loop not running"
×
312

313
        self.config = config
×
UNCOV
314
        self.daemon = daemon
×
315

UNCOV
316
        blockchain.read_blockchains(self.config)
×
UNCOV
317
        blockchain.init_headers_file_for_best_chain()
×
UNCOV
318
        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
×
UNCOV
319
        self._blockchain_preferred_block = self.config.BLOCKCHAIN_PREFERRED_BLOCK  # type: Dict[str, Any]
×
UNCOV
320
        if self._blockchain_preferred_block is None:
×
321
            self._set_preferred_chain(None)
×
UNCOV
322
        self._blockchain = blockchain.get_best_chain()
×
323

UNCOV
324
        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
325

UNCOV
326
        self.proxy = None  # type: Optional[dict]
×
UNCOV
327
        self.is_proxy_tor = None
×
328
        self._init_parameters_from_config()
×
329

UNCOV
330
        self.taskgroup = None
×
331

332
        # locks
UNCOV
333
        self.restart_lock = asyncio.Lock()
×
334
        self.bhi_lock = asyncio.Lock()
×
335
        self.recent_servers_lock = threading.RLock()       # <- re-entrant
×
UNCOV
336
        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
×
337

UNCOV
338
        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
×
UNCOV
339
        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
×
340

341
        self.banner = ''
×
UNCOV
342
        self.donation_address = ''
×
UNCOV
343
        self.relay_fee = None  # type: Optional[int]
×
344

UNCOV
345
        dir_path = os.path.join(self.config.path, 'certs')
×
346
        util.make_dir(dir_path)
×
347

348
        # the main server we are currently communicating with
UNCOV
349
        self.interface = None
×
UNCOV
350
        self.default_server_changed_event = asyncio.Event()
×
351
        # Set of servers we have an ongoing connection with.
352
        # For any ServerAddr, at most one corresponding Interface object
353
        # can exist at any given time. Depending on the state of that Interface,
354
        # the ServerAddr can be found in one of the following sets.
355
        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
UNCOV
356
        self._connecting_ifaces = set()
×
UNCOV
357
        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
×
UNCOV
358
        self._closing_ifaces = set()
×
359

360
        # Dump network messages (all interfaces).  Set at runtime from the console.
UNCOV
361
        self.debug = False
×
362

UNCOV
363
        self._set_status(ConnectionState.DISCONNECTED)
×
UNCOV
364
        self._has_ever_managed_to_connect_to_server = False
×
UNCOV
365
        self._was_started = False
×
366

367

368
    def has_internet_connection(self) -> bool:
4✔
369
        """Our guess whether the device has Internet-connectivity."""
UNCOV
370
        return self._has_ever_managed_to_connect_to_server
×
371

372
    def has_channel_db(self):
4✔
373
        return self.channel_db is not None
×
374

375
    def start_gossip(self):
4✔
376
        from . import lnrouter
×
377
        from . import channel_db
×
UNCOV
378
        from . import lnworker
×
UNCOV
379
        if not self.config.LIGHTNING_USE_GOSSIP:
×
UNCOV
380
            return
×
UNCOV
381
        if self.lngossip is None:
×
UNCOV
382
            self.channel_db = channel_db.ChannelDB(self)
×
UNCOV
383
            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
×
UNCOV
384
            self.channel_db.load_data()
×
385
            self.lngossip = lnworker.LNGossip(self.config)
×
386
            self.lngossip.start_network(self)
×
387

388
    async def stop_gossip(self, *, full_shutdown: bool = False):
4✔
389
        if self.lngossip:
×
UNCOV
390
            await self.lngossip.stop()
×
391
            self.lngossip = None
×
392
            self.channel_db.stop()
×
393
            if full_shutdown:
×
394
                await self.channel_db.stopped_event.wait()
×
395
            self.channel_db = None
×
396
            self.path_finder = None
×
397

398
    @classmethod
4✔
399
    def run_from_another_thread(cls, coro: 'Coroutine[Any, Any, T]', *, timeout=None) -> T:
4✔
UNCOV
400
        loop = util.get_asyncio_loop()
×
401
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
402
        fut = asyncio.run_coroutine_threadsafe(coro, loop)
×
403
        return fut.result(timeout)
×
404

405
    @staticmethod
4✔
406
    def get_instance() -> Optional["Network"]:
4✔
407
        """Return the global singleton network instance.
408
        Note that this can return None! If we are run with the --offline flag, there is no network.
409
        """
410
        return _INSTANCE
×
411

412
    def with_recent_servers_lock(func):
4✔
413
        def func_wrapper(self, *args, **kwargs):
4✔
414
            with self.recent_servers_lock:
×
UNCOV
415
                return func(self, *args, **kwargs)
×
416
        return func_wrapper
4✔
417

418
    def _read_recent_servers(self) -> List[ServerAddr]:
4✔
UNCOV
419
        if not self.config.path:
×
420
            return []
×
421
        path = os.path.join(self.config.path, "recent_servers")
×
UNCOV
422
        try:
×
UNCOV
423
            with open(path, "r", encoding='utf-8') as f:
×
424
                data = f.read()
×
425
                servers_list = json.loads(data)
×
UNCOV
426
            return [ServerAddr.from_str(s) for s in servers_list]
×
UNCOV
427
        except Exception:
×
UNCOV
428
            return []
×
429

430
    @with_recent_servers_lock
4✔
431
    def _save_recent_servers(self):
4✔
432
        if not self.config.path:
×
433
            return
×
UNCOV
434
        path = os.path.join(self.config.path, "recent_servers")
×
UNCOV
435
        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
×
436
        try:
×
UNCOV
437
            with open(path, "w", encoding='utf-8') as f:
×
438
                f.write(s)
×
439
        except Exception:
×
440
            pass
×
441

442
    async def _server_is_lagging(self) -> bool:
4✔
443
        sh = self.get_server_height()
×
444
        if not sh:
×
UNCOV
445
            self.logger.info('no height for main interface')
×
UNCOV
446
            return True
×
UNCOV
447
        lh = self.get_local_height()
×
448
        result = (lh - sh) > 1
×
UNCOV
449
        if result:
×
UNCOV
450
            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
×
451
        return result
×
452

453
    def _set_status(self, status):
4✔
454
        self.connection_status = status
×
455
        util.trigger_callback('status')
×
456

457
    def is_connected(self):
4✔
458
        interface = self.interface
×
459
        return interface is not None and interface.is_connected_and_ready()
×
460

461
    def is_connecting(self):
4✔
462
        return self.connection_status == ConnectionState.CONNECTING
×
463

464
    def get_connection_status_for_GUI(self):
4✔
UNCOV
465
        ConnectionStates = {
×
466
            ConnectionState.DISCONNECTED: _('Disconnected'),
467
            ConnectionState.CONNECTING: _('Connecting'),
468
            ConnectionState.CONNECTED: _('Connected'),
469
        }
470
        return ConnectionStates[self.connection_status]
×
471

472
    async def _request_server_info(self, interface: 'Interface'):
4✔
473
        await interface.ready
×
474
        session = interface.session
×
475

UNCOV
476
        async def get_banner():
×
UNCOV
477
            self.banner = await interface.get_server_banner()
×
478
            util.trigger_callback('banner', self.banner)
×
479
        async def get_donation_address():
×
480
            self.donation_address = await interface.get_donation_address()
×
481
        async def get_server_peers():
×
UNCOV
482
            server_peers = await session.send_request('server.peers.subscribe')
×
UNCOV
483
            random.shuffle(server_peers)
×
UNCOV
484
            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
×
UNCOV
485
            server_peers = server_peers[:max_accepted_peers]
×
486
            # note that 'parse_servers' also validates the data (which is untrusted input!)
UNCOV
487
            self.server_peers = parse_servers(server_peers)
×
488
            util.trigger_callback('servers', self.get_servers())
×
UNCOV
489
        async def get_relay_fee():
×
UNCOV
490
            self.relay_fee = await interface.get_relay_fee()
×
491

492
        async with OldTaskGroup() as group:
×
493
            await group.spawn(get_banner)
×
UNCOV
494
            await group.spawn(get_donation_address)
×
UNCOV
495
            await group.spawn(get_server_peers)
×
UNCOV
496
            await group.spawn(get_relay_fee)
×
497
            await group.spawn(self._request_fee_estimates(interface))
×
498

499
    async def _request_fee_estimates(self, interface):
4✔
500
        self.config.requested_fee_estimates()
×
501
        histogram = await interface.get_fee_histogram()
×
502
        self.config.mempool_fees = histogram
×
503
        self.logger.info(f'fee_histogram {len(histogram)}')
×
504
        util.trigger_callback('fee_histogram', self.config.mempool_fees)
×
505

506
    def get_parameters(self) -> NetworkParameters:
4✔
UNCOV
507
        return NetworkParameters(server=self.default_server,
×
508
                                 proxy=self.proxy,
509
                                 auto_connect=self.auto_connect,
510
                                 oneserver=self.oneserver)
511

512
    def _init_parameters_from_config(self) -> None:
4✔
513
        dns_hacks.configure_dns_resolver()
×
514
        self.auto_connect = self.config.NETWORK_AUTO_CONNECT
×
515
        self._set_default_server()
×
516
        self._set_proxy(deserialize_proxy(self.config.NETWORK_PROXY, self.config.NETWORK_PROXY_USER,
×
517
                                          self.config.NETWORK_PROXY_PASSWORD))
518
        self._maybe_set_oneserver()
×
519

520
    def get_donation_address(self):
4✔
521
        if self.is_connected():
×
522
            return self.donation_address
×
523

524
    def get_interfaces(self) -> List[ServerAddr]:
4✔
525
        """The list of servers for the connected interfaces."""
526
        with self.interfaces_lock:
×
527
            return list(self.interfaces)
×
528

529
    def get_status(self):
4✔
UNCOV
530
        n = len(self.get_interfaces())
×
UNCOV
531
        return _("Connected to {0} nodes.").format(n) if n > 1 else _("Connected to {0} node.").format(n) if n == 1 else _("Not connected")
×
532

533
    def get_fee_estimates(self):
4✔
UNCOV
534
        from statistics import median
×
UNCOV
535
        from .simple_config import FEE_ETA_TARGETS
×
536
        if self.auto_connect:
×
537
            with self.interfaces_lock:
×
UNCOV
538
                out = {}
×
UNCOV
539
                for n in FEE_ETA_TARGETS:
×
540
                    try:
×
UNCOV
541
                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
×
UNCOV
542
                    except Exception:
×
543
                        continue
×
UNCOV
544
                return out
×
545
        else:
UNCOV
546
            if not self.interface:
×
UNCOV
547
                return {}
×
548
            return self.interface.fee_estimates_eta
×
549

550
    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
4✔
551
        if fee_est is None:
×
552
            fee_est = self.get_fee_estimates()
×
UNCOV
553
        for nblock_target, fee in fee_est.items():
×
554
            self.config.update_fee_estimates(nblock_target, fee)
×
555
        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
×
556
            self._prev_fee_est = copy.copy(fee_est)
×
557
            self.logger.info(f'fee_estimates {fee_est}')
×
558
        util.trigger_callback('fee', self.config.fee_estimates)
×
559

560
    @with_recent_servers_lock
4✔
561
    def get_servers(self):
4✔
562
        # note: order of sources when adding servers here is crucial!
563
        # don't let "server_peers" overwrite anything,
564
        # otherwise main server can eclipse the client
565
        out = dict()
×
566
        # add servers received from main interface
567
        server_peers = self.server_peers
×
568
        if server_peers:
×
UNCOV
569
            out.update(filter_version(server_peers.copy()))
×
570
        # hardcoded servers
571
        out.update(constants.net.DEFAULT_SERVERS)
×
572
        # add recent servers
573
        for server in self._recent_servers:
×
574
            port = str(server.port)
×
575
            if server.host in out:
×
UNCOV
576
                out[server.host].update({server.protocol: port})
×
577
            else:
578
                out[server.host] = {server.protocol: port}
×
579
        # add bookmarks
580
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
581
        for server_str in bookmarks:
×
582
            try:
×
UNCOV
583
                server = ServerAddr.from_str(server_str)
×
UNCOV
584
            except ValueError:
×
UNCOV
585
                continue
×
UNCOV
586
            port = str(server.port)
×
UNCOV
587
            if server.host in out:
×
588
                out[server.host].update({server.protocol: port})
×
589
            else:
UNCOV
590
                out[server.host] = {server.protocol: port}
×
591
        # potentially filter out some
592
        if self.config.NETWORK_NOONION:
×
UNCOV
593
            out = filter_noonion(out)
×
UNCOV
594
        return out
×
595

596
    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
4✔
UNCOV
597
        now = time.time()
×
598
        with self.interfaces_lock:
×
UNCOV
599
            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
×
600
        # First try from recent servers. (which are persisted)
601
        # As these are servers we successfully connected to recently, they are
602
        # most likely to work. This also makes servers "sticky".
603
        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
604
        #       however if they succeed, the eclipsing would persist. To try to balance this,
605
        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
UNCOV
606
        with self.recent_servers_lock:
×
607
            recent_servers = list(self._recent_servers)
×
608
        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
×
609
        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
×
610
            for server in recent_servers:
×
611
                if server in connected_servers:
×
UNCOV
612
                    continue
×
UNCOV
613
                if not self._can_retry_addr(server, now=now):
×
614
                    continue
×
615
                return server
×
616
        # try all servers we know about, pick one at random
UNCOV
617
        hostmap = self.get_servers()
×
UNCOV
618
        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
×
619
        random.shuffle(servers)
×
620
        for server in servers:
×
UNCOV
621
            if not self._can_retry_addr(server, now=now):
×
UNCOV
622
                continue
×
623
            return server
×
624
        return None
×
625

626
    def _set_default_server(self) -> None:
4✔
627
        # Server for addresses and transactions
628
        server = self.config.NETWORK_SERVER
×
629
        # Sanitize default server
630
        if server:
×
631
            try:
×
632
                self.default_server = ServerAddr.from_str(server)
×
633
            except Exception:
×
634
                self.logger.warning(f'failed to parse server-string ({server!r}); falling back to localhost:1:s.')
×
635
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
636
        else:
UNCOV
637
            self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
×
638
        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
×
639

640
    def _set_proxy(self, proxy: Optional[dict]):
4✔
UNCOV
641
        if self.proxy == proxy:
×
UNCOV
642
            return
×
643

644
        self.logger.info(f'setting proxy {proxy}')
×
645
        self.proxy = proxy
×
646

647
        # reset is_proxy_tor to unknown, and re-detect it:
648
        self.is_proxy_tor = None
×
649
        self._detect_if_proxy_is_tor()
×
650

UNCOV
651
        util.trigger_callback('proxy_set', self.proxy)
×
652

653
    def _detect_if_proxy_is_tor(self) -> None:
4✔
UNCOV
654
        def tor_probe_task(p):
×
UNCOV
655
            assert p is not None
×
UNCOV
656
            is_tor = util.is_tor_socks_port(p['host'], int(p['port']))
×
UNCOV
657
            if self.proxy == p:  # is this the proxy we probed?
×
658
                if self.is_proxy_tor != is_tor:
×
UNCOV
659
                    self.logger.info(f'Proxy is {"" if is_tor else "not "}TOR')
×
660
                    self.is_proxy_tor = is_tor
×
661
                util.trigger_callback('tor_probed', is_tor)
×
662

UNCOV
663
        proxy = self.proxy
×
664
        if proxy and proxy['mode'] == 'socks5':
×
UNCOV
665
            t = threading.Thread(target=tor_probe_task, args=(proxy,), daemon=True)
×
666
            t.start()
×
667

668
    @log_exceptions
4✔
669
    async def set_parameters(self, net_params: NetworkParameters):
4✔
UNCOV
670
        proxy = net_params.proxy
×
671
        proxy_str = serialize_proxy(proxy)
×
UNCOV
672
        proxy_user = proxy['user'] if proxy else None
×
673
        proxy_pass = proxy['password'] if proxy else None
×
674
        server = net_params.server
×
675
        # sanitize parameters
676
        try:
×
677
            if proxy:
×
678
                proxy_modes.index(proxy['mode']) + 1
×
679
                int(proxy['port'])
×
680
        except Exception:
×
681
            return
×
UNCOV
682
        self.config.NETWORK_AUTO_CONNECT = net_params.auto_connect
×
683
        self.config.NETWORK_ONESERVER = net_params.oneserver
×
UNCOV
684
        self.config.NETWORK_PROXY = proxy_str
×
685
        self.config.NETWORK_PROXY_USER = proxy_user
×
686
        self.config.NETWORK_PROXY_PASSWORD = proxy_pass
×
687
        self.config.NETWORK_SERVER = str(server)
×
688
        # abort if changes were not allowed by config
UNCOV
689
        if self.config.NETWORK_SERVER != str(server) \
×
690
                or self.config.NETWORK_PROXY != proxy_str \
691
                or self.config.NETWORK_PROXY_USER != proxy_user \
692
                or self.config.NETWORK_PROXY_PASSWORD != proxy_pass \
693
                or self.config.NETWORK_ONESERVER != net_params.oneserver:
UNCOV
694
            return
×
695

UNCOV
696
        proxy_changed = self.proxy != proxy
×
UNCOV
697
        oneserver_changed = self.oneserver != net_params.oneserver
×
UNCOV
698
        default_server_changed = self.default_server != server
×
699
        self._init_parameters_from_config()
×
700
        if not self._was_started:
×
701
            return
×
702

703
        async with self.restart_lock:
×
704
            if proxy_changed or oneserver_changed:
×
705
                # Restart the network
706
                await self.stop(full_shutdown=False)
×
707
                await self._start()
×
708
            elif default_server_changed:
×
UNCOV
709
                await self.switch_to_interface(server)
×
710
            else:
711
                await self.switch_lagging_interface()
×
712
        util.trigger_callback('network_updated')
×
713

714
    def _maybe_set_oneserver(self) -> None:
4✔
715
        oneserver = self.config.NETWORK_ONESERVER
×
716
        self.oneserver = oneserver
×
717
        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
×
718

719
    def is_server_bookmarked(self, server: ServerAddr) -> bool:
4✔
UNCOV
720
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
721
        return str(server) in bookmarks
×
722

723
    def set_server_bookmark(self, server: ServerAddr, *, add: bool) -> None:
4✔
724
        server_str = str(server)
×
725
        with self.config.lock:
×
726
            bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
727
            if add:
×
728
                if server_str not in bookmarks:
×
UNCOV
729
                    bookmarks.append(server_str)
×
730
            else:  # remove
731
                if server_str in bookmarks:
×
UNCOV
732
                    bookmarks.remove(server_str)
×
UNCOV
733
            self.config.NETWORK_BOOKMARKED_SERVERS = bookmarks
×
734

735
    async def _switch_to_random_interface(self):
4✔
736
        '''Switch to a random connected server other than the current one'''
737
        servers = self.get_interfaces()    # Those in connected state
×
738
        if self.default_server in servers:
×
UNCOV
739
            servers.remove(self.default_server)
×
UNCOV
740
        if servers:
×
741
            await self.switch_to_interface(random.choice(servers))
×
742

743
    async def switch_lagging_interface(self):
4✔
744
        """If auto_connect and lagging, switch interface (only within fork)."""
UNCOV
745
        if self.auto_connect and await self._server_is_lagging():
×
746
            # switch to one that has the correct header (not height)
747
            best_header = self.blockchain().header_at_tip()
×
748
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
749
            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
×
750
            if filtered:
×
751
                chosen_iface = random.choice(filtered)
×
752
                await self.switch_to_interface(chosen_iface.server)
×
753

754
    async def switch_unwanted_fork_interface(self) -> None:
4✔
755
        """If auto_connect, maybe switch to another fork/chain."""
756
        if not self.auto_connect or not self.interface:
×
757
            return
×
758
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
UNCOV
759
        pref_height = self._blockchain_preferred_block['height']
×
UNCOV
760
        pref_hash   = self._blockchain_preferred_block['hash']
×
761
        # shortcut for common case
762
        if pref_height == 0:
×
763
            return
×
764
        # maybe try switching chains; starting with most desirable first
765
        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
×
766
        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
×
767
        for rank, chain in enumerate(chains_to_try):
×
768
            # check if main interface is already on this fork
769
            if self.interface.blockchain == chain:
×
770
                return
×
771
            # switch to another random interface that is on this fork, if any
772
            filtered = [iface for iface in interfaces
×
773
                        if iface.blockchain == chain]
774
            if filtered:
×
775
                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
×
776
                chosen_iface = random.choice(filtered)
×
UNCOV
777
                await self.switch_to_interface(chosen_iface.server)
×
778
                return
×
779
        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
×
780

781
    async def switch_to_interface(self, server: ServerAddr):
4✔
782
        """Switch to server as our main interface. If no connection exists,
783
        queue interface to be started. The actual switch will
784
        happen when the interface becomes ready.
785
        """
786
        self.default_server = server
×
UNCOV
787
        old_interface = self.interface
×
UNCOV
788
        old_server = old_interface.server if old_interface else None
×
789

790
        # Stop any current interface in order to terminate subscriptions,
791
        # and to cancel tasks in interface.taskgroup.
792
        if old_server and old_server != server:
×
793
            # don't wait for old_interface to close as that might be slow:
794
            await self.taskgroup.spawn(self._close_interface(old_interface))
×
795

796
        if server not in self.interfaces:
×
797
            self.interface = None
×
798
            await self.taskgroup.spawn(self._run_new_interface(server))
×
799
            return
×
800

801
        i = self.interfaces[server]
×
802
        if old_interface != i:
×
UNCOV
803
            if not i.is_connected_and_ready():
×
804
                return
×
805
            self.logger.info(f"switching to {server}")
×
806
            blockchain_updated = i.blockchain != self.blockchain()
×
807
            self.interface = i
×
UNCOV
808
            try:
×
809
                await i.taskgroup.spawn(self._request_server_info(i))
×
810
            except RuntimeError as e:  # see #7677
×
UNCOV
811
                if len(e.args) >= 1 and e.args[0] == 'task group terminated':
×
UNCOV
812
                    self.logger.warning(f"tried to switch to {server} but interface.taskgroup is already dead.")
×
813
                    self.interface = None
×
814
                    return
×
815
                raise
×
UNCOV
816
            util.trigger_callback('default_server_changed')
×
UNCOV
817
            self.default_server_changed_event.set()
×
818
            self.default_server_changed_event.clear()
×
819
            self._set_status(ConnectionState.CONNECTED)
×
UNCOV
820
            util.trigger_callback('network_updated')
×
UNCOV
821
            if blockchain_updated:
×
822
                util.trigger_callback('blockchain_updated')
×
823

824
    async def _close_interface(self, interface: Optional[Interface]):
4✔
825
        if not interface:
×
826
            return
×
827
        if interface.server in self._closing_ifaces:
×
UNCOV
828
            return
×
829
        self._closing_ifaces.add(interface.server)
×
830
        with self.interfaces_lock:
×
831
            if self.interfaces.get(interface.server) == interface:
×
UNCOV
832
                self.interfaces.pop(interface.server)
×
UNCOV
833
        if interface == self.interface:
×
UNCOV
834
            self.interface = None
×
835
        try:
×
836
            # this can take some time if server/connection is slow:
837
            await interface.close()
×
838
            await interface.got_disconnected.wait()
×
839
        finally:
UNCOV
840
            self._closing_ifaces.discard(interface.server)
×
841

842
    @with_recent_servers_lock
4✔
843
    def _add_recent_server(self, server: ServerAddr) -> None:
4✔
UNCOV
844
        self._on_connection_successfully_established(server)
×
845
        # list is ordered
846
        if server in self._recent_servers:
×
847
            self._recent_servers.remove(server)
×
848
        self._recent_servers.insert(0, server)
×
849
        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
×
850
        self._save_recent_servers()
×
851

852
    async def connection_down(self, interface: Interface):
4✔
853
        '''A connection to server either went down, or was never made.
854
        We distinguish by whether it is in self.interfaces.'''
855
        if not interface: return
×
856
        if interface.server == self.default_server:
×
857
            self._set_status(ConnectionState.DISCONNECTED)
×
858
        await self._close_interface(interface)
×
UNCOV
859
        util.trigger_callback('network_updated')
×
860

861
    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
4✔
UNCOV
862
        if self.config.NETWORK_TIMEOUT:
×
863
            return self.config.NETWORK_TIMEOUT
×
864
        if self.oneserver and not self.auto_connect:
×
865
            return request_type.MOST_RELAXED
×
UNCOV
866
        if self.proxy:
×
867
            return request_type.RELAXED
×
868
        return request_type.NORMAL
×
869

870
    @ignore_exceptions  # do not kill outer taskgroup
4✔
871
    @log_exceptions
4✔
872
    async def _run_new_interface(self, server: ServerAddr):
4✔
873
        if (server in self.interfaces
×
874
                or server in self._connecting_ifaces
875
                or server in self._closing_ifaces):
876
            return
×
877
        self._connecting_ifaces.add(server)
×
UNCOV
878
        if server == self.default_server:
×
UNCOV
879
            self.logger.info(f"connecting to {server} as new interface")
×
UNCOV
880
            self._set_status(ConnectionState.CONNECTING)
×
UNCOV
881
        self._trying_addr_now(server)
×
882

UNCOV
883
        interface = Interface(network=self, server=server)
×
884
        # note: using longer timeouts here as DNS can sometimes be slow!
885
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
886
        try:
×
UNCOV
887
            await util.wait_for2(interface.ready, timeout)
×
UNCOV
888
        except BaseException as e:
×
UNCOV
889
            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
×
890
            await interface.close()
×
UNCOV
891
            return
×
892
        else:
UNCOV
893
            with self.interfaces_lock:
×
894
                assert server not in self.interfaces
×
895
                self.interfaces[server] = interface
×
896
        finally:
897
            self._connecting_ifaces.discard(server)
×
898

899
        if server == self.default_server:
×
900
            await self.switch_to_interface(server)
×
901

902
        self._has_ever_managed_to_connect_to_server = True
×
903
        self._add_recent_server(server)
×
904
        util.trigger_callback('network_updated')
×
905
        # When the proxy settings were set, the proxy (if any) might have been unreachable,
906
        # resulting in a false-negative for Tor-detection. Given we just connected to a server, re-test now.
907
        self._detect_if_proxy_is_tor()
×
908

909
    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
4✔
910
        # main interface is exempt. this makes switching servers easier
911
        if iface_to_check.is_main_server():
×
912
            return True
×
913
        if not iface_to_check.bucket_based_on_ipaddress():
×
914
            return True
×
915
        # bucket connected interfaces
916
        with self.interfaces_lock:
×
917
            interfaces = list(self.interfaces.values())
×
918
        if iface_to_check in interfaces:
×
919
            interfaces.remove(iface_to_check)
×
920
        buckets = defaultdict(list)
×
UNCOV
921
        for iface in interfaces:
×
UNCOV
922
            buckets[iface.bucket_based_on_ipaddress()].append(iface)
×
923
        # check proposed server against buckets
924
        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
×
925
        if iface_to_check.is_tor():
×
926
            # keep number of onion servers below half of all connected servers
927
            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
×
928
                return False
×
929
        else:
930
            bucket = iface_to_check.bucket_based_on_ipaddress()
×
931
            if len(buckets[bucket]) > 0:
×
932
                return False
×
933
        return True
×
934

935
    def best_effort_reliable(func):
4✔
936
        @functools.wraps(func)
4✔
937
        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
4✔
938
            for i in range(10):
×
UNCOV
939
                iface = self.interface
×
940
                # retry until there is a main interface
UNCOV
941
                if not iface:
×
942
                    async with ignore_after(1):
×
UNCOV
943
                        await self.default_server_changed_event.wait()
×
944
                    continue  # try again
×
945
                assert iface.ready.done(), "interface not ready yet"
×
946
                # try actual request
947
                try:
×
948
                    async with OldTaskGroup(wait=any) as group:
×
UNCOV
949
                        task = await group.spawn(func(self, *args, **kwargs))
×
UNCOV
950
                        await group.spawn(iface.got_disconnected.wait())
×
UNCOV
951
                except RequestTimedOut:
×
UNCOV
952
                    await iface.close()
×
953
                    await iface.got_disconnected.wait()
×
954
                    continue  # try again
×
955
                except RequestCorrupted as e:
×
956
                    # TODO ban server?
957
                    iface.logger.exception(f"RequestCorrupted: {e}")
×
UNCOV
958
                    await iface.close()
×
UNCOV
959
                    await iface.got_disconnected.wait()
×
960
                    continue  # try again
×
961
                if task.done() and not task.cancelled():
×
962
                    return task.result()
×
963
                # otherwise; try again
964
            raise BestEffortRequestFailed('cannot establish a connection... gave up.')
×
965
        return make_reliable_wrapper
4✔
966

967
    def catch_server_exceptions(func):
4✔
968
        """Decorator that wraps server errors in UntrustedServerReturnedError,
969
        to avoid showing untrusted arbitrary text to users.
970
        """
971
        @functools.wraps(func)
4✔
972
        async def wrapper(self, *args, **kwargs):
4✔
UNCOV
973
            try:
×
974
                return await func(self, *args, **kwargs)
×
975
            except aiorpcx.jsonrpc.CodeMessageError as e:
×
976
                wrapped_exc = UntrustedServerReturnedError(original_exception=e)
×
977
                # log (sanitized) untrusted error text now, to ease debugging
978
                self.logger.debug(f"got error from server for {func.__qualname__}: {wrapped_exc.get_untrusted_message()!r}")
×
979
                raise wrapped_exc from e
×
980
        return wrapper
4✔
981

982
    @best_effort_reliable
4✔
983
    @catch_server_exceptions
4✔
984
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
4✔
985
        if self.interface is None:  # handled by best_effort_reliable
×
986
            raise RequestTimedOut()
×
987
        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
×
988

989
    @best_effort_reliable
4✔
990
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
4✔
991
        """caller should handle TxBroadcastError"""
992
        if self.interface is None:  # handled by best_effort_reliable
×
993
            raise RequestTimedOut()
×
UNCOV
994
        if timeout is None:
×
995
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
UNCOV
996
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
997
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
998
        try:
×
UNCOV
999
            out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
×
1000
            # note: both 'out' and exception messages are untrusted input from the server
1001
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1002
            raise  # pass-through
×
UNCOV
1003
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
UNCOV
1004
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
1005
            raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e
×
UNCOV
1006
        except BaseException as e:  # intentional BaseException for sanity!
×
UNCOV
1007
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
UNCOV
1008
            send_exception_to_crash_reporter(e)
×
1009
            raise TxBroadcastUnknownError() from e
×
1010
        if out != tx.txid():
×
1011
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1012
                             f"{error_text_str_to_safe_str(out)} != {tx.txid()}")
UNCOV
1013
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1014

1015
    async def try_broadcasting(self, tx, name) -> bool:
4✔
1016
        try:
×
1017
            await self.broadcast_transaction(tx)
×
1018
        except Exception as e:
×
1019
            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
×
1020
            return False
×
1021
        else:
1022
            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
×
1023
            return True
×
1024

1025
    @staticmethod
4✔
1026
    def sanitize_tx_broadcast_response(server_msg) -> str:
4✔
1027
        # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1028
        # So, we use substring matching to grok the error message.
1029
        # server_msg is untrusted input so it should not be shown to the user. see #4968
1030
        server_msg = str(server_msg)
×
1031
        server_msg = server_msg.replace("\n", r"\n")
×
1032

1033
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
UNCOV
1034
        script_error_messages = {
×
1035
            r"Script evaluated without error but finished with a false/empty top stack element",
1036
            r"Script failed an OP_VERIFY operation",
1037
            r"Script failed an OP_EQUALVERIFY operation",
1038
            r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1039
            r"Script failed an OP_CHECKSIGVERIFY operation",
1040
            r"Script failed an OP_NUMEQUALVERIFY operation",
1041
            r"Script is too big",
1042
            r"Push value size limit exceeded",
1043
            r"Operation limit exceeded",
1044
            r"Stack size limit exceeded",
1045
            r"Signature count negative or greater than pubkey count",
1046
            r"Pubkey count negative or limit exceeded",
1047
            r"Opcode missing or not understood",
1048
            r"Attempted to use a disabled opcode",
1049
            r"Operation not valid with the current stack size",
1050
            r"Operation not valid with the current altstack size",
1051
            r"OP_RETURN was encountered",
1052
            r"Invalid OP_IF construction",
1053
            r"Negative locktime",
1054
            r"Locktime requirement not satisfied",
1055
            r"Signature hash type missing or not understood",
1056
            r"Non-canonical DER signature",
1057
            r"Data push larger than necessary",
1058
            r"Only push operators allowed in signatures",
1059
            r"Non-canonical signature: S value is unnecessarily high",
1060
            r"Dummy CHECKMULTISIG argument must be zero",
1061
            r"OP_IF/NOTIF argument must be minimal",
1062
            r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1063
            r"NOPx reserved for soft-fork upgrades",
1064
            r"Witness version reserved for soft-fork upgrades",
1065
            r"Taproot version reserved for soft-fork upgrades",
1066
            r"OP_SUCCESSx reserved for soft-fork upgrades",
1067
            r"Public key version reserved for soft-fork upgrades",
1068
            r"Public key is neither compressed or uncompressed",
1069
            r"Stack size must be exactly one after execution",
1070
            r"Extra items left on stack after execution",
1071
            r"Witness program has incorrect length",
1072
            r"Witness program was passed an empty witness",
1073
            r"Witness program hash mismatch",
1074
            r"Witness requires empty scriptSig",
1075
            r"Witness requires only-redeemscript scriptSig",
1076
            r"Witness provided for non-witness script",
1077
            r"Using non-compressed keys in segwit",
1078
            r"Invalid Schnorr signature size",
1079
            r"Invalid Schnorr signature hash type",
1080
            r"Invalid Schnorr signature",
1081
            r"Invalid Taproot control block size",
1082
            r"Too much signature validation relative to witness weight",
1083
            r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1084
            r"OP_IF/NOTIF argument must be minimal in tapscript",
1085
            r"Using OP_CODESEPARATOR in non-witness script",
1086
            r"Signature is found in scriptCode",
1087
        }
UNCOV
1088
        for substring in script_error_messages:
×
UNCOV
1089
            if substring in server_msg:
×
1090
                return substring
×
1091
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1092
        # grep "REJECT_"
1093
        # grep "TxValidationResult"
1094
        # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1095
        validation_error_messages = {
×
1096
            r"coinbase": None,
1097
            r"tx-size-small": None,
1098
            r"non-final": None,
1099
            r"txn-already-in-mempool": None,
1100
            r"txn-mempool-conflict": None,
1101
            r"txn-already-known": None,
1102
            r"non-BIP68-final": None,
1103
            r"bad-txns-nonstandard-inputs": None,
1104
            r"bad-witness-nonstandard": None,
1105
            r"bad-txns-too-many-sigops": None,
1106
            r"mempool min fee not met":
1107
                ("mempool min fee not met\n" +
1108
                 _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1109
                   "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1110
                   "of transactions that all pay higher fees. Try to increase the fee.")),
1111
            r"min relay fee not met": None,
1112
            r"absurdly-high-fee": None,
1113
            r"max-fee-exceeded": None,
1114
            r"too-long-mempool-chain": None,
1115
            r"bad-txns-spends-conflicting-tx": None,
1116
            r"insufficient fee": ("insufficient fee\n" +
1117
                 _("Your transaction is trying to replace another one in the mempool but it "
1118
                   "does not meet the rules to do so. Try to increase the fee.")),
1119
            r"too many potential replacements": None,
1120
            r"replacement-adds-unconfirmed": None,
1121
            r"mempool full": None,
1122
            r"non-mandatory-script-verify-flag": None,
1123
            r"mandatory-script-verify-flag-failed": None,
1124
            r"Transaction check failed": None,
1125
        }
UNCOV
1126
        for substring in validation_error_messages:
×
UNCOV
1127
            if substring in server_msg:
×
1128
                msg = validation_error_messages[substring]
×
1129
                return msg if msg else substring
×
1130
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1131
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1132
        # grep "RPC_TRANSACTION"
1133
        # grep "RPC_DESERIALIZATION_ERROR"
UNCOV
1134
        rawtransaction_error_messages = {
×
1135
            r"Missing inputs": None,
1136
            r"Inputs missing or spent": None,
1137
            r"transaction already in block chain": None,
1138
            r"Transaction already in block chain": None,
1139
            r"TX decode failed": None,
1140
            r"Peer-to-peer functionality missing or disabled": None,
1141
            r"Transaction rejected by AcceptToMemoryPool": None,
1142
            r"AcceptToMemoryPool failed": None,
1143
            r"Fee exceeds maximum configured by user": None,
1144
        }
UNCOV
1145
        for substring in rawtransaction_error_messages:
×
UNCOV
1146
            if substring in server_msg:
×
UNCOV
1147
                msg = rawtransaction_error_messages[substring]
×
UNCOV
1148
                return msg if msg else substring
×
1149
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1150
        # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1151
        # grep "REJECT_"
1152
        # grep "TxValidationResult"
UNCOV
1153
        tx_verify_error_messages = {
×
1154
            r"bad-txns-vin-empty": None,
1155
            r"bad-txns-vout-empty": None,
1156
            r"bad-txns-oversize": None,
1157
            r"bad-txns-vout-negative": None,
1158
            r"bad-txns-vout-toolarge": None,
1159
            r"bad-txns-txouttotal-toolarge": None,
1160
            r"bad-txns-inputs-duplicate": None,
1161
            r"bad-cb-length": None,
1162
            r"bad-txns-prevout-null": None,
1163
            r"bad-txns-inputs-missingorspent":
1164
                ("bad-txns-inputs-missingorspent\n" +
1165
                 _("You might have a local transaction in your wallet that this transaction "
1166
                   "builds on top. You need to either broadcast or remove the local tx.")),
1167
            r"bad-txns-premature-spend-of-coinbase": None,
1168
            r"bad-txns-inputvalues-outofrange": None,
1169
            r"bad-txns-in-belowout": None,
1170
            r"bad-txns-fee-outofrange": None,
1171
        }
UNCOV
1172
        for substring in tx_verify_error_messages:
×
UNCOV
1173
            if substring in server_msg:
×
UNCOV
1174
                msg = tx_verify_error_messages[substring]
×
UNCOV
1175
                return msg if msg else substring
×
1176
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1177
        # grep "reason ="
1178
        # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1179
        # should come after script_error.cpp (due to e.g. "version")
UNCOV
1180
        policy_error_messages = {
×
1181
            r"version": _("Transaction uses non-standard version."),
1182
            r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1183
            r"scriptsig-size": None,
1184
            r"scriptsig-not-pushonly": None,
1185
            r"scriptpubkey":
1186
                ("scriptpubkey\n" +
1187
                 _("Some of the outputs pay to a non-standard script.")),
1188
            r"bare-multisig": None,
1189
            r"dust":
1190
                (_("Transaction could not be broadcast due to dust outputs.\n"
1191
                   "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1192
                   "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1193
            r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1194
        }
UNCOV
1195
        for substring in policy_error_messages:
×
UNCOV
1196
            if substring in server_msg:
×
UNCOV
1197
                msg = policy_error_messages[substring]
×
UNCOV
1198
                return msg if msg else substring
×
1199
        # otherwise:
UNCOV
1200
        return _("Unknown error")
×
1201

1202
    @best_effort_reliable
4✔
1203
    @catch_server_exceptions
4✔
1204
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
4✔
UNCOV
1205
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1206
            raise RequestTimedOut()
×
UNCOV
1207
        return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
×
1208

1209
    @best_effort_reliable
4✔
1210
    @catch_server_exceptions
4✔
1211
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
4✔
UNCOV
1212
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1213
            raise RequestTimedOut()
×
UNCOV
1214
        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
×
1215

1216
    @best_effort_reliable
4✔
1217
    @catch_server_exceptions
4✔
1218
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
4✔
UNCOV
1219
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1220
            raise RequestTimedOut()
×
UNCOV
1221
        return await self.interface.get_history_for_scripthash(sh)
×
1222

1223
    @best_effort_reliable
4✔
1224
    @catch_server_exceptions
4✔
1225
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
4✔
1226
        if self.interface is None:  # handled by best_effort_reliable
×
1227
            raise RequestTimedOut()
×
UNCOV
1228
        return await self.interface.listunspent_for_scripthash(sh)
×
1229

1230
    @best_effort_reliable
4✔
1231
    @catch_server_exceptions
4✔
1232
    async def get_balance_for_scripthash(self, sh: str) -> dict:
4✔
UNCOV
1233
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1234
            raise RequestTimedOut()
×
UNCOV
1235
        return await self.interface.get_balance_for_scripthash(sh)
×
1236

1237
    @best_effort_reliable
4✔
1238
    @catch_server_exceptions
4✔
1239
    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
4✔
UNCOV
1240
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1241
            raise RequestTimedOut()
×
UNCOV
1242
        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
×
1243

1244
    def blockchain(self) -> Blockchain:
4✔
1245
        interface = self.interface
×
1246
        if interface and interface.blockchain is not None:
×
UNCOV
1247
            self._blockchain = interface.blockchain
×
UNCOV
1248
        return self._blockchain
×
1249

1250
    def get_blockchains(self):
4✔
1251
        out = {}  # blockchain_id -> list(interfaces)
×
UNCOV
1252
        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
×
UNCOV
1253
        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
×
UNCOV
1254
        for chain_id, bc in blockchain_items:
×
UNCOV
1255
            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
×
UNCOV
1256
            if r:
×
UNCOV
1257
                out[chain_id] = r
×
UNCOV
1258
        return out
×
1259

1260
    def _set_preferred_chain(self, chain: Optional[Blockchain]):
4✔
UNCOV
1261
        if chain:
×
UNCOV
1262
            height = chain.get_max_forkpoint()
×
UNCOV
1263
            header_hash = chain.get_hash(height)
×
1264
        else:
UNCOV
1265
            height = 0
×
UNCOV
1266
            header_hash = constants.net.GENESIS
×
UNCOV
1267
        self._blockchain_preferred_block = {
×
1268
            'height': height,
1269
            'hash': header_hash,
1270
        }
1271
        self.config.BLOCKCHAIN_PREFERRED_BLOCK = self._blockchain_preferred_block
×
1272

1273
    async def follow_chain_given_id(self, chain_id: str) -> None:
4✔
UNCOV
1274
        bc = blockchain.blockchains.get(chain_id)
×
UNCOV
1275
        if not bc:
×
UNCOV
1276
            raise Exception('blockchain {} not found'.format(chain_id))
×
UNCOV
1277
        self._set_preferred_chain(bc)
×
1278
        # select server on this chain
UNCOV
1279
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
UNCOV
1280
        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
×
UNCOV
1281
        if len(interfaces_on_selected_chain) == 0: return
×
UNCOV
1282
        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
×
1283
        # switch to server (and save to config)
UNCOV
1284
        net_params = self.get_parameters()
×
UNCOV
1285
        net_params = net_params._replace(server=chosen_iface.server)
×
UNCOV
1286
        await self.set_parameters(net_params)
×
1287

1288
    async def follow_chain_given_server(self, server: ServerAddr) -> None:
4✔
1289
        # note that server_str should correspond to a connected interface
UNCOV
1290
        iface = self.interfaces.get(server)
×
UNCOV
1291
        if iface is None:
×
UNCOV
1292
            return
×
1293
        self._set_preferred_chain(iface.blockchain)
×
1294
        # switch to server (and save to config)
1295
        net_params = self.get_parameters()
×
1296
        net_params = net_params._replace(server=server)
×
UNCOV
1297
        await self.set_parameters(net_params)
×
1298

1299
    def get_server_height(self) -> int:
4✔
1300
        """Length of header chain, as claimed by main interface."""
UNCOV
1301
        interface = self.interface
×
UNCOV
1302
        return interface.tip if interface else 0
×
1303

1304
    def get_local_height(self) -> int:
4✔
1305
        """Length of header chain, POW-verified.
1306
        In case of a chain split, this is for the branch the main interface is on,
1307
        but it is the tip of that branch (even if main interface is behind).
1308
        """
UNCOV
1309
        return self.blockchain().height()
×
1310

1311
    def export_checkpoints(self, path):
4✔
1312
        """Run manually to generate blockchain checkpoints.
1313
        Kept for console use only.
1314
        """
UNCOV
1315
        cp = self.blockchain().get_checkpoints()
×
UNCOV
1316
        with open(path, 'w', encoding='utf-8') as f:
×
1317
            f.write(json.dumps(cp, indent=4))
×
1318

1319
    async def _start(self):
4✔
UNCOV
1320
        assert not self.taskgroup
×
UNCOV
1321
        self.taskgroup = taskgroup = OldTaskGroup()
×
UNCOV
1322
        assert not self.interface and not self.interfaces
×
UNCOV
1323
        assert not self._connecting_ifaces
×
1324
        assert not self._closing_ifaces
×
1325
        self.logger.info('starting network')
×
1326
        self._clear_addr_retry_times()
×
UNCOV
1327
        self._init_parameters_from_config()
×
UNCOV
1328
        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
×
1329

UNCOV
1330
        async def main():
×
1331
            self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1332
            try:
×
1333
                # note: if a task finishes with CancelledError, that
1334
                # will NOT raise, and the group will keep the other tasks running
UNCOV
1335
                async with taskgroup as group:
×
UNCOV
1336
                    await group.spawn(self._maintain_sessions())
×
UNCOV
1337
                    [await group.spawn(job) for job in self._jobs]
×
1338
            except Exception as e:
×
1339
                self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
×
1340
            finally:
UNCOV
1341
                self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
UNCOV
1342
        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
×
1343

1344
        util.trigger_callback('network_updated')
×
1345

1346
    def start(self, jobs: Iterable = None):
4✔
1347
        """Schedule starting the network, along with the given job co-routines.
1348

1349
        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1350
        setting changes.
1351
        """
1352
        self._was_started = True
×
1353
        self._jobs = jobs or []
×
1354
        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
×
1355

1356
    @log_exceptions
4✔
1357
    async def stop(self, *, full_shutdown: bool = True):
4✔
UNCOV
1358
        if not self._was_started:
×
1359
            self.logger.info("not stopping network as it was never started")
×
1360
            return
×
1361
        self.logger.info("stopping network")
×
1362
        # timeout: if full_shutdown, it is up to the caller to time us out,
1363
        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1364
        async with (nullcontext() if full_shutdown else ignore_after(1)):
×
1365
            async with OldTaskGroup() as group:
×
UNCOV
1366
                await group.spawn(self.taskgroup.cancel_remaining())
×
UNCOV
1367
                if full_shutdown:
×
UNCOV
1368
                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
×
1369
        self.taskgroup = None
×
UNCOV
1370
        self.interface = None
×
UNCOV
1371
        self.interfaces = {}
×
1372
        self._connecting_ifaces.clear()
×
1373
        self._closing_ifaces.clear()
×
1374
        if not full_shutdown:
×
1375
            util.trigger_callback('network_updated')
×
1376

1377
    async def _ensure_there_is_a_main_interface(self):
4✔
1378
        if self.interface:
×
1379
            return
×
1380
        # if auto_connect is set, try a different server
UNCOV
1381
        if self.auto_connect and not self.is_connecting():
×
1382
            await self._switch_to_random_interface()
×
1383
        # if auto_connect is not set, or still no main interface, retry current
1384
        if not self.interface and not self.is_connecting():
×
UNCOV
1385
            if self._can_retry_addr(self.default_server, urgent=True):
×
UNCOV
1386
                await self.switch_to_interface(self.default_server)
×
1387

1388
    async def _maintain_sessions(self):
4✔
1389
        async def maybe_start_new_interfaces():
×
1390
            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
×
1391
            for i in range(self.num_server - num_existing_ifaces):
×
1392
                # FIXME this should try to honour "healthy spread of connected servers"
1393
                server = self._get_next_server_to_try()
×
1394
                if server:
×
1395
                    await self.taskgroup.spawn(self._run_new_interface(server))
×
UNCOV
1396
        async def maintain_healthy_spread_of_connected_servers():
×
UNCOV
1397
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
UNCOV
1398
            random.shuffle(interfaces)
×
1399
            for iface in interfaces:
×
1400
                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
×
UNCOV
1401
                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
×
1402
                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
UNCOV
1403
                    await self._close_interface(iface)
×
UNCOV
1404
        async def maintain_main_interface():
×
UNCOV
1405
            await self._ensure_there_is_a_main_interface()
×
UNCOV
1406
            if self.is_connected():
×
1407
                if self.config.is_fee_estimates_update_required():
×
UNCOV
1408
                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
×
1409

UNCOV
1410
        while True:
×
UNCOV
1411
            await maybe_start_new_interfaces()
×
UNCOV
1412
            await maintain_healthy_spread_of_connected_servers()
×
1413
            await maintain_main_interface()
×
1414
            await asyncio.sleep(0.1)
×
1415

1416
    @classmethod
4✔
1417
    async def async_send_http_on_proxy(
4✔
1418
            cls, method: str, url: str, *,
1419
            params: dict = None,
1420
            body: bytes = None,
1421
            json: dict = None,
1422
            headers=None,
1423
            on_finish=None,
1424
            timeout=None,
1425
    ):
1426
        async def default_on_finish(resp: ClientResponse):
×
UNCOV
1427
            resp.raise_for_status()
×
1428
            return await resp.text()
×
1429
        if headers is None:
×
1430
            headers = {}
×
UNCOV
1431
        if on_finish is None:
×
UNCOV
1432
            on_finish = default_on_finish
×
1433
        network = cls.get_instance()
×
1434
        proxy = network.proxy if network else None
×
1435
        async with make_aiohttp_session(proxy, timeout=timeout) as session:
×
1436
            if method == 'get':
×
1437
                async with session.get(url, params=params, headers=headers) as resp:
×
UNCOV
1438
                    return await on_finish(resp)
×
1439
            elif method == 'post':
×
1440
                assert body is not None or json is not None, 'body or json must be supplied if method is post'
×
UNCOV
1441
                if body is not None:
×
1442
                    async with session.post(url, data=body, headers=headers) as resp:
×
UNCOV
1443
                        return await on_finish(resp)
×
UNCOV
1444
                elif json is not None:
×
UNCOV
1445
                    async with session.post(url, json=json, headers=headers) as resp:
×
UNCOV
1446
                        return await on_finish(resp)
×
1447
            else:
UNCOV
1448
                raise Exception(f"unexpected {method=!r}")
×
1449

1450
    @classmethod
4✔
1451
    def send_http_on_proxy(cls, method, url, **kwargs):
4✔
1452
        loop = util.get_asyncio_loop()
×
UNCOV
1453
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
UNCOV
1454
        coro = asyncio.run_coroutine_threadsafe(cls.async_send_http_on_proxy(method, url, **kwargs), loop)
×
1455
        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1456
        return coro.result()
×
1457

1458
    # methods used in scripts
1459
    async def get_peers(self):
4✔
UNCOV
1460
        while not self.is_connected():
×
UNCOV
1461
            await asyncio.sleep(1)
×
1462
        session = self.interface.session
×
1463
        return parse_servers(await session.send_request('server.peers.subscribe'))
×
1464

1465
    async def send_multiple_requests(
4✔
1466
            self,
1467
            servers: Sequence[ServerAddr],
1468
            method: str,
1469
            params: Sequence,
1470
            *,
1471
            timeout: int = None,
1472
    ):
1473
        if timeout is None:
×
UNCOV
1474
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
UNCOV
1475
        responses = dict()
×
1476
        async def get_response(server: ServerAddr):
×
1477
            interface = Interface(network=self, server=server)
×
UNCOV
1478
            try:
×
1479
                await util.wait_for2(interface.ready, timeout)
×
1480
            except BaseException as e:
×
UNCOV
1481
                await interface.close()
×
1482
                return
×
1483
            try:
×
1484
                res = await interface.session.send_request(method, params, timeout=10)
×
UNCOV
1485
            except Exception as e:
×
UNCOV
1486
                res = e
×
1487
            responses[interface.server] = res
×
1488
        async with OldTaskGroup() as group:
×
1489
            for server in servers:
×
UNCOV
1490
                await group.spawn(get_response(server))
×
1491
        return responses
×
1492

1493
    async def prune_offline_servers(self, hostmap):
4✔
1494
        peers = filter_protocol(hostmap, allowed_protocols=("t", "s",))
×
1495
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
1496
        replies = await self.send_multiple_requests(peers, 'blockchain.headers.subscribe', [], timeout=timeout)
×
1497
        servers_replied = {serveraddr.host for serveraddr in replies.keys()}
×
1498
        servers_dict = {k: v for k, v in hostmap.items()
×
1499
                        if k in servers_replied}
UNCOV
1500
        return servers_dict
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc