• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5351680889323520

18 Dec 2025 02:27PM UTC coverage: 62.356% (-0.06%) from 62.411%
5351680889323520

Pull #10378

CirrusCI

SomberNight
network: disconnected servers: do not filter out bookmarked raw ":t"

In the GUI, when displaying the list of disconnected servers, we were
filtering to only ":s" servers. Instead now we also show ":t" servers
if they are bookmarked.

Also, if bookmarked, we also show disconnected .onion servers, even
when not using a Tor proxy.

fixes https://github.com/spesmilo/electrum/issues/10374
Pull Request #10378: network gui: always show bookmarked servers in list

6 of 26 new or added lines in 2 files covered. (23.08%)

9 existing lines in 5 files now uncovered.

23740 of 38072 relevant lines covered (62.36%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.89
/electrum/network.py
1
# Electrum - Lightweight Bitcoin Client
2
# Copyright (c) 2011-2016 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import asyncio
1✔
24
import time
1✔
25
import os
1✔
26
import random
1✔
27
import re
1✔
28
from collections import defaultdict
1✔
29
import threading
1✔
30
import json
1✔
31
from typing import (
1✔
32
    NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any, TypeVar,
33
    Callable, Mapping,
34
)
35
import copy
1✔
36
import functools
1✔
37
from enum import IntEnum
1✔
38
from contextlib import nullcontext
1✔
39

40
import aiorpcx
1✔
41
from aiorpcx import ignore_after, NetAddress
1✔
42
from aiohttp import ClientResponse
1✔
43

44
from . import util
1✔
45
from .util import (
1✔
46
    log_exceptions, ignore_exceptions, OldTaskGroup, make_aiohttp_session,
47
    NetworkRetryManager, error_text_str_to_safe_str, detect_tor_socks_proxy
48
)
49
from . import constants
1✔
50
from . import blockchain
1✔
51
from . import dns_hacks
1✔
52
from .transaction import Transaction
1✔
53
from .blockchain import Blockchain
1✔
54
from .interface import (
1✔
55
    Interface, PREFERRED_NETWORK_PROTOCOL, RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
56
    NetworkException, RequestCorrupted, ServerAddr, TxBroadcastError, KNOWN_ELEC_PROTOCOL_TRANSPORTS,
57
)
58
from .version import PROTOCOL_VERSION_MIN
1✔
59
from .i18n import _
1✔
60
from .logging import get_logger, Logger
1✔
61
from .fee_policy import FeeHistogram, FeeTimeEstimates, FEE_ETA_TARGETS
1✔
62

63

64
if TYPE_CHECKING:
65
    from collections.abc import Coroutine
66
    from .channel_db import ChannelDB
67
    from .lnrouter import LNPathFinder
68
    from .lnworker import LNGossip
69
    from .daemon import Daemon
70
    from .simple_config import SimpleConfig
71

72

73
_logger = get_logger(__name__)
1✔
74

75

76
NUM_TARGET_CONNECTED_SERVERS = 10
1✔
77
NUM_STICKY_SERVERS = 4
1✔
78
NUM_RECENT_SERVERS = 20
1✔
79

80
T = TypeVar('T')
1✔
81

82

83
class ConnectionState(IntEnum):
1✔
84
    DISCONNECTED  = 0
1✔
85
    CONNECTING    = 1
1✔
86
    CONNECTED     = 2
1✔
87

88

89
def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
1✔
90
    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
91
    Also validate values, such as IP addresses and ports.
92
    """
93
    servers = {}
×
94
    for item in result:
×
95
        host = item[1]
×
96
        out = {}
×
97
        version = None
×
98
        pruning_level = '-'
×
99
        if len(item) > 2:
×
100
            for v in item[2]:
×
101
                if re.match(r"[st]\d*", v):
×
102
                    protocol, port = v[0], v[1:]
×
103
                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
×
104
                    ServerAddr(host, port, protocol=protocol)  # check if raises
×
105
                    out[protocol] = port
×
106
                elif re.match("v(.?)+", v):
×
107
                    version = v[1:]
×
108
                elif re.match(r"p\d*", v):
×
109
                    pruning_level = v[1:]
×
110
                if pruning_level == '': pruning_level = '0'
×
111
        if out:
×
112
            out['pruning'] = pruning_level
×
113
            out['version'] = version
×
114
            servers[host] = out
×
115
    return servers
×
116

117

118
def filter_version(servers):
1✔
119
    def is_recent(version):
×
120
        try:
×
121
            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION_MIN)
×
122
        except Exception as e:
×
123
            return False
×
124
    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
×
125

126

127
def filter_noonion(servers):
1✔
128
    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
×
129

130

131
def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
1✔
132
    """Filters the hostmap for those implementing protocol."""
133
    if allowed_protocols is None:
×
134
        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
135
    eligible = []
×
136
    for host, portmap in hostmap.items():
×
137
        for protocol in allowed_protocols:
×
138
            port = portmap.get(protocol)
×
139
            if port:
×
140
                eligible.append(ServerAddr(host, port, protocol=protocol))
×
141
    return eligible
×
142

143

144
def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
1✔
145
                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
146
    if hostmap is None:
×
147
        hostmap = constants.net.DEFAULT_SERVERS
×
148
    if exclude_set is None:
×
149
        exclude_set = set()
×
150
    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
×
151
    eligible = list(servers - exclude_set)
×
152
    return random.choice(eligible) if eligible else None
×
153

154

155
def is_valid_port(ps: str):
1✔
156
    try:
×
157
        return 0 < int(ps) < 65535
×
158
    except ValueError:
×
159
        return False
×
160

161

162
def is_valid_host(ph: str):
1✔
163
    try:
×
164
        NetAddress(ph, '1')
×
165
    except ValueError:
×
166
        return False
×
167
    return True
×
168

169

170
class ProxySettings:
1✔
171
    MODES = ['socks4', 'socks5']
1✔
172

173
    probe_fut = None
1✔
174

175
    def __init__(self):
1✔
176
        self.enabled = False
1✔
177
        self.mode = 'socks5'
1✔
178
        self.host = ''
1✔
179
        self.port = ''
1✔
180
        self.user = None
1✔
181
        self.password = None
1✔
182

183
    def set_defaults(self):
1✔
184
        self.__init__()  # call __init__ for default values
×
185

186
    def serialize_proxy_cfgstr(self):
1✔
187
        return ':'.join([self.mode, self.host, self.port])
×
188

189
    def deserialize_proxy_cfgstr(self, s: Optional[str], user: str = None, password: str = None) -> None:
1✔
190
        if s is None or (isinstance(s, str) and s.lower() == 'none'):
×
191
            self.set_defaults()
×
192
            self.user = user
×
193
            self.password = password
×
194
            return
×
195

196
        if not isinstance(s, str):
×
197
            raise ValueError('proxy config not a string')
×
198

199
        args = s.split(':')
×
200
        if args[0] in ProxySettings.MODES:
×
201
            self.mode = args[0]
×
202
            args = args[1:]
×
203

204
        # detect migrate from old settings
205
        if len(args) == 4 and is_valid_host(args[0]) and is_valid_port(args[1]):  # host:port:user:pass,
×
206
            self.host = args[0]
×
207
            self.port = args[1]
×
208
            self.user = args[2]
×
209
            self.password = args[3]
×
210
        else:
211
            self.host = ':'.join(args[:-1])
×
212
            self.port = args[-1]
×
213
            self.user = user
×
214
            self.password = password
×
215

216
        if not is_valid_host(self.host) or not is_valid_port(self.port):
×
217
            self.enabled = False
×
218

219
    def to_dict(self):
1✔
220
        return {
×
221
            'enabled': self.enabled,
222
            'mode': self.mode,
223
            'host': self.host,
224
            'port': self.port,
225
            'user': self.user,
226
            'password': self.password
227
        }
228

229
    @classmethod
1✔
230
    def from_config(cls, config: 'SimpleConfig') -> 'ProxySettings':
1✔
231
        proxy = ProxySettings()
×
232
        proxy.deserialize_proxy_cfgstr(
×
233
            config.NETWORK_PROXY, config.NETWORK_PROXY_USER, config.NETWORK_PROXY_PASSWORD
234
        )
235
        proxy.enabled = config.NETWORK_PROXY_ENABLED
×
236
        return proxy
×
237

238
    @classmethod
1✔
239
    def from_dict(cls, d: dict) -> 'ProxySettings':
1✔
240
        proxy = ProxySettings()
1✔
241
        proxy.enabled = d.get('enabled', proxy.enabled)
1✔
242
        proxy.mode = d.get('mode', proxy.mode)
1✔
243
        proxy.host = d.get('host', proxy.host)
1✔
244
        proxy.port = d.get('port', proxy.port)
1✔
245
        proxy.user = d.get('user', proxy.user)
1✔
246
        proxy.password = d.get('password', proxy.password)
1✔
247
        return proxy
1✔
248

249
    @classmethod
1✔
250
    def probe_tor(cls, on_finished: Callable[[str | None, int | None], None]):
1✔
251
        async def detect_task(finished: Callable[[str | None, int | None], None]):
×
252
            try:
×
253
                net_addr = await detect_tor_socks_proxy()
×
254
                if net_addr is None:
×
255
                    finished('', -1)
×
256
                else:
257
                    host = net_addr[0]
×
258
                    port = net_addr[1]
×
259
                    finished(host, port)
×
260
            finally:
261
                cls.probe_fut = None
×
262

263
        if cls.probe_fut:  # one probe at a time
×
264
            return
×
265
        cls.probe_fut = asyncio.run_coroutine_threadsafe(detect_task(on_finished), util.get_asyncio_loop())
×
266

267
    def __eq__(self, other):
1✔
268
        return self.enabled == other.enabled \
1✔
269
            and self.mode == other.mode \
270
            and self.host == other.host \
271
            and self.port == other.port \
272
            and self.user == other.user \
273
            and self.password == other.password
274

275
    def __str__(self):
1✔
276
        return f'{self.enabled=} {self.mode=} {self.host=} {self.port=} {self.user=}'
×
277

278

279
class NetworkParameters(NamedTuple):
1✔
280
    server: ServerAddr
1✔
281
    proxy: ProxySettings
1✔
282
    auto_connect: bool
1✔
283
    oneserver: bool = False
1✔
284

285

286
class BestEffortRequestFailed(NetworkException): pass
1✔
287

288

289
class UntrustedServerReturnedError(NetworkException):
1✔
290
    def __init__(self, *, original_exception):
1✔
291
        self.original_exception = original_exception
×
292

293
    def get_message_for_gui(self) -> str:
1✔
294
        return str(self)
×
295

296
    def get_untrusted_message(self) -> str:
1✔
297
        e = self.original_exception
×
298
        return (f"<UntrustedServerReturnedError "
×
299
                f"[DO NOT TRUST THIS MESSAGE] original_exception: {error_text_str_to_safe_str(repr(e))}>")
300

301
    def __str__(self):
1✔
302
        # We should not show the untrusted text from self.original_exception,
303
        # to avoid accidentally showing it in the GUI.
304
        return _("The server returned an error.")
×
305

306
    def __repr__(self):
1✔
307
        # We should not show the untrusted text from self.original_exception,
308
        # to avoid accidentally showing it in the GUI.
309
        return f"<UntrustedServerReturnedError {str(self)!r}>"
×
310

311

312
_INSTANCE = None
1✔
313

314

315
class Network(Logger, NetworkRetryManager[ServerAddr]):
1✔
316
    """The Network class manages a set of connections to remote electrum
317
    servers, each connected socket is handled by an Interface() object.
318
    """
319

320
    taskgroup: Optional[OldTaskGroup]
1✔
321
    interface: Optional[Interface]
1✔
322
    interfaces: Dict[ServerAddr, Interface]
1✔
323
    _connecting_ifaces: Set[ServerAddr]
1✔
324
    _closing_ifaces: Set[ServerAddr]
1✔
325
    default_server: ServerAddr
1✔
326
    _recent_servers: List[ServerAddr]
1✔
327

328
    channel_db: Optional['ChannelDB'] = None
1✔
329
    lngossip: Optional['LNGossip'] = None
1✔
330
    path_finder: Optional['LNPathFinder'] = None
1✔
331

332
    def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
1✔
333
        global _INSTANCE
334
        assert _INSTANCE is None, "Network is a singleton!"
×
335
        _INSTANCE = self
×
336

337
        Logger.__init__(self)
×
338
        NetworkRetryManager.__init__(
×
339
            self,
340
            max_retry_delay_normal=600,
341
            init_retry_delay_normal=15,
342
            max_retry_delay_urgent=10,
343
            init_retry_delay_urgent=1,
344
        )
345

346
        self.asyncio_loop = util.get_asyncio_loop()
×
347
        assert self.asyncio_loop.is_running(), "event loop not running"
×
348

349
        self.config = config
×
350
        self.daemon = daemon
×
351

352
        blockchain.read_blockchains(self.config)
×
353
        blockchain.init_headers_file_for_best_chain()
×
354
        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
×
355
        self._blockchain_preferred_block = self.config.BLOCKCHAIN_PREFERRED_BLOCK  # type: Dict[str, Any]
×
356
        if self._blockchain_preferred_block is None:
×
357
            self._set_preferred_chain(None)
×
358
        self._blockchain = blockchain.get_best_chain()
×
359

360
        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
361

362
        self.proxy = ProxySettings()
×
363
        self.is_proxy_tor = None  # type: Optional[bool]  # tri-state. None means unknown.
×
364
        self._init_parameters_from_config()
×
365

366
        self.taskgroup = None
×
367

368
        # locks
369
        self.restart_lock = asyncio.Lock()
×
370
        self.bhi_lock = asyncio.Lock()
×
371
        self.recent_servers_lock = threading.RLock()       # <- re-entrant
×
372
        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
×
373

374
        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
×
375
        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
×
376

377
        self.banner = ''
×
378
        self.donation_address = ''
×
379
        self.relay_fee = None  # type: Optional[int]
×
380

381
        dir_path = os.path.join(self.config.path, 'certs')
×
382
        util.make_dir(dir_path)
×
383

384
        # the main server we are currently communicating with
385
        self.interface = None
×
386
        self.default_server_changed_event = asyncio.Event()
×
387
        # Set of servers we have an ongoing connection with.
388
        # For any ServerAddr, at most one corresponding Interface object
389
        # can exist at any given time. Depending on the state of that Interface,
390
        # the ServerAddr can be found in one of the following sets.
391
        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
392
        self._connecting_ifaces = set()
×
393
        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
×
394
        self._closing_ifaces = set()
×
395

396
        # Dump network messages (all interfaces).  Set at runtime from the console.
397
        self.debug = False
×
398

399
        self._set_status(ConnectionState.DISCONNECTED)
×
400
        self._has_ever_managed_to_connect_to_server = False
×
401
        self._was_started = False
×
402

403
        self.mempool_fees = FeeHistogram()
×
404
        self.fee_estimates = FeeTimeEstimates()
×
405
        self.last_time_fee_estimates_requested = 0  # zero ensures immediate fees
×
406

407
    def has_internet_connection(self) -> bool:
1✔
408
        """Our guess whether the device has Internet-connectivity."""
409
        return self._has_ever_managed_to_connect_to_server
×
410

411
    def has_channel_db(self):
1✔
412
        return self.channel_db is not None
×
413

414
    def start_gossip(self):
1✔
415
        from . import lnrouter
×
416
        from . import channel_db
×
417
        from . import lnworker
×
418
        if not self.config.LIGHTNING_USE_GOSSIP:
×
419
            return
×
420
        if self.lngossip is None:
×
421
            self.channel_db = channel_db.ChannelDB(self)
×
422
            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
×
423
            self.channel_db.load_data()
×
424
            self.lngossip = lnworker.LNGossip(self.config)
×
425
            self.lngossip.start_network(self)
×
426

427
    async def stop_gossip(self, *, full_shutdown: bool = False):
1✔
428
        if self.lngossip:
×
429
            await self.lngossip.stop()
×
430
            self.lngossip = None
×
431
            self.channel_db.stop()
×
432
            if full_shutdown:
×
433
                await self.channel_db.stopped_event.wait()
×
434
            self.channel_db = None
×
435
            self.path_finder = None
×
436

437
    @classmethod
1✔
438
    def run_from_another_thread(cls, coro: 'Coroutine[Any, Any, T]', *, timeout=None) -> T:
1✔
439
        loop = util.get_asyncio_loop()
×
440
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
441
        fut = asyncio.run_coroutine_threadsafe(coro, loop)
×
442
        return fut.result(timeout)
×
443

444
    @staticmethod
1✔
445
    def get_instance() -> Optional["Network"]:
1✔
446
        """Return the global singleton network instance.
447
        Note that this can return None! If we are run with the --offline flag, there is no network.
448
        """
449
        return _INSTANCE
×
450

451
    def with_recent_servers_lock(func):
1✔
452
        def func_wrapper(self, *args, **kwargs):
1✔
453
            with self.recent_servers_lock:
×
454
                return func(self, *args, **kwargs)
×
455
        return func_wrapper
1✔
456

457
    def _read_recent_servers(self) -> List[ServerAddr]:
1✔
458
        if not self.config.path:
×
459
            return []
×
460
        path = os.path.join(self.config.path, "recent_servers")
×
461
        try:
×
462
            with open(path, "r", encoding='utf-8') as f:
×
463
                data = f.read()
×
464
                servers_list = json.loads(data)
×
465
            return [ServerAddr.from_str(s) for s in servers_list]
×
466
        except Exception:
×
467
            return []
×
468

469
    @with_recent_servers_lock
1✔
470
    def _save_recent_servers(self):
1✔
471
        if not self.config.path:
×
472
            return
×
473
        path = os.path.join(self.config.path, "recent_servers")
×
474
        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, default=str)
×
475
        try:
×
476
            with open(path, "w", encoding='utf-8') as f:
×
477
                f.write(s)
×
478
        except Exception:
×
479
            pass
×
480

481
    async def _server_is_lagging(self) -> bool:
1✔
482
        sh = self.get_server_height()
×
483
        if not sh:
×
484
            self.logger.info('no height for main interface')
×
485
            return True
×
486
        lh = self.get_local_height()
×
487
        result = (lh - sh) > 1
×
488
        if result:
×
489
            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
×
490
        return result
×
491

492
    def _set_status(self, status):
1✔
493
        self.connection_status = status
×
494
        util.trigger_callback('status')
×
495

496
    def is_connected(self):
1✔
497
        interface = self.interface
×
498
        return interface is not None and interface.is_connected_and_ready()
×
499

500
    def is_connecting(self):
1✔
501
        return self.connection_status == ConnectionState.CONNECTING
×
502

503
    def get_connection_status_for_GUI(self):
1✔
504
        ConnectionStates = {
×
505
            ConnectionState.DISCONNECTED: _('Disconnected'),
506
            ConnectionState.CONNECTING: _('Connecting'),
507
            ConnectionState.CONNECTED: _('Connected'),
508
        }
509
        return ConnectionStates[self.connection_status]
×
510

511
    async def _request_server_info(self, interface: 'Interface'):
1✔
512
        await interface.ready
×
513
        session = interface.session
×
514

515
        async def get_banner():
×
516
            self.banner = await interface.get_server_banner()
×
517
            util.trigger_callback('banner', self.banner)
×
518

519
        async def get_donation_address():
×
520
            self.donation_address = await interface.get_donation_address()
×
521

522
        async def get_server_peers():
×
523
            server_peers = await session.send_request('server.peers.subscribe')
×
524
            random.shuffle(server_peers)
×
525
            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
×
526
            server_peers = server_peers[:max_accepted_peers]
×
527
            # note that 'parse_servers' also validates the data (which is untrusted input!)
528
            self.server_peers = parse_servers(server_peers)
×
529
            util.trigger_callback('servers', self.get_servers())
×
530

531
        async def get_relay_fee():
×
532
            self.relay_fee = await interface.get_relay_fee()
×
533

534
        async with OldTaskGroup() as group:
×
535
            await group.spawn(get_banner)
×
536
            await group.spawn(get_donation_address)
×
537
            await group.spawn(get_server_peers)
×
538
            await group.spawn(get_relay_fee)
×
539
            await group.spawn(self._request_fee_estimates(interface))
×
540

541
    async def _request_fee_estimates(self, interface):
1✔
542
        self.requested_fee_estimates()
×
543
        histogram = await interface.get_fee_histogram()
×
544
        self.mempool_fees.set_data(histogram)
×
545
        self.logger.info(f'fee_histogram {len(histogram)}')
×
546
        util.trigger_callback('fee_histogram', self.mempool_fees)
×
547

548
    def is_fee_estimates_update_required(self):
1✔
549
        """Checks time since last requested and updated fee estimates.
550
        Returns True if an update should be requested.
551
        """
552
        now = time.time()
×
553
        return now - self.last_time_fee_estimates_requested > 60
×
554

555
    def has_fee_etas(self):
1✔
556
        return self.fee_estimates.has_data()
×
557

558
    def has_fee_mempool(self) -> bool:
1✔
559
        return self.mempool_fees.has_data()
×
560

561
    def requested_fee_estimates(self):
1✔
562
        self.last_time_fee_estimates_requested = time.time()
×
563

564
    def get_parameters(self) -> NetworkParameters:
1✔
565
        return NetworkParameters(server=self.default_server,
×
566
                                 proxy=self.proxy,
567
                                 auto_connect=self.auto_connect,
568
                                 oneserver=self.oneserver)
569

570
    def _init_parameters_from_config(self) -> None:
1✔
571
        dns_hacks.configure_dns_resolver()
×
572
        self.auto_connect = self.config.NETWORK_AUTO_CONNECT
×
573
        if self.auto_connect and self.config.NETWORK_ONESERVER:
×
574
            # enabling both oneserver and auto_connect doesn't really make sense
575
            # assume oneserver is enabled for privacy reasons, disable auto_connect and assume server is unpredictable
576
            self.logger.warning(f'both "oneserver" and "auto_connect" options enabled, disabling "auto_connect" and resetting "server".')
×
577
            self.config.NETWORK_SERVER = ""  # let _set_default_server set harmless default (localhost)
×
578
            self.auto_connect = False
×
579

580
        self._set_default_server()
×
581
        self._set_proxy(ProxySettings.from_config(self.config))
×
582
        self._maybe_set_oneserver()
×
583

584
    def get_donation_address(self):
1✔
585
        if self.is_connected():
×
586
            return self.donation_address
×
587

588
    def get_interfaces(self) -> List[ServerAddr]:
1✔
589
        """The list of servers for the connected interfaces."""
590
        with self.interfaces_lock:
×
591
            return list(self.interfaces)
×
592

593
    def get_status(self):
1✔
594
        n = len(self.get_interfaces())
×
595
        return _("Connected to {0} nodes.").format(n) if n > 1 else _("Connected to {0} node.").format(n) if n == 1 else _("Not connected")
×
596

597
    def get_fee_estimates(self):
1✔
598
        from statistics import median
×
599
        if self.auto_connect:
×
600
            with self.interfaces_lock:
×
601
                out = {}
×
602
                for n in FEE_ETA_TARGETS[0:-1]:
×
603
                    try:
×
604
                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
×
605
                    except Exception:
×
606
                        continue
×
607
                return out
×
608
        else:
609
            if not self.interface:
×
610
                return {}
×
611
            return self.interface.fee_estimates_eta
×
612

613
    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
1✔
614
        if fee_est is None:
×
615
            if self.config.TEST_DISABLE_AUTOMATIC_FEE_ETA_UPDATE:
×
616
                return
×
617
            fee_est = self.get_fee_estimates()
×
618
        for nblock_target, fee in fee_est.items():
×
619
            self.fee_estimates.set_data(nblock_target, fee)
×
620
        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
×
621
            self._prev_fee_est = copy.copy(fee_est)
×
622
            self.logger.info(f'fee_estimates {fee_est}')
×
623
        util.trigger_callback('fee', self.fee_estimates)
×
624

625

626
    @with_recent_servers_lock
1✔
627
    def get_servers(self) -> Mapping[str, Mapping[str, str]]:
1✔
628
        # note: order of sources when adding servers here is crucial!
629
        # don't let "server_peers" overwrite anything,
630
        # otherwise main server can eclipse the client
631
        out = dict()
×
632
        # add servers received from main interface
633
        server_peers = self.server_peers
×
634
        if server_peers:
×
635
            out.update(filter_version(server_peers.copy()))
×
636
        # hardcoded servers
637
        out.update(constants.net.DEFAULT_SERVERS)
×
638
        # add recent servers
639
        for server in self._recent_servers:
×
640
            port = str(server.port)
×
641
            if server.host in out:
×
642
                out[server.host].update({server.protocol: port})
×
643
            else:
644
                out[server.host] = {server.protocol: port}
×
645
        # add bookmarks
646
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
647
        for server_str in bookmarks:
×
648
            try:
×
649
                server = ServerAddr.from_str(server_str)
×
650
            except ValueError:
×
651
                continue
×
652
            port = str(server.port)
×
653
            if server.host in out:
×
654
                out[server.host].update({server.protocol: port})
×
655
            else:
656
                out[server.host] = {server.protocol: port}
×
657
        # potentially filter out some
658
        if self.config.NETWORK_NOONION:
×
659
            out = filter_noonion(out)
×
660
        return out
×
661

662
    def get_disconnected_server_addrs(self) -> Sequence[ServerAddr]:
1✔
NEW
663
        hostmap = self.get_servers()
×
NEW
664
        disconnected_server_addrs = []  # type: List[ServerAddr]
×
NEW
665
        chains = self.get_blockchains()
×
NEW
666
        connected_hosts = set([iface.host for ifaces in chains.values() for iface in ifaces])
×
667
        # convert hostmap to list of ServerAddrs (one-to-many mapping)
NEW
668
        server_addrs = []
×
NEW
669
        for host, portmap in hostmap.items():
×
NEW
670
            for protocol in KNOWN_ELEC_PROTOCOL_TRANSPORTS:
×
NEW
671
                if port := portmap.get(protocol):
×
NEW
672
                    server_addrs.append(ServerAddr(host, port, protocol=protocol))
×
673
        # sort bookmarked servers to appear first
NEW
674
        server_addrs.sort(key=lambda x: (-self.is_server_bookmarked(x), str(x)))
×
675
        # filter out stuff
NEW
676
        for server in server_addrs:
×
NEW
677
            if server.host in connected_hosts:
×
NEW
678
                continue
×
NEW
679
            if not self.is_server_bookmarked(server):
×
NEW
680
                if server.protocol != PREFERRED_NETWORK_PROTOCOL:
×
NEW
681
                    continue
×
NEW
682
                if server.host.endswith('.onion') and not self.is_proxy_tor:
×
NEW
683
                    continue
×
NEW
684
            disconnected_server_addrs.append(server)
×
NEW
685
        return disconnected_server_addrs
×
686

687
    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
1✔
688
        now = time.time()
×
689
        with self.interfaces_lock:
×
690
            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
×
691
        # First try from recent servers. (which are persisted)
692
        # As these are servers we successfully connected to recently, they are
693
        # most likely to work. This also makes servers "sticky".
694
        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
695
        #       however if they succeed, the eclipsing would persist. To try to balance this,
696
        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
697
        with self.recent_servers_lock:
×
698
            recent_servers = list(self._recent_servers)
×
699
        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
×
700
        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
×
701
            for server in recent_servers:
×
702
                if server in connected_servers:
×
703
                    continue
×
704
                if not self._can_retry_addr(server, now=now):
×
705
                    continue
×
706
                return server
×
707
        # try all servers we know about, pick one at random
708
        hostmap = self.get_servers()
×
709
        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
×
710
        random.shuffle(servers)
×
711
        for server in servers:
×
712
            if not self._can_retry_addr(server, now=now):
×
713
                continue
×
714
            return server
×
715
        return None
×
716

717
    def _set_default_server(self) -> None:
1✔
718
        # Server for addresses and transactions
719
        server = self.config.NETWORK_SERVER
×
720
        # Sanitize default server
721
        if server:
×
722
            try:
×
723
                self.default_server = ServerAddr.from_str(server)
×
724
            except Exception:
×
725
                self.logger.warning(f'failed to parse server-string ({server!r}); falling back to localhost:1:s.')
×
726
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
727
        else:
728
            # if oneserver is enabled but no server specified then don't pick a random server
729
            if self.config.NETWORK_ONESERVER:
×
730
                self.logger.warning(f'"oneserver" option enabled, but no "server" defined; falling back to localhost:1:s.')
×
731
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
732
            else:
733
                self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
×
734
        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
×
735

736
    def _set_proxy(self, proxy: ProxySettings):
1✔
737
        if self.proxy == proxy:
×
738
            return
×
739

740
        self.logger.info(f'setting proxy {proxy}')
×
741
        self.proxy = proxy
×
742

743
        # reset is_proxy_tor to unknown, and re-detect it:
744
        self.is_proxy_tor = None
×
745
        self._detect_if_proxy_is_tor()
×
746

747
        util.trigger_callback('proxy_set', self.proxy)
×
748

749
    def _detect_if_proxy_is_tor(self) -> None:
1✔
750
        async def tor_probe_task(p):
×
751
            assert p is not None
×
752
            is_tor = await util.is_tor_socks_port(p.host, int(p.port))
×
753
            if self.proxy == p:  # is this the proxy we probed?
×
754
                if self.is_proxy_tor != is_tor:
×
755
                    self.logger.info(f'Proxy is {"" if is_tor else "not "}TOR')
×
756
                    self.is_proxy_tor = is_tor
×
757
                util.trigger_callback('tor_probed', is_tor)
×
758

759
        proxy = self.proxy
×
760
        if proxy and proxy.enabled and proxy.mode == 'socks5':
×
761
            asyncio.run_coroutine_threadsafe(tor_probe_task(proxy), self.asyncio_loop)
×
762

763
    @log_exceptions
1✔
764
    async def set_parameters(self, net_params: NetworkParameters):
1✔
765
        proxy = net_params.proxy
×
766
        proxy_str = proxy.serialize_proxy_cfgstr()
×
767
        proxy_enabled = proxy.enabled
×
768
        proxy_user = proxy.user
×
769
        proxy_pass = proxy.password
×
770
        server = net_params.server
×
771
        # sanitize parameters
772
        try:
×
773
            if proxy:
×
774
                # proxy_modes.index(proxy['mode']) + 1
775
                ProxySettings.MODES.index(proxy.mode) + 1
×
776
                # int(proxy['port'])
777
                int(proxy.port)
×
778
        except Exception:
×
779
            proxy.enabled = False
×
780
            # return
781
        self.config.NETWORK_AUTO_CONNECT = net_params.auto_connect
×
782
        self.config.NETWORK_ONESERVER = net_params.oneserver
×
783
        self.config.NETWORK_PROXY_ENABLED = proxy_enabled
×
784
        self.config.NETWORK_PROXY = proxy_str
×
785
        self.config.NETWORK_PROXY_USER = proxy_user
×
786
        self.config.NETWORK_PROXY_PASSWORD = proxy_pass
×
787
        self.config.NETWORK_SERVER = str(server)
×
788
        # abort if changes were not allowed by config
789
        if self.config.NETWORK_SERVER != str(server) \
×
790
                or self.config.NETWORK_PROXY_ENABLED != proxy_enabled \
791
                or self.config.NETWORK_PROXY != proxy_str \
792
                or self.config.NETWORK_PROXY_USER != proxy_user \
793
                or self.config.NETWORK_PROXY_PASSWORD != proxy_pass \
794
                or self.config.NETWORK_ONESERVER != net_params.oneserver:
795
            return
×
796

797
        proxy_changed = self.proxy != proxy
×
798
        oneserver_changed = self.oneserver != net_params.oneserver
×
799
        default_server_changed = self.default_server != server
×
800
        self._init_parameters_from_config()
×
801
        if not self._was_started:
×
802
            return
×
803

804
        async with self.restart_lock:
×
805
            if proxy_changed or oneserver_changed:
×
806
                # Restart the network
807
                await self.stop(full_shutdown=False)
×
808
                await self._start()
×
809
            elif default_server_changed:
×
810
                await self.switch_to_interface(server)
×
811
            else:
812
                await self.switch_lagging_interface()
×
813
        util.trigger_callback('network_updated')
×
814

815
    def _maybe_set_oneserver(self) -> None:
1✔
816
        oneserver = self.config.NETWORK_ONESERVER
×
817
        self.oneserver = oneserver
×
818
        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
×
819

820
    def is_server_bookmarked(self, server: ServerAddr) -> bool:
1✔
821
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
822
        return str(server) in bookmarks
×
823

824
    def set_server_bookmark(self, server: ServerAddr, *, add: bool) -> None:
1✔
825
        server_str = str(server)
×
826
        with self.config.lock:
×
827
            bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
828
            if add:
×
829
                if server_str not in bookmarks:
×
830
                    bookmarks.append(server_str)
×
831
            else:  # remove
832
                if server_str in bookmarks:
×
833
                    bookmarks.remove(server_str)
×
834
            self.config.NETWORK_BOOKMARKED_SERVERS = bookmarks
×
835

836
    async def _switch_to_random_interface(self):
1✔
837
        '''Switch to a random connected server other than the current one'''
838
        servers = self.get_interfaces()    # Those in connected state
×
839
        if self.default_server in servers:
×
840
            servers.remove(self.default_server)
×
841
        if servers:
×
842
            await self.switch_to_interface(random.choice(servers))
×
843

844
    async def switch_lagging_interface(self):
1✔
845
        """If auto_connect and lagging, switch interface (only within fork)."""
846
        if self.auto_connect and await self._server_is_lagging():
×
847
            # switch to one that has the correct header (not height)
848
            best_header = self.blockchain().header_at_tip()
×
849
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
850
            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
×
851
            if filtered:
×
852
                chosen_iface = random.choice(filtered)
×
853
                await self.switch_to_interface(chosen_iface.server)
×
854

855
    async def switch_unwanted_fork_interface(self) -> None:
1✔
856
        """If auto_connect, maybe switch to another fork/chain."""
857
        if not self.auto_connect or not self.interface:
×
858
            return
×
859
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
860
        pref_height = self._blockchain_preferred_block['height']
×
861
        pref_hash   = self._blockchain_preferred_block['hash']
×
862
        # shortcut for common case
863
        if pref_height == 0:
×
864
            return
×
865
        # maybe try switching chains; starting with most desirable first
866
        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
×
867
        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
×
868
        for rank, chain in enumerate(chains_to_try):
×
869
            # check if main interface is already on this fork
870
            if self.interface.blockchain == chain:
×
871
                return
×
872
            # switch to another random interface that is on this fork, if any
873
            filtered = [iface for iface in interfaces
×
874
                        if iface.blockchain == chain]
875
            if filtered:
×
876
                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
×
877
                chosen_iface = random.choice(filtered)
×
878
                await self.switch_to_interface(chosen_iface.server)
×
879
                return
×
880
        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
×
881

882
    async def switch_to_interface(self, server: ServerAddr):
1✔
883
        """Switch to server as our main interface. If no connection exists,
884
        queue interface to be started. The actual switch will
885
        happen when the interface becomes ready.
886
        """
887
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
888
        self.default_server = server
×
889
        old_interface = self.interface
×
890
        old_server = old_interface.server if old_interface else None
×
891

892
        # Stop any current interface in order to terminate subscriptions,
893
        # and to cancel tasks in interface.taskgroup.
894
        if old_server and old_server != server:
×
895
            # don't wait for old_interface to close as that might be slow:
896
            await self.taskgroup.spawn(self._close_interface(old_interface))
×
897

898
        if server not in self.interfaces:
×
899
            self.interface = None
×
900
            await self.taskgroup.spawn(self._run_new_interface(server))
×
901
            return
×
902

903
        i = self.interfaces[server]
×
904
        if old_interface != i:
×
905
            if not i.is_connected_and_ready():
×
906
                return
×
907
            self.logger.info(f"switching to {server}")
×
908
            blockchain_updated = i.blockchain != self.blockchain()
×
909
            self.interface = i
×
910
            try:
×
911
                await i.taskgroup.spawn(self._request_server_info(i))
×
912
            except RuntimeError as e:  # see #7677
×
913
                if len(e.args) >= 1 and e.args[0] == 'task group terminated':
×
914
                    self.logger.warning(f"tried to switch to {server} but interface.taskgroup is already dead.")
×
915
                    self.interface = None
×
916
                    return
×
917
                raise
×
918
            util.trigger_callback('default_server_changed')
×
919
            self.default_server_changed_event.set()
×
920
            self.default_server_changed_event.clear()
×
921
            self._set_status(ConnectionState.CONNECTED)
×
922
            util.trigger_callback('network_updated')
×
923
            if blockchain_updated:
×
924
                util.trigger_callback('blockchain_updated')
×
925

926
    async def _close_interface(self, interface: Optional[Interface]):
1✔
927
        if not interface:
×
928
            return
×
929
        if interface.server in self._closing_ifaces:
×
930
            return
×
931
        self._closing_ifaces.add(interface.server)
×
932
        with self.interfaces_lock:
×
933
            if self.interfaces.get(interface.server) == interface:
×
934
                self.interfaces.pop(interface.server)
×
935
        if interface == self.interface:
×
936
            self.interface = None
×
937
        try:
×
938
            # this can take some time if server/connection is slow:
939
            await interface.close()
×
940
            await interface.got_disconnected.wait()
×
941
        finally:
942
            self._closing_ifaces.discard(interface.server)
×
943

944
    @with_recent_servers_lock
1✔
945
    def _add_recent_server(self, server: ServerAddr) -> None:
1✔
946
        self._on_connection_successfully_established(server)
×
947
        # list is ordered
948
        if server in self._recent_servers:
×
949
            self._recent_servers.remove(server)
×
950
        self._recent_servers.insert(0, server)
×
951
        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
×
952
        self._save_recent_servers()
×
953

954
    async def connection_down(self, interface: Interface):
1✔
955
        '''A connection to server either went down, or was never made.
956
        We distinguish by whether it is in self.interfaces.'''
957
        if not interface: return
×
958
        if interface.server == self.default_server:
×
959
            self._set_status(ConnectionState.DISCONNECTED)
×
960
        await self._close_interface(interface)
×
961
        util.trigger_callback('network_updated')
×
962

963
    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
1✔
964
        if self.config.NETWORK_TIMEOUT:
×
965
            return self.config.NETWORK_TIMEOUT
×
966
        if self.oneserver and not self.auto_connect:
×
967
            return request_type.MOST_RELAXED
×
968
        if self.proxy and self.proxy.enabled:
×
969
            return request_type.RELAXED
×
970
        return request_type.NORMAL
×
971

972
    @ignore_exceptions  # do not kill outer taskgroup
1✔
973
    @log_exceptions
1✔
974
    async def _run_new_interface(self, server: ServerAddr):
1✔
975
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
976
        if (server in self.interfaces
×
977
                or server in self._connecting_ifaces
978
                or server in self._closing_ifaces):
979
            return
×
980
        self._connecting_ifaces.add(server)
×
981
        if server == self.default_server:
×
982
            self.logger.info(f"connecting to {server} as new interface")
×
983
            self._set_status(ConnectionState.CONNECTING)
×
984
        self._trying_addr_now(server)
×
985

986
        interface = Interface(network=self, server=server)
×
987
        # note: using longer timeouts here as DNS can sometimes be slow!
988
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
989
        try:
×
990
            await util.wait_for2(interface.ready, timeout)
×
991
        except BaseException as e:
×
992
            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
×
993
            await interface.close()
×
994
            return
×
995
        else:
996
            with self.interfaces_lock:
×
997
                assert server not in self.interfaces
×
998
                self.interfaces[server] = interface
×
999
        finally:
1000
            self._connecting_ifaces.discard(server)
×
1001

1002
        if server == self.default_server:
×
1003
            await self.switch_to_interface(server)
×
1004

1005
        self._has_ever_managed_to_connect_to_server = True
×
1006
        self._add_recent_server(server)
×
1007
        util.trigger_callback('network_updated')
×
1008
        # When the proxy settings were set, the proxy (if any) might have been unreachable,
1009
        # resulting in a false-negative for Tor-detection. Given we just connected to a server, re-test now.
1010
        self._detect_if_proxy_is_tor()
×
1011

1012
    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
1✔
1013
        # main interface is exempt. this makes switching servers easier
1014
        if iface_to_check.is_main_server():
×
1015
            return True
×
1016
        if not iface_to_check.bucket_based_on_ipaddress():
×
1017
            return True
×
1018
        # bucket connected interfaces
1019
        with self.interfaces_lock:
×
1020
            interfaces = list(self.interfaces.values())
×
1021
        if iface_to_check in interfaces:
×
1022
            interfaces.remove(iface_to_check)
×
1023
        buckets = defaultdict(list)
×
1024
        for iface in interfaces:
×
1025
            buckets[iface.bucket_based_on_ipaddress()].append(iface)
×
1026
        # check proposed server against buckets
1027
        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
×
1028
        if iface_to_check.is_tor():
×
1029
            # keep number of onion servers below half of all connected servers
1030
            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
×
1031
                return False
×
1032
        else:
1033
            bucket = iface_to_check.bucket_based_on_ipaddress()
×
1034
            if len(buckets[bucket]) > 0:
×
1035
                return False
×
1036
        return True
×
1037

1038
    def best_effort_reliable(func):
1✔
1039
        @functools.wraps(func)
1✔
1040
        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
1✔
1041
            for i in range(10):
×
1042
                iface = self.interface
×
1043
                # retry until there is a main interface
1044
                if not iface:
×
1045
                    async with ignore_after(1):
×
1046
                        await self.default_server_changed_event.wait()
×
1047
                    continue  # try again
×
1048
                assert iface.ready.done(), "interface not ready yet"
×
1049
                # try actual request
1050
                try:
×
1051
                    async with OldTaskGroup(wait=any) as group:
×
1052
                        task = await group.spawn(func(self, *args, **kwargs))
×
1053
                        await group.spawn(iface.got_disconnected.wait())
×
1054
                except RequestTimedOut:
×
1055
                    await iface.close()
×
1056
                    await iface.got_disconnected.wait()
×
1057
                    continue  # try again
×
1058
                except RequestCorrupted as e:
×
1059
                    # TODO ban server?
1060
                    iface.logger.exception(f"RequestCorrupted: {e}")
×
1061
                    await iface.close()
×
1062
                    await iface.got_disconnected.wait()
×
1063
                    continue  # try again
×
1064
                if task.done() and not task.cancelled():
×
1065
                    return task.result()
×
1066
                # otherwise; try again
1067
            raise BestEffortRequestFailed('cannot establish a connection... gave up.')
×
1068
        return make_reliable_wrapper
1✔
1069

1070
    def catch_server_exceptions(func):
1✔
1071
        """Decorator that wraps server errors in UntrustedServerReturnedError,
1072
        to avoid showing untrusted arbitrary text to users.
1073
        """
1074
        @functools.wraps(func)
1✔
1075
        async def wrapper(self, *args, **kwargs):
1✔
1076
            try:
×
1077
                return await func(self, *args, **kwargs)
×
1078
            except aiorpcx.jsonrpc.CodeMessageError as e:
×
1079
                wrapped_exc = UntrustedServerReturnedError(original_exception=e)
×
1080
                # log (sanitized) untrusted error text now, to ease debugging
1081
                self.logger.debug(f"got error from server for {func.__qualname__}: {wrapped_exc.get_untrusted_message()!r}")
×
1082
                raise wrapped_exc from e
×
1083
        return wrapper
1✔
1084

1085
    @best_effort_reliable
1✔
1086
    @catch_server_exceptions
1✔
1087
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1088
        if self.interface is None:  # handled by best_effort_reliable
×
1089
            raise RequestTimedOut()
×
1090
        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
×
1091

1092
    @best_effort_reliable
1✔
1093
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1094
        """caller should handle TxBroadcastError"""
1095
        if self.interface is None:  # handled by best_effort_reliable
×
1096
            raise RequestTimedOut()
×
1097
        await self.interface.broadcast_transaction(tx, timeout=timeout)
×
1098

1099
    async def try_broadcasting(self, tx: 'Transaction', name: str) -> bool:
1✔
1100
        try:
×
1101
            await self.broadcast_transaction(tx)
×
1102
        except Exception as e:
×
1103
            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
×
1104
            return False
×
1105
        else:
1106
            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
×
1107
            return True
×
1108

1109
    @best_effort_reliable
1✔
1110
    @catch_server_exceptions
1✔
1111
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1112
        if self.interface is None:  # handled by best_effort_reliable
×
1113
            raise RequestTimedOut()
×
1114
        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
×
1115

1116
    @best_effort_reliable
1✔
1117
    @catch_server_exceptions
1✔
1118
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1119
        if self.interface is None:  # handled by best_effort_reliable
×
1120
            raise RequestTimedOut()
×
1121
        return await self.interface.get_history_for_scripthash(sh)
×
1122

1123
    @best_effort_reliable
1✔
1124
    @catch_server_exceptions
1✔
1125
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1126
        if self.interface is None:  # handled by best_effort_reliable
×
1127
            raise RequestTimedOut()
×
1128
        return await self.interface.listunspent_for_scripthash(sh)
×
1129

1130
    @best_effort_reliable
1✔
1131
    @catch_server_exceptions
1✔
1132
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1133
        if self.interface is None:  # handled by best_effort_reliable
×
1134
            raise RequestTimedOut()
×
1135
        return await self.interface.get_balance_for_scripthash(sh)
×
1136

1137
    @best_effort_reliable
1✔
1138
    @catch_server_exceptions
1✔
1139
    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
1✔
1140
        if self.interface is None:  # handled by best_effort_reliable
×
1141
            raise RequestTimedOut()
×
1142
        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
×
1143

1144
    def blockchain(self) -> Blockchain:
1✔
1145
        interface = self.interface
×
1146
        if interface and interface.blockchain is not None:
×
1147
            self._blockchain = interface.blockchain
×
1148
        return self._blockchain
×
1149

1150
    def get_blockchains(self) -> Mapping[str, Sequence[Interface]]:
1✔
1151
        out = {}  # blockchain_id -> list(interfaces)
×
1152
        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
×
1153
        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
×
1154
        for chain_id, bc in blockchain_items:
×
1155
            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
×
1156
            if r:
×
1157
                out[chain_id] = r
×
1158
        return out
×
1159

1160
    def _set_preferred_chain(self, chain: Optional[Blockchain]):
1✔
1161
        if chain:
×
1162
            height = chain.get_max_forkpoint()
×
1163
            header_hash = chain.get_hash(height)
×
1164
        else:
1165
            height = 0
×
1166
            header_hash = constants.net.GENESIS
×
1167
        self._blockchain_preferred_block = {
×
1168
            'height': height,
1169
            'hash': header_hash,
1170
        }
1171
        self.config.BLOCKCHAIN_PREFERRED_BLOCK = self._blockchain_preferred_block
×
1172

1173
    async def follow_chain_given_id(self, chain_id: str) -> None:
1✔
1174
        bc = blockchain.blockchains.get(chain_id)
×
1175
        if not bc:
×
1176
            raise Exception('blockchain {} not found'.format(chain_id))
×
1177
        self._set_preferred_chain(bc)
×
1178
        # select server on this chain
1179
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1180
        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
×
1181
        if len(interfaces_on_selected_chain) == 0: return
×
1182
        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
×
1183
        # switch to server (and save to config)
1184
        net_params = self.get_parameters()
×
1185
        # we select a random interface, so set connection mode back to autoconnect
1186
        net_params = net_params._replace(server=chosen_iface.server, auto_connect=True, oneserver=False)
×
1187
        await self.set_parameters(net_params)
×
1188

1189
    def follow_chain_given_server(self, server: ServerAddr) -> None:
1✔
1190
        # note that server_str should correspond to a connected interface
1191
        iface = self.interfaces[server]
×
1192
        self._set_preferred_chain(iface.blockchain)
×
1193
        self.logger.debug(f"following {self.config.BLOCKCHAIN_PREFERRED_BLOCK=}")
×
1194

1195
    def get_server_height(self) -> int:
1✔
1196
        """Length of header chain, as claimed by main interface."""
1197
        interface = self.interface
×
1198
        return interface.tip if interface else 0
×
1199

1200
    def get_local_height(self) -> int:
1✔
1201
        """Length of header chain, POW-verified.
1202
        In case of a chain split, this is for the branch the main interface is on,
1203
        but it is the tip of that branch (even if main interface is behind).
1204
        """
1205
        return self.blockchain().height()
×
1206

1207
    def export_checkpoints(self, path):
1✔
1208
        """Run manually to generate blockchain checkpoints.
1209
        Kept for console use only.
1210
        """
1211
        cp = self.blockchain().get_checkpoints()
×
1212
        with open(path, 'w', encoding='utf-8') as f:
×
1213
            f.write(json.dumps(cp, indent=4))
×
1214

1215
    async def _start(self):
1✔
1216
        assert not self.taskgroup
×
1217
        self.taskgroup = taskgroup = OldTaskGroup()
×
1218
        assert not self.interface and not self.interfaces
×
1219
        assert not self._connecting_ifaces
×
1220
        assert not self._closing_ifaces
×
1221
        self.logger.info('starting network')
×
1222
        self._clear_addr_retry_times()
×
1223
        self._init_parameters_from_config()
×
1224
        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
×
1225

1226
        async def main():
×
1227
            self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1228
            try:
×
1229
                # note: if a task finishes with CancelledError, that
1230
                # will NOT raise, and the group will keep the other tasks running
1231
                async with taskgroup as group:
×
1232
                    await group.spawn(self._maintain_sessions())
×
1233
                    [await group.spawn(job) for job in self._jobs]
×
1234
            except Exception as e:
×
1235
                self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
×
1236
            finally:
1237
                self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1238
        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
×
1239

1240
        util.trigger_callback('network_updated')
×
1241

1242
    def start(self, jobs: Iterable = None):
1✔
1243
        """Schedule starting the network, along with the given job co-routines.
1244

1245
        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1246
        setting changes.
1247
        """
1248
        self._was_started = True
×
1249
        self._jobs = jobs or []
×
1250
        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
×
1251

1252
    @log_exceptions
1✔
1253
    async def stop(self, *, full_shutdown: bool = True):
1✔
1254
        if not self._was_started:
×
1255
            self.logger.info("not stopping network as it was never started")
×
1256
            return
×
1257
        self.logger.info("stopping network")
×
1258
        # timeout: if full_shutdown, it is up to the caller to time us out,
1259
        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1260
        async with (nullcontext() if full_shutdown else ignore_after(1)):
×
1261
            async with OldTaskGroup() as group:
×
1262
                await group.spawn(self.taskgroup.cancel_remaining())
×
1263
                if full_shutdown:
×
1264
                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
×
1265
        self.taskgroup = None
×
1266
        self.interface = None
×
1267
        self.interfaces = {}
×
1268
        self._connecting_ifaces.clear()
×
1269
        self._closing_ifaces.clear()
×
1270
        if not full_shutdown:
×
1271
            util.trigger_callback('network_updated')
×
1272

1273
    async def _ensure_there_is_a_main_interface(self):
1✔
1274
        if self.interface:
×
1275
            return
×
1276
        # if auto_connect is set, try a different server
1277
        if self.auto_connect and not self.is_connecting():
×
1278
            await self._switch_to_random_interface()
×
1279
        # if auto_connect is not set, or still no main interface, retry current
1280
        if not self.interface and not self.is_connecting():
×
1281
            if self._can_retry_addr(self.default_server, urgent=True):
×
1282
                await self.switch_to_interface(self.default_server)
×
1283

1284
    async def _maintain_sessions(self):
1✔
1285
        async def maybe_start_new_interfaces():
×
1286
            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
×
1287
            for i in range(self.num_server - num_existing_ifaces):
×
1288
                # FIXME this should try to honour "healthy spread of connected servers"
1289
                server = self._get_next_server_to_try()
×
1290
                if server:
×
1291
                    assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
1292
                    await self.taskgroup.spawn(self._run_new_interface(server))
×
1293
        async def maintain_healthy_spread_of_connected_servers():
×
1294
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1295
            random.shuffle(interfaces)
×
1296
            for iface in interfaces:
×
1297
                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
×
1298
                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
×
1299
                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1300
                    await self._close_interface(iface)
×
1301
        async def maintain_main_interface():
×
1302
            await self._ensure_there_is_a_main_interface()
×
1303
            if self.is_connected():
×
1304
                if self.is_fee_estimates_update_required():
×
1305
                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
×
1306

1307
        while True:
×
1308
            await maybe_start_new_interfaces()
×
1309
            await maintain_healthy_spread_of_connected_servers()
×
1310
            await maintain_main_interface()
×
1311
            await asyncio.sleep(0.1)
×
1312

1313
    @classmethod
1✔
1314
    async def async_send_http_on_proxy(
1✔
1315
            cls, method: str, url: str, *,
1316
            params: dict = None,
1317
            body: bytes = None,
1318
            json: dict = None,
1319
            headers=None,
1320
            on_finish=None,
1321
            timeout=None,
1322
    ):
1323
        async def default_on_finish(resp: ClientResponse):
×
1324
            resp.raise_for_status()
×
1325
            return await resp.text()
×
1326
        if headers is None:
×
1327
            headers = {}
×
1328
        if on_finish is None:
×
1329
            on_finish = default_on_finish
×
1330
        network = cls.get_instance()
×
1331
        proxy = network.proxy if network else None
×
1332
        async with make_aiohttp_session(proxy, timeout=timeout) as session:
×
1333
            if method == 'get':
×
1334
                async with session.get(url, params=params, headers=headers) as resp:
×
1335
                    return await on_finish(resp)
×
1336
            elif method == 'post':
×
1337
                assert body is not None or json is not None, 'body or json must be supplied if method is post'
×
1338
                if body is not None:
×
1339
                    async with session.post(url, data=body, headers=headers) as resp:
×
1340
                        return await on_finish(resp)
×
1341
                elif json is not None:
×
1342
                    async with session.post(url, json=json, headers=headers) as resp:
×
1343
                        return await on_finish(resp)
×
1344
            else:
1345
                raise Exception(f"unexpected {method=!r}")
×
1346

1347
    @classmethod
1✔
1348
    def send_http_on_proxy(cls, method, url, **kwargs):
1✔
1349
        loop = util.get_asyncio_loop()
×
1350
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
1351
        coro = asyncio.run_coroutine_threadsafe(cls.async_send_http_on_proxy(method, url, **kwargs), loop)
×
1352
        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1353
        return coro.result()
×
1354

1355
    # methods used in scripts
1356
    async def get_peers(self):
1✔
1357
        while not self.is_connected():
×
1358
            await asyncio.sleep(1)
×
1359
        session = self.interface.session
×
1360
        return parse_servers(await session.send_request('server.peers.subscribe'))
×
1361

1362
    async def send_multiple_requests(
1✔
1363
            self,
1364
            servers: Sequence[ServerAddr],
1365
            method: str,
1366
            params: Sequence,
1367
            *,
1368
            timeout: int = None,
1369
    ):
1370
        if timeout is None:
×
1371
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1372
        responses = dict()
×
1373
        async def get_response(server: ServerAddr):
×
1374
            interface = Interface(network=self, server=server)
×
1375
            try:
×
1376
                await util.wait_for2(interface.ready, timeout)
×
1377
            except BaseException as e:
×
1378
                await interface.close()
×
1379
                return
×
1380
            try:
×
1381
                res = await interface.session.send_request(method, params, timeout=10)
×
1382
            except Exception as e:
×
1383
                res = e
×
1384
            responses[interface.server] = res
×
1385
        async with OldTaskGroup() as group:
×
1386
            for server in servers:
×
1387
                await group.spawn(get_response(server))
×
1388
        return responses
×
1389

1390
    async def prune_offline_servers(self, hostmap):
1✔
1391
        peers = filter_protocol(hostmap, allowed_protocols=("t", "s",))
×
1392
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
1393
        replies = await self.send_multiple_requests(peers, 'blockchain.headers.subscribe', [], timeout=timeout)
×
1394
        servers_replied = {serveraddr.host for serveraddr in replies.keys()}
×
1395
        servers_dict = {k: v for k, v in hostmap.items()
×
1396
                        if k in servers_replied}
1397
        return servers_dict
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc