• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4824330053353472

05 Mar 2025 03:01PM UTC coverage: 60.374% (-0.008%) from 60.382%
4824330053353472

push

CirrusCI

SomberNight
Merge branch 'pr/9507': qt: refactor NetworkChoiceLayout to ProxyWidget+ServerWidget

ref https://github.com/spesmilo/electrum/pull/9507

53 of 146 new or added lines in 5 files covered. (36.3%)

19 existing lines in 4 files now uncovered.

20815 of 34477 relevant lines covered (60.37%)

3.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.0
/electrum/network.py
1
# Electrum - Lightweight Bitcoin Client
2
# Copyright (c) 2011-2016 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import asyncio
5✔
24
import time
5✔
25
import queue
5✔
26
import os
5✔
27
import random
5✔
28
import re
5✔
29
from collections import defaultdict
5✔
30
import threading
5✔
31
import socket
5✔
32
import json
5✔
33
import sys
5✔
34
from typing import (
5✔
35
    NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any, TypeVar,
36
    Callable
37
)
38
import traceback
5✔
39
import concurrent
5✔
40
from concurrent import futures
5✔
41
import copy
5✔
42
import functools
5✔
43
from enum import IntEnum
5✔
44
from contextlib import nullcontext
5✔
45

46
import aiorpcx
5✔
47
from aiorpcx import ignore_after, NetAddress
5✔
48
from aiohttp import ClientResponse
5✔
49

50
from . import util
5✔
51
from .util import (log_exceptions, ignore_exceptions, OldTaskGroup,
5✔
52
                   bfh, make_aiohttp_session, send_exception_to_crash_reporter,
53
                   is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
54
                   error_text_str_to_safe_str, detect_tor_socks_proxy)
55
from .bitcoin import COIN, DummyAddress, DummyAddressUsedInTxException
5✔
56
from . import constants
5✔
57
from . import blockchain
5✔
58
from . import bitcoin
5✔
59
from . import dns_hacks
5✔
60
from .transaction import Transaction
5✔
61
from .blockchain import Blockchain, HEADER_SIZE
5✔
62
from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
5✔
63
                        RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
64
                        NetworkException, RequestCorrupted, ServerAddr)
65
from .version import PROTOCOL_VERSION
5✔
66
from .i18n import _
5✔
67
from .logging import get_logger, Logger
5✔
68
from .fee_policy import FeeHistogram, FeeTimeEstimates, FEE_ETA_TARGETS
5✔
69

70

71
if TYPE_CHECKING:
5✔
72
    from collections.abc import Coroutine
×
73

74
    from .channel_db import ChannelDB
×
75
    from .lnrouter import LNPathFinder
×
76
    from .lnworker import LNGossip
×
77
    #from .lnwatcher import WatchTower
78
    from .daemon import Daemon
×
79
    from .simple_config import SimpleConfig
×
80

81

82
_logger = get_logger(__name__)
5✔
83

84

85
NUM_TARGET_CONNECTED_SERVERS = 10
5✔
86
NUM_STICKY_SERVERS = 4
5✔
87
NUM_RECENT_SERVERS = 20
5✔
88

89
T = TypeVar('T')
5✔
90

91

92
class ConnectionState(IntEnum):
5✔
93
    DISCONNECTED  = 0
5✔
94
    CONNECTING    = 1
5✔
95
    CONNECTED     = 2
5✔
96

97

98
def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
5✔
99
    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
100
    Also validate values, such as IP addresses and ports.
101
    """
102
    servers = {}
×
103
    for item in result:
×
104
        host = item[1]
×
105
        out = {}
×
106
        version = None
×
107
        pruning_level = '-'
×
108
        if len(item) > 2:
×
109
            for v in item[2]:
×
110
                if re.match(r"[st]\d*", v):
×
111
                    protocol, port = v[0], v[1:]
×
112
                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
×
113
                    ServerAddr(host, port, protocol=protocol)  # check if raises
×
114
                    out[protocol] = port
×
115
                elif re.match("v(.?)+", v):
×
116
                    version = v[1:]
×
117
                elif re.match(r"p\d*", v):
×
118
                    pruning_level = v[1:]
×
119
                if pruning_level == '': pruning_level = '0'
×
120
        if out:
×
121
            out['pruning'] = pruning_level
×
122
            out['version'] = version
×
123
            servers[host] = out
×
124
    return servers
×
125

126

127
def filter_version(servers):
5✔
128
    def is_recent(version):
×
129
        try:
×
130
            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
×
131
        except Exception as e:
×
132
            return False
×
133
    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
×
134

135

136
def filter_noonion(servers):
5✔
137
    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
×
138

139

140
def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
5✔
141
    """Filters the hostmap for those implementing protocol."""
142
    if allowed_protocols is None:
×
143
        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
144
    eligible = []
×
145
    for host, portmap in hostmap.items():
×
146
        for protocol in allowed_protocols:
×
147
            port = portmap.get(protocol)
×
148
            if port:
×
149
                eligible.append(ServerAddr(host, port, protocol=protocol))
×
150
    return eligible
×
151

152

153
def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
5✔
154
                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
155
    if hostmap is None:
×
156
        hostmap = constants.net.DEFAULT_SERVERS
×
157
    if exclude_set is None:
×
158
        exclude_set = set()
×
159
    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
×
160
    eligible = list(servers - exclude_set)
×
161
    return random.choice(eligible) if eligible else None
×
162

163

164
def is_valid_port(ps: str):
5✔
NEW
165
    try:
×
NEW
166
        return 0 < int(ps) < 65535
×
NEW
167
    except ValueError:
×
NEW
168
        return False
×
169

170

171
def is_valid_host(ph: str):
5✔
NEW
172
    try:
×
NEW
173
        NetAddress(ph, '1')
×
NEW
174
    except ValueError:
×
NEW
175
        return False
×
NEW
176
    return True
×
177

178

179
class ProxySettings:
5✔
180
    MODES = ['socks4', 'socks5']
5✔
181

182
    probe_fut = None
5✔
183

184
    def __init__(self):
5✔
185
        self.enabled = False
5✔
186
        self.mode = 'socks5'
5✔
187
        self.host = ''
5✔
188
        self.port = ''
5✔
189
        self.user = None
5✔
190
        self.password = None
5✔
191

192
    def set_defaults(self):
5✔
NEW
193
        self.__init__()  # call __init__ for default values
×
194

195
    def serialize_proxy_cfgstr(self):
5✔
NEW
196
        return ':'.join([self.mode, self.host, self.port])
×
197

198
    def deserialize_proxy_cfgstr(self, s: Optional[str], user: str = None, password: str = None) -> None:
5✔
NEW
199
        if s is None or (isinstance(s, str) and s.lower() == 'none'):
×
NEW
200
            self.set_defaults()
×
NEW
201
            self.user = user
×
NEW
202
            self.password = password
×
NEW
203
            return
×
204

NEW
205
        if not isinstance(s, str):
×
NEW
206
            raise ValueError('proxy config not a string')
×
207

NEW
208
        args = s.split(':')
×
NEW
209
        if args[0] in ProxySettings.MODES:
×
NEW
210
            self.mode = args[0]
×
NEW
211
            args = args[1:]
×
212

213
        # detect migrate from old settings
NEW
214
        if len(args) == 4 and is_valid_host(args[0]) and is_valid_port(args[1]):  # host:port:user:pass,
×
NEW
215
            self.host = args[0]
×
NEW
216
            self.port = args[1]
×
NEW
217
            self.user = args[2]
×
NEW
218
            self.password = args[3]
×
219
        else:
NEW
220
            self.host = ':'.join(args[:-1])
×
NEW
221
            self.port = args[-1]
×
NEW
222
            self.user = user
×
NEW
223
            self.password = password
×
224

NEW
225
        if not is_valid_host(self.host) or not is_valid_port(self.port):
×
NEW
226
            self.enabled = False
×
227

228
    def to_dict(self):
5✔
NEW
229
        return {
×
230
            'enabled': self.enabled,
231
            'mode': self.mode,
232
            'host': self.host,
233
            'port': self.port,
234
            'user': self.user,
235
            'password': self.password
236
        }
237

238
    @classmethod
5✔
239
    def from_config(cls, config: 'SimpleConfig') -> 'ProxySettings':
5✔
NEW
240
        proxy = ProxySettings()
×
NEW
241
        proxy.deserialize_proxy_cfgstr(
×
242
            config.NETWORK_PROXY, config.NETWORK_PROXY_USER, config.NETWORK_PROXY_PASSWORD
243
        )
NEW
244
        proxy.enabled = config.NETWORK_PROXY_ENABLED
×
UNCOV
245
        return proxy
×
246

247
    @classmethod
5✔
248
    def from_dict(cls, d: dict) -> 'ProxySettings':
5✔
249
        proxy = ProxySettings()
5✔
250
        proxy.enabled = d.get('enabled', proxy.enabled)
5✔
251
        proxy.mode = d.get('mode', proxy.mode)
5✔
252
        proxy.host = d.get('host', proxy.host)
5✔
253
        proxy.port = d.get('port', proxy.port)
5✔
254
        proxy.user = d.get('user', proxy.user)
5✔
255
        proxy.password = d.get('password', proxy.password)
5✔
256
        return proxy
5✔
257

258
    @classmethod
5✔
259
    def probe_tor(cls, on_finished: Callable[[str | None, int | None], None]):
5✔
NEW
260
        async def detect_task(finished: Callable[[str | None, int | None], None]):
×
NEW
261
            net_addr = await detect_tor_socks_proxy()
×
NEW
262
            if net_addr is None:
×
NEW
263
                finished('', -1)
×
264
            else:
NEW
265
                host = net_addr[0]
×
NEW
266
                port = net_addr[1]
×
NEW
267
                finished(host, port)
×
NEW
268
            cls.probe_fut = None
×
269

NEW
270
        if cls.probe_fut:  # one probe at a time
×
NEW
271
            return
×
NEW
272
        cls.probe_fut = asyncio.run_coroutine_threadsafe(detect_task(on_finished), util.get_asyncio_loop())
×
273

274
    def __eq__(self, other):
5✔
275
        return self.enabled == other.enabled \
5✔
276
            and self.mode == other.mode \
277
            and self.host == other.host \
278
            and self.port == other.port \
279
            and self.user == other.user \
280
            and self.password == other.password
281

282
    def __str__(self):
5✔
NEW
283
        return f'{self.enabled=} {self.mode=} {self.host=} {self.port=} {self.user=}'
×
284

285

286
class NetworkParameters(NamedTuple):
5✔
287
    server: ServerAddr
5✔
288
    proxy: ProxySettings
5✔
289
    auto_connect: bool
5✔
290
    oneserver: bool = False
5✔
291

292

293
class BestEffortRequestFailed(NetworkException): pass
5✔
294

295

296
class TxBroadcastError(NetworkException):
5✔
297
    def get_message_for_gui(self):
5✔
298
        raise NotImplementedError()
×
299

300

301
class TxBroadcastHashMismatch(TxBroadcastError):
5✔
302
    def get_message_for_gui(self):
5✔
303
        return "{}\n{}\n\n{}" \
×
304
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
305
                    _("Consider trying to connect to a different server, or updating Electrum."),
306
                    str(self))
307

308

309
class TxBroadcastServerReturnedError(TxBroadcastError):
5✔
310
    def get_message_for_gui(self):
5✔
311
        return "{}\n{}\n\n{}" \
×
312
            .format(_("The server returned an error when broadcasting the transaction."),
313
                    _("Consider trying to connect to a different server, or updating Electrum."),
314
                    str(self))
315

316

317
class TxBroadcastUnknownError(TxBroadcastError):
5✔
318
    def get_message_for_gui(self):
5✔
319
        return "{}\n{}" \
×
320
            .format(_("Unknown error when broadcasting the transaction."),
321
                    _("Consider trying to connect to a different server, or updating Electrum."))
322

323

324
class UntrustedServerReturnedError(NetworkException):
5✔
325
    def __init__(self, *, original_exception):
5✔
326
        self.original_exception = original_exception
×
327

328
    def get_message_for_gui(self) -> str:
5✔
329
        return str(self)
×
330

331
    def get_untrusted_message(self) -> str:
5✔
332
        e = self.original_exception
×
333
        return (f"<UntrustedServerReturnedError "
×
334
                f"[DO NOT TRUST THIS MESSAGE] original_exception: {error_text_str_to_safe_str(repr(e))}>")
335

336
    def __str__(self):
5✔
337
        # We should not show the untrusted text from self.original_exception,
338
        # to avoid accidentally showing it in the GUI.
339
        return _("The server returned an error.")
×
340

341
    def __repr__(self):
5✔
342
        # We should not show the untrusted text from self.original_exception,
343
        # to avoid accidentally showing it in the GUI.
344
        return f"<UntrustedServerReturnedError {str(self)!r}>"
×
345

346

347
_INSTANCE = None
5✔
348

349

350
class Network(Logger, NetworkRetryManager[ServerAddr]):
5✔
351
    """The Network class manages a set of connections to remote electrum
352
    servers, each connected socket is handled by an Interface() object.
353
    """
354

355
    LOGGING_SHORTCUT = 'n'
5✔
356

357
    taskgroup: Optional[OldTaskGroup]
5✔
358
    interface: Optional[Interface]
5✔
359
    interfaces: Dict[ServerAddr, Interface]
5✔
360
    _connecting_ifaces: Set[ServerAddr]
5✔
361
    _closing_ifaces: Set[ServerAddr]
5✔
362
    default_server: ServerAddr
5✔
363
    _recent_servers: List[ServerAddr]
5✔
364

365
    channel_db: Optional['ChannelDB'] = None
5✔
366
    lngossip: Optional['LNGossip'] = None
5✔
367
    path_finder: Optional['LNPathFinder'] = None
5✔
368

369
    def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
5✔
370
        global _INSTANCE
371
        assert _INSTANCE is None, "Network is a singleton!"
×
372
        _INSTANCE = self
×
373

374
        Logger.__init__(self)
×
375
        NetworkRetryManager.__init__(
×
376
            self,
377
            max_retry_delay_normal=600,
378
            init_retry_delay_normal=15,
379
            max_retry_delay_urgent=10,
380
            init_retry_delay_urgent=1,
381
        )
382

383
        self.asyncio_loop = util.get_asyncio_loop()
×
384
        assert self.asyncio_loop.is_running(), "event loop not running"
×
385

386
        self.config = config
×
387
        self.daemon = daemon
×
388

389
        blockchain.read_blockchains(self.config)
×
390
        blockchain.init_headers_file_for_best_chain()
×
391
        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
×
392
        self._blockchain_preferred_block = self.config.BLOCKCHAIN_PREFERRED_BLOCK  # type: Dict[str, Any]
×
393
        if self._blockchain_preferred_block is None:
×
394
            self._set_preferred_chain(None)
×
395
        self._blockchain = blockchain.get_best_chain()
×
396

397
        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
398

NEW
399
        self.proxy = ProxySettings()
×
400
        self.is_proxy_tor = None
×
401
        self._init_parameters_from_config()
×
402

403
        self.taskgroup = None
×
404

405
        # locks
406
        self.restart_lock = asyncio.Lock()
×
407
        self.bhi_lock = asyncio.Lock()
×
408
        self.recent_servers_lock = threading.RLock()       # <- re-entrant
×
409
        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
×
410

411
        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
×
412
        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
×
413

414
        self.banner = ''
×
415
        self.donation_address = ''
×
416
        self.relay_fee = None  # type: Optional[int]
×
417

418
        dir_path = os.path.join(self.config.path, 'certs')
×
419
        util.make_dir(dir_path)
×
420

421
        # the main server we are currently communicating with
422
        self.interface = None
×
423
        self.default_server_changed_event = asyncio.Event()
×
424
        # Set of servers we have an ongoing connection with.
425
        # For any ServerAddr, at most one corresponding Interface object
426
        # can exist at any given time. Depending on the state of that Interface,
427
        # the ServerAddr can be found in one of the following sets.
428
        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
429
        self._connecting_ifaces = set()
×
430
        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
×
431
        self._closing_ifaces = set()
×
432

433
        # Dump network messages (all interfaces).  Set at runtime from the console.
434
        self.debug = False
×
435

436
        self._set_status(ConnectionState.DISCONNECTED)
×
437
        self._has_ever_managed_to_connect_to_server = False
×
438
        self._was_started = False
×
439

440
        self.mempool_fees = FeeHistogram()
×
441
        self.fee_estimates = FeeTimeEstimates()
×
442
        self.last_time_fee_estimates_requested = 0  # zero ensures immediate fees
×
443

444

445
    def has_internet_connection(self) -> bool:
5✔
446
        """Our guess whether the device has Internet-connectivity."""
447
        return self._has_ever_managed_to_connect_to_server
×
448

449
    def has_channel_db(self):
5✔
450
        return self.channel_db is not None
×
451

452
    def start_gossip(self):
5✔
453
        from . import lnrouter
×
454
        from . import channel_db
×
455
        from . import lnworker
×
456
        if not self.config.LIGHTNING_USE_GOSSIP:
×
457
            return
×
458
        if self.lngossip is None:
×
459
            self.channel_db = channel_db.ChannelDB(self)
×
460
            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
×
461
            self.channel_db.load_data()
×
462
            self.lngossip = lnworker.LNGossip(self.config)
×
463
            self.lngossip.start_network(self)
×
464

465
    async def stop_gossip(self, *, full_shutdown: bool = False):
5✔
466
        if self.lngossip:
×
467
            await self.lngossip.stop()
×
468
            self.lngossip = None
×
469
            self.channel_db.stop()
×
470
            if full_shutdown:
×
471
                await self.channel_db.stopped_event.wait()
×
472
            self.channel_db = None
×
473
            self.path_finder = None
×
474

475
    @classmethod
5✔
476
    def run_from_another_thread(cls, coro: 'Coroutine[Any, Any, T]', *, timeout=None) -> T:
5✔
477
        loop = util.get_asyncio_loop()
×
478
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
479
        fut = asyncio.run_coroutine_threadsafe(coro, loop)
×
480
        return fut.result(timeout)
×
481

482
    @staticmethod
5✔
483
    def get_instance() -> Optional["Network"]:
5✔
484
        """Return the global singleton network instance.
485
        Note that this can return None! If we are run with the --offline flag, there is no network.
486
        """
487
        return _INSTANCE
×
488

489
    def with_recent_servers_lock(func):
5✔
490
        def func_wrapper(self, *args, **kwargs):
5✔
491
            with self.recent_servers_lock:
×
492
                return func(self, *args, **kwargs)
×
493
        return func_wrapper
5✔
494

495
    def _read_recent_servers(self) -> List[ServerAddr]:
5✔
496
        if not self.config.path:
×
497
            return []
×
498
        path = os.path.join(self.config.path, "recent_servers")
×
499
        try:
×
500
            with open(path, "r", encoding='utf-8') as f:
×
501
                data = f.read()
×
502
                servers_list = json.loads(data)
×
503
            return [ServerAddr.from_str(s) for s in servers_list]
×
504
        except Exception:
×
505
            return []
×
506

507
    @with_recent_servers_lock
5✔
508
    def _save_recent_servers(self):
5✔
509
        if not self.config.path:
×
510
            return
×
511
        path = os.path.join(self.config.path, "recent_servers")
×
512
        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
×
513
        try:
×
514
            with open(path, "w", encoding='utf-8') as f:
×
515
                f.write(s)
×
516
        except Exception:
×
517
            pass
×
518

519
    async def _server_is_lagging(self) -> bool:
5✔
520
        sh = self.get_server_height()
×
521
        if not sh:
×
522
            self.logger.info('no height for main interface')
×
523
            return True
×
524
        lh = self.get_local_height()
×
525
        result = (lh - sh) > 1
×
526
        if result:
×
527
            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
×
528
        return result
×
529

530
    def _set_status(self, status):
5✔
531
        self.connection_status = status
×
532
        util.trigger_callback('status')
×
533

534
    def is_connected(self):
5✔
535
        interface = self.interface
×
536
        return interface is not None and interface.is_connected_and_ready()
×
537

538
    def is_connecting(self):
5✔
539
        return self.connection_status == ConnectionState.CONNECTING
×
540

541
    def get_connection_status_for_GUI(self):
5✔
542
        ConnectionStates = {
×
543
            ConnectionState.DISCONNECTED: _('Disconnected'),
544
            ConnectionState.CONNECTING: _('Connecting'),
545
            ConnectionState.CONNECTED: _('Connected'),
546
        }
547
        return ConnectionStates[self.connection_status]
×
548

549
    async def _request_server_info(self, interface: 'Interface'):
5✔
550
        await interface.ready
×
551
        session = interface.session
×
552

553
        async def get_banner():
×
554
            self.banner = await interface.get_server_banner()
×
555
            util.trigger_callback('banner', self.banner)
×
556
        async def get_donation_address():
×
557
            self.donation_address = await interface.get_donation_address()
×
558
        async def get_server_peers():
×
559
            server_peers = await session.send_request('server.peers.subscribe')
×
560
            random.shuffle(server_peers)
×
561
            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
×
562
            server_peers = server_peers[:max_accepted_peers]
×
563
            # note that 'parse_servers' also validates the data (which is untrusted input!)
564
            self.server_peers = parse_servers(server_peers)
×
565
            util.trigger_callback('servers', self.get_servers())
×
566
        async def get_relay_fee():
×
567
            self.relay_fee = await interface.get_relay_fee()
×
568

569
        async with OldTaskGroup() as group:
×
570
            await group.spawn(get_banner)
×
571
            await group.spawn(get_donation_address)
×
572
            await group.spawn(get_server_peers)
×
573
            await group.spawn(get_relay_fee)
×
574
            await group.spawn(self._request_fee_estimates(interface))
×
575

576
    async def _request_fee_estimates(self, interface):
5✔
577
        self.requested_fee_estimates()
×
578
        histogram = await interface.get_fee_histogram()
×
579
        self.mempool_fees.set_data(histogram)
×
580
        self.logger.info(f'fee_histogram {len(histogram)}')
×
581
        util.trigger_callback('fee_histogram', self.mempool_fees)
×
582

583
    def is_fee_estimates_update_required(self):
5✔
584
        """Checks time since last requested and updated fee estimates.
585
        Returns True if an update should be requested.
586
        """
587
        now = time.time()
×
588
        return now - self.last_time_fee_estimates_requested > 60
×
589

590
    def has_fee_etas(self):
5✔
591
        return self.fee_estimates.has_data()
×
592

593
    def has_fee_mempool(self) -> bool:
5✔
594
        return self.mempool_fees.has_data()
×
595

596
    def requested_fee_estimates(self):
5✔
597
        self.last_time_fee_estimates_requested = time.time()
×
598

599
    def get_parameters(self) -> NetworkParameters:
5✔
600
        return NetworkParameters(server=self.default_server,
×
601
                                 proxy=self.proxy,
602
                                 auto_connect=self.auto_connect,
603
                                 oneserver=self.oneserver)
604

605
    def _init_parameters_from_config(self) -> None:
5✔
606
        dns_hacks.configure_dns_resolver()
×
607
        self.auto_connect = self.config.NETWORK_AUTO_CONNECT
×
608
        self._set_default_server()
×
NEW
609
        self._set_proxy(ProxySettings.from_config(self.config))
×
UNCOV
610
        self._maybe_set_oneserver()
×
611

612
    def get_donation_address(self):
5✔
613
        if self.is_connected():
×
614
            return self.donation_address
×
615

616
    def get_interfaces(self) -> List[ServerAddr]:
5✔
617
        """The list of servers for the connected interfaces."""
618
        with self.interfaces_lock:
×
619
            return list(self.interfaces)
×
620

621
    def get_status(self):
5✔
622
        n = len(self.get_interfaces())
×
623
        return _("Connected to {0} nodes.").format(n) if n > 1 else _("Connected to {0} node.").format(n) if n == 1 else _("Not connected")
×
624

625
    def get_fee_estimates(self):
5✔
626
        from statistics import median
×
627
        if self.auto_connect:
×
628
            with self.interfaces_lock:
×
629
                out = {}
×
630
                for n in FEE_ETA_TARGETS[0:-1]:
×
631
                    try:
×
632
                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
×
633
                    except Exception:
×
634
                        continue
×
635
                return out
×
636
        else:
637
            if not self.interface:
×
638
                return {}
×
639
            return self.interface.fee_estimates_eta
×
640

641
    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
5✔
642
        if fee_est is None:
×
643
            fee_est = self.get_fee_estimates()
×
644
        for nblock_target, fee in fee_est.items():
×
645
            self.fee_estimates.set_data(nblock_target, fee)
×
646
        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
×
647
            self._prev_fee_est = copy.copy(fee_est)
×
648
            self.logger.info(f'fee_estimates {fee_est}')
×
649
        util.trigger_callback('fee', self.fee_estimates)
×
650

651

652
    @with_recent_servers_lock
5✔
653
    def get_servers(self):
5✔
654
        # note: order of sources when adding servers here is crucial!
655
        # don't let "server_peers" overwrite anything,
656
        # otherwise main server can eclipse the client
657
        out = dict()
×
658
        # add servers received from main interface
659
        server_peers = self.server_peers
×
660
        if server_peers:
×
661
            out.update(filter_version(server_peers.copy()))
×
662
        # hardcoded servers
663
        out.update(constants.net.DEFAULT_SERVERS)
×
664
        # add recent servers
665
        for server in self._recent_servers:
×
666
            port = str(server.port)
×
667
            if server.host in out:
×
668
                out[server.host].update({server.protocol: port})
×
669
            else:
670
                out[server.host] = {server.protocol: port}
×
671
        # add bookmarks
672
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
673
        for server_str in bookmarks:
×
674
            try:
×
675
                server = ServerAddr.from_str(server_str)
×
676
            except ValueError:
×
677
                continue
×
678
            port = str(server.port)
×
679
            if server.host in out:
×
680
                out[server.host].update({server.protocol: port})
×
681
            else:
682
                out[server.host] = {server.protocol: port}
×
683
        # potentially filter out some
684
        if self.config.NETWORK_NOONION:
×
685
            out = filter_noonion(out)
×
686
        return out
×
687

688
    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
5✔
689
        now = time.time()
×
690
        with self.interfaces_lock:
×
691
            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
×
692
        # First try from recent servers. (which are persisted)
693
        # As these are servers we successfully connected to recently, they are
694
        # most likely to work. This also makes servers "sticky".
695
        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
696
        #       however if they succeed, the eclipsing would persist. To try to balance this,
697
        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
698
        with self.recent_servers_lock:
×
699
            recent_servers = list(self._recent_servers)
×
700
        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
×
701
        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
×
702
            for server in recent_servers:
×
703
                if server in connected_servers:
×
704
                    continue
×
705
                if not self._can_retry_addr(server, now=now):
×
706
                    continue
×
707
                return server
×
708
        # try all servers we know about, pick one at random
709
        hostmap = self.get_servers()
×
710
        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
×
711
        random.shuffle(servers)
×
712
        for server in servers:
×
713
            if not self._can_retry_addr(server, now=now):
×
714
                continue
×
715
            return server
×
716
        return None
×
717

718
    def _set_default_server(self) -> None:
5✔
719
        # Server for addresses and transactions
720
        server = self.config.NETWORK_SERVER
×
721
        # Sanitize default server
722
        if server:
×
723
            try:
×
724
                self.default_server = ServerAddr.from_str(server)
×
725
            except Exception:
×
726
                self.logger.warning(f'failed to parse server-string ({server!r}); falling back to localhost:1:s.')
×
727
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
728
        else:
729
            self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
×
730
        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
×
731

732
    def _set_proxy(self, proxy: ProxySettings):
5✔
733
        if self.proxy == proxy:
×
734
            return
×
735

736
        self.logger.info(f'setting proxy {proxy}')
×
737
        self.proxy = proxy
×
738

739
        # reset is_proxy_tor to unknown, and re-detect it:
740
        self.is_proxy_tor = None
×
741
        self._detect_if_proxy_is_tor()
×
742

743
        util.trigger_callback('proxy_set', self.proxy)
×
744

745
    def _detect_if_proxy_is_tor(self) -> None:
5✔
NEW
746
        async def tor_probe_task(p):
×
747
            assert p is not None
×
NEW
748
            is_tor = await util.is_tor_socks_port(p.host, int(p.port))
×
749
            if self.proxy == p:  # is this the proxy we probed?
×
750
                if self.is_proxy_tor != is_tor:
×
751
                    self.logger.info(f'Proxy is {"" if is_tor else "not "}TOR')
×
752
                    self.is_proxy_tor = is_tor
×
753
                util.trigger_callback('tor_probed', is_tor)
×
754

755
        proxy = self.proxy
×
NEW
756
        if proxy and proxy.enabled and proxy.mode == 'socks5':
×
757
            # FIXME GC issues? do we need to store the Future?
NEW
758
            asyncio.run_coroutine_threadsafe(tor_probe_task(proxy), self.asyncio_loop)
×
759

760
    @log_exceptions
5✔
761
    async def set_parameters(self, net_params: NetworkParameters):
5✔
762
        proxy = net_params.proxy
×
NEW
763
        proxy_str = proxy.serialize_proxy_cfgstr()
×
NEW
764
        proxy_enabled = proxy.enabled
×
NEW
765
        proxy_user = proxy.user
×
NEW
766
        proxy_pass = proxy.password
×
UNCOV
767
        server = net_params.server
×
768
        # sanitize parameters
769
        try:
×
770
            if proxy:
×
771
                # proxy_modes.index(proxy['mode']) + 1
NEW
772
                ProxySettings.MODES.index(proxy.mode) + 1
×
773
                # int(proxy['port'])
NEW
774
                int(proxy.port)
×
775
        except Exception:
×
NEW
776
            proxy.enabled = False
×
777
            # return
778
        self.config.NETWORK_AUTO_CONNECT = net_params.auto_connect
×
779
        self.config.NETWORK_ONESERVER = net_params.oneserver
×
NEW
780
        self.config.NETWORK_PROXY_ENABLED = proxy_enabled
×
781
        self.config.NETWORK_PROXY = proxy_str
×
782
        self.config.NETWORK_PROXY_USER = proxy_user
×
783
        self.config.NETWORK_PROXY_PASSWORD = proxy_pass
×
784
        self.config.NETWORK_SERVER = str(server)
×
785
        # abort if changes were not allowed by config
786
        if self.config.NETWORK_SERVER != str(server) \
×
787
                or self.config.NETWORK_PROXY_ENABLED != proxy_enabled \
788
                or self.config.NETWORK_PROXY != proxy_str \
789
                or self.config.NETWORK_PROXY_USER != proxy_user \
790
                or self.config.NETWORK_PROXY_PASSWORD != proxy_pass \
791
                or self.config.NETWORK_ONESERVER != net_params.oneserver:
792
            return
×
793

794
        proxy_changed = self.proxy != proxy
×
795
        oneserver_changed = self.oneserver != net_params.oneserver
×
796
        default_server_changed = self.default_server != server
×
797
        self._init_parameters_from_config()
×
798
        if not self._was_started:
×
799
            return
×
800

801
        async with self.restart_lock:
×
802
            if proxy_changed or oneserver_changed:
×
803
                # Restart the network
804
                await self.stop(full_shutdown=False)
×
805
                await self._start()
×
806
            elif default_server_changed:
×
807
                await self.switch_to_interface(server)
×
808
            else:
809
                await self.switch_lagging_interface()
×
810
        util.trigger_callback('network_updated')
×
811

812
    def _maybe_set_oneserver(self) -> None:
5✔
813
        oneserver = self.config.NETWORK_ONESERVER
×
814
        self.oneserver = oneserver
×
815
        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
×
816

817
    def is_server_bookmarked(self, server: ServerAddr) -> bool:
5✔
818
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
819
        return str(server) in bookmarks
×
820

821
    def set_server_bookmark(self, server: ServerAddr, *, add: bool) -> None:
5✔
822
        server_str = str(server)
×
823
        with self.config.lock:
×
824
            bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
825
            if add:
×
826
                if server_str not in bookmarks:
×
827
                    bookmarks.append(server_str)
×
828
            else:  # remove
829
                if server_str in bookmarks:
×
830
                    bookmarks.remove(server_str)
×
831
            self.config.NETWORK_BOOKMARKED_SERVERS = bookmarks
×
832

833
    async def _switch_to_random_interface(self):
5✔
834
        '''Switch to a random connected server other than the current one'''
835
        servers = self.get_interfaces()    # Those in connected state
×
836
        if self.default_server in servers:
×
837
            servers.remove(self.default_server)
×
838
        if servers:
×
839
            await self.switch_to_interface(random.choice(servers))
×
840

841
    async def switch_lagging_interface(self):
5✔
842
        """If auto_connect and lagging, switch interface (only within fork)."""
843
        if self.auto_connect and await self._server_is_lagging():
×
844
            # switch to one that has the correct header (not height)
845
            best_header = self.blockchain().header_at_tip()
×
846
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
847
            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
×
848
            if filtered:
×
849
                chosen_iface = random.choice(filtered)
×
850
                await self.switch_to_interface(chosen_iface.server)
×
851

852
    async def switch_unwanted_fork_interface(self) -> None:
5✔
853
        """If auto_connect, maybe switch to another fork/chain."""
854
        if not self.auto_connect or not self.interface:
×
855
            return
×
856
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
857
        pref_height = self._blockchain_preferred_block['height']
×
858
        pref_hash   = self._blockchain_preferred_block['hash']
×
859
        # shortcut for common case
860
        if pref_height == 0:
×
861
            return
×
862
        # maybe try switching chains; starting with most desirable first
863
        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
×
864
        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
×
865
        for rank, chain in enumerate(chains_to_try):
×
866
            # check if main interface is already on this fork
867
            if self.interface.blockchain == chain:
×
868
                return
×
869
            # switch to another random interface that is on this fork, if any
870
            filtered = [iface for iface in interfaces
×
871
                        if iface.blockchain == chain]
872
            if filtered:
×
873
                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
×
874
                chosen_iface = random.choice(filtered)
×
875
                await self.switch_to_interface(chosen_iface.server)
×
876
                return
×
877
        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
×
878

879
    async def switch_to_interface(self, server: ServerAddr):
5✔
880
        """Switch to server as our main interface. If no connection exists,
881
        queue interface to be started. The actual switch will
882
        happen when the interface becomes ready.
883
        """
884
        self.default_server = server
×
885
        old_interface = self.interface
×
886
        old_server = old_interface.server if old_interface else None
×
887

888
        # Stop any current interface in order to terminate subscriptions,
889
        # and to cancel tasks in interface.taskgroup.
890
        if old_server and old_server != server:
×
891
            # don't wait for old_interface to close as that might be slow:
892
            await self.taskgroup.spawn(self._close_interface(old_interface))
×
893

894
        if server not in self.interfaces:
×
895
            self.interface = None
×
896
            await self.taskgroup.spawn(self._run_new_interface(server))
×
897
            return
×
898

899
        i = self.interfaces[server]
×
900
        if old_interface != i:
×
901
            if not i.is_connected_and_ready():
×
902
                return
×
903
            self.logger.info(f"switching to {server}")
×
904
            blockchain_updated = i.blockchain != self.blockchain()
×
905
            self.interface = i
×
906
            try:
×
907
                await i.taskgroup.spawn(self._request_server_info(i))
×
908
            except RuntimeError as e:  # see #7677
×
909
                if len(e.args) >= 1 and e.args[0] == 'task group terminated':
×
910
                    self.logger.warning(f"tried to switch to {server} but interface.taskgroup is already dead.")
×
911
                    self.interface = None
×
912
                    return
×
913
                raise
×
914
            util.trigger_callback('default_server_changed')
×
915
            self.default_server_changed_event.set()
×
916
            self.default_server_changed_event.clear()
×
917
            self._set_status(ConnectionState.CONNECTED)
×
918
            util.trigger_callback('network_updated')
×
919
            if blockchain_updated:
×
920
                util.trigger_callback('blockchain_updated')
×
921

922
    async def _close_interface(self, interface: Optional[Interface]):
5✔
923
        if not interface:
×
924
            return
×
925
        if interface.server in self._closing_ifaces:
×
926
            return
×
927
        self._closing_ifaces.add(interface.server)
×
928
        with self.interfaces_lock:
×
929
            if self.interfaces.get(interface.server) == interface:
×
930
                self.interfaces.pop(interface.server)
×
931
        if interface == self.interface:
×
932
            self.interface = None
×
933
        try:
×
934
            # this can take some time if server/connection is slow:
935
            await interface.close()
×
936
            await interface.got_disconnected.wait()
×
937
        finally:
938
            self._closing_ifaces.discard(interface.server)
×
939

940
    @with_recent_servers_lock
5✔
941
    def _add_recent_server(self, server: ServerAddr) -> None:
5✔
942
        self._on_connection_successfully_established(server)
×
943
        # list is ordered
944
        if server in self._recent_servers:
×
945
            self._recent_servers.remove(server)
×
946
        self._recent_servers.insert(0, server)
×
947
        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
×
948
        self._save_recent_servers()
×
949

950
    async def connection_down(self, interface: Interface):
5✔
951
        '''A connection to server either went down, or was never made.
952
        We distinguish by whether it is in self.interfaces.'''
953
        if not interface: return
×
954
        if interface.server == self.default_server:
×
955
            self._set_status(ConnectionState.DISCONNECTED)
×
956
        await self._close_interface(interface)
×
957
        util.trigger_callback('network_updated')
×
958

959
    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
5✔
960
        if self.config.NETWORK_TIMEOUT:
×
961
            return self.config.NETWORK_TIMEOUT
×
962
        if self.oneserver and not self.auto_connect:
×
963
            return request_type.MOST_RELAXED
×
NEW
964
        if self.proxy and self.proxy.enabled:
×
965
            return request_type.RELAXED
×
966
        return request_type.NORMAL
×
967

968
    @ignore_exceptions  # do not kill outer taskgroup
5✔
969
    @log_exceptions
5✔
970
    async def _run_new_interface(self, server: ServerAddr):
5✔
971
        if (server in self.interfaces
×
972
                or server in self._connecting_ifaces
973
                or server in self._closing_ifaces):
974
            return
×
975
        self._connecting_ifaces.add(server)
×
976
        if server == self.default_server:
×
977
            self.logger.info(f"connecting to {server} as new interface")
×
978
            self._set_status(ConnectionState.CONNECTING)
×
979
        self._trying_addr_now(server)
×
980

981
        interface = Interface(network=self, server=server)
×
982
        # note: using longer timeouts here as DNS can sometimes be slow!
983
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
984
        try:
×
985
            await util.wait_for2(interface.ready, timeout)
×
986
        except BaseException as e:
×
987
            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
×
988
            await interface.close()
×
989
            return
×
990
        else:
991
            with self.interfaces_lock:
×
992
                assert server not in self.interfaces
×
993
                self.interfaces[server] = interface
×
994
        finally:
995
            self._connecting_ifaces.discard(server)
×
996

997
        if server == self.default_server:
×
998
            await self.switch_to_interface(server)
×
999

1000
        self._has_ever_managed_to_connect_to_server = True
×
1001
        self._add_recent_server(server)
×
1002
        util.trigger_callback('network_updated')
×
1003
        # When the proxy settings were set, the proxy (if any) might have been unreachable,
1004
        # resulting in a false-negative for Tor-detection. Given we just connected to a server, re-test now.
1005
        self._detect_if_proxy_is_tor()
×
1006

1007
    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
5✔
1008
        # main interface is exempt. this makes switching servers easier
1009
        if iface_to_check.is_main_server():
×
1010
            return True
×
1011
        if not iface_to_check.bucket_based_on_ipaddress():
×
1012
            return True
×
1013
        # bucket connected interfaces
1014
        with self.interfaces_lock:
×
1015
            interfaces = list(self.interfaces.values())
×
1016
        if iface_to_check in interfaces:
×
1017
            interfaces.remove(iface_to_check)
×
1018
        buckets = defaultdict(list)
×
1019
        for iface in interfaces:
×
1020
            buckets[iface.bucket_based_on_ipaddress()].append(iface)
×
1021
        # check proposed server against buckets
1022
        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
×
1023
        if iface_to_check.is_tor():
×
1024
            # keep number of onion servers below half of all connected servers
1025
            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
×
1026
                return False
×
1027
        else:
1028
            bucket = iface_to_check.bucket_based_on_ipaddress()
×
1029
            if len(buckets[bucket]) > 0:
×
1030
                return False
×
1031
        return True
×
1032

1033
    def best_effort_reliable(func):
5✔
1034
        @functools.wraps(func)
5✔
1035
        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
5✔
1036
            for i in range(10):
×
1037
                iface = self.interface
×
1038
                # retry until there is a main interface
1039
                if not iface:
×
1040
                    async with ignore_after(1):
×
1041
                        await self.default_server_changed_event.wait()
×
1042
                    continue  # try again
×
1043
                assert iface.ready.done(), "interface not ready yet"
×
1044
                # try actual request
1045
                try:
×
1046
                    async with OldTaskGroup(wait=any) as group:
×
1047
                        task = await group.spawn(func(self, *args, **kwargs))
×
1048
                        await group.spawn(iface.got_disconnected.wait())
×
1049
                except RequestTimedOut:
×
1050
                    await iface.close()
×
1051
                    await iface.got_disconnected.wait()
×
1052
                    continue  # try again
×
1053
                except RequestCorrupted as e:
×
1054
                    # TODO ban server?
1055
                    iface.logger.exception(f"RequestCorrupted: {e}")
×
1056
                    await iface.close()
×
1057
                    await iface.got_disconnected.wait()
×
1058
                    continue  # try again
×
1059
                if task.done() and not task.cancelled():
×
1060
                    return task.result()
×
1061
                # otherwise; try again
1062
            raise BestEffortRequestFailed('cannot establish a connection... gave up.')
×
1063
        return make_reliable_wrapper
5✔
1064

1065
    def catch_server_exceptions(func):
5✔
1066
        """Decorator that wraps server errors in UntrustedServerReturnedError,
1067
        to avoid showing untrusted arbitrary text to users.
1068
        """
1069
        @functools.wraps(func)
5✔
1070
        async def wrapper(self, *args, **kwargs):
5✔
1071
            try:
×
1072
                return await func(self, *args, **kwargs)
×
1073
            except aiorpcx.jsonrpc.CodeMessageError as e:
×
1074
                wrapped_exc = UntrustedServerReturnedError(original_exception=e)
×
1075
                # log (sanitized) untrusted error text now, to ease debugging
1076
                self.logger.debug(f"got error from server for {func.__qualname__}: {wrapped_exc.get_untrusted_message()!r}")
×
1077
                raise wrapped_exc from e
×
1078
        return wrapper
5✔
1079

1080
    @best_effort_reliable
5✔
1081
    @catch_server_exceptions
5✔
1082
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
1083
        if self.interface is None:  # handled by best_effort_reliable
×
1084
            raise RequestTimedOut()
×
1085
        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
×
1086

1087
    @best_effort_reliable
5✔
1088
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
5✔
1089
        """caller should handle TxBroadcastError"""
1090
        if self.interface is None:  # handled by best_effort_reliable
×
1091
            raise RequestTimedOut()
×
1092
        if timeout is None:
×
1093
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1094
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1095
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1096
        try:
×
1097
            out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
×
1098
            # note: both 'out' and exception messages are untrusted input from the server
1099
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1100
            raise  # pass-through
×
1101
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1102
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
1103
            raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e
×
1104
        except BaseException as e:  # intentional BaseException for sanity!
×
1105
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
1106
            send_exception_to_crash_reporter(e)
×
1107
            raise TxBroadcastUnknownError() from e
×
1108
        if out != tx.txid():
×
1109
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1110
                             f"{error_text_str_to_safe_str(out)} != {tx.txid()}")
1111
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1112

1113
    async def try_broadcasting(self, tx, name) -> bool:
5✔
1114
        try:
×
1115
            await self.broadcast_transaction(tx)
×
1116
        except Exception as e:
×
1117
            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
×
1118
            return False
×
1119
        else:
1120
            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
×
1121
            return True
×
1122

1123
    @staticmethod
5✔
1124
    def sanitize_tx_broadcast_response(server_msg) -> str:
5✔
1125
        # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1126
        # So, we use substring matching to grok the error message.
1127
        # server_msg is untrusted input so it should not be shown to the user. see #4968
1128
        server_msg = str(server_msg)
×
1129
        server_msg = server_msg.replace("\n", r"\n")
×
1130

1131
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1132
        script_error_messages = {
×
1133
            r"Script evaluated without error but finished with a false/empty top stack element",
1134
            r"Script failed an OP_VERIFY operation",
1135
            r"Script failed an OP_EQUALVERIFY operation",
1136
            r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1137
            r"Script failed an OP_CHECKSIGVERIFY operation",
1138
            r"Script failed an OP_NUMEQUALVERIFY operation",
1139
            r"Script is too big",
1140
            r"Push value size limit exceeded",
1141
            r"Operation limit exceeded",
1142
            r"Stack size limit exceeded",
1143
            r"Signature count negative or greater than pubkey count",
1144
            r"Pubkey count negative or limit exceeded",
1145
            r"Opcode missing or not understood",
1146
            r"Attempted to use a disabled opcode",
1147
            r"Operation not valid with the current stack size",
1148
            r"Operation not valid with the current altstack size",
1149
            r"OP_RETURN was encountered",
1150
            r"Invalid OP_IF construction",
1151
            r"Negative locktime",
1152
            r"Locktime requirement not satisfied",
1153
            r"Signature hash type missing or not understood",
1154
            r"Non-canonical DER signature",
1155
            r"Data push larger than necessary",
1156
            r"Only push operators allowed in signatures",
1157
            r"Non-canonical signature: S value is unnecessarily high",
1158
            r"Dummy CHECKMULTISIG argument must be zero",
1159
            r"OP_IF/NOTIF argument must be minimal",
1160
            r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1161
            r"NOPx reserved for soft-fork upgrades",
1162
            r"Witness version reserved for soft-fork upgrades",
1163
            r"Taproot version reserved for soft-fork upgrades",
1164
            r"OP_SUCCESSx reserved for soft-fork upgrades",
1165
            r"Public key version reserved for soft-fork upgrades",
1166
            r"Public key is neither compressed or uncompressed",
1167
            r"Stack size must be exactly one after execution",
1168
            r"Extra items left on stack after execution",
1169
            r"Witness program has incorrect length",
1170
            r"Witness program was passed an empty witness",
1171
            r"Witness program hash mismatch",
1172
            r"Witness requires empty scriptSig",
1173
            r"Witness requires only-redeemscript scriptSig",
1174
            r"Witness provided for non-witness script",
1175
            r"Using non-compressed keys in segwit",
1176
            r"Invalid Schnorr signature size",
1177
            r"Invalid Schnorr signature hash type",
1178
            r"Invalid Schnorr signature",
1179
            r"Invalid Taproot control block size",
1180
            r"Too much signature validation relative to witness weight",
1181
            r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1182
            r"OP_IF/NOTIF argument must be minimal in tapscript",
1183
            r"Using OP_CODESEPARATOR in non-witness script",
1184
            r"Signature is found in scriptCode",
1185
        }
1186
        for substring in script_error_messages:
×
1187
            if substring in server_msg:
×
1188
                return substring
×
1189
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1190
        # grep "REJECT_"
1191
        # grep "TxValidationResult"
1192
        # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1193
        validation_error_messages = {
×
1194
            r"coinbase": None,
1195
            r"tx-size-small": None,
1196
            r"non-final": None,
1197
            r"txn-already-in-mempool": None,
1198
            r"txn-mempool-conflict": None,
1199
            r"txn-already-known": None,
1200
            r"non-BIP68-final": None,
1201
            r"bad-txns-nonstandard-inputs": None,
1202
            r"bad-witness-nonstandard": None,
1203
            r"bad-txns-too-many-sigops": None,
1204
            r"mempool min fee not met":
1205
                ("mempool min fee not met\n" +
1206
                 _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1207
                   "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1208
                   "of transactions that all pay higher fees. Try to increase the fee.")),
1209
            r"min relay fee not met": None,
1210
            r"absurdly-high-fee": None,
1211
            r"max-fee-exceeded": None,
1212
            r"too-long-mempool-chain": None,
1213
            r"bad-txns-spends-conflicting-tx": None,
1214
            r"insufficient fee": ("insufficient fee\n" +
1215
                 _("Your transaction is trying to replace another one in the mempool but it "
1216
                   "does not meet the rules to do so. Try to increase the fee.")),
1217
            r"too many potential replacements": None,
1218
            r"replacement-adds-unconfirmed": None,
1219
            r"mempool full": None,
1220
            r"non-mandatory-script-verify-flag": None,
1221
            r"mandatory-script-verify-flag-failed": None,
1222
            r"Transaction check failed": None,
1223
        }
1224
        for substring in validation_error_messages:
×
1225
            if substring in server_msg:
×
1226
                msg = validation_error_messages[substring]
×
1227
                return msg if msg else substring
×
1228
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1229
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1230
        # grep "RPC_TRANSACTION"
1231
        # grep "RPC_DESERIALIZATION_ERROR"
1232
        rawtransaction_error_messages = {
×
1233
            r"Missing inputs": None,
1234
            r"Inputs missing or spent": None,
1235
            r"transaction already in block chain": None,
1236
            r"Transaction already in block chain": None,
1237
            r"TX decode failed": None,
1238
            r"Peer-to-peer functionality missing or disabled": None,
1239
            r"Transaction rejected by AcceptToMemoryPool": None,
1240
            r"AcceptToMemoryPool failed": None,
1241
            r"Fee exceeds maximum configured by user": None,
1242
        }
1243
        for substring in rawtransaction_error_messages:
×
1244
            if substring in server_msg:
×
1245
                msg = rawtransaction_error_messages[substring]
×
1246
                return msg if msg else substring
×
1247
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1248
        # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1249
        # grep "REJECT_"
1250
        # grep "TxValidationResult"
1251
        tx_verify_error_messages = {
×
1252
            r"bad-txns-vin-empty": None,
1253
            r"bad-txns-vout-empty": None,
1254
            r"bad-txns-oversize": None,
1255
            r"bad-txns-vout-negative": None,
1256
            r"bad-txns-vout-toolarge": None,
1257
            r"bad-txns-txouttotal-toolarge": None,
1258
            r"bad-txns-inputs-duplicate": None,
1259
            r"bad-cb-length": None,
1260
            r"bad-txns-prevout-null": None,
1261
            r"bad-txns-inputs-missingorspent":
1262
                ("bad-txns-inputs-missingorspent\n" +
1263
                 _("You might have a local transaction in your wallet that this transaction "
1264
                   "builds on top. You need to either broadcast or remove the local tx.")),
1265
            r"bad-txns-premature-spend-of-coinbase": None,
1266
            r"bad-txns-inputvalues-outofrange": None,
1267
            r"bad-txns-in-belowout": None,
1268
            r"bad-txns-fee-outofrange": None,
1269
        }
1270
        for substring in tx_verify_error_messages:
×
1271
            if substring in server_msg:
×
1272
                msg = tx_verify_error_messages[substring]
×
1273
                return msg if msg else substring
×
1274
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1275
        # grep "reason ="
1276
        # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1277
        # should come after script_error.cpp (due to e.g. "version")
1278
        policy_error_messages = {
×
1279
            r"version": _("Transaction uses non-standard version."),
1280
            r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1281
            r"scriptsig-size": None,
1282
            r"scriptsig-not-pushonly": None,
1283
            r"scriptpubkey":
1284
                ("scriptpubkey\n" +
1285
                 _("Some of the outputs pay to a non-standard script.")),
1286
            r"bare-multisig": None,
1287
            r"dust":
1288
                (_("Transaction could not be broadcast due to dust outputs.\n"
1289
                   "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1290
                   "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1291
            r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1292
        }
1293
        for substring in policy_error_messages:
×
1294
            if substring in server_msg:
×
1295
                msg = policy_error_messages[substring]
×
1296
                return msg if msg else substring
×
1297
        # otherwise:
1298
        return _("Unknown error")
×
1299

1300
    @best_effort_reliable
5✔
1301
    @catch_server_exceptions
5✔
1302
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
5✔
1303
        if self.interface is None:  # handled by best_effort_reliable
×
1304
            raise RequestTimedOut()
×
1305
        return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
×
1306

1307
    @best_effort_reliable
5✔
1308
    @catch_server_exceptions
5✔
1309
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
1310
        if self.interface is None:  # handled by best_effort_reliable
×
1311
            raise RequestTimedOut()
×
1312
        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
×
1313

1314
    @best_effort_reliable
5✔
1315
    @catch_server_exceptions
5✔
1316
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1317
        if self.interface is None:  # handled by best_effort_reliable
×
1318
            raise RequestTimedOut()
×
1319
        return await self.interface.get_history_for_scripthash(sh)
×
1320

1321
    @best_effort_reliable
5✔
1322
    @catch_server_exceptions
5✔
1323
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1324
        if self.interface is None:  # handled by best_effort_reliable
×
1325
            raise RequestTimedOut()
×
1326
        return await self.interface.listunspent_for_scripthash(sh)
×
1327

1328
    @best_effort_reliable
5✔
1329
    @catch_server_exceptions
5✔
1330
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1331
        if self.interface is None:  # handled by best_effort_reliable
×
1332
            raise RequestTimedOut()
×
1333
        return await self.interface.get_balance_for_scripthash(sh)
×
1334

1335
    @best_effort_reliable
5✔
1336
    @catch_server_exceptions
5✔
1337
    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
5✔
1338
        if self.interface is None:  # handled by best_effort_reliable
×
1339
            raise RequestTimedOut()
×
1340
        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
×
1341

1342
    def blockchain(self) -> Blockchain:
5✔
1343
        interface = self.interface
×
1344
        if interface and interface.blockchain is not None:
×
1345
            self._blockchain = interface.blockchain
×
1346
        return self._blockchain
×
1347

1348
    def get_blockchains(self):
5✔
1349
        out = {}  # blockchain_id -> list(interfaces)
×
1350
        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
×
1351
        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
×
1352
        for chain_id, bc in blockchain_items:
×
1353
            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
×
1354
            if r:
×
1355
                out[chain_id] = r
×
1356
        return out
×
1357

1358
    def _set_preferred_chain(self, chain: Optional[Blockchain]):
5✔
1359
        if chain:
×
1360
            height = chain.get_max_forkpoint()
×
1361
            header_hash = chain.get_hash(height)
×
1362
        else:
1363
            height = 0
×
1364
            header_hash = constants.net.GENESIS
×
1365
        self._blockchain_preferred_block = {
×
1366
            'height': height,
1367
            'hash': header_hash,
1368
        }
1369
        self.config.BLOCKCHAIN_PREFERRED_BLOCK = self._blockchain_preferred_block
×
1370

1371
    async def follow_chain_given_id(self, chain_id: str) -> None:
5✔
1372
        bc = blockchain.blockchains.get(chain_id)
×
1373
        if not bc:
×
1374
            raise Exception('blockchain {} not found'.format(chain_id))
×
1375
        self._set_preferred_chain(bc)
×
1376
        # select server on this chain
1377
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1378
        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
×
1379
        if len(interfaces_on_selected_chain) == 0: return
×
1380
        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
×
1381
        # switch to server (and save to config)
1382
        net_params = self.get_parameters()
×
1383
        net_params = net_params._replace(server=chosen_iface.server)
×
1384
        await self.set_parameters(net_params)
×
1385

1386
    async def follow_chain_given_server(self, server: ServerAddr) -> None:
5✔
1387
        # note that server_str should correspond to a connected interface
1388
        iface = self.interfaces.get(server)
×
1389
        if iface is None:
×
1390
            return
×
1391
        self._set_preferred_chain(iface.blockchain)
×
1392
        # switch to server (and save to config)
1393
        net_params = self.get_parameters()
×
1394
        net_params = net_params._replace(server=server)
×
1395
        await self.set_parameters(net_params)
×
1396

1397
    def get_server_height(self) -> int:
5✔
1398
        """Length of header chain, as claimed by main interface."""
1399
        interface = self.interface
×
1400
        return interface.tip if interface else 0
×
1401

1402
    def get_local_height(self) -> int:
5✔
1403
        """Length of header chain, POW-verified.
1404
        In case of a chain split, this is for the branch the main interface is on,
1405
        but it is the tip of that branch (even if main interface is behind).
1406
        """
1407
        return self.blockchain().height()
×
1408

1409
    def export_checkpoints(self, path):
5✔
1410
        """Run manually to generate blockchain checkpoints.
1411
        Kept for console use only.
1412
        """
1413
        cp = self.blockchain().get_checkpoints()
×
1414
        with open(path, 'w', encoding='utf-8') as f:
×
1415
            f.write(json.dumps(cp, indent=4))
×
1416

1417
    async def _start(self):
5✔
1418
        assert not self.taskgroup
×
1419
        self.taskgroup = taskgroup = OldTaskGroup()
×
1420
        assert not self.interface and not self.interfaces
×
1421
        assert not self._connecting_ifaces
×
1422
        assert not self._closing_ifaces
×
1423
        self.logger.info('starting network')
×
1424
        self._clear_addr_retry_times()
×
1425
        self._init_parameters_from_config()
×
1426
        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
×
1427

1428
        async def main():
×
1429
            self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1430
            try:
×
1431
                # note: if a task finishes with CancelledError, that
1432
                # will NOT raise, and the group will keep the other tasks running
1433
                async with taskgroup as group:
×
1434
                    await group.spawn(self._maintain_sessions())
×
1435
                    [await group.spawn(job) for job in self._jobs]
×
1436
            except Exception as e:
×
1437
                self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
×
1438
            finally:
1439
                self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1440
        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
×
1441

1442
        util.trigger_callback('network_updated')
×
1443

1444
    def start(self, jobs: Iterable = None):
5✔
1445
        """Schedule starting the network, along with the given job co-routines.
1446

1447
        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1448
        setting changes.
1449
        """
1450
        self._was_started = True
×
1451
        self._jobs = jobs or []
×
1452
        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
×
1453

1454
    @log_exceptions
5✔
1455
    async def stop(self, *, full_shutdown: bool = True):
5✔
1456
        if not self._was_started:
×
1457
            self.logger.info("not stopping network as it was never started")
×
1458
            return
×
1459
        self.logger.info("stopping network")
×
1460
        # timeout: if full_shutdown, it is up to the caller to time us out,
1461
        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1462
        async with (nullcontext() if full_shutdown else ignore_after(1)):
×
1463
            async with OldTaskGroup() as group:
×
1464
                await group.spawn(self.taskgroup.cancel_remaining())
×
1465
                if full_shutdown:
×
1466
                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
×
1467
        self.taskgroup = None
×
1468
        self.interface = None
×
1469
        self.interfaces = {}
×
1470
        self._connecting_ifaces.clear()
×
1471
        self._closing_ifaces.clear()
×
1472
        if not full_shutdown:
×
1473
            util.trigger_callback('network_updated')
×
1474

1475
    async def _ensure_there_is_a_main_interface(self):
5✔
1476
        if self.interface:
×
1477
            return
×
1478
        # if auto_connect is set, try a different server
1479
        if self.auto_connect and not self.is_connecting():
×
1480
            await self._switch_to_random_interface()
×
1481
        # if auto_connect is not set, or still no main interface, retry current
1482
        if not self.interface and not self.is_connecting():
×
1483
            if self._can_retry_addr(self.default_server, urgent=True):
×
1484
                await self.switch_to_interface(self.default_server)
×
1485

1486
    async def _maintain_sessions(self):
5✔
1487
        async def maybe_start_new_interfaces():
×
1488
            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
×
1489
            for i in range(self.num_server - num_existing_ifaces):
×
1490
                # FIXME this should try to honour "healthy spread of connected servers"
1491
                server = self._get_next_server_to_try()
×
1492
                if server:
×
1493
                    await self.taskgroup.spawn(self._run_new_interface(server))
×
1494
        async def maintain_healthy_spread_of_connected_servers():
×
1495
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1496
            random.shuffle(interfaces)
×
1497
            for iface in interfaces:
×
1498
                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
×
1499
                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
×
1500
                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1501
                    await self._close_interface(iface)
×
1502
        async def maintain_main_interface():
×
1503
            await self._ensure_there_is_a_main_interface()
×
1504
            if self.is_connected():
×
1505
                if self.is_fee_estimates_update_required():
×
1506
                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
×
1507

1508
        while True:
×
1509
            await maybe_start_new_interfaces()
×
1510
            await maintain_healthy_spread_of_connected_servers()
×
1511
            await maintain_main_interface()
×
1512
            await asyncio.sleep(0.1)
×
1513

1514
    @classmethod
5✔
1515
    async def async_send_http_on_proxy(
5✔
1516
            cls, method: str, url: str, *,
1517
            params: dict = None,
1518
            body: bytes = None,
1519
            json: dict = None,
1520
            headers=None,
1521
            on_finish=None,
1522
            timeout=None,
1523
    ):
1524
        async def default_on_finish(resp: ClientResponse):
×
1525
            resp.raise_for_status()
×
1526
            return await resp.text()
×
1527
        if headers is None:
×
1528
            headers = {}
×
1529
        if on_finish is None:
×
1530
            on_finish = default_on_finish
×
1531
        network = cls.get_instance()
×
1532
        proxy = network.proxy if network else None
×
1533
        async with make_aiohttp_session(proxy, timeout=timeout) as session:
×
1534
            if method == 'get':
×
1535
                async with session.get(url, params=params, headers=headers) as resp:
×
1536
                    return await on_finish(resp)
×
1537
            elif method == 'post':
×
1538
                assert body is not None or json is not None, 'body or json must be supplied if method is post'
×
1539
                if body is not None:
×
1540
                    async with session.post(url, data=body, headers=headers) as resp:
×
1541
                        return await on_finish(resp)
×
1542
                elif json is not None:
×
1543
                    async with session.post(url, json=json, headers=headers) as resp:
×
1544
                        return await on_finish(resp)
×
1545
            else:
1546
                raise Exception(f"unexpected {method=!r}")
×
1547

1548
    @classmethod
5✔
1549
    def send_http_on_proxy(cls, method, url, **kwargs):
5✔
1550
        loop = util.get_asyncio_loop()
×
1551
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
1552
        coro = asyncio.run_coroutine_threadsafe(cls.async_send_http_on_proxy(method, url, **kwargs), loop)
×
1553
        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1554
        return coro.result()
×
1555

1556
    # methods used in scripts
1557
    async def get_peers(self):
5✔
1558
        while not self.is_connected():
×
1559
            await asyncio.sleep(1)
×
1560
        session = self.interface.session
×
1561
        return parse_servers(await session.send_request('server.peers.subscribe'))
×
1562

1563
    async def send_multiple_requests(
5✔
1564
            self,
1565
            servers: Sequence[ServerAddr],
1566
            method: str,
1567
            params: Sequence,
1568
            *,
1569
            timeout: int = None,
1570
    ):
1571
        if timeout is None:
×
1572
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1573
        responses = dict()
×
1574
        async def get_response(server: ServerAddr):
×
1575
            interface = Interface(network=self, server=server)
×
1576
            try:
×
1577
                await util.wait_for2(interface.ready, timeout)
×
1578
            except BaseException as e:
×
1579
                await interface.close()
×
1580
                return
×
1581
            try:
×
1582
                res = await interface.session.send_request(method, params, timeout=10)
×
1583
            except Exception as e:
×
1584
                res = e
×
1585
            responses[interface.server] = res
×
1586
        async with OldTaskGroup() as group:
×
1587
            for server in servers:
×
1588
                await group.spawn(get_response(server))
×
1589
        return responses
×
1590

1591
    async def prune_offline_servers(self, hostmap):
5✔
1592
        peers = filter_protocol(hostmap, allowed_protocols=("t", "s",))
×
1593
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
1594
        replies = await self.send_multiple_requests(peers, 'blockchain.headers.subscribe', [], timeout=timeout)
×
1595
        servers_replied = {serveraddr.host for serveraddr in replies.keys()}
×
1596
        servers_dict = {k: v for k, v in hostmap.items()
×
1597
                        if k in servers_replied}
1598
        return servers_dict
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc